diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 0b4ebba19cb4..896b53794e62 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -53,6 +53,8 @@ import java.util.Comparator; import java.util.List; +import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; + /** * Base {@link FileStore} implementation. * @@ -169,6 +171,10 @@ public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) { @Override public FileStoreCommitImpl newCommit(String commitUser) { + return newCommit(commitUser, DEFAULT_MAIN_BRANCH); + } + + public FileStoreCommitImpl newCommit(String commitUser, String branchName) { return new FileStoreCommitImpl( fileIO, schemaManager, @@ -186,6 +192,7 @@ public FileStoreCommitImpl newCommit(String commitUser) { options.manifestMergeMinCount(), partitionType.getFieldCount() > 0 && options.dynamicPartitionOverwrite(), newKeyComparator(), + branchName, newStatsFileHandler()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java index ec1e7cb58fb4..8be8f817841d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java @@ -38,6 +38,7 @@ import static org.apache.paimon.predicate.PredicateBuilder.and; import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping; import static org.apache.paimon.predicate.PredicateBuilder.splitAnd; +import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; /** {@link FileStore} for reading and writing {@link InternalRow}. */ public class AppendOnlyFileStore extends AbstractFileStore { @@ -69,7 +70,11 @@ public BucketMode bucketMode() { @Override public AppendOnlyFileStoreScan newScan() { - return newScan(false); + return newScan(DEFAULT_MAIN_BRANCH); + } + + public AppendOnlyFileStoreScan newScan(String branchName) { + return newScan(false, branchName); } @Override @@ -99,12 +104,12 @@ public AppendOnlyFileStoreWrite newWrite( rowType, pathFactory(), snapshotManager(), - newScan(true).withManifestCacheFilter(manifestFilter), + newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter), options, tableName); } - private AppendOnlyFileStoreScan newScan(boolean forWrite) { + private AppendOnlyFileStoreScan newScan(boolean forWrite, String branchName) { ScanBucketFilter bucketFilter = new ScanBucketFilter(bucketKeyType) { @Override @@ -138,7 +143,8 @@ public void pushdown(Predicate predicate) { manifestListFactory(forWrite), options.bucket(), forWrite, - options.scanManifestParallelism()); + options.scanManifestParallelism(), + branchName); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java b/paimon-core/src/main/java/org/apache/paimon/FileStore.java index b8346f986a77..cd38d20611fc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java @@ -63,6 +63,8 @@ public interface FileStore extends Serializable { FileStoreScan newScan(); + FileStoreScan newScan(String branchName); + ManifestList.Factory manifestListFactory(); ManifestFile.Factory manifestFileFactory(); @@ -79,6 +81,8 @@ public interface FileStore extends Serializable { FileStoreCommit newCommit(String commitUser); + FileStoreCommit newCommit(String commitUser, String branchName); + SnapshotDeletion newSnapshotDeletion(); TagManager newTagManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index 710b015855e2..373bce35c6a4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -52,6 +52,7 @@ import static org.apache.paimon.predicate.PredicateBuilder.and; import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping; import static org.apache.paimon.predicate.PredicateBuilder.splitAnd; +import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.Preconditions.checkArgument; /** {@link FileStore} for querying and updating {@link KeyValue}s. */ @@ -107,7 +108,11 @@ public BucketMode bucketMode() { @Override public KeyValueFileStoreScan newScan() { - return newScan(false); + return newScan(DEFAULT_MAIN_BRANCH); + } + + public KeyValueFileStoreScan newScan(String branchName) { + return newScan(false, branchName); } @Override @@ -159,7 +164,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma pathFactory(), format2PathFactory(), snapshotManager(), - newScan(true).withManifestCacheFilter(manifestFilter), + newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter), indexFactory, options, keyValueFieldsExtractor, @@ -182,7 +187,7 @@ private Map format2PathFactory() { return pathFactoryMap; } - private KeyValueFileStoreScan newScan(boolean forWrite) { + private KeyValueFileStoreScan newScan(boolean forWrite, String branchName) { ScanBucketFilter bucketFilter = new ScanBucketFilter(bucketKeyType) { @Override @@ -212,7 +217,8 @@ public void pushdown(Predicate keyFilter) { manifestListFactory(forWrite), options.bucket(), forWrite, - options.scanManifestParallelism()); + options.scanManifestParallelism(), + branchName); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 65857daed57b..adcab419112d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -84,6 +84,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { private final Integer scanManifestParallelism; private ScanMetrics scanMetrics = null; + private String branchName; public AbstractFileStoreScan( RowType partitionType, @@ -94,7 +95,8 @@ public AbstractFileStoreScan( ManifestList.Factory manifestListFactory, int numOfBuckets, boolean checkNumOfBuckets, - Integer scanManifestParallelism) { + Integer scanManifestParallelism, + String branchName) { this.partitionType = partitionType; this.bucketKeyFilter = bucketKeyFilter; this.snapshotManager = snapshotManager; @@ -105,6 +107,7 @@ public AbstractFileStoreScan( this.checkNumOfBuckets = checkNumOfBuckets; this.tableSchemas = new ConcurrentHashMap<>(); this.scanManifestParallelism = scanManifestParallelism; + this.branchName = branchName; } @Override @@ -245,7 +248,7 @@ private Pair> doPlan( if (manifests == null) { snapshot = specifiedSnapshot == null - ? snapshotManager.latestSnapshot() + ? snapshotManager.latestSnapshot(branchName) : specifiedSnapshot; if (snapshot == null) { manifests = Collections.emptyList(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index 1cd7db0d2b9b..90aa988b6806 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -49,7 +49,8 @@ public AppendOnlyFileStoreScan( ManifestList.Factory manifestListFactory, int numOfBuckets, boolean checkNumOfBuckets, - Integer scanManifestParallelism) { + Integer scanManifestParallelism, + String branchName) { super( partitionType, bucketFilter, @@ -59,7 +60,8 @@ public AppendOnlyFileStoreScan( manifestListFactory, numOfBuckets, checkNumOfBuckets, - scanManifestParallelism); + scanManifestParallelism, + branchName); this.fieldStatsConverters = new FieldStatsConverters(sid -> scanTableSchema(sid).fields(), schemaId); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 044eb7f6eb9f..0264f1e45a3b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -71,6 +71,8 @@ import java.util.concurrent.Callable; import java.util.stream.Collectors; +import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; + /** * Default implementation of {@link FileStoreCommit}. * @@ -112,6 +114,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { private final int manifestMergeMinCount; private final boolean dynamicPartitionOverwrite; @Nullable private final Comparator keyComparator; + private final String branchName; @Nullable private Lock lock; private boolean ignoreEmptyCommit; @@ -137,6 +140,7 @@ public FileStoreCommitImpl( int manifestMergeMinCount, boolean dynamicPartitionOverwrite, @Nullable Comparator keyComparator, + String branchName, StatsFileHandler statsFileHandler) { this.fileIO = fileIO; this.schemaManager = schemaManager; @@ -155,6 +159,8 @@ public FileStoreCommitImpl( this.manifestMergeMinCount = manifestMergeMinCount; this.dynamicPartitionOverwrite = dynamicPartitionOverwrite; this.keyComparator = keyComparator; + this.branchName = branchName; + this.lock = null; this.ignoreEmptyCommit = true; this.commitMetrics = null; @@ -233,7 +239,7 @@ public void commit(ManifestCommittable committable, Map properti // we can skip conflict checking in tryCommit method. // This optimization is mainly used to decrease the number of times we read from // files. - latestSnapshot = snapshotManager.latestSnapshot(); + latestSnapshot = snapshotManager.latestSnapshot(branchName); if (latestSnapshot != null) { // it is possible that some partitions only have compact changes, // so we need to contain all changes @@ -254,6 +260,7 @@ public void commit(ManifestCommittable committable, Map properti committable.logOffsets(), Snapshot.CommitKind.APPEND, safeLatestSnapshotId, + branchName, null); generatedSnapshot += 1; } @@ -283,6 +290,7 @@ public void commit(ManifestCommittable committable, Map properti committable.logOffsets(), Snapshot.CommitKind.COMPACT, safeLatestSnapshotId, + branchName, null); generatedSnapshot += 1; } @@ -428,6 +436,7 @@ public void overwrite( committable.logOffsets(), Snapshot.CommitKind.COMPACT, null, + branchName, null); generatedSnapshot += 1; } @@ -523,6 +532,7 @@ public void commitStatistics(Statistics stats, long commitIdentifier) { Collections.emptyMap(), Snapshot.CommitKind.ANALYZE, null, + branchName, statsFileName); } @@ -596,10 +606,11 @@ private int tryCommit( Map logOffsets, Snapshot.CommitKind commitKind, @Nullable Long safeLatestSnapshotId, + String branchName, @Nullable String statsFileName) { int cnt = 0; while (true) { - Snapshot latestSnapshot = snapshotManager.latestSnapshot(); + Snapshot latestSnapshot = snapshotManager.latestSnapshot(branchName); cnt++; if (tryCommitOnce( tableFiles, @@ -611,6 +622,7 @@ private int tryCommit( commitKind, latestSnapshot, safeLatestSnapshotId, + branchName, statsFileName)) { break; } @@ -672,6 +684,7 @@ private int tryOverwrite( Snapshot.CommitKind.OVERWRITE, latestSnapshot, null, + branchName, null)) { break; } @@ -690,10 +703,14 @@ public boolean tryCommitOnce( Snapshot.CommitKind commitKind, @Nullable Snapshot latestSnapshot, @Nullable Long safeLatestSnapshotId, + String branchName, @Nullable String newStatsFileName) { long newSnapshotId = latestSnapshot == null ? Snapshot.FIRST_SNAPSHOT_ID : latestSnapshot.id() + 1; - Path newSnapshotPath = snapshotManager.snapshotPath(newSnapshotId); + Path newSnapshotPath = + branchName.equals(DEFAULT_MAIN_BRANCH) + ? snapshotManager.snapshotPath(newSnapshotId) + : snapshotManager.branchSnapshotPath(branchName, newSnapshotId); if (LOG.isDebugEnabled()) { LOG.debug("Ready to commit table files to snapshot #" + newSnapshotId); @@ -775,7 +792,7 @@ public boolean tryCommitOnce( newIndexManifest = indexManifest; } - long latestSchemaId = schemaManager.latest().get().id(); + long latestSchemaId = schemaManager.latest(branchName).get().id(); // write new stats or inherit from the previous snapshot String statsFileName = null; @@ -840,7 +857,7 @@ public boolean tryCommitOnce( boolean committed = fileIO.writeFileUtf8(newSnapshotPath, newSnapshot.toJson()); if (committed) { - snapshotManager.commitLatestHint(newSnapshotId); + snapshotManager.commitLatestHint(newSnapshotId, branchName); } return committed; }; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java index d528e5e7951a..02086ceb3bb3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java @@ -53,7 +53,8 @@ public KeyValueFileStoreScan( ManifestList.Factory manifestListFactory, int numOfBuckets, boolean checkNumOfBuckets, - Integer scanManifestParallelism) { + Integer scanManifestParallelism, + String branchName) { super( partitionType, bucketFilter, @@ -63,7 +64,8 @@ public KeyValueFileStoreScan( manifestListFactory, numOfBuckets, checkNumOfBuckets, - scanManifestParallelism); + scanManifestParallelism, + branchName); this.fieldKeyStatsConverters = new FieldStatsConverters( sid -> keyValueFieldsExtractor.keyFields(scanTableSchema(sid)), schemaId); diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 07dda61966a0..a6d274688aea 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -64,6 +64,7 @@ import static org.apache.paimon.catalog.AbstractCatalog.DB_SUFFIX; import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE; +import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.BranchManager.getBranchPath; import static org.apache.paimon.utils.FileUtils.listVersionedFiles; import static org.apache.paimon.utils.Preconditions.checkState; @@ -91,8 +92,16 @@ public SchemaManager withLock(@Nullable Lock lock) { /** @return latest schema. */ public Optional latest() { + return latest(DEFAULT_MAIN_BRANCH); + } + + public Optional latest(String branchName) { + Path directoryPath = + branchName.equals(DEFAULT_MAIN_BRANCH) + ? schemaDirectory() + : branchSchemaDirectory(branchName); try { - return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX) + return listVersionedFiles(fileIO, directoryPath, SCHEMA_PREFIX) .reduce(Math::max) .map(this::schema); } catch (IOException e) { @@ -482,6 +491,10 @@ public Path toSchemaPath(long id) { return new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + id); } + public Path branchSchemaDirectory(String branchName) { + return new Path(getBranchPath(tableRoot, branchName) + "/schema"); + } + public Path branchSchemaPath(String branchName, long schemaId) { return new Path( getBranchPath(tableRoot, branchName) + "/schema/" + SCHEMA_PREFIX + schemaId); diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java index 6fe65aca12d0..398aa98d128d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java @@ -285,6 +285,10 @@ public TableSchema copy(Map newOptions) { timeMillis); } + public static TableSchema fromJson(String json) { + return JsonSerdeUtil.fromJson(json, TableSchema.class); + } + @Override public String toString() { return JsonSerdeUtil.toJson(this); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 9900d9ecf0f6..d00ce913d69d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -68,6 +68,7 @@ import java.util.function.BiConsumer; import static org.apache.paimon.CoreOptions.PATH; +import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkNotNull; @@ -134,8 +135,13 @@ public RowKeyExtractor createRowKeyExtractor() { @Override public SnapshotReader newSnapshotReader() { + return newSnapshotReader(DEFAULT_MAIN_BRANCH); + } + + @Override + public SnapshotReader newSnapshotReader(String branchName) { return new SnapshotReaderImpl( - store().newScan(), + store().newScan(branchName), tableSchema, coreOptions(), snapshotManager(), @@ -289,6 +295,11 @@ public ExpireSnapshots newExpireSnapshots() { @Override public TableCommitImpl newCommit(String commitUser) { + // Compatibility with previous design, the main branch is written by default + return newCommit(commitUser, DEFAULT_MAIN_BRANCH); + } + + public TableCommitImpl newCommit(String commitUser, String branchName) { CoreOptions options = coreOptions(); Runnable snapshotExpire = null; if (!options.writeOnly()) { @@ -304,8 +315,9 @@ public TableCommitImpl newCommit(String commitUser) { .olderThanMills(System.currentTimeMillis() - snapshotTimeRetain) .expire(); } + return new TableCommitImpl( - store().newCommit(commitUser), + store().newCommit(commitUser, branchName), createCommitCallbacks(), snapshotExpire, options.writeOnly() ? null : store().newPartitionExpire(commitUser), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java index b5bebe2a72d4..1d892130499c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java @@ -31,6 +31,8 @@ public interface DataTable extends InnerTable { SnapshotReader newSnapshotReader(); + SnapshotReader newSnapshotReader(String branchName); + CoreOptions coreOptions(); SnapshotManager snapshotManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java index b183d6ad63a5..ab1b2e961c96 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java @@ -97,6 +97,8 @@ default Optional comment() { @Override TableCommitImpl newCommit(String commitUser); + TableCommitImpl newCommit(String commitUser, String branchName); + LocalTableQuery newLocalTableQuery(); default BinaryTableStats getSchemaFieldStats(DataFileMeta dataFileMeta) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index 96c2621b6c21..59ceed137a7f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -131,6 +131,11 @@ public SnapshotReader newSnapshotReader() { return new AuditLogDataReader(dataTable.newSnapshotReader()); } + @Override + public SnapshotReader newSnapshotReader(String branchName) { + return new AuditLogDataReader(dataTable.newSnapshotReader(branchName)); + } + @Override public InnerTableScan newScan() { return new AuditLogBatchScan(dataTable.newScan()); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java index 4c9b9a6015f7..2ab88a34609e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java @@ -148,6 +148,11 @@ public SnapshotReader newSnapshotReader() { return wrapped.newSnapshotReader(); } + @Override + public SnapshotReader newSnapshotReader(String branchName) { + return wrapped.newSnapshotReader(branchName); + } + @Override public InnerTableScan newScan() { return wrapped.newScan(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java index 7825b93e4fad..bedc19ac3557 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java @@ -135,6 +135,11 @@ public SnapshotReader newSnapshotReader() { return wrapped.newSnapshotReader(); } + @Override + public SnapshotReader newSnapshotReader(String branchName) { + return wrapped.newSnapshotReader(branchName); + } + @Override public InnerTableScan newScan() { return wrapped.newScan(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java index 8daff265fad1..0deac172b1ff 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java @@ -97,6 +97,11 @@ public SnapshotReader newSnapshotReader() { } } + @Override + public SnapshotReader newSnapshotReader(String branchName) { + return dataTable.newSnapshotReader(branchName); + } + @Override public InnerTableScan newScan() { return new InnerTableScanImpl( diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 5ed647ef4b8c..ae56bf362203 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -36,6 +36,7 @@ public class BranchManager { private static final Logger LOG = LoggerFactory.getLogger(BranchManager.class); public static final String BRANCH_PREFIX = "branch-"; + public static final String DEFAULT_MAIN_BRANCH = "main"; private final FileIO fileIO; private final Path tablePath; @@ -72,6 +73,11 @@ public Path branchPath(String branchName) { } public void createBranch(String branchName, String tagName) { + checkArgument( + !branchName.equals(DEFAULT_MAIN_BRANCH), + String.format( + "Branch name '%s' is the default branch and cannot be used.", + DEFAULT_MAIN_BRANCH)); checkArgument(!StringUtils.isBlank(branchName), "Branch name '%s' is blank.", branchName); checkArgument(!branchExists(branchName), "Branch name '%s' already exists.", branchName); checkArgument(tagManager.tagExists(tagName), "Tag name '%s' not exists.", tagName); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index c6965385588c..2f0a1d859091 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -43,6 +43,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.BranchManager.getBranchPath; import static org.apache.paimon.utils.FileUtils.listVersionedFiles; @@ -81,13 +82,34 @@ public Path snapshotPath(long snapshotId) { return new Path(tablePath + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId); } + public Path branchSnapshotDirectory(String branchName) { + return new Path(getBranchPath(tablePath, branchName) + "/snapshot"); + } + public Path branchSnapshotPath(String branchName, long snapshotId) { return new Path( getBranchPath(tablePath, branchName) + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId); } + public Path snapshotPathByBranch(String branchName, long snapshotId) { + return branchName.equals(DEFAULT_MAIN_BRANCH) + ? snapshotPath(snapshotId) + : branchSnapshotPath(branchName, snapshotId); + } + + public Path snapshotDirByBranch(String branchName) { + return branchName.equals(DEFAULT_MAIN_BRANCH) + ? snapshotDirectory() + : branchSnapshotDirectory(branchName); + } + public Snapshot snapshot(long snapshotId) { - return Snapshot.fromPath(fileIO, snapshotPath(snapshotId)); + return snapshot(DEFAULT_MAIN_BRANCH, snapshotId); + } + + public Snapshot snapshot(String branchName, long snapshotId) { + Path snapshotPath = snapshotPathByBranch(branchName, snapshotId); + return Snapshot.fromPath(fileIO, snapshotPath); } public boolean snapshotExists(long snapshotId) { @@ -102,13 +124,21 @@ public boolean snapshotExists(long snapshotId) { } public @Nullable Snapshot latestSnapshot() { - Long snapshotId = latestSnapshotId(); - return snapshotId == null ? null : snapshot(snapshotId); + return latestSnapshot(DEFAULT_MAIN_BRANCH); + } + + public @Nullable Snapshot latestSnapshot(String branchName) { + Long snapshotId = latestSnapshotId(branchName); + return snapshotId == null ? null : snapshot(branchName, snapshotId); } public @Nullable Long latestSnapshotId() { + return latestSnapshotId(DEFAULT_MAIN_BRANCH); + } + + public @Nullable Long latestSnapshotId(String branchName) { try { - return findLatest(); + return findLatest(branchName); } catch (IOException e) { throw new RuntimeException("Failed to find latest snapshot id", e); } @@ -120,8 +150,12 @@ public boolean snapshotExists(long snapshotId) { } public @Nullable Long earliestSnapshotId() { + return earliestSnapshotId(DEFAULT_MAIN_BRANCH); + } + + public @Nullable Long earliestSnapshotId(String branchName) { try { - return findEarliest(); + return findEarliest(branchName); } catch (IOException e) { throw new RuntimeException("Failed to find earliest snapshot id", e); } @@ -352,13 +386,13 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) { return null; } - private @Nullable Long findLatest() throws IOException { - Path snapshotDir = snapshotDirectory(); + private @Nullable Long findLatest(String branchName) throws IOException { + Path snapshotDir = snapshotDirByBranch(branchName); if (!fileIO.exists(snapshotDir)) { return null; } - Long snapshotId = readHint(LATEST); + Long snapshotId = readHint(LATEST, branchName); if (snapshotId != null) { long nextSnapshot = snapshotId + 1; // it is the latest only there is no next one @@ -367,26 +401,30 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) { } } - return findByListFiles(Math::max); + return findByListFiles(Math::max, branchName); } - private @Nullable Long findEarliest() throws IOException { - Path snapshotDir = snapshotDirectory(); + private @Nullable Long findEarliest(String branchName) throws IOException { + Path snapshotDir = snapshotDirByBranch(branchName); if (!fileIO.exists(snapshotDir)) { return null; } - Long snapshotId = readHint(EARLIEST); + Long snapshotId = readHint(EARLIEST, branchName); // null and it is the earliest only it exists if (snapshotId != null && snapshotExists(snapshotId)) { return snapshotId; } - return findByListFiles(Math::min); + return findByListFiles(Math::min, branchName); } public Long readHint(String fileName) { - Path snapshotDir = snapshotDirectory(); + return readHint(fileName, DEFAULT_MAIN_BRANCH); + } + + public Long readHint(String fileName, String branchName) { + Path snapshotDir = snapshotDirByBranch(branchName); Path path = new Path(snapshotDir, fileName); int retryNumber = 0; while (retryNumber++ < READ_HINT_RETRY_NUM) { @@ -404,23 +442,33 @@ public Long readHint(String fileName) { return null; } - private Long findByListFiles(BinaryOperator reducer) throws IOException { - Path snapshotDir = snapshotDirectory(); + private Long findByListFiles(BinaryOperator reducer, String branchName) + throws IOException { + Path snapshotDir = snapshotDirByBranch(branchName); return listVersionedFiles(fileIO, snapshotDir, SNAPSHOT_PREFIX) .reduce(reducer) .orElse(null); } public void commitLatestHint(long snapshotId) throws IOException { - commitHint(snapshotId, LATEST); + commitLatestHint(snapshotId, DEFAULT_MAIN_BRANCH); + } + + public void commitLatestHint(long snapshotId, String branchName) throws IOException { + commitHint(snapshotId, LATEST, branchName); } public void commitEarliestHint(long snapshotId) throws IOException { - commitHint(snapshotId, EARLIEST); + commitEarliestHint(snapshotId, DEFAULT_MAIN_BRANCH); + } + + public void commitEarliestHint(long snapshotId, String branchName) throws IOException { + commitHint(snapshotId, EARLIEST, branchName); } - private void commitHint(long snapshotId, String fileName) throws IOException { - Path snapshotDir = snapshotDirectory(); + private void commitHint(long snapshotId, String fileName, String branchName) + throws IOException { + Path snapshotDir = snapshotDirByBranch(branchName); Path hintFile = new Path(snapshotDir, fileName); fileIO.delete(hintFile, false); fileIO.writeFileUtf8(hintFile, String.valueOf(snapshotId)); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index a29a3e151c76..51fe4739c01c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -69,6 +69,11 @@ public Path tagPath(String tagName) { return new Path(tablePath + "/tag/" + TAG_PREFIX + tagName); } + /** Return the path of tag directory in branch. */ + public Path branchTagDirectory(String branchName) { + return new Path(getBranchPath(tablePath, branchName) + "/tag"); + } + /** Return the path of a tag in branch. */ public Path branchTagPath(String branchName, String tagName) { return new Path(getBranchPath(tablePath, branchName) + "/tag/" + TAG_PREFIX + tagName); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java index c41741c08c17..49e55ee4e1f6 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java @@ -63,6 +63,7 @@ import static org.apache.paimon.operation.FileStoreTestUtils.assertPathNotExists; import static org.apache.paimon.operation.FileStoreTestUtils.commitData; import static org.apache.paimon.operation.FileStoreTestUtils.partitionedData; +import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.assertj.core.api.Assertions.assertThat; /** @@ -723,6 +724,7 @@ private void cleanBucket(TestFileStore store, BinaryRow partition, int bucket) { Snapshot.CommitKind.APPEND, store.snapshotManager().latestSnapshot(), null, + DEFAULT_MAIN_BRANCH, null); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java index d91e0a320865..9e0c6d2ab4cb 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java @@ -85,6 +85,30 @@ public void testBatchReadWrite() throws Exception { "2|21|201|binary|varbinary|mapKey:mapVal|multiset")); } + @Test + public void testBranchBatchReadWrite() throws Exception { + FileStoreTable table = createFileStoreTable(); + generateBranch(table); + writeBranchData(table); + List splits = toSplits(table.newSnapshotReader(BRANCH_NAME).read().dataSplits()); + TableRead read = table.newRead(); + assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)) + .hasSameElementsAs( + Arrays.asList( + "1|10|100|binary|varbinary|mapKey:mapVal|multiset", + "1|11|101|binary|varbinary|mapKey:mapVal|multiset", + "1|12|102|binary|varbinary|mapKey:mapVal|multiset", + "1|11|101|binary|varbinary|mapKey:mapVal|multiset", + "1|12|102|binary|varbinary|mapKey:mapVal|multiset")); + assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING)) + .hasSameElementsAs( + Arrays.asList( + "2|20|200|binary|varbinary|mapKey:mapVal|multiset", + "2|21|201|binary|varbinary|mapKey:mapVal|multiset", + "2|22|202|binary|varbinary|mapKey:mapVal|multiset", + "2|21|201|binary|varbinary|mapKey:mapVal|multiset")); + } + @Test public void testBatchProjection() throws Exception { writeData(); @@ -241,6 +265,31 @@ public void testBatchSplitOrderByPartition() throws Exception { assertThat(partitions).containsExactly(1, 2, 3); } + @Test + public void testBranchStreamingReadWrite() throws Exception { + FileStoreTable table = createFileStoreTable(); + generateBranch(table); + writeBranchData(table); + + List splits = + toSplits( + table.newSnapshotReader(BRANCH_NAME) + .withMode(ScanMode.DELTA) + .read() + .dataSplits()); + TableRead read = table.newRead(); + + assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING)) + .isEqualTo( + Arrays.asList( + "+1|11|101|binary|varbinary|mapKey:mapVal|multiset", + "+1|12|102|binary|varbinary|mapKey:mapVal|multiset")); + assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING)) + .isEqualTo( + Collections.singletonList( + "+2|21|201|binary|varbinary|mapKey:mapVal|multiset")); + } + @Test public void testStreamingSplitInUnawareBucketMode() throws Exception { // in unaware-bucket mode, we split files into splits all the time @@ -451,6 +500,29 @@ private void writeData() throws Exception { commit.close(); } + private void writeBranchData(FileStoreTable table) throws Exception { + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser, BRANCH_NAME); + + write.write(rowData(1, 10, 100L)); + write.write(rowData(2, 20, 200L)); + write.write(rowData(1, 11, 101L)); + commit.commit(0, write.prepareCommit(true, 0)); + + write.write(rowData(1, 12, 102L)); + write.write(rowData(2, 21, 201L)); + write.write(rowData(2, 22, 202L)); + commit.commit(1, write.prepareCommit(true, 1)); + + write.write(rowData(1, 11, 101L)); + write.write(rowData(2, 21, 201L)); + write.write(rowData(1, 12, 102L)); + commit.commit(2, write.prepareCommit(true, 2)); + + write.close(); + commit.close(); + } + @Override protected FileStoreTable createFileStoreTable(Consumer configure) throws Exception { Options conf = new Options(); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index c07fc202976b..b9b117182ec8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -117,6 +117,8 @@ /** Base test class for {@link FileStoreTable}. */ public abstract class FileStoreTableTestBase { + protected static final String BRANCH_NAME = "branch1"; + protected static final RowType ROW_TYPE = RowType.of( new DataType[] { @@ -941,14 +943,7 @@ public void testCreateBranch() throws Exception { table.createBranch("test-branch", "test-tag"); // verify that branch file exist - TraceableFileIO fileIO = new TraceableFileIO(); - BranchManager branchManager = - new BranchManager( - fileIO, - tablePath, - new SnapshotManager(fileIO, tablePath), - new TagManager(fileIO, tablePath), - new SchemaManager(fileIO, tablePath)); + BranchManager branchManager = table.branchManager(); assertThat(branchManager.branchExists("test-branch")).isTrue(); // verify test-tag in test-branch is equal to snapshot 2 @@ -987,6 +982,12 @@ public void testUnsupportedBranchName() throws Exception { table.createTag("test-tag", 1); table.createBranch("branch0", "test-tag"); + assertThatThrownBy(() -> table.createBranch("main", "tag1")) + .satisfies( + AssertionUtils.anyCauseMatches( + IllegalArgumentException.class, + "Branch name 'main' is the default branch and cannot be used.")); + assertThatThrownBy(() -> table.createBranch("branch-1", "tag1")) .satisfies( AssertionUtils.anyCauseMatches( @@ -1026,14 +1027,7 @@ public void testDeleteBranch() throws Exception { table.deleteBranch("branch1"); // verify that branch file not exist - TraceableFileIO fileIO = new TraceableFileIO(); - BranchManager branchManager = - new BranchManager( - fileIO, - tablePath, - new SnapshotManager(fileIO, tablePath), - new TagManager(fileIO, tablePath), - new SchemaManager(fileIO, tablePath)); + BranchManager branchManager = table.branchManager(); assertThat(branchManager.branchExists("branch1")).isFalse(); assertThatThrownBy(() -> table.deleteBranch("branch1")) @@ -1240,6 +1234,66 @@ public void testSchemaPathOption() throws Exception { assertThat(schemaPath).isEqualTo(tablePath); } + @Test + public void testBranchWriteAndRead() throws Exception { + FileStoreTable table = createFileStoreTable(); + + generateBranch(table); + + // Write data to branch1 + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser, BRANCH_NAME)) { + write.write(rowData(2, 20, 200L)); + commit.commit(1, write.prepareCommit(false, 2)); + } + + // Validate data in main branch + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset"); + + // Validate data in branch1 + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader(BRANCH_NAME).read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "2|20|200|binary|varbinary|mapKey:mapVal|multiset"); + + // Write two rows data to branch1 again + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser, BRANCH_NAME)) { + write.write(rowData(3, 30, 300L)); + write.write(rowData(4, 40, 400L)); + commit.commit(2, write.prepareCommit(false, 3)); + } + + // Validate data in main branch + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset"); + + // Verify data in branch1 + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader(BRANCH_NAME).read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder( + "0|0|0|binary|varbinary|mapKey:mapVal|multiset", + "2|20|200|binary|varbinary|mapKey:mapVal|multiset", + "3|30|300|binary|varbinary|mapKey:mapVal|multiset", + "4|40|400|binary|varbinary|mapKey:mapVal|multiset"); + } + protected List getResult( TableRead read, List splits, @@ -1420,4 +1474,36 @@ private static List overwriteTestData() { protected List toSplits(List dataSplits) { return new ArrayList<>(dataSplits); } + + // create a branch which named branch1 + protected void generateBranch(FileStoreTable table) throws Exception { + try (StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser)) { + write.write(rowData(0, 0, 0L)); + commit.commit(0, write.prepareCommit(false, 1)); + } + + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader().read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset"); + + table.createTag("tag1", 1); + table.createBranch(BRANCH_NAME, "tag1"); + + // verify that branch1 file exist + TraceableFileIO fileIO = new TraceableFileIO(); + BranchManager branchManager = table.branchManager(); + assertThat(branchManager.branchExists(BRANCH_NAME)).isTrue(); + + // Verify branch1 and the main branch have the same data + assertThat( + getResult( + table.newRead(), + toSplits(table.newSnapshotReader(BRANCH_NAME).read().dataSplits()), + BATCH_ROW_TO_STRING)) + .containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset"); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index 92172ae4f5fd..2412e4b7c7f8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -261,6 +261,24 @@ public void testBatchReadWrite() throws Exception { "2|22|202|binary|varbinary|mapKey:mapVal|multiset")); } + @Test + public void testBranchBatchReadWrite() throws Exception { + FileStoreTable table = createFileStoreTable(); + generateBranch(table); + writeBranchData(table); + List splits = toSplits(table.newSnapshotReader(BRANCH_NAME).read().dataSplits()); + TableRead read = table.newRead(); + assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)) + .isEqualTo( + Collections.singletonList( + "1|10|1000|binary|varbinary|mapKey:mapVal|multiset")); + assertThat(getResult(read, splits, binaryRow(2), 0, BATCH_ROW_TO_STRING)) + .isEqualTo( + Arrays.asList( + "2|21|20001|binary|varbinary|mapKey:mapVal|multiset", + "2|22|202|binary|varbinary|mapKey:mapVal|multiset")); + } + @Test public void testBatchProjection() throws Exception { writeData(); @@ -314,6 +332,31 @@ public void testStreamingReadWrite() throws Exception { "+2|22|202|binary|varbinary|mapKey:mapVal|multiset")); } + @Test + public void testBranchStreamingReadWrite() throws Exception { + FileStoreTable table = createFileStoreTable(); + generateBranch(table); + writeBranchData(table); + + List splits = + toSplits( + table.newSnapshotReader(BRANCH_NAME) + .withMode(ScanMode.DELTA) + .read() + .dataSplits()); + TableRead read = table.newRead(); + assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING)) + .isEqualTo( + Collections.singletonList( + "-1|11|1001|binary|varbinary|mapKey:mapVal|multiset")); + assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING)) + .isEqualTo( + Arrays.asList( + "-2|20|200|binary|varbinary|mapKey:mapVal|multiset", + "+2|21|20001|binary|varbinary|mapKey:mapVal|multiset", + "+2|22|202|binary|varbinary|mapKey:mapVal|multiset")); + } + @Test public void testStreamingProjection() throws Exception { writeData(); @@ -611,6 +654,31 @@ private void writeData() throws Exception { commit.close(); } + private void writeBranchData(FileStoreTable table) throws Exception { + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser, BRANCH_NAME); + + write.write(rowData(1, 10, 100L)); + write.write(rowData(2, 20, 200L)); + write.write(rowData(1, 11, 101L)); + commit.commit(0, write.prepareCommit(true, 0)); + + write.write(rowData(1, 10, 1000L)); + write.write(rowData(2, 21, 201L)); + write.write(rowData(2, 21, 2001L)); + commit.commit(1, write.prepareCommit(true, 1)); + + write.write(rowData(1, 11, 1001L)); + write.write(rowData(2, 21, 20001L)); + write.write(rowData(2, 22, 202L)); + write.write(rowDataWithKind(RowKind.DELETE, 1, 11, 1001L)); + write.write(rowDataWithKind(RowKind.DELETE, 2, 20, 200L)); + commit.commit(2, write.prepareCommit(true, 2)); + + write.close(); + commit.close(); + } + @Override @Test public void testReadFilter() throws Exception { diff --git a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java index f5874bed7eac..a55bda9118a5 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java @@ -502,6 +502,12 @@ public Optional latest() { .orElseThrow(IllegalStateException::new))); } + @Override + public Optional latest(String branchName) { + // for compatibility test + return latest(); + } + @Override public List listAll() { return new ArrayList<>(tableSchemas.values());