Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Manage main branch for paimon #2972

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,29 @@ public void deleteBranch(String branchName) {
wrapped.deleteBranch(branchName);
}

/**
* Replace main branch.
*
* @param branchName
*/
@Override
public void replaceMainBranch(String branchName) {
privilegeChecker.assertCanInsert(identifier);
wrapped.replaceMainBranch(branchName);
}

@Override
public void cleanMainBranchFile() {
privilegeChecker.assertCanInsert(identifier);
wrapped.cleanMainBranchFile();
}

@Override
public void mainBranch() {
privilegeChecker.assertCanInsert(identifier);
wrapped.mainBranch();
}

@Override
public void replaceBranch(String fromBranch) {
privilegeChecker.assertCanInsert(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,20 @@ public void replaceBranch(String fromBranch) {
branchManager().replaceBranch(fromBranch);
}

public void cleanMainBranchFile() {
branchManager().cleanMainBranchFile();
}

@Override
public void replaceMainBranch(String branchName) {
branchManager().commitMainBranch(branchName);
}

@Override
public void mainBranch() {
branchManager().mainBranch();
}

@Override
public void rollbackTo(String tagName) {
TagManager tagManager = tagManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,29 @@ default void replaceBranch(String fromBranch) {
this.getClass().getSimpleName()));
}

default void mainBranch() {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support mainBranch.",
this.getClass().getSimpleName()));
}

@Override
default void cleanMainBranchFile() {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support cleanMainBranchFile.",
this.getClass().getSimpleName()));
}

@Override
default void replaceMainBranch(String branchName) {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support setMainBranch.",
this.getClass().getSimpleName()));
}

@Override
default ExpireSnapshots newExpireSnapshots() {
throw new UnsupportedOperationException(
Expand Down
10 changes: 10 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,16 @@ public interface Table extends Serializable {
@Experimental
void replaceBranch(String fromBranch);

/** Replace main branch. */
@Experimental
void replaceMainBranch(String branchName);

@Experimental
void cleanMainBranchFile();

@Experimental
void mainBranch();

/** Manually expire snapshots, parameters can be controlled independently of table options. */
@Experimental
ExpireSnapshots newExpireSnapshots();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,13 @@ public BranchManager(
}

/** Commit specify branch to main. */
public void commitMainBranch(String branchName) throws IOException {
public void commitMainBranch(String branchName) {
Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE);
fileIO.overwriteFileUtf8(mainBranchFile, branchName);
try {
fileIO.overwriteFileUtf8(mainBranchFile, branchName);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/** Return the root Directory of branch. */
Expand Down Expand Up @@ -125,11 +129,35 @@ public String defaultMainBranch() {
}
}

/** Get main branch. */
public String mainBranch() {
Path path = new Path(tablePath, MAIN_BRANCH_FILE);
try {
if (fileIO.exists(path)) {
return fileIO.readFileUtf8(path);
} else {
return DEFAULT_MAIN_BRANCH;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/** Return the path of a branch. */
public Path branchPath(String branchName) {
return new Path(getBranchPath(fileIO, tablePath, branchName));
}

/** Clean the main branch file and use default. */
public void cleanMainBranchFile() {
Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE);
try {
fileIO.delete(mainBranchFile, false);
} catch (IOException e) {
throw new RuntimeException("Exception occurs when clean main branch file.", e);
}
}

/** Create empty branch. */
public void createBranch(String branchName) {
checkArgument(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;

import org.apache.flink.table.procedure.ProcedureContext;

/**
* Clean main branch procedure. Usage:
*
* <pre><code>
* CALL sys.clean_main_branch('tableId')
* </code></pre>
*/
public class CleanMainBranchProcedure extends ProcedureBase {

public static final String IDENTIFIER = "clean_main_branch";

@Override
public String identifier() {
return IDENTIFIER;
}

public String[] call(ProcedureContext procedureContext, String tableId)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
table.cleanMainBranchFile();

return new String[] {"Success"};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.Table;

import org.apache.flink.table.procedure.ProcedureContext;

/**
* Replace main branch procedure. Usage:
*
* <pre><code>
* CALL sys.replace_main_branch('tableId', 'branchName')
* </code></pre>
*/
public class SetMainBranchProcedure extends ProcedureBase {

public static final String IDENTIFIER = "set_main_branch";

@Override
public String identifier() {
return IDENTIFIER;
}

public String[] call(ProcedureContext procedureContext, String tableId, String branchName)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
table.replaceMainBranch(branchName);

return new String[] {"Success"};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,5 @@ org.apache.paimon.flink.procedure.privilege.GrantPrivilegeToUserProcedure
org.apache.paimon.flink.procedure.privilege.RevokePrivilegeFromUserProcedure
org.apache.paimon.flink.procedure.RepairProcedure
org.apache.paimon.flink.procedure.ReplaceBranchProcedure
org.apache.paimon.flink.procedure.SetMainBranchProcedure
org.apache.paimon.flink.procedure.CleanMainBranchProcedure
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,51 @@ void testReplaceBranch() throws Exception {
String.format("CALL sys.create_tag('%s.%s', 'tag3', 3)", database, tableName));
assertThat(tagManager.tagExists("tag3")).isTrue();
}

@Test
void testReplaceMainBranchAndCleanMainBranch() throws Exception {

init(warehouse);
RowType rowType =
RowType.of(
new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()},
new String[] {"k", "v"});
FileStoreTable table =
createFileStoreTable(
rowType,
Collections.emptyList(),
Collections.singletonList("k"),
Collections.emptyList(),
Collections.emptyMap());

StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser);
write = writeBuilder.newWrite();
commit = writeBuilder.newCommit();

// 3 snapshots
writeData(rowData(1L, BinaryString.fromString("Hi")));
writeData(rowData(2L, BinaryString.fromString("Hello")));
writeData(rowData(3L, BinaryString.fromString("Paimon")));

// Create tag2
TagManager tagManager = new TagManager(table.fileIO(), table.location());
callProcedure(
String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName));
assertThat(tagManager.tagExists("tag2")).isTrue();

BranchManager branchManager = table.branchManager();
callProcedure(
String.format(
"CALL sys.create_branch('%s.%s', 'branch_name', 'tag2')",
database, tableName));
assertThat(branchManager.branchExists("branch_name")).isTrue();

callProcedure(
String.format(
"CALL sys.set_main_branch('%s.%s', 'branch_name')", database, tableName));
assertThat(branchManager.mainBranch()).isEqualTo("branch_name");

callProcedure(String.format("CALL sys.clean_main_branch('%s.%s')", database, tableName));
assertThat(branchManager.mainBranch()).isEqualTo("main");
}
}
Loading