diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index 6b83901e7050..352d5726a7dc 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -244,6 +244,17 @@ default void overwriteFileUtf8(Path path, String content) throws IOException { } } + /** + * Read file to UTF_8 decoding, then write content to one file atomically, initially writes to + * temp hidden file and only renames to the target file once temp file is closed. + * + * @return false if targetPath file exists + */ + default boolean copyFileUtf8(Path sourcePath, Path targetPath) throws IOException { + String content = readFileUtf8(sourcePath); + return writeFileUtf8(targetPath, content); + } + /** Read file from {@link #overwriteFileUtf8} file. */ default Optional readOverwrittenFileUtf8(Path path) throws IOException { int retryNumber = 0; diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index dba7f8e14a5d..5300084d053d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -64,6 +64,7 @@ import static org.apache.paimon.catalog.AbstractCatalog.DB_SUFFIX; import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE; +import static org.apache.paimon.utils.BranchManager.getBranchPath; import static org.apache.paimon.utils.FileUtils.listVersionedFiles; import static org.apache.paimon.utils.Preconditions.checkState; @@ -462,6 +463,14 @@ public TableSchema schema(long id) { } } + public static TableSchema fromPath(FileIO fileIO, Path path) { + try { + return JsonSerdeUtil.fromJson(fileIO.readFileUtf8(path), TableSchema.class); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + private Path schemaDirectory() { return new Path(tableRoot + "/schema"); } @@ -471,6 +480,11 @@ public Path toSchemaPath(long id) { return new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + id); } + public Path branchSchemaPath(String branchName, long schemaId) { + return new Path( + getBranchPath(tableRoot, branchName) + "/schema/" + SCHEMA_PREFIX + schemaId); + } + /** * Delete schema with specific id. * diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 0bef36e558dd..3185de4c3177 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -50,6 +50,7 @@ import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl; import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner; import org.apache.paimon.tag.TagPreview; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -379,6 +380,16 @@ public void deleteTag(String tagName) { tagManager().deleteTag(tagName, store().newTagDeletion(), snapshotManager()); } + @Override + public void createBranch(String branchName, String tagName) { + branchManager().createBranch(branchName, tagName); + } + + @Override + public void deleteBranch(String branchName) { + branchManager().deleteBranch(branchName); + } + @Override public void rollbackTo(String tagName) { TagManager tagManager = tagManager(); @@ -409,6 +420,11 @@ public TagManager tagManager() { return new TagManager(fileIO, path); } + @Override + public BranchManager branchManager() { + return new BranchManager(fileIO, path, snapshotManager(), tagManager(), schemaManager()); + } + private RollbackHelper rollbackHelper() { return new RollbackHelper( snapshotManager(), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java index 1a66edc5ed94..b5bebe2a72d4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java @@ -22,6 +22,7 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.table.source.snapshot.SnapshotReader; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -36,6 +37,8 @@ public interface DataTable extends InnerTable { TagManager tagManager(); + BranchManager branchManager(); + Path location(); FileIO fileIO(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index ca2dce0bd9cf..3e3bcb2d731e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -126,4 +126,20 @@ default void rollbackTo(String tagName) { "Readonly Table %s does not support rollbackTo tag.", this.getClass().getSimpleName())); } + + @Override + default void createBranch(String branchName, String tagName) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support createBranch.", + this.getClass().getSimpleName())); + } + + @Override + default void deleteBranch(String branchName) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support deleteBranch.", + this.getClass().getSimpleName())); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index 3e6261b06f55..4f713e992e76 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -83,6 +83,14 @@ public interface Table extends Serializable { @Experimental void rollbackTo(String tagName); + /** Create a branch from given tag. */ + @Experimental + void createBranch(String branchName, String tagName); + + /** Delete a branch by branchName. */ + @Experimental + void deleteBranch(String branchName); + // =============== Read & Write Operations ================== /** Returns a new read builder. */ diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index 37e5a38a9f31..fead5068b2be 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -50,6 +50,7 @@ import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.apache.paimon.types.VarCharType; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.ProjectedRow; import org.apache.paimon.utils.SnapshotManager; @@ -160,6 +161,11 @@ public TagManager tagManager() { return dataTable.tagManager(); } + @Override + public BranchManager branchManager() { + return dataTable.branchManager(); + } + @Override public InnerTableRead newRead() { return new AuditLogRead(dataTable.newRead()); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java index 062a440ce8f6..4c9b9a6015f7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java @@ -44,6 +44,7 @@ import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; import org.apache.paimon.types.VarCharType; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.IteratorRecordReader; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -117,6 +118,11 @@ public TagManager tagManager() { return wrapped.tagManager(); } + @Override + public BranchManager branchManager() { + return wrapped.branchManager(); + } + @Override public String name() { return "__internal_buckets_" + wrapped.location().getName(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java index a43c5da5af39..7825b93e4fad 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java @@ -44,6 +44,7 @@ import org.apache.paimon.types.DataType; import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.IteratorRecordReader; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -104,6 +105,11 @@ public TagManager tagManager() { return wrapped.tagManager(); } + @Override + public BranchManager branchManager() { + return wrapped.branchManager(); + } + @Override public String name() { return "__internal_file_monitor_" + wrapped.location().getName(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java index 13d617cf8166..55542373458b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java @@ -34,6 +34,7 @@ import org.apache.paimon.table.source.InnerTableScanImpl; import org.apache.paimon.table.source.snapshot.SnapshotReader; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -136,6 +137,11 @@ public TagManager tagManager() { return dataTable.tagManager(); } + @Override + public BranchManager branchManager() { + return dataTable.branchManager(); + } + @Override public InnerTableRead newRead() { return dataTable.newRead(); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java new file mode 100644 index 000000000000..5ed647ef4b8c --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.schema.SchemaManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** Manager for {@code Branch}. */ +public class BranchManager { + + private static final Logger LOG = LoggerFactory.getLogger(BranchManager.class); + + public static final String BRANCH_PREFIX = "branch-"; + + private final FileIO fileIO; + private final Path tablePath; + private final SnapshotManager snapshotManager; + private final TagManager tagManager; + private final SchemaManager schemaManager; + + public BranchManager( + FileIO fileIO, + Path path, + SnapshotManager snapshotManager, + TagManager tagManager, + SchemaManager schemaManager) { + this.fileIO = fileIO; + this.tablePath = path; + this.snapshotManager = snapshotManager; + this.tagManager = tagManager; + this.schemaManager = schemaManager; + } + + /** Return the root Directory of branch. */ + public Path branchDirectory() { + return new Path(tablePath + "/branch"); + } + + /** Return the path string of a branch. */ + public static String getBranchPath(Path tablePath, String branchName) { + return tablePath.toString() + "/branch/" + BRANCH_PREFIX + branchName; + } + + /** Return the path of a branch. */ + public Path branchPath(String branchName) { + return new Path(getBranchPath(tablePath, branchName)); + } + + public void createBranch(String branchName, String tagName) { + checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is blank.", branchName); + checkArgument(!branchExists(branchName), "Branch name '%s' already exists.", branchName); + checkArgument(tagManager.tagExists(tagName), "Tag name '%s' not exists.", tagName); + checkArgument( + !branchName.chars().allMatch(Character::isDigit), + "Branch name cannot be pure numeric string but is '%s'.", + branchName); + + Snapshot snapshot = tagManager.taggedSnapshot(tagName); + + try { + // Copy the corresponding tag, snapshot and schema files into the branch directory + fileIO.copyFileUtf8( + tagManager.tagPath(tagName), tagManager.branchTagPath(branchName, tagName)); + fileIO.copyFileUtf8( + snapshotManager.snapshotPath(snapshot.id()), + snapshotManager.branchSnapshotPath(branchName, snapshot.id())); + fileIO.copyFileUtf8( + schemaManager.toSchemaPath(snapshot.schemaId()), + schemaManager.branchSchemaPath(branchName, snapshot.schemaId())); + } catch (IOException e) { + throw new RuntimeException( + String.format( + "Exception occurs when create branch '%s' (directory in %s).", + branchName, getBranchPath(tablePath, branchName)), + e); + } + } + + public void deleteBranch(String branchName) { + checkArgument(branchExists(branchName), "Branch name '%s' doesn't exist.", branchName); + try { + // Delete branch directory + fileIO.delete(branchPath(branchName), true); + } catch (IOException e) { + LOG.info( + String.format( + "Deleting the branch failed due to an exception in deleting the directory %s. Please try again.", + getBranchPath(tablePath, branchName)), + e); + } + } + + /** Check if path exists. */ + public boolean fileExists(Path path) { + try { + if (fileIO.exists(path)) { + return true; + } + return false; + } catch (IOException e) { + throw new RuntimeException( + String.format("Failed to determine if path '%s' exists.", path), e); + } + } + + /** Check if a branch exists. */ + public boolean branchExists(String branchName) { + Path branchPath = branchPath(branchName); + return fileExists(branchPath); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index 96d45d456862..c6965385588c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -43,6 +43,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.apache.paimon.utils.BranchManager.getBranchPath; import static org.apache.paimon.utils.FileUtils.listVersionedFiles; /** Manager for {@link Snapshot}, providing utility methods related to paths and snapshot hints. */ @@ -80,6 +81,11 @@ public Path snapshotPath(long snapshotId) { return new Path(tablePath + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId); } + public Path branchSnapshotPath(String branchName, long snapshotId) { + return new Path( + getBranchPath(tablePath, branchName) + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId); + } + public Snapshot snapshot(long snapshotId) { return Snapshot.fromPath(fileIO, snapshotPath(snapshotId)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index f011328b49c1..bebde6ce3d13 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -38,6 +38,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.apache.paimon.utils.BranchManager.getBranchPath; import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -66,6 +67,11 @@ public Path tagPath(String tagName) { return new Path(tablePath + "/tag/" + TAG_PREFIX + tagName); } + /** Return the path of a tag in branch. */ + public Path branchTagPath(String branchName, String tagName) { + return new Path(getBranchPath(tablePath, branchName) + "/tag/" + TAG_PREFIX + tagName); + } + /** Create a tag from given snapshot and save it in the storage. */ public void createTag(Snapshot snapshot, String tagName, List callbacks) { checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is blank.", tagName); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index f8e864e8ce26..f8f44881405c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -46,6 +46,7 @@ import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.table.sink.InnerTableCommit; @@ -64,6 +65,7 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; import org.apache.paimon.utils.TraceableFileIO; @@ -912,6 +914,136 @@ public void testCreateTag() throws Exception { assertThat(tagged.equals(snapshot2)).isTrue(); } + @Test + public void testCreateBranch() throws Exception { + FileStoreTable table = createFileStoreTable(); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + // snapshot 1 + write.write(rowData(1, 10, 100L)); + commit.commit(0, write.prepareCommit(false, 1)); + // snapshot 2 + write.write(rowData(2, 20, 200L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + table.createTag("test-tag", 2); + + // verify that tag file exist + TagManager tagManager = new TagManager(new TraceableFileIO(), tablePath); + assertThat(tagManager.tagExists("test-tag")).isTrue(); + + // verify that test-tag is equal to snapshot 2 + Snapshot tagged = tagManager.taggedSnapshot("test-tag"); + Snapshot snapshot2 = table.snapshotManager().snapshot(2); + assertThat(tagged.equals(snapshot2)).isTrue(); + + table.createBranch("test-branch", "test-tag"); + + // verify that branch file exist + TraceableFileIO fileIO = new TraceableFileIO(); + BranchManager branchManager = + new BranchManager( + fileIO, + tablePath, + new SnapshotManager(fileIO, tablePath), + new TagManager(fileIO, tablePath), + new SchemaManager(fileIO, tablePath)); + assertThat(branchManager.branchExists("test-branch")).isTrue(); + + // verify test-tag in test-branch is equal to snapshot 2 + Snapshot branchTag = + Snapshot.fromPath( + new TraceableFileIO(), tagManager.branchTagPath("test-branch", "test-tag")); + assertThat(branchTag.equals(snapshot2)).isTrue(); + + // verify snapshot in test-branch is equal to snapshot 2 + SnapshotManager snapshotManager = new SnapshotManager(new TraceableFileIO(), tablePath); + Snapshot branchSnapshot = + Snapshot.fromPath( + new TraceableFileIO(), + snapshotManager.branchSnapshotPath("test-branch", 2)); + assertThat(branchSnapshot.equals(snapshot2)).isTrue(); + + // verify schema in test-branch is equal to schema 0 + SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(), tablePath); + TableSchema branchSchema = + SchemaManager.fromPath( + new TraceableFileIO(), schemaManager.branchSchemaPath("test-branch", 0)); + TableSchema schema0 = schemaManager.schema(0); + assertThat(branchSchema.equals(schema0)).isTrue(); + } + + @Test + public void testUnsupportedBranchName() throws Exception { + FileStoreTable table = createFileStoreTable(); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createTag("test-tag", 1); + table.createBranch("branch0", "test-tag"); + + assertThatThrownBy(() -> table.createBranch("branch-1", "tag1")) + .satisfies( + AssertionUtils.anyCauseMatches( + IllegalArgumentException.class, "Tag name 'tag1' not exists.")); + + assertThatThrownBy(() -> table.createBranch("branch0", "test-tag")) + .satisfies( + AssertionUtils.anyCauseMatches( + IllegalArgumentException.class, + "Branch name 'branch0' already exists.")); + + assertThatThrownBy(() -> table.createBranch("", "test-tag")) + .satisfies( + AssertionUtils.anyCauseMatches( + IllegalArgumentException.class, + String.format("Branch name '%s' is blank", ""))); + + assertThatThrownBy(() -> table.createBranch("10", "test-tag")) + .satisfies( + AssertionUtils.anyCauseMatches( + IllegalArgumentException.class, + "Branch name cannot be pure numeric string but is '10'.")); + } + + @Test + public void testDeleteBranch() throws Exception { + FileStoreTable table = createFileStoreTable(); + + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(1, 10, 100L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + table.createTag("tag1", 1); + table.createBranch("branch1", "tag1"); + table.deleteBranch("branch1"); + + // verify that branch file not exist + TraceableFileIO fileIO = new TraceableFileIO(); + BranchManager branchManager = + new BranchManager( + fileIO, + tablePath, + new SnapshotManager(fileIO, tablePath), + new TagManager(fileIO, tablePath), + new SchemaManager(fileIO, tablePath)); + assertThat(branchManager.branchExists("branch1")).isFalse(); + + assertThatThrownBy(() -> table.deleteBranch("branch1")) + .satisfies( + AssertionUtils.anyCauseMatches( + IllegalArgumentException.class, + "Branch name 'branch1' doesn't exist.")); + } + @Test public void testUnsupportedTagName() throws Exception { FileStoreTable table = createFileStoreTable();