diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java index 8be8f817841db..eaa42963d520a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java @@ -96,6 +96,12 @@ public AppendOnlyFileStoreWrite newWrite(String commitUser) { @Override public AppendOnlyFileStoreWrite newWrite( String commitUser, ManifestCacheFilter manifestFilter) { + return newWrite(commitUser, manifestFilter, DEFAULT_MAIN_BRANCH); + } + + @Override + public AppendOnlyFileStoreWrite newWrite( + String commitUser, ManifestCacheFilter manifestFilter, String branchName) { return new AppendOnlyFileStoreWrite( fileIO, newRead(), @@ -104,9 +110,10 @@ public AppendOnlyFileStoreWrite newWrite( rowType, pathFactory(), snapshotManager(), - newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter), + newScan(true, branchName).withManifestCacheFilter(manifestFilter), options, - tableName); + tableName, + branchName); } private AppendOnlyFileStoreScan newScan(boolean forWrite, String branchName) { diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java b/paimon-core/src/main/java/org/apache/paimon/FileStore.java index cd38d20611fcf..daa4d9400d14c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java @@ -79,6 +79,9 @@ public interface FileStore extends Serializable { FileStoreWrite newWrite(String commitUser, ManifestCacheFilter manifestFilter); + FileStoreWrite newWrite( + String commitUser, ManifestCacheFilter manifestFilter, String branchName); + FileStoreCommit newCommit(String commitUser); FileStoreCommit newCommit(String commitUser, String branchName); diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index 373bce35c6a4a..3ed1ce7df5195 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -147,6 +147,12 @@ public KeyValueFileStoreWrite newWrite(String commitUser) { @Override public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter manifestFilter) { + return newWrite(commitUser, manifestFilter, DEFAULT_MAIN_BRANCH); + } + + @Override + public KeyValueFileStoreWrite newWrite( + String commitUser, ManifestCacheFilter manifestFilter, String branchName) { IndexMaintainer.Factory indexFactory = null; if (bucketMode() == BucketMode.DYNAMIC) { indexFactory = new HashIndexMaintainer.Factory(newIndexFileHandler()); @@ -164,11 +170,12 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma pathFactory(), format2PathFactory(), snapshotManager(), - newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter), + newScan(true, branchName).withManifestCacheFilter(manifestFilter), indexFactory, options, keyValueFieldsExtractor, - tableName); + tableName, + branchName); } private Map format2PathFactory() { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index af78e34632e87..c121e14b61645 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -77,6 +77,7 @@ public abstract class AbstractFileStoreWrite implements FileStoreWrite { protected CompactionMetrics compactionMetrics = null; protected final String tableName; + private final String branchName; protected AbstractFileStoreWrite( String commitUser, @@ -84,7 +85,8 @@ protected AbstractFileStoreWrite( FileStoreScan scan, @Nullable IndexMaintainer.Factory indexFactory, String tableName, - int writerNumberMax) { + int writerNumberMax, + String branchName) { this.commitUser = commitUser; this.snapshotManager = snapshotManager; this.scan = scan; @@ -93,6 +95,7 @@ protected AbstractFileStoreWrite( this.writers = new HashMap<>(); this.tableName = tableName; this.writerNumberMax = writerNumberMax; + this.branchName = branchName; } @Override @@ -350,7 +353,7 @@ public WriterContainer createWriterContainer( } } - Long latestSnapshotId = snapshotManager.latestSnapshotId(); + Long latestSnapshotId = snapshotManager.latestSnapshotId(branchName); List restoreFiles = new ArrayList<>(); if (!ignorePreviousFiles && latestSnapshotId != null) { restoreFiles = scanExistingFileMetas(latestSnapshotId, partition, bucket); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index 283df7b07583b..f7ec2be76417a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -84,8 +84,9 @@ public AppendOnlyFileStoreWrite( SnapshotManager snapshotManager, FileStoreScan scan, CoreOptions options, - String tableName) { - super(commitUser, snapshotManager, scan, options, null, tableName); + String tableName, + String branchName) { + super(commitUser, snapshotManager, scan, options, null, tableName, branchName); this.fileIO = fileIO; this.read = read; this.schemaId = schemaId; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 269ff2dc5dc2f..9285a2f21676b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -109,8 +109,9 @@ public KeyValueFileStoreWrite( @Nullable IndexMaintainer.Factory indexFactory, CoreOptions options, KeyValueFieldsExtractor extractor, - String tableName) { - super(commitUser, snapshotManager, scan, options, indexFactory, tableName); + String tableName, + String branchName) { + super(commitUser, snapshotManager, scan, options, indexFactory, tableName, branchName); this.fileIO = fileIO; this.keyType = keyType; this.valueType = valueType; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java index 9159965d9f9c2..eefde36a3290a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java @@ -60,14 +60,16 @@ public MemoryFileStoreWrite( FileStoreScan scan, CoreOptions options, @Nullable IndexMaintainer.Factory indexFactory, - String tableName) { + String tableName, + String branchName) { super( commitUser, snapshotManager, scan, indexFactory, tableName, - options.writeMaxWritersToSpill()); + options.writeMaxWritersToSpill(), + branchName); this.options = options; this.cacheManager = new CacheManager(options.lookupCacheMaxMemory()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java index 9c97d406cb7e6..58cee6d892e2a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java @@ -46,6 +46,8 @@ import java.io.IOException; import java.util.function.BiConsumer; +import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; + /** {@link FileStoreTable} for append table. */ class AppendOnlyFileStoreTable extends AbstractFileStoreTable { @@ -135,9 +137,16 @@ public TableWriteImpl newWrite(String commitUser) { @Override public TableWriteImpl newWrite( String commitUser, ManifestCacheFilter manifestFilter) { + return newWrite(commitUser, manifestFilter, DEFAULT_MAIN_BRANCH); + } + + @Override + public TableWriteImpl newWrite( + String commitUser, ManifestCacheFilter manifestFilter, String branchName) { // if this table is unaware-bucket table, we skip compaction and restored files searching AppendOnlyFileStoreWrite writer = - store().newWrite(commitUser, manifestFilter).withBucketMode(bucketMode()); + store().newWrite(commitUser, manifestFilter, branchName) + .withBucketMode(bucketMode()); return new TableWriteImpl<>( writer, createRowKeyExtractor(), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java index 2134d97cac98d..c6f5a89ac0d42 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java @@ -96,6 +96,9 @@ default Optional comment() { TableWriteImpl newWrite(String commitUser, ManifestCacheFilter manifestFilter); + TableWriteImpl newWrite( + String commitUser, ManifestCacheFilter manifestFilter, String branchName); + @Override TableCommitImpl newCommit(String commitUser); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java index 11e557f176490..f26169b271168 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java @@ -53,6 +53,7 @@ import static org.apache.paimon.predicate.PredicateBuilder.and; import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping; import static org.apache.paimon.predicate.PredicateBuilder.splitAnd; +import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; /** {@link FileStoreTable} for primary key table. */ class PrimaryKeyFileStoreTable extends AbstractFileStoreTable { @@ -184,12 +185,18 @@ public TableWriteImpl newWrite(String commitUser) { @Override public TableWriteImpl newWrite( String commitUser, ManifestCacheFilter manifestFilter) { + return newWrite(commitUser, manifestFilter, DEFAULT_MAIN_BRANCH); + } + + @Override + public TableWriteImpl newWrite( + String commitUser, ManifestCacheFilter manifestFilter, String branchName) { TableSchema schema = schema(); CoreOptions options = store().options(); RowKindGenerator rowKindGenerator = RowKindGenerator.create(schema, options); KeyValue kv = new KeyValue(); return new TableWriteImpl<>( - store().newWrite(commitUser, manifestFilter), + store().newWrite(commitUser, manifestFilter, branchName), createRowKeyExtractor(), record -> { InternalRow row = record.row(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java index c70f6038ec962..663e1c71f4a63 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java @@ -143,7 +143,8 @@ private TableWriteImpl newTableWrite(FileStoreTable table) { table.newWrite( commitUser, (part, bucket) -> - state.stateValueFilter().filter(table.name(), part, bucket)) + state.stateValueFilter().filter(table.name(), part, bucket), + table.coreOptions().branchName()) .withIOManager(paimonIOManager) .withIgnorePreviousFiles(ignorePreviousFiles) .withExecutionMode(isStreamingMode) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java index 55a67187945bf..02aef00e90d50 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java @@ -185,7 +185,8 @@ public RecordWriter createMergeTreeWriter(BinaryRow partition, int buc null, options, EXTRACTOR, - tablePath.getName()) + tablePath.getName(), + "main") .createWriterContainer(partition, bucket, true) .writer; ((MemoryOwner) writer)