From e683082b1dec651a05b78b300147ecd93d17f3bb Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Thu, 7 Mar 2024 16:41:44 +0800 Subject: [PATCH] Manage main branch for paimon --- .../privilege/PrivilegedFileStoreTable.java | 23 +++++++++ .../paimon/table/AbstractFileStoreTable.java | 14 ++++++ .../apache/paimon/table/ReadonlyTable.java | 23 +++++++++ .../java/org/apache/paimon/table/Table.java | 10 ++++ .../apache/paimon/utils/BranchManager.java | 31 +++++++++++- .../procedure/CleanMainBranchProcedure.java | 50 +++++++++++++++++++ .../procedure/ReplaceMainBranchProcedure.java | 50 +++++++++++++++++++ .../org.apache.paimon.factories.Factory | 2 + .../flink/action/BranchActionITCase.java | 48 ++++++++++++++++++ 9 files changed, 249 insertions(+), 2 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CleanMainBranchProcedure.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceMainBranchProcedure.java diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java index e4b09df3893b6..635587e22e40f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java @@ -204,6 +204,29 @@ public void deleteBranch(String branchName) { wrapped.deleteBranch(branchName); } + /** + * Replace main branch. + * + * @param branchName + */ + @Override + public void replaceMainBranch(String branchName) { + privilegeChecker.assertCanInsert(identifier); + wrapped.replaceMainBranch(branchName); + } + + @Override + public void cleanMainBranchFile() { + privilegeChecker.assertCanInsert(identifier); + wrapped.cleanMainBranchFile(); + } + + @Override + public void mainBranch() { + privilegeChecker.assertCanInsert(identifier); + wrapped.mainBranch(); + } + @Override public void replaceBranch(String fromBranch) { privilegeChecker.assertCanInsert(identifier); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 82cc47ad5a474..57b0e5588cc20 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -530,6 +530,20 @@ public void replaceBranch(String fromBranch) { branchManager().replaceBranch(fromBranch); } + public void cleanMainBranchFile() { + branchManager().cleanMainBranchFile(); + } + + @Override + public void replaceMainBranch(String branchName) { + branchManager().commitMainBranch(branchName); + } + + @Override + public void mainBranch() { + branchManager().mainBranch(); + } + @Override public void rollbackTo(String tagName) { TagManager tagManager = tagManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index dcb62dfcbea62..3ace8160e3eda 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -190,6 +190,29 @@ default void replaceBranch(String fromBranch) { this.getClass().getSimpleName())); } + default void mainBranch() { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support mainBranch.", + this.getClass().getSimpleName())); + } + + @Override + default void cleanMainBranchFile() { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support cleanMainBranchFile.", + this.getClass().getSimpleName())); + } + + @Override + default void replaceMainBranch(String branchName) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support setMainBranch.", + this.getClass().getSimpleName())); + } + @Override default ExpireSnapshots newExpireSnapshots() { throw new UnsupportedOperationException( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index d01ecc95cdb27..16d50891c6376 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -114,6 +114,16 @@ public interface Table extends Serializable { @Experimental void replaceBranch(String fromBranch); + /** Replace main branch. */ + @Experimental + void replaceMainBranch(String branchName); + + @Experimental + void cleanMainBranchFile(); + + @Experimental + void mainBranch(); + /** Manually expire snapshots, parameters can be controlled independently of table options. */ @Experimental ExpireSnapshots newExpireSnapshots(); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 9742d63ac57d6..27282d51d2789 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -74,9 +74,13 @@ public BranchManager( } /** Commit specify branch to main. */ - public void commitMainBranch(String branchName) throws IOException { + public void commitMainBranch(String branchName) { Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE); - fileIO.overwriteFileUtf8(mainBranchFile, branchName); + try { + fileIO.overwriteFileUtf8(mainBranchFile, branchName); + } catch (IOException e) { + throw new RuntimeException(e); + } } /** Return the root Directory of branch. */ @@ -124,12 +128,35 @@ public String defaultMainBranch() { throw new RuntimeException(e); } } + /** Get main branch. */ + public String mainBranch() { + Path path = new Path(tablePath, MAIN_BRANCH_FILE); + try { + if (fileIO.exists(path)) { + return fileIO.readFileUtf8(path); + } else { + return DEFAULT_MAIN_BRANCH; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } /** Return the path of a branch. */ public Path branchPath(String branchName) { return new Path(getBranchPath(fileIO, tablePath, branchName)); } + /** Clean the main branch file and use default. */ + public void cleanMainBranchFile() { + Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE); + try { + fileIO.delete(mainBranchFile, false); + } catch (IOException e) { + throw new RuntimeException("Exception occurs when clean main branch file.", e); + } + } + /** Create empty branch. */ public void createBranch(String branchName) { checkArgument( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CleanMainBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CleanMainBranchProcedure.java new file mode 100644 index 0000000000000..bae7820d10124 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CleanMainBranchProcedure.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.Table; + +import org.apache.flink.table.procedure.ProcedureContext; + +/** + * Clean main branch procedure. Usage: + * + *

+ *  CALL sys.clean_main_branch('tableId')
+ * 
+ */ +public class CleanMainBranchProcedure extends ProcedureBase { + + public static final String IDENTIFIER = "clean_main_branch"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + public String[] call(ProcedureContext procedureContext, String tableId) + throws Catalog.TableNotExistException { + Table table = catalog.getTable(Identifier.fromString(tableId)); + table.cleanMainBranchFile(); + + return new String[] {"Success"}; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceMainBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceMainBranchProcedure.java new file mode 100644 index 0000000000000..33fdd99142569 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceMainBranchProcedure.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.Table; + +import org.apache.flink.table.procedure.ProcedureContext; + +/** + * Replace main branch procedure. Usage: + * + *

+ *  CALL sys.replace_main_branch('tableId', 'branchName')
+ * 
+ */ +public class ReplaceMainBranchProcedure extends ProcedureBase { + + public static final String IDENTIFIER = "replace_main_branch"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + public String[] call(ProcedureContext procedureContext, String tableId, String branchName) + throws Catalog.TableNotExistException { + Table table = catalog.getTable(Identifier.fromString(tableId)); + table.replaceMainBranch(branchName); + + return new String[] {"Success"}; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 33a43009d69b4..cedf4850d9c15 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -52,3 +52,5 @@ org.apache.paimon.flink.procedure.privilege.GrantPrivilegeToUserProcedure org.apache.paimon.flink.procedure.privilege.RevokePrivilegeFromUserProcedure org.apache.paimon.flink.procedure.RepairProcedure org.apache.paimon.flink.procedure.ReplaceBranchProcedure +org.apache.paimon.flink.procedure.ReplaceMainBranchProcedure +org.apache.paimon.flink.procedure.CleanMainBranchProcedure diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java index 209b0d2e7bda3..ce284a38933d1 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java @@ -255,4 +255,52 @@ void testReplaceBranch() throws Exception { String.format("CALL sys.create_tag('%s.%s', 'tag3', 3)", database, tableName)); assertThat(tagManager.tagExists("tag3")).isTrue(); } + + @Test + void testReplaceMainBranchAndCleanMainBranch() throws Exception { + + init(warehouse); + RowType rowType = + RowType.of( + new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()}, + new String[] {"k", "v"}); + FileStoreTable table = + createFileStoreTable( + rowType, + Collections.emptyList(), + Collections.singletonList("k"), + Collections.emptyList(), + Collections.emptyMap()); + + StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); + write = writeBuilder.newWrite(); + commit = writeBuilder.newCommit(); + + // 3 snapshots + writeData(rowData(1L, BinaryString.fromString("Hi"))); + writeData(rowData(2L, BinaryString.fromString("Hello"))); + writeData(rowData(3L, BinaryString.fromString("Paimon"))); + + // Create tag2 + TagManager tagManager = new TagManager(table.fileIO(), table.location()); + callProcedure( + String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName)); + assertThat(tagManager.tagExists("tag2")).isTrue(); + + BranchManager branchManager = table.branchManager(); + callProcedure( + String.format( + "CALL sys.create_branch('%s.%s', 'branch_name', 'tag2')", + database, tableName)); + assertThat(branchManager.branchExists("branch_name")).isTrue(); + + callProcedure( + String.format( + "CALL sys.replace_main_branch('%s.%s', 'branch_name')", + database, tableName)); + assertThat(branchManager.mainBranch()).isEqualTo("branch_name"); + + callProcedure(String.format("CALL sys.clean_main_branch('%s.%s')", database, tableName)); + assertThat(branchManager.mainBranch()).isEqualTo("main"); + } }