diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java index e4b09df3893b..635587e22e40 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java @@ -204,6 +204,29 @@ public void deleteBranch(String branchName) { wrapped.deleteBranch(branchName); } + /** + * Replace main branch. + * + * @param branchName + */ + @Override + public void replaceMainBranch(String branchName) { + privilegeChecker.assertCanInsert(identifier); + wrapped.replaceMainBranch(branchName); + } + + @Override + public void cleanMainBranchFile() { + privilegeChecker.assertCanInsert(identifier); + wrapped.cleanMainBranchFile(); + } + + @Override + public void mainBranch() { + privilegeChecker.assertCanInsert(identifier); + wrapped.mainBranch(); + } + @Override public void replaceBranch(String fromBranch) { privilegeChecker.assertCanInsert(identifier); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 82cc47ad5a47..57b0e5588cc2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -530,6 +530,20 @@ public void replaceBranch(String fromBranch) { branchManager().replaceBranch(fromBranch); } + public void cleanMainBranchFile() { + branchManager().cleanMainBranchFile(); + } + + @Override + public void replaceMainBranch(String branchName) { + branchManager().commitMainBranch(branchName); + } + + @Override + public void mainBranch() { + branchManager().mainBranch(); + } + @Override public void rollbackTo(String tagName) { TagManager tagManager = tagManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index dcb62dfcbea6..3ace8160e3ed 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -190,6 +190,29 @@ default void replaceBranch(String fromBranch) { this.getClass().getSimpleName())); } + default void mainBranch() { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support mainBranch.", + this.getClass().getSimpleName())); + } + + @Override + default void cleanMainBranchFile() { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support cleanMainBranchFile.", + this.getClass().getSimpleName())); + } + + @Override + default void replaceMainBranch(String branchName) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support setMainBranch.", + this.getClass().getSimpleName())); + } + @Override default ExpireSnapshots newExpireSnapshots() { throw new UnsupportedOperationException( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index d01ecc95cdb2..16d50891c637 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -114,6 +114,16 @@ public interface Table extends Serializable { @Experimental void replaceBranch(String fromBranch); + /** Replace main branch. */ + @Experimental + void replaceMainBranch(String branchName); + + @Experimental + void cleanMainBranchFile(); + + @Experimental + void mainBranch(); + /** Manually expire snapshots, parameters can be controlled independently of table options. */ @Experimental ExpireSnapshots newExpireSnapshots(); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 9742d63ac57d..c665fe9d67a6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -74,9 +74,13 @@ public BranchManager( } /** Commit specify branch to main. */ - public void commitMainBranch(String branchName) throws IOException { + public void commitMainBranch(String branchName) { Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE); - fileIO.overwriteFileUtf8(mainBranchFile, branchName); + try { + fileIO.overwriteFileUtf8(mainBranchFile, branchName); + } catch (IOException e) { + throw new RuntimeException(e); + } } /** Return the root Directory of branch. */ @@ -125,11 +129,35 @@ public String defaultMainBranch() { } } + /** Get main branch. */ + public String mainBranch() { + Path path = new Path(tablePath, MAIN_BRANCH_FILE); + try { + if (fileIO.exists(path)) { + return fileIO.readFileUtf8(path); + } else { + return DEFAULT_MAIN_BRANCH; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + /** Return the path of a branch. */ public Path branchPath(String branchName) { return new Path(getBranchPath(fileIO, tablePath, branchName)); } + /** Clean the main branch file and use default. */ + public void cleanMainBranchFile() { + Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE); + try { + fileIO.delete(mainBranchFile, false); + } catch (IOException e) { + throw new RuntimeException("Exception occurs when clean main branch file.", e); + } + } + /** Create empty branch. */ public void createBranch(String branchName) { checkArgument( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CleanMainBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CleanMainBranchProcedure.java new file mode 100644 index 000000000000..bae7820d1012 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CleanMainBranchProcedure.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.Table; + +import org.apache.flink.table.procedure.ProcedureContext; + +/** + * Clean main branch procedure. Usage: + * + *
+ * CALL sys.clean_main_branch('tableId')
+ *
+ */
+public class CleanMainBranchProcedure extends ProcedureBase {
+
+ public static final String IDENTIFIER = "clean_main_branch";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ public String[] call(ProcedureContext procedureContext, String tableId)
+ throws Catalog.TableNotExistException {
+ Table table = catalog.getTable(Identifier.fromString(tableId));
+ table.cleanMainBranchFile();
+
+ return new String[] {"Success"};
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/SetMainBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/SetMainBranchProcedure.java
new file mode 100644
index 000000000000..ac8dc3b48da8
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/SetMainBranchProcedure.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Replace main branch procedure. Usage:
+ *
+ *
+ * CALL sys.replace_main_branch('tableId', 'branchName')
+ *
+ */
+public class SetMainBranchProcedure extends ProcedureBase {
+
+ public static final String IDENTIFIER = "set_main_branch";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ public String[] call(ProcedureContext procedureContext, String tableId, String branchName)
+ throws Catalog.TableNotExistException {
+ Table table = catalog.getTable(Identifier.fromString(tableId));
+ table.replaceMainBranch(branchName);
+
+ return new String[] {"Success"};
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 33a43009d69b..732be3d432ef 100644
--- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -52,3 +52,5 @@ org.apache.paimon.flink.procedure.privilege.GrantPrivilegeToUserProcedure
org.apache.paimon.flink.procedure.privilege.RevokePrivilegeFromUserProcedure
org.apache.paimon.flink.procedure.RepairProcedure
org.apache.paimon.flink.procedure.ReplaceBranchProcedure
+org.apache.paimon.flink.procedure.SetMainBranchProcedure
+org.apache.paimon.flink.procedure.CleanMainBranchProcedure
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
index 209b0d2e7bda..3ada0a20039d 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java
@@ -255,4 +255,51 @@ void testReplaceBranch() throws Exception {
String.format("CALL sys.create_tag('%s.%s', 'tag3', 3)", database, tableName));
assertThat(tagManager.tagExists("tag3")).isTrue();
}
+
+ @Test
+ void testReplaceMainBranchAndCleanMainBranch() throws Exception {
+
+ init(warehouse);
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()},
+ new String[] {"k", "v"});
+ FileStoreTable table =
+ createFileStoreTable(
+ rowType,
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ Collections.emptyList(),
+ Collections.emptyMap());
+
+ StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = writeBuilder.newWrite();
+ commit = writeBuilder.newCommit();
+
+ // 3 snapshots
+ writeData(rowData(1L, BinaryString.fromString("Hi")));
+ writeData(rowData(2L, BinaryString.fromString("Hello")));
+ writeData(rowData(3L, BinaryString.fromString("Paimon")));
+
+ // Create tag2
+ TagManager tagManager = new TagManager(table.fileIO(), table.location());
+ callProcedure(
+ String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName));
+ assertThat(tagManager.tagExists("tag2")).isTrue();
+
+ BranchManager branchManager = table.branchManager();
+ callProcedure(
+ String.format(
+ "CALL sys.create_branch('%s.%s', 'branch_name', 'tag2')",
+ database, tableName));
+ assertThat(branchManager.branchExists("branch_name")).isTrue();
+
+ callProcedure(
+ String.format(
+ "CALL sys.set_main_branch('%s.%s', 'branch_name')", database, tableName));
+ assertThat(branchManager.mainBranch()).isEqualTo("branch_name");
+
+ callProcedure(String.format("CALL sys.clean_main_branch('%s.%s')", database, tableName));
+ assertThat(branchManager.mainBranch()).isEqualTo("main");
+ }
}