From 8e4cb760717700d24ee9434a6a2ac2df9d384788 Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Thu, 7 Mar 2024 16:41:44 +0800 Subject: [PATCH] Manage main branch for paimon --- .../apache/paimon/schema/SchemaManager.java | 20 ++--- .../paimon/table/AbstractFileStoreTable.java | 15 ++++ .../apache/paimon/table/ReadonlyTable.java | 24 ++++++ .../java/org/apache/paimon/table/Table.java | 8 ++ .../apache/paimon/utils/BranchManager.java | 80 ++++++++++++++++--- .../apache/paimon/utils/SnapshotManager.java | 7 +- .../org/apache/paimon/utils/TagManager.java | 12 ++- .../paimon/table/FileStoreTableTestBase.java | 4 +- .../procedure/CleanMainBranchProcedure.java | 50 ++++++++++++ .../procedure/ReplaceMainBranchProcedure.java | 50 ++++++++++++ .../org.apache.paimon.factories.Factory | 2 + .../flink/action/BranchActionITCase.java | 47 +++++++++++ 12 files changed, 290 insertions(+), 29 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CleanMainBranchProcedure.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceMainBranchProcedure.java diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index d94da91ef4c56..74a871c54669c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -97,10 +97,7 @@ public Optional latest() { } public Optional latest(String branchName) { - Path directoryPath = - branchName.equals(DEFAULT_MAIN_BRANCH) - ? schemaDirectory() - : branchSchemaDirectory(branchName); + Path directoryPath = schemaDirectory(branchName); try { return listVersionedFiles(fileIO, directoryPath, SCHEMA_PREFIX) .reduce(Math::max) @@ -498,21 +495,24 @@ public static TableSchema fromPath(FileIO fileIO, Path path) { } private Path schemaDirectory() { - return new Path(tableRoot + "/schema"); + return schemaDirectory(DEFAULT_MAIN_BRANCH); } @VisibleForTesting public Path toSchemaPath(long id) { - return new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + id); + return toSchemaPath(DEFAULT_MAIN_BRANCH, id); } - public Path branchSchemaDirectory(String branchName) { - return new Path(getBranchPath(tableRoot, branchName) + "/schema"); + public Path schemaDirectory(String branchName) { + return new Path(getBranchPath(fileIO, tableRoot, branchName) + "/schema"); } - public Path branchSchemaPath(String branchName, long schemaId) { + public Path toSchemaPath(String branchName, long schemaId) { return new Path( - getBranchPath(tableRoot, branchName) + "/schema/" + SCHEMA_PREFIX + schemaId); + getBranchPath(fileIO, tableRoot, branchName) + + "/schema/" + + SCHEMA_PREFIX + + schemaId); } /** diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 7d4a4a7c27dc9..cf3f37450bbeb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -544,6 +544,21 @@ public void deleteBranch(String branchName) { branchManager().deleteBranch(branchName); } + @Override + public void cleanMainBranchFile() { + branchManager().cleanMainBranchFile(); + } + + @Override + public void replaceMainBranch(String branchName) { + branchManager().commitMainBranch(branchName); + } + + @Override + public void mainBranch() { + branchManager().mainBranch(); + } + @Override public void rollbackTo(String tagName) { TagManager tagManager = tagManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index 42bea3f6813eb..2577ab2bcc486 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -182,6 +182,30 @@ default void deleteBranch(String branchName) { this.getClass().getSimpleName())); } + @Override + default void mainBranch() { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support mainBranch.", + this.getClass().getSimpleName())); + } + + @Override + default void cleanMainBranchFile() { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support cleanMainBranchFile.", + this.getClass().getSimpleName())); + } + + @Override + default void replaceMainBranch(String branchName) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support setMainBranch.", + this.getClass().getSimpleName())); + } + @Override default ExpireSnapshots newExpireSnapshots() { throw new UnsupportedOperationException( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index 876908394f2dc..3ecc00205b866 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -111,6 +111,14 @@ public interface Table extends Serializable { @Experimental void deleteBranch(String branchName); + /** Replace main branch. */ + @Experimental + void replaceMainBranch(String branchName); + + void cleanMainBranchFile(); + + void mainBranch(); + /** Manually expire snapshots, parameters can be controlled independently of table options. */ @Experimental ExpireSnapshots newExpireSnapshots(); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index f3f06f89208a4..9c44fb410b124 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -49,6 +49,7 @@ public class BranchManager { public static final String BRANCH_PREFIX = "branch-"; public static final String DEFAULT_MAIN_BRANCH = "main"; + public static final String MAIN_BRANCH_FILE = "MAIN-BRANCH"; private final FileIO fileIO; private final Path tablePath; @@ -75,13 +76,69 @@ public Path branchDirectory() { } /** Return the path string of a branch. */ - public static String getBranchPath(Path tablePath, String branchName) { + public static String getBranchPath(FileIO fileIO, Path tablePath, String branchName) { + if (branchName.equals(DEFAULT_MAIN_BRANCH)) { + Path path = new Path(tablePath, MAIN_BRANCH_FILE); + try { + if (fileIO.exists(path)) { + String data = fileIO.readFileUtf8(path); + if (StringUtils.isBlank(data)) { + return tablePath.toString(); + } else { + return tablePath.toString() + "/branch/" + BRANCH_PREFIX + data; + } + } else { + return tablePath.toString(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } return tablePath.toString() + "/branch/" + BRANCH_PREFIX + branchName; } + /** Get main branch. */ + public String mainBranch() { + Path path = new Path(tablePath, MAIN_BRANCH_FILE); + try { + if (fileIO.exists(path)) { + return fileIO.readFileUtf8(path); + } else { + return DEFAULT_MAIN_BRANCH; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + /** Return the path of a branch. */ public Path branchPath(String branchName) { - return new Path(getBranchPath(tablePath, branchName)); + return new Path(getBranchPath(fileIO, tablePath, branchName)); + } + + /** Replace main by specify branch. */ + public void commitMainBranch(String branchName) { + Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE); + try { + fileIO.delete(mainBranchFile, false); + fileIO.overwriteFileUtf8(mainBranchFile, branchName); + } catch (IOException e) { + throw new RuntimeException( + String.format( + "Exception occurs when set main branch '%s' (directory in %s).", + branchName, tablePath.toString()), + e); + } + } + + /** Clean the main branch file and use default. */ + public void cleanMainBranchFile() { + Path mainBranchFile = new Path(tablePath, MAIN_BRANCH_FILE); + try { + fileIO.delete(mainBranchFile, false); + } catch (IOException e) { + throw new RuntimeException("Exception occurs when clean main branch file.", e); + } } /** Create empty branch. */ @@ -101,12 +158,12 @@ public void createBranch(String branchName) { TableSchema latestSchema = schemaManager.latest().get(); fileIO.copyFileUtf8( schemaManager.toSchemaPath(latestSchema.id()), - schemaManager.branchSchemaPath(branchName, latestSchema.id())); + schemaManager.toSchemaPath(branchName, latestSchema.id())); } catch (IOException e) { throw new RuntimeException( String.format( "Exception occurs when create branch '%s' (directory in %s).", - branchName, getBranchPath(tablePath, branchName)), + branchName, getBranchPath(fileIO, tablePath, branchName)), e); } } @@ -133,12 +190,12 @@ public void createBranch(String branchName, long snapshotId) { snapshotManager.branchSnapshotPath(branchName, snapshot.id())); fileIO.copyFileUtf8( schemaManager.toSchemaPath(snapshot.schemaId()), - schemaManager.branchSchemaPath(branchName, snapshot.schemaId())); + schemaManager.toSchemaPath(branchName, snapshot.schemaId())); } catch (IOException e) { throw new RuntimeException( String.format( "Exception occurs when create branch '%s' (directory in %s).", - branchName, getBranchPath(tablePath, branchName)), + branchName, getBranchPath(fileIO, tablePath, branchName)), e); } } @@ -162,18 +219,18 @@ public void createBranch(String branchName, String tagName) { try { // Copy the corresponding tag, snapshot and schema files into the branch directory fileIO.copyFileUtf8( - tagManager.tagPath(tagName), tagManager.branchTagPath(branchName, tagName)); + tagManager.tagPath(tagName), tagManager.tagPath(branchName, tagName)); fileIO.copyFileUtf8( snapshotManager.snapshotPath(snapshot.id()), snapshotManager.branchSnapshotPath(branchName, snapshot.id())); fileIO.copyFileUtf8( schemaManager.toSchemaPath(snapshot.schemaId()), - schemaManager.branchSchemaPath(branchName, snapshot.schemaId())); + schemaManager.toSchemaPath(branchName, snapshot.schemaId())); } catch (IOException e) { throw new RuntimeException( String.format( "Exception occurs when create branch '%s' (directory in %s).", - branchName, getBranchPath(tablePath, branchName)), + branchName, getBranchPath(fileIO, tablePath, branchName)), e); } } @@ -187,7 +244,7 @@ public void deleteBranch(String branchName) { LOG.info( String.format( "Deleting the branch failed due to an exception in deleting the directory %s. Please try again.", - getBranchPath(tablePath, branchName)), + getBranchPath(fileIO, tablePath, branchName)), e); } } @@ -239,8 +296,7 @@ public List branches() { } FileStoreTable branchTable = FileStoreTableFactory.create( - fileIO, new Path(getBranchPath(tablePath, branchName))); - + fileIO, new Path(getBranchPath(fileIO, tablePath, branchName))); SortedMap> snapshotTags = branchTable.tagManager().tags(); Long earliestSnapshotId = branchTable.snapshotManager().earliestSnapshotId(); if (snapshotTags.isEmpty()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index dbbc8fffdc050..2b79a2897face 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -94,12 +94,15 @@ public Path snapshotPath(long snapshotId) { } public Path branchSnapshotDirectory(String branchName) { - return new Path(getBranchPath(tablePath, branchName) + "/snapshot"); + return new Path(getBranchPath(fileIO, tablePath, branchName) + "/snapshot"); } public Path branchSnapshotPath(String branchName, long snapshotId) { return new Path( - getBranchPath(tablePath, branchName) + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId); + getBranchPath(fileIO, tablePath, branchName) + + "/snapshot/" + + SNAPSHOT_PREFIX + + snapshotId); } public Path snapshotPathByBranch(String branchName, long snapshotId) { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 8b7818fed782e..903a59eb79740 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -45,6 +45,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.BranchManager.getBranchPath; import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -66,7 +67,11 @@ public TagManager(FileIO fileIO, Path tablePath) { /** Return the root Directory of tags. */ public Path tagDirectory() { - return new Path(tablePath + "/tag"); + return tagDirectory(DEFAULT_MAIN_BRANCH); + } + + public Path tagDirectory(String branchName) { + return new Path(getBranchPath(fileIO, tablePath, branchName) + "/tag"); } /** Return the path of a tag. */ @@ -75,8 +80,9 @@ public Path tagPath(String tagName) { } /** Return the path of a tag in branch. */ - public Path branchTagPath(String branchName, String tagName) { - return new Path(getBranchPath(tablePath, branchName) + "/tag/" + TAG_PREFIX + tagName); + public Path tagPath(String branchName, String tagName) { + return new Path( + getBranchPath(fileIO, tablePath, branchName) + "/tag/" + TAG_PREFIX + tagName); } /** Create a tag from given snapshot and save it in the storage. */ diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index 778c871f7798a..8f074701a3033 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -1019,7 +1019,7 @@ public void testCreateBranch() throws Exception { // verify test-tag in test-branch is equal to snapshot 2 Snapshot branchTag = Snapshot.fromPath( - new TraceableFileIO(), tagManager.branchTagPath("test-branch", "test-tag")); + new TraceableFileIO(), tagManager.tagPath("test-branch", "test-tag")); assertThat(branchTag.equals(snapshot2)).isTrue(); // verify snapshot in test-branch is equal to snapshot 2 @@ -1034,7 +1034,7 @@ public void testCreateBranch() throws Exception { SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(), tablePath); TableSchema branchSchema = SchemaManager.fromPath( - new TraceableFileIO(), schemaManager.branchSchemaPath("test-branch", 0)); + new TraceableFileIO(), schemaManager.toSchemaPath("test-branch", 0)); TableSchema schema0 = schemaManager.schema(0); assertThat(branchSchema.equals(schema0)).isTrue(); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CleanMainBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CleanMainBranchProcedure.java new file mode 100644 index 0000000000000..bae7820d10124 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CleanMainBranchProcedure.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.Table; + +import org.apache.flink.table.procedure.ProcedureContext; + +/** + * Clean main branch procedure. Usage: + * + *

+ *  CALL sys.clean_main_branch('tableId')
+ * 
+ */ +public class CleanMainBranchProcedure extends ProcedureBase { + + public static final String IDENTIFIER = "clean_main_branch"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + public String[] call(ProcedureContext procedureContext, String tableId) + throws Catalog.TableNotExistException { + Table table = catalog.getTable(Identifier.fromString(tableId)); + table.cleanMainBranchFile(); + + return new String[] {"Success"}; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceMainBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceMainBranchProcedure.java new file mode 100644 index 0000000000000..33fdd99142569 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ReplaceMainBranchProcedure.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.Table; + +import org.apache.flink.table.procedure.ProcedureContext; + +/** + * Replace main branch procedure. Usage: + * + *

+ *  CALL sys.replace_main_branch('tableId', 'branchName')
+ * 
+ */ +public class ReplaceMainBranchProcedure extends ProcedureBase { + + public static final String IDENTIFIER = "replace_main_branch"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + public String[] call(ProcedureContext procedureContext, String tableId, String branchName) + throws Catalog.TableNotExistException { + Table table = catalog.getTable(Identifier.fromString(tableId)); + table.replaceMainBranch(branchName); + + return new String[] {"Success"}; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 943da3e16e6d6..b9c114656f1d5 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -50,3 +50,5 @@ org.apache.paimon.flink.procedure.privilege.CreatePrivilegedUserProcedure org.apache.paimon.flink.procedure.privilege.DropPrivilegedUserProcedure org.apache.paimon.flink.procedure.privilege.GrantPrivilegeToUserProcedure org.apache.paimon.flink.procedure.privilege.RevokePrivilegeFromUserProcedure +org.apache.paimon.flink.procedure.ReplaceMainBranchProcedure +org.apache.paimon.flink.procedure.CleanMainBranchProcedure diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java index 4f4f314966bbd..e8f9e2c877783 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/BranchActionITCase.java @@ -160,4 +160,51 @@ void testCreateAndDeleteEmptyBranch() throws Exception { database, tableName)); assertThat(branchManager.branchExists("empty_branch_name")).isFalse(); } + + @Test + void testReplaceMainBranchAndCleanMainBranch() throws Exception { + + init(warehouse); + + RowType rowType = + RowType.of( + new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()}, + new String[] {"k", "v"}); + FileStoreTable table = + createFileStoreTable( + rowType, + Collections.emptyList(), + Collections.singletonList("k"), + Collections.emptyMap()); + + StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); + write = writeBuilder.newWrite(); + commit = writeBuilder.newCommit(); + + // 3 snapshots + writeData(rowData(1L, BinaryString.fromString("Hi"))); + writeData(rowData(2L, BinaryString.fromString("Hello"))); + writeData(rowData(3L, BinaryString.fromString("Paimon"))); + + TagManager tagManager = new TagManager(table.fileIO(), table.location()); + callProcedure( + String.format("CALL sys.create_tag('%s.%s', 'tag2', 2)", database, tableName)); + assertThat(tagManager.tagExists("tag2")).isTrue(); + + BranchManager branchManager = table.branchManager(); + callProcedure( + String.format( + "CALL sys.create_branch('%s.%s', 'branch_name', 'tag2')", + database, tableName)); + assertThat(branchManager.branchExists("branch_name")).isTrue(); + + callProcedure( + String.format( + "CALL sys.replace_main_branch('%s.%s', 'branch_name')", + database, tableName)); + assertThat(branchManager.mainBranch()).isEqualTo("branch_name"); + + callProcedure(String.format("CALL sys.clean_main_branch('%s.%s')", database, tableName)); + assertThat(branchManager.mainBranch()).isEqualTo("main"); + } }