From 541c591cedd1264d87b6a3c11a8dd5112ce58656 Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Tue, 5 Mar 2024 19:54:54 +0800 Subject: [PATCH] support flink write branch --- .../generated/core_configuration.html | 6 + .../java/org/apache/paimon/CoreOptions.java | 14 + .../org/apache/paimon/AbstractFileStore.java | 2 +- .../apache/paimon/AppendOnlyFileStore.java | 12 +- .../java/org/apache/paimon/FileStore.java | 2 - .../org/apache/paimon/KeyValueFileStore.java | 12 +- .../paimon/catalog/AbstractCatalog.java | 11 +- .../paimon/catalog/FileSystemCatalog.java | 4 +- .../org/apache/paimon/jdbc/JdbcCatalog.java | 6 +- .../operation/AbstractFileStoreScan.java | 7 +- .../operation/AppendOnlyFileStoreScan.java | 4 +- .../paimon/operation/FileStoreCommitImpl.java | 8 +- .../operation/KeyValueFileStoreScan.java | 4 +- .../apache/paimon/schema/SchemaManager.java | 126 +++++-- .../org/apache/paimon/schema/TableSchema.java | 3 +- .../paimon/table/AbstractFileStoreTable.java | 14 +- .../paimon/table/FileStoreTableFactory.java | 3 +- .../apache/paimon/utils/BranchManager.java | 6 +- .../apache/paimon/utils/SnapshotManager.java | 276 +++++++++----- .../org/apache/paimon/utils/TagManager.java | 137 +++++-- .../java/org/apache/paimon/TestFileStore.java | 64 +++- .../KeyValueFileStoreScanBranchTest.java | 347 ++++++++++++++++++ .../operation/MergeFileSplitReadTest.java | 5 +- .../table/AppendOnlyFileStoreTableTest.java | 37 +- .../paimon/table/FileStoreTableTestBase.java | 43 ++- .../table/PrimaryKeyFileStoreTableTest.java | 43 ++- .../table/SchemaEvolutionTableTestBase.java | 6 - .../paimon/table/WritePreemptMemoryTest.java | 25 ++ .../paimon/flink/FlinkTableFactory.java | 5 +- .../paimon/flink/sink/CompactorSink.java | 4 +- .../apache/paimon/flink/sink/FlinkSink.java | 3 + .../paimon/flink/sink/FlinkWriteSink.java | 3 +- .../sink/UnawareBucketCompactionSink.java | 5 +- .../paimon/flink/AppendOnlyTableITCase.java | 16 + .../paimon/flink/CatalogTableITCase.java | 3 +- .../apache/paimon/flink/FileStoreITCase.java | 13 +- .../flink/FileStoreWithBranchITCase.java | 43 +++ .../flink/PrimaryKeyFileStoreTableITCase.java | 55 +++ .../paimon/flink/action/ActionITCaseBase.java | 13 +- .../org/apache/paimon/hive/HiveCatalog.java | 6 +- 40 files changed, 1127 insertions(+), 269 deletions(-) create mode 100644 paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanBranchTest.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreWithBranchITCase.java diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 48822af1bd8a2..48e955ca74c3b 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -32,6 +32,12 @@ Boolean Whether to create underlying storage when reading and writing the table. + +
branch
+ "main" + String + Specify branch name. +
bucket
-1 diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 2440bdcc60909..28004f69f7511 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -112,6 +112,9 @@ public class CoreOptions implements Serializable { .noDefaultValue() .withDescription("The file path of this table in the filesystem."); + public static final ConfigOption BRANCH = + key("branch").stringType().defaultValue("main").withDescription("Specify branch name."); + public static final ConfigOption FILE_FORMAT = key("file.format") .enumType(FileFormatType.class) @@ -1149,6 +1152,17 @@ public Path path() { return path(options.toMap()); } + public String branch() { + return branch(options.toMap()); + } + + public static String branch(Map options) { + if (options.containsKey(BRANCH.key())) { + return options.get(BRANCH.key()); + } + return BRANCH.defaultValue(); + } + public static Path path(Map options) { return new Path(options.get(PATH.key())); } diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 07a73d3de2aa9..c31a85fbb3eb8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -104,7 +104,7 @@ public FileStorePathFactory pathFactory() { @Override public SnapshotManager snapshotManager() { - return new SnapshotManager(fileIO, options.path()); + return new SnapshotManager(fileIO, options.path(), options.branch()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java index 99efac540e802..0a18ae36141d6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java @@ -39,7 +39,6 @@ import static org.apache.paimon.predicate.PredicateBuilder.and; import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping; import static org.apache.paimon.predicate.PredicateBuilder.splitAnd; -import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; /** {@link FileStore} for reading and writing {@link InternalRow}. */ public class AppendOnlyFileStore extends AbstractFileStore { @@ -71,11 +70,7 @@ public BucketMode bucketMode() { @Override public AppendOnlyFileStoreScan newScan() { - return newScan(DEFAULT_MAIN_BRANCH); - } - - public AppendOnlyFileStoreScan newScan(String branchName) { - return newScan(false, branchName); + return newScan(false); } @Override @@ -106,12 +101,12 @@ public AppendOnlyFileStoreWrite newWrite( rowType, pathFactory(), snapshotManager(), - newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter), + newScan(true).withManifestCacheFilter(manifestFilter), options, tableName); } - private AppendOnlyFileStoreScan newScan(boolean forWrite, String branchName) { + private AppendOnlyFileStoreScan newScan(boolean forWrite) { ScanBucketFilter bucketFilter = new ScanBucketFilter(bucketKeyType) { @Override @@ -146,7 +141,6 @@ public void pushdown(Predicate predicate) { options.bucket(), forWrite, options.scanManifestParallelism(), - branchName, options.fileIndexReadEnabled()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java b/paimon-core/src/main/java/org/apache/paimon/FileStore.java index 870feffdef685..3643b9e30743d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java @@ -63,8 +63,6 @@ public interface FileStore extends Serializable { FileStoreScan newScan(); - FileStoreScan newScan(String branchName); - ManifestList.Factory manifestListFactory(); ManifestFile.Factory manifestFileFactory(); diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index aeb30731dfccb..460d0c38f539b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -56,7 +56,6 @@ import static org.apache.paimon.predicate.PredicateBuilder.and; import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping; import static org.apache.paimon.predicate.PredicateBuilder.splitAnd; -import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.Preconditions.checkArgument; /** {@link FileStore} for querying and updating {@link KeyValue}s. */ @@ -112,11 +111,7 @@ public BucketMode bucketMode() { @Override public KeyValueFileStoreScan newScan() { - return newScan(DEFAULT_MAIN_BRANCH); - } - - public KeyValueFileStoreScan newScan(String branchName) { - return newScan(false, branchName); + return newScan(false); } @Override @@ -185,7 +180,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma pathFactory(), format2PathFactory(), snapshotManager(), - newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter), + newScan(true).withManifestCacheFilter(manifestFilter), indexFactory, deletionVectorsMaintainerFactory, options, @@ -209,7 +204,7 @@ private Map format2PathFactory() { return pathFactoryMap; } - private KeyValueFileStoreScan newScan(boolean forWrite, String branchName) { + private KeyValueFileStoreScan newScan(boolean forWrite) { ScanBucketFilter bucketFilter = new ScanBucketFilter(bucketKeyType) { @Override @@ -240,7 +235,6 @@ public void pushdown(Predicate keyFilter) { options.bucket(), forWrite, options.scanManifestParallelism(), - branchName, options.deletionVectorsEnabled(), options.mergeEngine()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 4a4fb04fd4e2d..9f4ce8bec8c9c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -36,6 +36,7 @@ import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.system.SystemTableLoader; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.StringUtils; import javax.annotation.Nullable; @@ -66,6 +67,7 @@ public abstract class AbstractCatalog implements Catalog { protected final FileIO fileIO; protected final Map tableDefaultOptions; protected final Options catalogOptions; + protected final String branchName; @Nullable protected final LineageMetaFactory lineageMetaFactory; @@ -74,6 +76,7 @@ protected AbstractCatalog(FileIO fileIO) { this.lineageMetaFactory = null; this.tableDefaultOptions = new HashMap<>(); this.catalogOptions = new Options(); + branchName = BranchManager.DEFAULT_MAIN_BRANCH; } protected AbstractCatalog(FileIO fileIO, Options options) { @@ -83,6 +86,7 @@ protected AbstractCatalog(FileIO fileIO, Options options) { this.tableDefaultOptions = convertToPropertiesPrefixKey(options.toMap(), TABLE_DEFAULT_OPTION_PREFIX); this.catalogOptions = options; + this.branchName = options.get(CoreOptions.BRANCH); } @Override @@ -325,7 +329,12 @@ public Table getTable(Identifier identifier) throws TableNotExistException { } return table; } else { - return getDataTable(identifier); + Table table = getDataTable(identifier); + // Override branch option + if (!branchName.equals(BranchManager.DEFAULT_MAIN_BRANCH)) { + table.options().put(CoreOptions.BRANCH.key(), branchName); + } + return table; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index 1e4e5b0ebaaac..e4815a4de42c9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -119,7 +119,7 @@ public boolean tableExists(Identifier identifier) { } private boolean tableExists(Path tablePath) { - return new SchemaManager(fileIO, tablePath).listAllIds().size() > 0; + return new SchemaManager(fileIO, tablePath, branchName).listAllIds().size() > 0; } @Override @@ -153,7 +153,7 @@ private SchemaManager schemaManager(Identifier identifier) { new RuntimeException( "No lock context when lock is enabled.")))) .orElse(null); - return new SchemaManager(fileIO, path) + return new SchemaManager(fileIO, path, branchName) .withLock(catalogLock == null ? null : Lock.fromCatalog(catalogLock, identifier)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index 7e7718b5bee96..922d513ebac2b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -288,7 +288,7 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) { updateTable(connections, catalogKey, fromTable, toTable); Path fromPath = getDataTableLocation(fromTable); - if (new SchemaManager(fileIO, fromPath).listAllIds().size() > 0) { + if (new SchemaManager(fileIO, fromPath, branchName).listAllIds().size() > 0) { // Rename the file system's table directory. Maintain consistency between tables in // the file system and tables in the Hive Metastore. Path toPath = getDataTableLocation(toTable); @@ -323,7 +323,7 @@ protected TableSchema getDataTableSchema(Identifier identifier) throws TableNotE throw new TableNotExistException(identifier); } Path tableLocation = getDataTableLocation(identifier); - return new SchemaManager(fileIO, tableLocation) + return new SchemaManager(fileIO, tableLocation, branchName) .latest() .orElseThrow( () -> new RuntimeException("There is no paimon table in " + tableLocation)); @@ -374,7 +374,7 @@ public void close() throws Exception { } private SchemaManager getSchemaManager(Identifier identifier) { - return new SchemaManager(fileIO, getDataTableLocation(identifier)) + return new SchemaManager(fileIO, getDataTableLocation(identifier), branchName) .withLock(lock(identifier)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index a911b084742b4..d90731f31ae74 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -75,7 +75,6 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { private final SchemaManager schemaManager; private final TableSchema schema; protected final ScanBucketFilter bucketKeyFilter; - private final String branchName; private PartitionPredicate partitionFilter; private Snapshot specifiedSnapshot = null; @@ -98,8 +97,7 @@ public AbstractFileStoreScan( ManifestList.Factory manifestListFactory, int numOfBuckets, boolean checkNumOfBuckets, - Integer scanManifestParallelism, - String branchName) { + Integer scanManifestParallelism) { this.partitionType = partitionType; this.bucketKeyFilter = bucketKeyFilter; this.snapshotManager = snapshotManager; @@ -111,7 +109,6 @@ public AbstractFileStoreScan( this.checkNumOfBuckets = checkNumOfBuckets; this.tableSchemas = new ConcurrentHashMap<>(); this.scanManifestParallelism = scanManifestParallelism; - this.branchName = branchName; } @Override @@ -368,7 +365,7 @@ private Pair> readManifests() { if (manifests == null) { snapshot = specifiedSnapshot == null - ? snapshotManager.latestSnapshot(branchName) + ? snapshotManager.latestSnapshot() : specifiedSnapshot; if (snapshot == null) { manifests = Collections.emptyList(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index baa2e9f4ab4ac..a716f997755ff 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -62,7 +62,6 @@ public AppendOnlyFileStoreScan( int numOfBuckets, boolean checkNumOfBuckets, Integer scanManifestParallelism, - String branchName, boolean fileIndexReadEnabled) { super( partitionType, @@ -74,8 +73,7 @@ public AppendOnlyFileStoreScan( manifestListFactory, numOfBuckets, checkNumOfBuckets, - scanManifestParallelism, - branchName); + scanManifestParallelism); this.fieldStatsConverters = new FieldStatsConverters(sid -> scanTableSchema(sid).fields(), schema.id()); this.fileIndexReadEnabled = fileIndexReadEnabled; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index e389a471c4ec5..61303879b8b93 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -251,7 +251,7 @@ public void commit( // we can skip conflict checking in tryCommit method. // This optimization is mainly used to decrease the number of times we read from // files. - latestSnapshot = snapshotManager.latestSnapshot(branchName); + latestSnapshot = snapshotManager.latestSnapshot(); if (latestSnapshot != null && checkAppendFiles) { // it is possible that some partitions only have compact changes, // so we need to contain all changes @@ -654,7 +654,7 @@ private int tryCommit( @Nullable String statsFileName) { int cnt = 0; while (true) { - Snapshot latestSnapshot = snapshotManager.latestSnapshot(branchName); + Snapshot latestSnapshot = snapshotManager.latestSnapshot(); cnt++; if (tryCommitOnce( tableFiles, @@ -754,7 +754,7 @@ public boolean tryCommitOnce( Path newSnapshotPath = branchName.equals(DEFAULT_MAIN_BRANCH) ? snapshotManager.snapshotPath(newSnapshotId) - : snapshotManager.branchSnapshotPath(branchName, newSnapshotId); + : snapshotManager.snapshotPath(branchName, newSnapshotId); if (LOG.isDebugEnabled()) { LOG.debug("Ready to commit table files to snapshot #" + newSnapshotId); @@ -836,7 +836,7 @@ public boolean tryCommitOnce( newIndexManifest = indexManifest; } - long latestSchemaId = schemaManager.latest(branchName).get().id(); + long latestSchemaId = schemaManager.latest().get().id(); // write new stats or inherit from the previous snapshot String statsFileName = null; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java index b4c4909aed3f8..83da1221ad9e4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java @@ -62,7 +62,6 @@ public KeyValueFileStoreScan( int numOfBuckets, boolean checkNumOfBuckets, Integer scanManifestParallelism, - String branchName, boolean deletionVectorsEnabled, MergeEngine mergeEngine) { super( @@ -75,8 +74,7 @@ public KeyValueFileStoreScan( manifestListFactory, numOfBuckets, checkNumOfBuckets, - scanManifestParallelism, - branchName); + scanManifestParallelism); this.fieldKeyStatsConverters = new FieldStatsConverters( sid -> keyValueFieldsExtractor.keyFields(scanTableSchema(sid)), diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index d94da91ef4c56..fa8f0b9e3289b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -44,6 +44,7 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.StringUtils; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -78,12 +79,18 @@ public class SchemaManager implements Serializable { private final FileIO fileIO; private final Path tableRoot; - @Nullable private transient Lock lock; + private final String branch; public SchemaManager(FileIO fileIO, Path tableRoot) { + this(fileIO, tableRoot, DEFAULT_MAIN_BRANCH); + } + + /** Specify the default branch for data writing. */ + public SchemaManager(FileIO fileIO, Path tableRoot, String branch) { this.fileIO = fileIO; this.tableRoot = tableRoot; + this.branch = StringUtils.isBlank(branch) ? DEFAULT_MAIN_BRANCH : branch; } public SchemaManager withLock(@Nullable Lock lock) { @@ -91,48 +98,56 @@ public SchemaManager withLock(@Nullable Lock lock) { return this; } - /** @return latest schema. */ public Optional latest() { - return latest(DEFAULT_MAIN_BRANCH); + return latest(branch); } - public Optional latest(String branchName) { - Path directoryPath = - branchName.equals(DEFAULT_MAIN_BRANCH) - ? schemaDirectory() - : branchSchemaDirectory(branchName); + public Optional latest(String branch) { try { - return listVersionedFiles(fileIO, directoryPath, SCHEMA_PREFIX) + return listVersionedFiles(fileIO, schemaDirectory(branch), SCHEMA_PREFIX) .reduce(Math::max) - .map(this::schema); + .map(id -> schema(id, branch)); } catch (IOException e) { throw new UncheckedIOException(e); } } - /** List all schema. */ public List listAll() { - return listAllIds().stream().map(this::schema).collect(Collectors.toList()); + return listAll(branch); + } + + public List listAll(String branch) { + return listAllIds(branch).stream() + .map(id -> schema(id, branch)) + .collect(Collectors.toList()); } - /** List all schema IDs. */ public List listAllIds() { + return listAllIds(branch); + } + + /** List all schema IDs. */ + public List listAllIds(String branch) { try { - return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX) + return listVersionedFiles(fileIO, schemaDirectory(branch), SCHEMA_PREFIX) .collect(Collectors.toList()); } catch (IOException e) { throw new UncheckedIOException(e); } } - /** Create a new schema from {@link Schema}. */ public TableSchema createTable(Schema schema) throws Exception { - return createTable(schema, false); + return createTable(branch, schema, false); } public TableSchema createTable(Schema schema, boolean ignoreIfExistsSame) throws Exception { + return createTable(branch, schema, ignoreIfExistsSame); + } + + public TableSchema createTable(String branch, Schema schema, boolean ignoreIfExistsSame) + throws Exception { while (true) { - Optional latest = latest(); + Optional latest = latest(branch); if (latest.isPresent()) { TableSchema oldSchema = latest.get(); boolean isSame = @@ -166,25 +181,36 @@ public TableSchema createTable(Schema schema, boolean ignoreIfExistsSame) throws options, schema.comment()); - boolean success = commit(newSchema); + boolean success = commit(branch, newSchema); if (success) { return newSchema; } } } - /** Update {@link SchemaChange}s. */ public TableSchema commitChanges(SchemaChange... changes) throws Exception { - return commitChanges(Arrays.asList(changes)); + return commitChanges(branch, changes); } /** Update {@link SchemaChange}s. */ + public TableSchema commitChanges(String branch, SchemaChange... changes) throws Exception { + return commitChanges(branch, Arrays.asList(changes)); + } + public TableSchema commitChanges(List changes) + throws Catalog.ColumnAlreadyExistException, Catalog.TableNotExistException, + Catalog.ColumnNotExistException { + return commitChanges(branch, changes); + } + + /** Update {@link SchemaChange}s. */ + public TableSchema commitChanges(String branch, List changes) throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException, Catalog.ColumnNotExistException { while (true) { TableSchema schema = - latest().orElseThrow( + latest(branch) + .orElseThrow( () -> new Catalog.TableNotExistException( fromPath(tableRoot.toString(), true))); @@ -376,7 +402,7 @@ public TableSchema commitChanges(List changes) newComment); try { - boolean success = commit(newSchema); + boolean success = commit(branch, newSchema); if (success) { return newSchema; } @@ -387,8 +413,13 @@ public TableSchema commitChanges(List changes) } public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) { + return mergeSchema(branch, rowType, allowExplicitCast); + } + + public boolean mergeSchema(String branch, RowType rowType, boolean allowExplicitCast) { TableSchema current = - latest().orElseThrow( + latest(branch) + .orElseThrow( () -> new RuntimeException( "It requires that the current schema to exist when calling 'mergeSchema'")); @@ -397,7 +428,7 @@ public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) { return false; } else { try { - return commit(update); + return commit(branch, update); } catch (Exception e) { throw new RuntimeException("Failed to commit the schema.", e); } @@ -470,9 +501,13 @@ private void updateColumn( @VisibleForTesting boolean commit(TableSchema newSchema) throws Exception { - SchemaValidation.validateTableSchema(newSchema); + return commit(branch, newSchema); + } - Path schemaPath = toSchemaPath(newSchema.id()); + @VisibleForTesting + boolean commit(String branchName, TableSchema newSchema) throws Exception { + SchemaValidation.validateTableSchema(newSchema); + Path schemaPath = toSchemaPath(branchName, newSchema.id()); Callable callable = () -> fileIO.writeFileUtf8(schemaPath, newSchema.toString()); if (lock == null) { return callable.call(); @@ -480,10 +515,15 @@ boolean commit(TableSchema newSchema) throws Exception { return lock.runWithLock(callable); } - /** Read schema for schema id. */ public TableSchema schema(long id) { + return schema(id, branch); + } + + /** Read schema for schema id. */ + public TableSchema schema(long id, String branch) { try { - return JsonSerdeUtil.fromJson(fileIO.readFileUtf8(toSchemaPath(id)), TableSchema.class); + return JsonSerdeUtil.fromJson( + fileIO.readFileUtf8(toSchemaPath(branch, id)), TableSchema.class); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -501,18 +541,34 @@ private Path schemaDirectory() { return new Path(tableRoot + "/schema"); } + public Path schemaDirectory(String branchName) { + return branchName.equals(DEFAULT_MAIN_BRANCH) + ? schemaDirectory() + : new Path(getBranchPath(tableRoot, branchName) + "/schema"); + } + @VisibleForTesting public Path toSchemaPath(long id) { return new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + id); } - public Path branchSchemaDirectory(String branchName) { - return new Path(getBranchPath(tableRoot, branchName) + "/schema"); + public Path toSchemaPath(String branchName, long schemaId) { + return branchName.equals(DEFAULT_MAIN_BRANCH) + ? toSchemaPath(schemaId) + : new Path( + getBranchPath(tableRoot, branchName) + + "/schema/" + + SCHEMA_PREFIX + + schemaId); } - public Path branchSchemaPath(String branchName, long schemaId) { - return new Path( - getBranchPath(tableRoot, branchName) + "/schema/" + SCHEMA_PREFIX + schemaId); + /** + * Delete schema with specific id. + * + * @param schemaId the schema id to delete. + */ + public void deleteSchema(long schemaId) { + deleteSchema(branch, schemaId); } /** @@ -520,8 +576,8 @@ public Path branchSchemaPath(String branchName, long schemaId) { * * @param schemaId the schema id to delete. */ - public void deleteSchema(long schemaId) { - fileIO.deleteQuietly(toSchemaPath(schemaId)); + public void deleteSchema(String branch, long schemaId) { + fileIO.deleteQuietly(toSchemaPath(branch, schemaId)); } public static void checkAlterTableOption(String key) { diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java index 398aa98d128d7..1438e81de2efb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java @@ -104,7 +104,8 @@ public TableSchema( this.highestFieldId = highestFieldId; this.partitionKeys = partitionKeys; this.primaryKeys = primaryKeys; - this.options = Collections.unmodifiableMap(options); + Objects.requireNonNull(options); + this.options = options; this.comment = comment; this.timeMillis = timeMillis; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 1ca321d563a59..4f2cbd888597d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -71,7 +71,6 @@ import java.util.function.BiConsumer; import static org.apache.paimon.CoreOptions.PATH; -import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkNotNull; @@ -139,13 +138,13 @@ public RowKeyExtractor createRowKeyExtractor() { @Override public SnapshotReader newSnapshotReader() { - return newSnapshotReader(DEFAULT_MAIN_BRANCH); + return newSnapshotReader(CoreOptions.branch(options())); } @Override public SnapshotReader newSnapshotReader(String branchName) { return new SnapshotReaderImpl( - store().newScan(branchName), + store().newScan(), tableSchema, coreOptions(), snapshotManager(), @@ -246,7 +245,8 @@ private FileStoreTable copyInternal(Map dynamicOptions, boolean @Override public FileStoreTable copyWithLatestSchema() { Map options = tableSchema.options(); - SchemaManager schemaManager = new SchemaManager(fileIO(), location()); + SchemaManager schemaManager = + new SchemaManager(fileIO(), location(), CoreOptions.branch(options())); Optional optionalLatestSchema = schemaManager.latest(); if (optionalLatestSchema.isPresent()) { TableSchema newTableSchema = optionalLatestSchema.get(); @@ -259,7 +259,7 @@ public FileStoreTable copyWithLatestSchema() { } protected SchemaManager schemaManager() { - return new SchemaManager(fileIO(), path); + return new SchemaManager(fileIO(), path, CoreOptions.branch(options())); } @Override @@ -309,7 +309,7 @@ public ExpireSnapshots newExpireChangelog() { @Override public TableCommitImpl newCommit(String commitUser) { // Compatibility with previous design, the main branch is written by default - return newCommit(commitUser, DEFAULT_MAIN_BRANCH); + return newCommit(commitUser, CoreOptions.branch(options())); } public TableCommitImpl newCommit(String commitUser, String branchName) { @@ -561,7 +561,7 @@ public void rollbackTo(String tagName) { @Override public TagManager tagManager() { - return new TagManager(fileIO, path); + return new TagManager(fileIO, path, CoreOptions.branch(options())); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java index 6382535858676..e941a89e01b3f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java @@ -53,8 +53,9 @@ public static FileStoreTable create(FileIO fileIO, Path path) { public static FileStoreTable create(FileIO fileIO, Options options) { Path tablePath = CoreOptions.path(options); + String branchName = CoreOptions.branch(options.toMap()); TableSchema tableSchema = - new SchemaManager(fileIO, tablePath) + new SchemaManager(fileIO, tablePath, branchName) .latest() .orElseThrow( () -> diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 4656deb676710..3f71812e5b86c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -101,13 +101,13 @@ public void createBranch(String branchName, String tagName) { try { // Copy the corresponding tag, snapshot and schema files into the branch directory fileIO.copyFileUtf8( - tagManager.tagPath(tagName), tagManager.branchTagPath(branchName, tagName)); + tagManager.tagPath(tagName), tagManager.tagPath(branchName, tagName)); fileIO.copyFileUtf8( snapshotManager.snapshotPath(snapshot.id()), - snapshotManager.branchSnapshotPath(branchName, snapshot.id())); + snapshotManager.snapshotPath(branchName, snapshot.id())); fileIO.copyFileUtf8( schemaManager.toSchemaPath(snapshot.schemaId()), - schemaManager.branchSchemaPath(branchName, snapshot.schemaId())); + schemaManager.toSchemaPath(branchName, snapshot.schemaId())); } catch (IOException e) { throw new RuntimeException( String.format( diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index dbbc8fffdc050..3287f92a74e9e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -63,10 +63,17 @@ public class SnapshotManager implements Serializable { private final FileIO fileIO; private final Path tablePath; + private final String branch; public SnapshotManager(FileIO fileIO, Path tablePath) { + this(fileIO, tablePath, DEFAULT_MAIN_BRANCH); + } + + /** Specify the default branch for data writing. */ + public SnapshotManager(FileIO fileIO, Path tablePath, String branchName) { this.fileIO = fileIO; this.tablePath = tablePath; + this.branch = StringUtils.isBlank(branchName) ? DEFAULT_MAIN_BRANCH : branchName; } public FileIO fileIO() { @@ -85,55 +92,74 @@ public Path changelogDirectory() { return new Path(tablePath + "/changelog"); } - public Path longLivedChangelogPath(long snapshotId) { - return new Path(tablePath + "/changelog/" + CHANGELOG_PREFIX + snapshotId); + public Path changelogDirectory(String branchName) { + return branchName.equals(DEFAULT_MAIN_BRANCH) + ? changelogDirectory() + : new Path(getBranchPath(tablePath, branchName) + "/changelog"); } - public Path snapshotPath(long snapshotId) { - return new Path(tablePath + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId); + public Path longLivedChangelogPath(long snapshotId) { + return new Path(tablePath + "/changelog/" + CHANGELOG_PREFIX + snapshotId); } - public Path branchSnapshotDirectory(String branchName) { - return new Path(getBranchPath(tablePath, branchName) + "/snapshot"); + public Path longLivedChangelogPath(String branchName, long snapshotId) { + return branchName.equals(DEFAULT_MAIN_BRANCH) + ? longLivedChangelogPath(snapshotId) + : new Path( + getBranchPath(tablePath, branchName) + + "/changelog/" + + CHANGELOG_PREFIX + + snapshotId); } - public Path branchSnapshotPath(String branchName, long snapshotId) { - return new Path( - getBranchPath(tablePath, branchName) + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId); + public Path snapshotPath(long snapshotId) { + return new Path(tablePath + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId); } - public Path snapshotPathByBranch(String branchName, long snapshotId) { + public Path snapshotPath(String branchName, long snapshotId) { return branchName.equals(DEFAULT_MAIN_BRANCH) ? snapshotPath(snapshotId) - : branchSnapshotPath(branchName, snapshotId); + : new Path( + getBranchPath(tablePath, branchName) + + "/snapshot/" + + SNAPSHOT_PREFIX + + snapshotId); } - public Path snapshotDirByBranch(String branchName) { + public Path snapshotDirectory(String branchName) { return branchName.equals(DEFAULT_MAIN_BRANCH) ? snapshotDirectory() - : branchSnapshotDirectory(branchName); + : new Path(getBranchPath(tablePath, branchName) + "/snapshot"); } public Snapshot snapshot(long snapshotId) { - return snapshot(DEFAULT_MAIN_BRANCH, snapshotId); + return snapshot(snapshotId, branch); } public Changelog changelog(long snapshotId) { - Path changelogPath = longLivedChangelogPath(snapshotId); + return changelog(snapshotId, branch); + } + + public Changelog changelog(long snapshotId, String branch) { + Path changelogPath = longLivedChangelogPath(branch, snapshotId); return Changelog.fromPath(fileIO, changelogPath); } public Changelog longLivedChangelog(long snapshotId) { - return Changelog.fromPath(fileIO, longLivedChangelogPath(snapshotId)); + return Changelog.fromPath(fileIO, longLivedChangelogPath(branch, snapshotId)); } - public Snapshot snapshot(String branchName, long snapshotId) { - Path snapshotPath = snapshotPathByBranch(branchName, snapshotId); + public Snapshot snapshot(long snapshotId, String branch) { + Path snapshotPath = snapshotPath(branch, snapshotId); return Snapshot.fromPath(fileIO, snapshotPath); } public boolean snapshotExists(long snapshotId) { - Path path = snapshotPath(snapshotId); + return snapshotExists(snapshotId, branch); + } + + public boolean snapshotExists(long snapshotId, String branch) { + Path path = snapshotPath(branch, snapshotId); try { return fileIO.exists(path); } catch (IOException e) { @@ -144,7 +170,11 @@ public boolean snapshotExists(long snapshotId) { } public boolean longLivedChangelogExists(long snapshotId) { - Path path = longLivedChangelogPath(snapshotId); + return longLivedChangelogExists(branch, snapshotId); + } + + public boolean longLivedChangelogExists(String branch, long snapshotId) { + Path path = longLivedChangelogPath(branch, snapshotId); try { return fileIO.exists(path); } catch (IOException e) { @@ -155,75 +185,91 @@ public boolean longLivedChangelogExists(long snapshotId) { } public @Nullable Snapshot latestSnapshot() { - return latestSnapshot(DEFAULT_MAIN_BRANCH); + return latestSnapshot(branch); } - public @Nullable Snapshot latestSnapshot(String branchName) { - Long snapshotId = latestSnapshotId(branchName); - return snapshotId == null ? null : snapshot(branchName, snapshotId); + public @Nullable Snapshot latestSnapshot(String branch) { + Long snapshotId = latestSnapshotId(branch); + return snapshotId == null ? null : snapshot(snapshotId, branch); } public @Nullable Long latestSnapshotId() { - return latestSnapshotId(DEFAULT_MAIN_BRANCH); + return latestSnapshotId(branch); } - public @Nullable Long latestSnapshotId(String branchName) { + public @Nullable Long latestSnapshotId(String branch) { try { - return findLatest(snapshotDirByBranch(branchName), SNAPSHOT_PREFIX, this::snapshotPath); + return findLatest(snapshotDirectory(branch), SNAPSHOT_PREFIX, this::snapshotPath); } catch (IOException e) { throw new RuntimeException("Failed to find latest snapshot id", e); } } public @Nullable Snapshot earliestSnapshot() { - Long snapshotId = earliestSnapshotId(); - return snapshotId == null ? null : snapshot(snapshotId); + return earliestSnapshot(branch); + } + + public @Nullable Snapshot earliestSnapshot(String branch) { + Long snapshotId = earliestSnapshotId(branch); + return snapshotId == null ? null : snapshot(snapshotId, branch); } public @Nullable Long earliestSnapshotId() { - return earliestSnapshotId(DEFAULT_MAIN_BRANCH); + return earliestSnapshotId(branch); + } + + public @Nullable Long earliestSnapshotId(String branchName) { + try { + return findEarliest(snapshotDirectory(branchName), SNAPSHOT_PREFIX, this::snapshotPath); + } catch (IOException e) { + throw new RuntimeException("Failed to find earliest snapshot id", e); + } } public @Nullable Long earliestLongLivedChangelogId() { + return earliestLongLivedChangelogId(branch); + } + + public @Nullable Long earliestLongLivedChangelogId(String branch) { try { return findEarliest( - changelogDirectory(), CHANGELOG_PREFIX, this::longLivedChangelogPath); + changelogDirectory(branch), CHANGELOG_PREFIX, this::longLivedChangelogPath); } catch (IOException e) { throw new RuntimeException("Failed to find earliest changelog id", e); } } public @Nullable Long latestLongLivedChangelogId() { + return latestLongLivedChangelogId(branch); + } + + public @Nullable Long latestLongLivedChangelogId(String branch) { try { - return findLatest(changelogDirectory(), CHANGELOG_PREFIX, this::longLivedChangelogPath); + return findLatest( + changelogDirectory(branch), CHANGELOG_PREFIX, this::longLivedChangelogPath); } catch (IOException e) { throw new RuntimeException("Failed to find latest changelog id", e); } } public @Nullable Long latestChangelogId() { - return latestSnapshotId(); + return latestSnapshotId(branch); } - public @Nullable Long earliestSnapshotId(String branchName) { - try { - return findEarliest( - snapshotDirByBranch(branchName), SNAPSHOT_PREFIX, this::snapshotPath); - } catch (IOException e) { - throw new RuntimeException("Failed to find earliest snapshot id", e); - } + public @Nullable Long pickOrLatest(Predicate predicate) { + return pickOrLatest(branch, predicate); } - public @Nullable Long pickOrLatest(Predicate predicate) { - Long latestId = latestSnapshotId(); - Long earliestId = earliestSnapshotId(); + public @Nullable Long pickOrLatest(String branch, Predicate predicate) { + Long latestId = latestSnapshotId(branch); + Long earliestId = earliestSnapshotId(branch); if (latestId == null || earliestId == null) { return null; } for (long snapshotId = latestId; snapshotId >= earliestId; snapshotId--) { - if (snapshotExists(snapshotId)) { - Snapshot snapshot = snapshot(snapshotId); + if (snapshotExists(snapshotId, branch)) { + Snapshot snapshot = snapshot(snapshotId, branch); if (predicate.test(snapshot)) { return snapshot.id(); } @@ -234,39 +280,47 @@ public boolean longLivedChangelogExists(long snapshotId) { } private Snapshot changelogOrSnapshot(long snapshotId) { - if (longLivedChangelogExists(snapshotId)) { - return changelog(snapshotId); + return changelogOrSnapshot(snapshotId, branch); + } + + private Snapshot changelogOrSnapshot(long snapshotId, String branch) { + if (longLivedChangelogExists(branch, snapshotId)) { + return changelog(snapshotId, branch); } else { - return snapshot(snapshotId); + return snapshot(snapshotId, branch); } } + public @Nullable Long earlierThanTimeMills(long timestampMills, boolean startFromChangelog) { + return earlierThanTimeMills(branch, timestampMills, startFromChangelog); + } + /** * Returns the latest snapshot earlier than the timestamp mills. A non-existent snapshot may be * returned if all snapshots are equal to or later than the timestamp mills. */ - public @Nullable Long earlierThanTimeMills(long timestampMills, boolean startFromChangelog) { - Long earliestSnapshot = earliestSnapshotId(); + public @Nullable Long earlierThanTimeMills( + String branch, long timestampMills, boolean startFromChangelog) { + Long earliestSnapshot = earliestSnapshotId(branch); Long earliest; if (startFromChangelog) { - Long earliestChangelog = earliestLongLivedChangelogId(); + Long earliestChangelog = earliestLongLivedChangelogId(branch); earliest = earliestChangelog == null ? earliestSnapshot : earliestChangelog; } else { earliest = earliestSnapshot; } - Long latest = latestSnapshotId(); - + Long latest = latestSnapshotId(branch); if (earliest == null || latest == null) { return null; } - if (changelogOrSnapshot(earliest).timeMillis() >= timestampMills) { + if (changelogOrSnapshot(earliest, branch).timeMillis() >= timestampMills) { return earliest - 1; } while (earliest < latest) { long mid = (earliest + latest + 1) / 2; - if (changelogOrSnapshot(mid).timeMillis() < timestampMills) { + if (changelogOrSnapshot(mid, branch).timeMillis() < timestampMills) { earliest = mid; } else { latest = mid - 1; @@ -275,24 +329,28 @@ private Snapshot changelogOrSnapshot(long snapshotId) { return earliest; } + public @Nullable Snapshot earlierOrEqualTimeMills(long timestampMills) { + return earlierOrEqualTimeMills(branch, timestampMills); + } + /** * Returns a {@link Snapshot} whoes commit time is earlier than or equal to given timestamp * mills. If there is no such a snapshot, returns null. */ - public @Nullable Snapshot earlierOrEqualTimeMills(long timestampMills) { - Long earliest = earliestSnapshotId(); - Long latest = latestSnapshotId(); + public @Nullable Snapshot earlierOrEqualTimeMills(String branch, long timestampMills) { + Long earliest = earliestSnapshotId(branch); + Long latest = latestSnapshotId(branch); if (earliest == null || latest == null) { return null; } - if (snapshot(earliest).timeMillis() > timestampMills) { + if (snapshot(earliest, branch).timeMillis() > timestampMills) { return null; } Snapshot finalSnapshot = null; while (earliest <= latest) { long mid = earliest + (latest - earliest) / 2; // Avoid overflow - Snapshot snapshot = snapshot(mid); + Snapshot snapshot = snapshot(mid, branch); long commitTime = snapshot.timeMillis(); if (commitTime > timestampMills) { latest = mid - 1; // Search in the left half @@ -365,31 +423,47 @@ private Snapshot changelogOrSnapshot(long snapshotId) { } public long snapshotCount() throws IOException { - return listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX).count(); + return snapshotCount(branch); + } + + public long snapshotCount(String branch) throws IOException { + return listVersionedFiles(fileIO, snapshotDirectory(branch), SNAPSHOT_PREFIX).count(); } public Iterator snapshots() throws IOException { - return listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX) - .map(this::snapshot) + return snapshots(branch); + } + + public Iterator snapshots(String branch) throws IOException { + return listVersionedFiles(fileIO, snapshotDirectory(branch), SNAPSHOT_PREFIX) + .map(id -> snapshot(id, branch)) .sorted(Comparator.comparingLong(Snapshot::id)) .iterator(); } public Iterator changelogs() throws IOException { - return listVersionedFiles(fileIO, changelogDirectory(), CHANGELOG_PREFIX) - .map(this::changelog) + return changelogs(branch); + } + + public Iterator changelogs(String branch) throws IOException { + return listVersionedFiles(fileIO, changelogDirectory(branch), CHANGELOG_PREFIX) + .map(snapshotId -> changelog(snapshotId, branch)) .sorted(Comparator.comparingLong(Changelog::id)) .iterator(); } + public List safelyGetAllSnapshots() throws IOException { + return safelyGetAllSnapshots(branch); + } + /** * If {@link FileNotFoundException} is thrown when reading the snapshot file, this snapshot may * be deleted by other processes, so just skip this snapshot. */ - public List safelyGetAllSnapshots() throws IOException { + public List safelyGetAllSnapshots(String branch) throws IOException { List paths = - listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX) - .map(this::snapshotPath) + listVersionedFiles(fileIO, snapshotDirectory(branch), SNAPSHOT_PREFIX) + .map(id -> snapshotPath(branch, id)) .collect(Collectors.toList()); List snapshots = new ArrayList<>(); @@ -404,9 +478,13 @@ public List safelyGetAllSnapshots() throws IOException { } public List safelyGetAllChangelogs() throws IOException { + return safelyGetAllChangelogs(branch); + } + + public List safelyGetAllChangelogs(String branch) throws IOException { List paths = - listVersionedFiles(fileIO, changelogDirectory(), CHANGELOG_PREFIX) - .map(this::longLivedChangelogPath) + listVersionedFiles(fileIO, changelogDirectory(branch), CHANGELOG_PREFIX) + .map(id -> longLivedChangelogPath(branch, id)) .collect(Collectors.toList()); List changelogs = new ArrayList<>(); @@ -426,7 +504,8 @@ public List safelyGetAllChangelogs() throws IOException { * result. */ public List tryGetNonSnapshotFiles(Predicate fileStatusFilter) { - return listPathWithFilter(snapshotDirectory(), fileStatusFilter, nonSnapshotFileFilter()); + return listPathWithFilter( + snapshotDirectory(branch), fileStatusFilter, nonSnapshotFileFilter()); } public List tryGetNonChangelogFiles(Predicate fileStatusFilter) { @@ -470,18 +549,22 @@ private Predicate nonChangelogFileFilter() { } public Optional latestSnapshotOfUser(String user) { - Long latestId = latestSnapshotId(); + return latestSnapshotOfUser(branch, user); + } + + public Optional latestSnapshotOfUser(String branch, String user) { + Long latestId = latestSnapshotId(branch); if (latestId == null) { return Optional.empty(); } long earliestId = Preconditions.checkNotNull( - earliestSnapshotId(), + earliestSnapshotId(branch), "Latest snapshot id is not null, but earliest snapshot id is null. " + "This is unexpected."); for (long id = latestId; id >= earliestId; id--) { - Snapshot snapshot = snapshot(id); + Snapshot snapshot = snapshot(id, branch); if (user.equals(snapshot.commitUser())) { return Optional.of(snapshot); } @@ -489,19 +572,24 @@ public Optional latestSnapshotOfUser(String user) { return Optional.empty(); } - /** Find the snapshot of the specified identifiers written by the specified user. */ public List findSnapshotsForIdentifiers( @Nonnull String user, List identifiers) { + return findSnapshotsForIdentifiers(user, identifiers, branch); + } + + /** Find the snapshot of the specified identifiers written by the specified user. */ + public List findSnapshotsForIdentifiers( + @Nonnull String user, List identifiers, String branch) { if (identifiers.isEmpty()) { return Collections.emptyList(); } - Long latestId = latestSnapshotId(); + Long latestId = latestSnapshotId(branch); if (latestId == null) { return Collections.emptyList(); } long earliestId = Preconditions.checkNotNull( - earliestSnapshotId(), + earliestSnapshotId(branch), "Latest snapshot id is not null, but earliest snapshot id is null. " + "This is unexpected."); @@ -509,7 +597,7 @@ public List findSnapshotsForIdentifiers( List matchedSnapshots = new ArrayList<>(); Set remainingIdentifiers = new HashSet<>(identifiers); for (long id = latestId; id >= earliestId && !remainingIdentifiers.isEmpty(); id--) { - Snapshot snapshot = snapshot(id); + Snapshot snapshot = snapshot(id, branch); if (user.equals(snapshot.commitUser())) { if (remainingIdentifiers.remove(snapshot.commitIdentifier())) { matchedSnapshots.add(snapshot); @@ -523,7 +611,12 @@ public List findSnapshotsForIdentifiers( } public void commitChangelog(Changelog changelog, long id) throws IOException { - fileIO.writeFileUtf8(longLivedChangelogPath(id), changelog.toJson()); + fileIO.writeFileUtf8(longLivedChangelogPath(branch, id), changelog.toJson()); + } + + @Nullable + public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) { + return traversalSnapshotsFromLatestSafely(checker, branch); } /** @@ -532,12 +625,12 @@ public void commitChangelog(Changelog changelog, long id) throws IOException { * unreadable snapshots. */ @Nullable - public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) { - Long latestId = latestSnapshotId(); + public Snapshot traversalSnapshotsFromLatestSafely(Filter checker, String branch) { + Long latestId = latestSnapshotId(branch); if (latestId == null) { return null; } - Long earliestId = earliestSnapshotId(); + Long earliestId = earliestSnapshotId(branch); if (earliestId == null) { return null; } @@ -545,9 +638,9 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) { for (long id = latestId; id >= earliestId; id--) { Snapshot snapshot; try { - snapshot = snapshot(id); + snapshot = snapshot(id, branch); } catch (Exception e) { - Long newEarliestId = earliestSnapshotId(); + Long newEarliestId = earliestSnapshotId(branch); if (newEarliestId == null) { return null; } @@ -582,7 +675,6 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) { return snapshotId; } } - return findByListFiles(Math::max, dir, prefix); } @@ -602,7 +694,11 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) { } public Long readHint(String fileName) { - return readHint(fileName, snapshotDirByBranch(DEFAULT_MAIN_BRANCH)); + return readHint(fileName, branch); + } + + public Long readHint(String fileName, String branch) { + return readHint(fileName, snapshotDirectory(branch)); } public Long readHint(String fileName, Path dir) { @@ -629,11 +725,11 @@ private Long findByListFiles(BinaryOperator reducer, Path dir, String pref } public void commitLatestHint(long snapshotId) throws IOException { - commitLatestHint(snapshotId, DEFAULT_MAIN_BRANCH); + commitLatestHint(snapshotId, branch); } public void commitLatestHint(long snapshotId, String branchName) throws IOException { - commitHint(snapshotId, LATEST, snapshotDirByBranch(branchName)); + commitHint(snapshotId, LATEST, snapshotDirectory(branchName)); } public void commitLongLivedChangelogLatestHint(long snapshotId) throws IOException { @@ -645,11 +741,11 @@ public void commitLongLivedChangelogEarliestHint(long snapshotId) throws IOExcep } public void commitEarliestHint(long snapshotId) throws IOException { - commitEarliestHint(snapshotId, DEFAULT_MAIN_BRANCH); + commitEarliestHint(snapshotId, branch); } public void commitEarliestHint(long snapshotId, String branchName) throws IOException { - commitHint(snapshotId, EARLIEST, snapshotDirByBranch(branchName)); + commitHint(snapshotId, EARLIEST, snapshotDirectory(branchName)); } private void commitHint(long snapshotId, String fileName, Path dir) throws IOException { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 8b7818fed782e..634e9f760ae9e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -45,6 +45,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.BranchManager.getBranchPath; import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -58,10 +59,17 @@ public class TagManager { private final FileIO fileIO; private final Path tablePath; + private final String branch; public TagManager(FileIO fileIO, Path tablePath) { + this(fileIO, tablePath, DEFAULT_MAIN_BRANCH); + } + + /** Specify the default branch for data writing. */ + public TagManager(FileIO fileIO, Path tablePath, String branch) { this.fileIO = fileIO; this.tablePath = tablePath; + this.branch = StringUtils.isBlank(branch) ? DEFAULT_MAIN_BRANCH : branch; } /** Return the root Directory of tags. */ @@ -69,31 +77,49 @@ public Path tagDirectory() { return new Path(tablePath + "/tag"); } + /** Return the root Directory of tags. */ + public Path tagDirectory(String branch) { + return branch.equals(DEFAULT_MAIN_BRANCH) + ? tagDirectory() + : new Path(getBranchPath(tablePath, branch) + "/tag"); + } + /** Return the path of a tag. */ public Path tagPath(String tagName) { return new Path(tablePath + "/tag/" + TAG_PREFIX + tagName); } /** Return the path of a tag in branch. */ - public Path branchTagPath(String branchName, String tagName) { - return new Path(getBranchPath(tablePath, branchName) + "/tag/" + TAG_PREFIX + tagName); + public Path tagPath(String branch, String tagName) { + return branch.equals(DEFAULT_MAIN_BRANCH) + ? tagPath(tagName) + : new Path(getBranchPath(tablePath, branch) + "/tag/" + TAG_PREFIX + tagName); } - /** Create a tag from given snapshot and save it in the storage. */ public void createTag( Snapshot snapshot, String tagName, @Nullable Duration timeRetained, List callbacks) { + createTag(snapshot, tagName, timeRetained, callbacks, branch); + } + + /** Create a tag from given snapshot and save it in the storage. */ + public void createTag( + Snapshot snapshot, + String tagName, + @Nullable Duration timeRetained, + List callbacks, + String branch) { checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is blank.", tagName); // skip create tag for the same snapshot of the same name. - if (tagExists(tagName)) { - Snapshot tagged = taggedSnapshot(tagName); + if (tagExists(branch, tagName)) { + Snapshot tagged = taggedSnapshot(branch, tagName); Preconditions.checkArgument( tagged.id() == snapshot.id(), "Tag name '%s' already exists.", tagName); } else { - Path newTagPath = tagPath(tagName); + Path newTagPath = tagPath(branch, tagName); try { fileIO.writeFileUtf8( newTagPath, @@ -121,10 +147,18 @@ public void createTag( } } - /** Make sure the tagNames are ALL tags of one snapshot. */ public void deleteAllTagsOfOneSnapshot( List tagNames, TagDeletion tagDeletion, SnapshotManager snapshotManager) { - Snapshot taggedSnapshot = taggedSnapshot(tagNames.get(0)); + deleteAllTagsOfOneSnapshot(tagNames, tagDeletion, snapshotManager, branch); + } + + /** Make sure the tagNames are ALL tags of one snapshot. */ + public void deleteAllTagsOfOneSnapshot( + List tagNames, + TagDeletion tagDeletion, + SnapshotManager snapshotManager, + String branch) { + Snapshot taggedSnapshot = taggedSnapshot(branch, tagNames.get(0)); List taggedSnapshots; // skip file deletion if snapshot exists @@ -133,11 +167,11 @@ public void deleteAllTagsOfOneSnapshot( return; } else { // FileIO discovers tags by tag file, so we should read all tags before we delete tag - taggedSnapshots = taggedSnapshots(); - tagNames.forEach(tagName -> fileIO.deleteQuietly(tagPath(tagName))); + taggedSnapshots = taggedSnapshots(branch); + tagNames.forEach(tagName -> fileIO.deleteQuietly(tagPath(branch, tagName))); } - doClean(taggedSnapshot, taggedSnapshots, snapshotManager, tagDeletion); + doClean(taggedSnapshot, taggedSnapshots, snapshotManager, tagDeletion, branch); } public void deleteTag( @@ -145,21 +179,29 @@ public void deleteTag( TagDeletion tagDeletion, SnapshotManager snapshotManager, List callbacks) { + deleteTag(tagName, tagDeletion, snapshotManager, callbacks, branch); + } + + public void deleteTag( + String tagName, + TagDeletion tagDeletion, + SnapshotManager snapshotManager, + List callbacks, + String branch) { checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is blank.", tagName); - checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName); + checkArgument(tagExists(branch, tagName), "Tag '%s' doesn't exist.", tagName); - Snapshot taggedSnapshot = taggedSnapshot(tagName); + Snapshot taggedSnapshot = taggedSnapshot(branch, tagName); List taggedSnapshots; // skip file deletion if snapshot exists - if (snapshotManager.snapshotExists(taggedSnapshot.id())) { - deleteTagMetaFile(tagName, callbacks); + if (snapshotManager.snapshotExists(taggedSnapshot.id(), branch)) { + deleteTagMetaFile(tagName, callbacks, branch); return; } else { // FileIO discovers tags by tag file, so we should read all tags before we delete tag SortedMap> tags = tags(); - deleteTagMetaFile(tagName, callbacks); - + deleteTagMetaFile(tagName, callbacks, branch); // skip data file clean if more than 1 tags are created based on this snapshot if (tags.get(taggedSnapshot).size() > 1) { return; @@ -167,11 +209,15 @@ public void deleteTag( taggedSnapshots = new ArrayList<>(tags.keySet()); } - doClean(taggedSnapshot, taggedSnapshots, snapshotManager, tagDeletion); + doClean(taggedSnapshot, taggedSnapshots, snapshotManager, tagDeletion, branch); } private void deleteTagMetaFile(String tagName, List callbacks) { - fileIO.deleteQuietly(tagPath(tagName)); + deleteTagMetaFile(tagName, callbacks, branch); + } + + private void deleteTagMetaFile(String tagName, List callbacks, String branch) { + fileIO.deleteQuietly(tagPath(branch, tagName)); try { callbacks.forEach(callback -> callback.notifyDeletion(tagName)); } finally { @@ -185,7 +231,8 @@ private void doClean( Snapshot taggedSnapshot, List taggedSnapshots, SnapshotManager snapshotManager, - TagDeletion tagDeletion) { + TagDeletion tagDeletion, + String branch) { // collect skipping sets from the left neighbor tag and the nearest right neighbor (either // the earliest snapshot or right neighbor tag) List skippedSnapshots = new ArrayList<>(); @@ -196,7 +243,7 @@ private void doClean( skippedSnapshots.add(taggedSnapshots.get(index - 1)); } // the nearest right neighbor - Snapshot right = snapshotManager.earliestSnapshot(); + Snapshot right = snapshotManager.earliestSnapshot(branch); if (index + 1 < taggedSnapshots.size()) { Snapshot rightTag = taggedSnapshots.get(index + 1); right = right.id() < rightTag.id() ? right : rightTag; @@ -226,9 +273,9 @@ private void doClean( taggedSnapshot, tagDeletion.manifestSkippingSet(skippedSnapshots)); } - /** Check if a tag exists. */ - public boolean tagExists(String tagName) { - Path path = tagPath(tagName); + /** Check if a branch tag exists. */ + public boolean tagExists(String branch, String tagName) { + Path path = tagPath(branch, tagName); try { return fileIO.exists(path); } catch (IOException e) { @@ -239,11 +286,20 @@ public boolean tagExists(String tagName) { } } - /** Get the tagged snapshot by name. */ + /** Check if a tag exists. */ + public boolean tagExists(String tagName) { + return tagExists(DEFAULT_MAIN_BRANCH, tagName); + } + + /** Get the branch tagged snapshot by name. */ public Snapshot taggedSnapshot(String tagName) { - checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName); - // Trim to snapshot to avoid equals and compare snapshot. - return Tag.fromPath(fileIO, tagPath(tagName)).trimToSnapshot(); + return taggedSnapshot(DEFAULT_MAIN_BRANCH, tagName); + } + + /** Get the tagged snapshot by name. */ + public Snapshot taggedSnapshot(String branch, String tagName) { + checkArgument(tagExists(branch, tagName), "Tag '%s' doesn't exist.", tagName); + return Tag.fromPath(fileIO, tagPath(branch, tagName)).trimToSnapshot(); } public long tagCount() { @@ -259,11 +315,24 @@ public List taggedSnapshots() { return new ArrayList<>(tags().keySet()); } - /** Get all tagged snapshots with tag names sorted by snapshot id. */ + /** Get all tagged snapshots sorted by snapshot id. */ + public List taggedSnapshots(String branch) { + return new ArrayList<>(tags(branch).keySet()); + } + + /** Get all tagged snapshots with names sorted by snapshot id. */ + public SortedMap> tags(String branch) { + return tags(branch, tagName -> true); + } + public SortedMap> tags() { return tags(tagName -> true); } + public SortedMap> tags(Predicate filter) { + return tags(DEFAULT_MAIN_BRANCH, filter); + } + /** * Retrieves a sorted map of snapshots filtered based on a provided predicate. The predicate * determines which tag names should be included in the result. Only snapshots with tag names @@ -275,12 +344,12 @@ public SortedMap> tags() { * name. * @throws RuntimeException if an IOException occurs during retrieval of snapshots. */ - public SortedMap> tags(Predicate filter) { + public SortedMap> tags(String branch, Predicate filter) { TreeMap> tags = new TreeMap<>(Comparator.comparingLong(Snapshot::id)); try { List paths = - listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX) + listVersionedFileStatus(fileIO, tagDirectory(branch), TAG_PREFIX) .map(FileStatus::getPath) .collect(Collectors.toList()); @@ -325,11 +394,15 @@ public List> tagObjects() { } public List sortTagsOfOneSnapshot(List tagNames) { + return sortTagsOfOneSnapshot(branch, tagNames); + } + + public List sortTagsOfOneSnapshot(String branch, List tagNames) { return tagNames.stream() .map( name -> { try { - return fileIO.getFileStatus(tagPath(name)); + return fileIO.getFileStatus(tagPath(branch, name)); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index 6adc3aff04f8c..dd671136426f4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -53,10 +53,12 @@ import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.ScanMode; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.CommitIncrement; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.RecordWriter; import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.StringUtils; import org.apache.paimon.utils.TagManager; import org.slf4j.Logger; @@ -98,6 +100,8 @@ public class TestFileStore extends KeyValueFileStore { private long commitIdentifier; + private String branch; + private TestFileStore( String root, CoreOptions options, @@ -106,10 +110,11 @@ private TestFileStore( RowType valueType, KeyValueFieldsExtractor keyValueFieldsExtractor, MergeFunctionFactory mfFactory, - TableSchema tableSchema) { + TableSchema tableSchema, + String branch) { super( FileIOFinder.find(new Path(root)), - schemaManager(root, options), + schemaManager(root, options, branch), tableSchema != null ? tableSchema : new TableSchema( @@ -137,10 +142,11 @@ private TestFileStore( this.commitUser = UUID.randomUUID().toString(); this.commitIdentifier = 0L; + this.branch = branch; } - private static SchemaManager schemaManager(String root, CoreOptions options) { - return new SchemaManager(FileIOFinder.find(new Path(root)), options.path()); + private static SchemaManager schemaManager(String root, CoreOptions options, String branch) { + return new SchemaManager(FileIOFinder.find(new Path(root)), options.path(), branch); } public AbstractFileStoreWrite newWrite() { @@ -331,7 +337,7 @@ public List commitDataImpl( .write(kv); } - FileStoreCommit commit = newCommit(commitUser); + FileStoreCommit commit = newCommit(commitUser, branch); ManifestCommittable committable = new ManifestCommittable( identifier == null ? commitIdentifier++ : identifier, watermark); @@ -352,12 +358,12 @@ public List commitDataImpl( } SnapshotManager snapshotManager = snapshotManager(); - Long snapshotIdBeforeCommit = snapshotManager.latestSnapshotId(); + Long snapshotIdBeforeCommit = snapshotManager.latestSnapshotId(branch); if (snapshotIdBeforeCommit == null) { snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1; } commitFunction.accept(commit, committable); - Long snapshotIdAfterCommit = snapshotManager.latestSnapshotId(); + Long snapshotIdAfterCommit = snapshotManager.latestSnapshotId(branch); if (snapshotIdAfterCommit == null) { snapshotIdAfterCommit = Snapshot.FIRST_SNAPSHOT_ID - 1; } @@ -497,18 +503,18 @@ public void assertCleaned() throws IOException { // - latest should < true_latest // - earliest should < true_earliest SnapshotManager snapshotManager = snapshotManager(); - Path snapshotDir = snapshotManager.snapshotDirectory(); + Path snapshotDir = snapshotManager.snapshotDirectory(branch); Path earliest = new Path(snapshotDir, SnapshotManager.EARLIEST); Path latest = new Path(snapshotDir, SnapshotManager.LATEST); if (actualFiles.remove(earliest)) { - long earliestId = snapshotManager.readHint(SnapshotManager.EARLIEST); + long earliestId = snapshotManager.readHint(SnapshotManager.EARLIEST, branch); fileIO.delete(earliest, false); - assertThat(earliestId <= snapshotManager.earliestSnapshotId()).isTrue(); + assertThat(earliestId <= snapshotManager.earliestSnapshotId(branch)).isTrue(); } if (actualFiles.remove(latest)) { - long latestId = snapshotManager.readHint(SnapshotManager.LATEST); + long latestId = snapshotManager.readHint(SnapshotManager.LATEST, branch); fileIO.delete(latest, false); - assertThat(latestId <= snapshotManager.latestSnapshotId()).isTrue(); + assertThat(latestId <= snapshotManager.latestSnapshotId(branch)).isTrue(); } Path changelogDir = snapshotManager.changelogDirectory(); Path earliestChangelog = new Path(changelogDir, SnapshotManager.EARLIEST); @@ -540,7 +546,9 @@ private Set getFilesInUse() { Set result = new HashSet<>(); SchemaManager schemaManager = new SchemaManager(fileIO, options.path()); - schemaManager.listAllIds().forEach(id -> result.add(schemaManager.toSchemaPath(id))); + schemaManager + .listAllIds(branch) + .forEach(id -> result.add(schemaManager.toSchemaPath(branch, id))); SnapshotManager snapshotManager = snapshotManager(); Long latestSnapshotId = snapshotManager.latestSnapshotId(); @@ -689,6 +697,7 @@ public static class Builder { private final TableSchema tableSchema; private CoreOptions.ChangelogProducer changelogProducer; + private final String branch; public Builder( String format, @@ -700,6 +709,30 @@ public Builder( KeyValueFieldsExtractor keyValueFieldsExtractor, MergeFunctionFactory mfFactory, TableSchema tableSchema) { + this( + format, + root, + numBuckets, + partitionType, + keyType, + valueType, + keyValueFieldsExtractor, + mfFactory, + tableSchema, + BranchManager.DEFAULT_MAIN_BRANCH); + } + + public Builder( + String format, + String root, + int numBuckets, + RowType partitionType, + RowType keyType, + RowType valueType, + KeyValueFieldsExtractor keyValueFieldsExtractor, + MergeFunctionFactory mfFactory, + TableSchema tableSchema, + String branch) { this.format = format; this.root = root; this.numBuckets = numBuckets; @@ -711,6 +744,7 @@ public Builder( this.tableSchema = tableSchema; this.changelogProducer = CoreOptions.ChangelogProducer.NONE; + this.branch = StringUtils.isEmpty(branch) ? BranchManager.DEFAULT_MAIN_BRANCH : branch; } public Builder changelogProducer(CoreOptions.ChangelogProducer changelogProducer) { @@ -738,6 +772,7 @@ public TestFileStore build() { // disable dynamic-partition-overwrite in FileStoreCommit layer test conf.set(CoreOptions.DYNAMIC_PARTITION_OVERWRITE, false); + conf.set(CoreOptions.BRANCH, branch); return new TestFileStore( root, @@ -747,7 +782,8 @@ public TestFileStore build() { valueType, keyValueFieldsExtractor, mfFactory, - tableSchema); + tableSchema, + branch); } } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanBranchTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanBranchTest.java new file mode 100644 index 0000000000000..d787065826e64 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanBranchTest.java @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.KeyValue; +import org.apache.paimon.Snapshot; +import org.apache.paimon.TestFileStore; +import org.apache.paimon.TestKeyValueGenerator; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryRowWriter; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ManifestFileMeta; +import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.SnapshotManager; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link KeyValueFileStoreScan}. */ +public class KeyValueFileStoreScanBranchTest { + + private static final int NUM_BUCKETS = 10; + + private TestKeyValueGenerator gen; + @TempDir java.nio.file.Path tempDir; + private TestFileStore store; + private SnapshotManager snapshotManager; + + private String branch = UUID.randomUUID() + "-branch"; + + @BeforeEach + public void beforeEach() throws Exception { + gen = new TestKeyValueGenerator(); + store = + new TestFileStore.Builder( + "avro", + tempDir.toString(), + NUM_BUCKETS, + TestKeyValueGenerator.DEFAULT_PART_TYPE, + TestKeyValueGenerator.KEY_TYPE, + TestKeyValueGenerator.DEFAULT_ROW_TYPE, + TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR, + DeduplicateMergeFunction.factory(), + null, + branch) + .build(); + snapshotManager = store.snapshotManager(); + + LocalFileIO localFile = LocalFileIO.create(); + + SchemaManager schemaManager = + new SchemaManager(localFile, new Path(tempDir.toUri()), branch); + schemaManager.createTable( + new Schema( + TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(), + TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(), + TestKeyValueGenerator.getPrimaryKeys( + TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED), + Collections.emptyMap(), + null)); + } + + @Test + public void testWithPartitionFilter() throws Exception { + ThreadLocalRandom random = ThreadLocalRandom.current(); + List data = generateData(random.nextInt(1000) + 1); + List partitions = + data.stream() + .map(kv -> gen.getPartition(kv)) + .distinct() + .collect(Collectors.toList()); + Snapshot snapshot = writeData(data); + + Set wantedPartitions = new HashSet<>(); + for (int i = random.nextInt(partitions.size() + 1); i > 0; i--) { + wantedPartitions.add(partitions.get(random.nextInt(partitions.size()))); + } + + FileStoreScan scan = store.newScan(); + scan.withSnapshot(snapshot.id()); + scan.withPartitionFilter(new ArrayList<>(wantedPartitions)); + + Map expected = + store.toKvMap( + wantedPartitions.isEmpty() + ? data + : data.stream() + .filter( + kv -> + wantedPartitions.contains( + gen.getPartition(kv))) + .collect(Collectors.toList())); + runTestExactMatch(scan, snapshot.id(), expected); + } + + @Test + public void testWithKeyFilter() throws Exception { + ThreadLocalRandom random = ThreadLocalRandom.current(); + List data = generateData(random.nextInt(1000) + 1); + Snapshot snapshot = writeData(data); + + int wantedShopId = data.get(random.nextInt(data.size())).key().getInt(0); + + KeyValueFileStoreScan scan = store.newScan(); + scan.withSnapshot(snapshot.id()); + scan.withKeyFilter( + new PredicateBuilder(RowType.of(new IntType(false))).equal(0, wantedShopId)); + + Map expected = + store.toKvMap( + data.stream() + .filter(kv -> kv.key().getInt(0) == wantedShopId) + .collect(Collectors.toList())); + runTestContainsAll(scan, snapshot.id(), expected); + } + + @Test + public void testWithValueFilter() throws Exception { + ThreadLocalRandom random = ThreadLocalRandom.current(); + List data = generateData(100, Math.abs(random.nextInt(1000))); + writeData(data, 0); + data = generateData(100, Math.abs(random.nextInt(1000)) + 1000); + writeData(data, 1); + data = generateData(100, Math.abs(random.nextInt(1000)) + 2000); + writeData(data, 2); + generateData(100, Math.abs(random.nextInt(1000)) + 3000); + Snapshot snapshot = writeData(data, 3); + + KeyValueFileStoreScan scan = store.newScan(); + scan.withSnapshot(snapshot.id()); + List files = scan.plan().files(); + + scan = store.newScan(); + scan.withSnapshot(snapshot.id()); + scan.withValueFilter( + new PredicateBuilder(TestKeyValueGenerator.DEFAULT_ROW_TYPE) + .between(1, 1000, 2000)); + + List filesFiltered = scan.plan().files(); + + assertThat(files.size()).isEqualTo(4); + assertThat(filesFiltered.size()).isEqualTo(1); + } + + @Test + public void testWithValuePartitionFilter() throws Exception { + ThreadLocalRandom random = ThreadLocalRandom.current(); + List data = generateData(100, Math.abs(random.nextInt(1000))); + writeData(data, "0", 0); + data = generateData(100, Math.abs(random.nextInt(1000)) + 1000); + writeData(data, "1", 0); + data = generateData(100, Math.abs(random.nextInt(1000)) + 2000); + writeData(data, "2", 0); + generateData(100, Math.abs(random.nextInt(1000)) + 3000); + Snapshot snapshot = writeData(data, "3", 0); + + KeyValueFileStoreScan scan = store.newScan(); + scan.withSnapshot(snapshot.id()); + List files = scan.plan().files(); + + scan = store.newScan(); + scan.withSnapshot(snapshot.id()); + scan.withValueFilter( + new PredicateBuilder(TestKeyValueGenerator.DEFAULT_ROW_TYPE) + .between(1, 1000, 2000)); + + List filesFiltered = scan.plan().files(); + + assertThat(files.size()).isEqualTo(4); + assertThat(filesFiltered.size()).isEqualTo(1); + } + + @Test + public void testWithBucket() throws Exception { + ThreadLocalRandom random = ThreadLocalRandom.current(); + List data = generateData(random.nextInt(1000) + 1); + Snapshot snapshot = writeData(data); + + int wantedBucket = random.nextInt(NUM_BUCKETS); + + FileStoreScan scan = store.newScan(); + scan.withSnapshot(snapshot.id()); + scan.withBucket(wantedBucket); + + Map expected = + store.toKvMap( + data.stream() + .filter(kv -> getBucket(kv) == wantedBucket) + .collect(Collectors.toList())); + runTestExactMatch(scan, snapshot.id(), expected); + } + + @Test + public void testWithSnapshot() throws Exception { + ThreadLocalRandom random = ThreadLocalRandom.current(); + int numCommits = random.nextInt(10) + 1; + int wantedCommit = random.nextInt(numCommits); + + List snapshots = new ArrayList<>(); + List> allData = new ArrayList<>(); + for (int i = 0; i < numCommits; i++) { + List data = generateData(random.nextInt(100) + 1); + snapshots.add(writeData(data)); + allData.add(data); + } + long wantedSnapshot = snapshots.get(wantedCommit).id(); + + FileStoreScan scan = store.newScan(); + scan.withSnapshot(wantedSnapshot); + + Map expected = + store.toKvMap( + allData.subList(0, wantedCommit + 1).stream() + .flatMap(Collection::stream) + .collect(Collectors.toList())); + runTestExactMatch(scan, wantedSnapshot, expected); + } + + @Test + public void testWithManifestList() throws Exception { + ThreadLocalRandom random = ThreadLocalRandom.current(); + int numCommits = random.nextInt(10) + 1; + for (int i = 0; i < numCommits; i++) { + List data = generateData(random.nextInt(100) + 1); + writeData(data); + } + + ManifestList manifestList = store.manifestListFactory().create(); + long wantedSnapshotId = random.nextLong(snapshotManager.latestSnapshotId(branch)) + 1; + Snapshot wantedSnapshot = snapshotManager.snapshot(wantedSnapshotId); + List wantedManifests = wantedSnapshot.dataManifests(manifestList); + + FileStoreScan scan = store.newScan(); + scan.withManifestList(wantedManifests); + + List expectedKvs = store.readKvsFromSnapshot(wantedSnapshotId); + gen.sort(expectedKvs); + Map expected = store.toKvMap(expectedKvs); + runTestExactMatch(scan, null, expected); + } + + private void runTestExactMatch( + FileStoreScan scan, Long expectedSnapshotId, Map expected) + throws Exception { + Map actual = getActualKvMap(scan, expectedSnapshotId); + assertThat(actual).isEqualTo(expected); + } + + private void runTestContainsAll( + FileStoreScan scan, Long expectedSnapshotId, Map expected) + throws Exception { + Map actual = getActualKvMap(scan, expectedSnapshotId); + for (Map.Entry entry : expected.entrySet()) { + assertThat(actual).containsKey(entry.getKey()); + assertThat(actual.get(entry.getKey())).isEqualTo(entry.getValue()); + } + } + + private Map getActualKvMap(FileStoreScan scan, Long expectedSnapshotId) + throws Exception { + FileStoreScan.Plan plan = scan.plan(); + assertThat(plan.snapshotId()).isEqualTo(expectedSnapshotId); + + List actualKvs = store.readKvsFromManifestEntries(plan.files(), false); + gen.sort(actualKvs); + return store.toKvMap(actualKvs); + } + + private List generateData(int numRecords) { + List data = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + data.add(gen.next()); + } + return data; + } + + private List generateData(int numRecords, int hr) { + List data = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + data.add(gen.nextInsert("", hr, null, null, null)); + } + return data; + } + + private Snapshot writeData(List kvs) throws Exception { + List snapshots = store.commitData(kvs, gen::getPartition, this::getBucket); + return snapshots.get(snapshots.size() - 1); + } + + private Snapshot writeData(List kvs, int bucket) throws Exception { + List snapshots = store.commitData(kvs, gen::getPartition, b -> bucket); + return snapshots.get(snapshots.size() - 1); + } + + private Snapshot writeData(List kvs, String partition, int bucket) throws Exception { + BinaryRow binaryRow = new BinaryRow(2); + BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow); + binaryRowWriter.writeString(0, BinaryString.fromString(partition)); + binaryRowWriter.writeInt(1, 0); + binaryRowWriter.complete(); + List snapshots = store.commitData(kvs, p -> binaryRow, b -> bucket); + return snapshots.get(snapshots.size() - 1); + } + + private int getBucket(KeyValue kv) { + return (kv.key().hashCode() % NUM_BUCKETS + NUM_BUCKETS) % NUM_BUCKETS; + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java index 806c869f94f3c..44f502f41c705 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java @@ -127,10 +127,7 @@ public List keyFields(TableSchema schema) { @Override public List valueFields(TableSchema schema) { return Collections.singletonList( - new DataField( - 0, - "count", - new org.apache.paimon.types.BigIntType())); + new DataField(0, "count", new BigIntType())); } }, TestValueCountMergeFunction.factory()); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java index 6893649d532d1..a37802f33c9df 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java @@ -120,9 +120,12 @@ public void testBatchReadWrite() throws Exception { public void testBranchBatchReadWrite() throws Exception { FileStoreTable table = createFileStoreTable(); generateBranch(table); - writeBranchData(table); - List splits = toSplits(table.newSnapshotReader(BRANCH_NAME).read().dataSplits()); - TableRead read = table.newRead(); + + FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME); + writeBranchData(tableBranch); + List splits = + toSplits(tableBranch.newSnapshotReader(BRANCH_NAME).read().dataSplits()); + TableRead read = tableBranch.newRead(); assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)) .hasSameElementsAs( Arrays.asList( @@ -300,15 +303,18 @@ public void testBatchSplitOrderByPartition() throws Exception { public void testBranchStreamingReadWrite() throws Exception { FileStoreTable table = createFileStoreTable(); generateBranch(table); - writeBranchData(table); + + FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME); + writeBranchData(tableBranch); List splits = toSplits( - table.newSnapshotReader(BRANCH_NAME) + tableBranch + .newSnapshotReader(BRANCH_NAME) .withMode(ScanMode.DELTA) .read() .dataSplits()); - TableRead read = table.newRead(); + TableRead read = tableBranch.newRead(); assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING)) .isEqualTo( @@ -694,6 +700,25 @@ protected FileStoreTable createFileStoreTable(Consumer configure) throw return new AppendOnlyFileStoreTable(FileIOFinder.find(tablePath), tablePath, tableSchema); } + @Override + protected FileStoreTable createFileStoreTable(String branch, Consumer configure) + throws Exception { + Options conf = new Options(); + conf.set(CoreOptions.PATH, tablePath.toString()); + conf.set(CoreOptions.BRANCH, branch); + configure.accept(conf); + TableSchema tableSchema = + SchemaUtils.forceCommit( + new SchemaManager(LocalFileIO.create(), tablePath, branch), + new Schema( + ROW_TYPE.getFields(), + Collections.singletonList("pt"), + Collections.emptyList(), + conf.toMap(), + "")); + return new AppendOnlyFileStoreTable(FileIOFinder.find(tablePath), tablePath, tableSchema); + } + @Override protected FileStoreTable overwriteTestFileStoreTable() throws Exception { Options conf = new Options(); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index 138a30d5bf87b..bd7a4a2cebfcb 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -1019,22 +1019,21 @@ public void testCreateBranch() throws Exception { // verify test-tag in test-branch is equal to snapshot 2 Snapshot branchTag = Snapshot.fromPath( - new TraceableFileIO(), tagManager.branchTagPath("test-branch", "test-tag")); + new TraceableFileIO(), tagManager.tagPath("test-branch", "test-tag")); assertThat(branchTag.equals(snapshot2)).isTrue(); // verify snapshot in test-branch is equal to snapshot 2 SnapshotManager snapshotManager = new SnapshotManager(new TraceableFileIO(), tablePath); Snapshot branchSnapshot = Snapshot.fromPath( - new TraceableFileIO(), - snapshotManager.branchSnapshotPath("test-branch", 2)); + new TraceableFileIO(), snapshotManager.snapshotPath("test-branch", 2)); assertThat(branchSnapshot.equals(snapshot2)).isTrue(); // verify schema in test-branch is equal to schema 0 SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(), tablePath); TableSchema branchSchema = SchemaManager.fromPath( - new TraceableFileIO(), schemaManager.branchSchemaPath("test-branch", 0)); + new TraceableFileIO(), schemaManager.toSchemaPath("test-branch", 0)); TableSchema schema0 = schemaManager.schema(0); assertThat(branchSchema.equals(schema0)).isTrue(); } @@ -1312,9 +1311,10 @@ public void testBranchWriteAndRead() throws Exception { generateBranch(table); + FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME); // Write data to branch1 - try (StreamTableWrite write = table.newWrite(commitUser); - StreamTableCommit commit = table.newCommit(commitUser, BRANCH_NAME)) { + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser, BRANCH_NAME)) { write.write(rowData(2, 20, 200L)); commit.commit(1, write.prepareCommit(false, 2)); } @@ -1330,16 +1330,20 @@ public void testBranchWriteAndRead() throws Exception { // Validate data in branch1 assertThat( getResult( - table.newRead(), - toSplits(table.newSnapshotReader(BRANCH_NAME).read().dataSplits()), + tableBranch.newRead(), + toSplits( + tableBranch + .newSnapshotReader(BRANCH_NAME) + .read() + .dataSplits()), BATCH_ROW_TO_STRING)) .containsExactlyInAnyOrder( "0|0|0|binary|varbinary|mapKey:mapVal|multiset", "2|20|200|binary|varbinary|mapKey:mapVal|multiset"); // Write two rows data to branch1 again - try (StreamTableWrite write = table.newWrite(commitUser); - StreamTableCommit commit = table.newCommit(commitUser, BRANCH_NAME)) { + try (StreamTableWrite write = tableBranch.newWrite(commitUser); + StreamTableCommit commit = tableBranch.newCommit(commitUser, BRANCH_NAME)) { write.write(rowData(3, 30, 300L)); write.write(rowData(4, 40, 400L)); commit.commit(2, write.prepareCommit(false, 3)); @@ -1356,8 +1360,12 @@ public void testBranchWriteAndRead() throws Exception { // Verify data in branch1 assertThat( getResult( - table.newRead(), - toSplits(table.newSnapshotReader(BRANCH_NAME).read().dataSplits()), + tableBranch.newRead(), + toSplits( + tableBranch + .newSnapshotReader(BRANCH_NAME) + .read() + .dataSplits()), BATCH_ROW_TO_STRING)) .containsExactlyInAnyOrder( "0|0|0|binary|varbinary|mapKey:mapVal|multiset", @@ -1446,6 +1454,14 @@ protected FileStoreTable createFileStoreTable(int numOfBucket) throws Exception return createFileStoreTable(conf -> conf.set(BUCKET, numOfBucket)); } + protected FileStoreTable createFileStoreTable(String branch, int numOfBucket) throws Exception { + return createFileStoreTable(branch, conf -> conf.set(BUCKET, numOfBucket)); + } + + protected FileStoreTable createFileStoreTable(String branch) throws Exception { + return createFileStoreTable(branch, 1); + } + protected FileStoreTable createFileStoreTable() throws Exception { return createFileStoreTable(1); } @@ -1453,6 +1469,9 @@ protected FileStoreTable createFileStoreTable() throws Exception { protected abstract FileStoreTable createFileStoreTable(Consumer configure) throws Exception; + protected abstract FileStoreTable createFileStoreTable( + String branch, Consumer configure) throws Exception; + protected abstract FileStoreTable overwriteTestFileStoreTable() throws Exception; private static InternalRow overwriteRow(Object... values) { diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index 43d338eb8fc33..9c14bff9eed22 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -81,6 +81,7 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.paimon.CoreOptions.BRANCH; import static org.apache.paimon.CoreOptions.BUCKET; import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MAX; import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MIN; @@ -257,9 +258,11 @@ public void testBatchReadWrite() throws Exception { public void testBranchBatchReadWrite() throws Exception { FileStoreTable table = createFileStoreTable(); generateBranch(table); - writeBranchData(table); - List splits = toSplits(table.newSnapshotReader(BRANCH_NAME).read().dataSplits()); - TableRead read = table.newRead(); + FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME); + writeBranchData(tableBranch); + List splits = + toSplits(tableBranch.newSnapshotReader(BRANCH_NAME).read().dataSplits()); + TableRead read = tableBranch.newRead(); assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING)) .isEqualTo( Collections.singletonList( @@ -328,15 +331,18 @@ public void testStreamingReadWrite() throws Exception { public void testBranchStreamingReadWrite() throws Exception { FileStoreTable table = createFileStoreTable(); generateBranch(table); - writeBranchData(table); + + FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME); + writeBranchData(tableBranch); List splits = toSplits( - table.newSnapshotReader(BRANCH_NAME) + tableBranch + .newSnapshotReader(BRANCH_NAME) .withMode(ScanMode.DELTA) .read() .dataSplits()); - TableRead read = table.newRead(); + TableRead read = tableBranch.newRead(); assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING)) .isEqualTo( Collections.singletonList( @@ -1645,4 +1651,29 @@ private FileStoreTable createFileStoreTable(Consumer configure, RowType "")); return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath), tablePath, tableSchema); } + + @Override + protected FileStoreTable createFileStoreTable(String branch, Consumer configure) + throws Exception { + return createFileStoreTable(branch, configure, ROW_TYPE); + } + + private FileStoreTable createFileStoreTable( + String branch, Consumer configure, RowType rowType) throws Exception { + Options options = new Options(); + options.set(CoreOptions.PATH, tablePath.toString()); + options.set(BUCKET, 1); + options.set(BRANCH, branch); + configure.accept(options); + TableSchema tableSchema = + SchemaUtils.forceCommit( + new SchemaManager(LocalFileIO.create(), tablePath), + new Schema( + rowType.getFields(), + Collections.singletonList("pt"), + Arrays.asList("pt", "a"), + options.toMap(), + "")); + return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath), tablePath, tableSchema); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java index a55bda9118a56..f5874bed7eac7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java @@ -502,12 +502,6 @@ public Optional latest() { .orElseThrow(IllegalStateException::new))); } - @Override - public Optional latest(String branchName) { - // for compatibility test - return latest(); - } - @Override public List listAll() { return new ArrayList<>(tableSchemas.values()); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java b/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java index 548dc7205f434..ee74a9e8f213c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java @@ -110,6 +110,31 @@ protected FileStoreTable createFileStoreTable(Consumer configure) throw return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath), tablePath, schema); } + @Override + protected FileStoreTable createFileStoreTable(String branch, Consumer configure) + throws Exception { + Options options = new Options(); + options.set(CoreOptions.BUCKET, 1); + options.set(CoreOptions.PATH, tablePath.toString()); + // Run with minimal memory to ensure a more intense preempt + // Currently a writer needs at least one page + int pages = 10; + options.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(pages * 1024)); + options.set(CoreOptions.PAGE_SIZE, new MemorySize(1024)); + options.set(CoreOptions.BRANCH, branch); + configure.accept(options); + TableSchema schema = + SchemaUtils.forceCommit( + new SchemaManager(LocalFileIO.create(), tablePath, branch), + new Schema( + ROW_TYPE.getFields(), + Collections.singletonList("pt"), + Arrays.asList("pt", "a"), + options.toMap(), + "")); + return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath), tablePath, schema); + } + @Override protected FileStoreTable overwriteTestFileStoreTable() throws Exception { Options conf = new Options(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java index 4c9f8ff9b55de..1617b4cc33278 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java @@ -58,9 +58,12 @@ private void createTableIfNeeded(Context context) { if (options.get(AUTO_CREATE)) { try { Path tablePath = CoreOptions.path(table.getOptions()); + String branch = CoreOptions.branch(table.getOptions()); SchemaManager schemaManager = new SchemaManager( - FileIO.get(tablePath, createCatalogContext(context)), tablePath); + FileIO.get(tablePath, createCatalogContext(context)), + tablePath, + branch); if (!schemaManager.latest().isPresent()) { schemaManager.createTable(FlinkCatalog.fromCatalogTable(table)); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java index 8e08824567ad3..22104159a4e45 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java @@ -42,7 +42,9 @@ protected OneInputStreamOperator createWriteOperator( @Override protected Committer.Factory createCommitterFactory( boolean streamingCheckpointEnabled) { - return (user, metricGroup) -> new StoreCommitter(table.newCommit(user), metricGroup); + return (user, metricGroup) -> + new StoreCommitter( + table.newCommit(user, table.coreOptions().branch()), metricGroup); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index fa4526897bcd5..42332d647ac77 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.sink; +import org.apache.paimon.CoreOptions; import org.apache.paimon.CoreOptions.ChangelogProducer; import org.apache.paimon.CoreOptions.TagCreationMode; import org.apache.paimon.manifest.ManifestCommittable; @@ -231,6 +232,8 @@ protected DataStreamSink doCommit(DataStream written, String com commitUser, createCommitterFactory(streamingCheckpointEnabled), createCommittableStateManager()); + + String branch = CoreOptions.branch(table.options()); if (Options.fromMap(table.options()).get(SINK_AUTO_TAG_FOR_SAVEPOINT)) { committerOperator = new AutoTagForSavepointCommitterOperator<>( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java index b812c04912b12..039d798c98539 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.sink; +import org.apache.paimon.CoreOptions; import org.apache.paimon.flink.VersionedSerializerWrapper; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.manifest.ManifestCommittableSerializer; @@ -48,7 +49,7 @@ protected Committer.Factory createCommitterFac // a restart. return (user, metricGroup) -> new StoreCommitter( - table.newCommit(user) + table.newCommit(user, CoreOptions.branch(table.options())) .withOverwrite(overwritePartition) .ignoreEmptyCommit(!streamingCheckpointEnabled), metricGroup); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java index cf825cec7ba41..8be3b937c0b8e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.sink; +import org.apache.paimon.CoreOptions; import org.apache.paimon.append.AppendOnlyCompactionTask; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.table.FileStoreTable; @@ -50,7 +51,9 @@ protected OneInputStreamOperator createWr @Override protected Committer.Factory createCommitterFactory( boolean streamingCheckpointEnabled) { - return (s, metricGroup) -> new StoreCommitter(table.newCommit(s), metricGroup); + return (s, metricGroup) -> + new StoreCommitter( + table.newCommit(s, CoreOptions.branch(table.options())), metricGroup); } @Override diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java index 95defcd4253d4..61a464c2f1db6 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java @@ -259,6 +259,22 @@ public void testDynamicOptions() throws Exception { assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(2)); } + @Test + public void testReadWriteBranch() throws Exception { + // create table + sql("CREATE TABLE T (id INT)"); + // insert data + batchSql("INSERT INTO T VALUES (1)"); + // create tag + paimonTable("T").createTag("tag1", 1); + // create branch + paimonTable("T").createBranch("branch1", "tag1"); + // insert data to branch + batchSql("INSERT INTO T/*+ OPTIONS('branch' = 'branch1') */ VALUES (2)"); + List rows = batchSql("select * from T /*+ OPTIONS('branch' = 'branch1') */"); + assertThat(rows).containsExactlyInAnyOrder(Row.of(2), Row.of(1)); + } + @Override protected List ddl() { return Arrays.asList( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java index e6fb350982d81..e2f3d8b50eb80 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java @@ -691,7 +691,8 @@ public void testTagsTable() throws Exception { List result = sql("SELECT tag_name, snapshot_id, schema_id, record_count FROM T$tags"); - assertThat(result).containsExactly(Row.of("tag1", 1L, 0L, 1L), Row.of("tag2", 2L, 0L, 2L)); + assertThat(result) + .containsExactlyInAnyOrder(Row.of("tag1", 1L, 0L, 1L), Row.of("tag2", 2L, 0L, 2L)); } @Test diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java index dff589e9295e1..5e9a5f109ec60 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java @@ -33,6 +33,7 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.utils.BlockingIterator; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.FailingFileIO; import org.apache.flink.api.common.functions.MapFunction; @@ -62,6 +63,7 @@ import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.util.CloseableIterator; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -76,6 +78,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.paimon.CoreOptions.BRANCH; import static org.apache.paimon.CoreOptions.BUCKET; import static org.apache.paimon.CoreOptions.FILE_FORMAT; import static org.apache.paimon.CoreOptions.PATH; @@ -124,11 +127,18 @@ public class FileStoreITCase extends AbstractTestBase { private final StreamExecutionEnvironment env; + protected static String branch; + public FileStoreITCase(boolean isBatch) { this.isBatch = isBatch; this.env = isBatch ? buildBatchEnv() : buildStreamEnv(); } + @BeforeAll + public static void before() { + branch = BranchManager.DEFAULT_MAIN_BRANCH; + } + @Parameters(name = "isBatch-{0}") public static List getVarSeg() { return Arrays.asList(true, false); @@ -464,7 +474,7 @@ public static FileStoreTable buildFileStoreTable( ""); return retryArtificialException( () -> { - new SchemaManager(LocalFileIO.create(), tablePath).createTable(schema); + new SchemaManager(LocalFileIO.create(), tablePath, branch).createTable(schema); return FileStoreTableFactory.create(LocalFileIO.create(), options); }); } @@ -480,6 +490,7 @@ public static Options buildConfiguration(boolean noFail, String temporaryPath) { options.set(PATH, FailingFileIO.getFailingPath(failingName, temporaryPath)); } options.set(FILE_FORMAT, CoreOptions.FileFormatType.AVRO); + options.set(BRANCH, branch); return options; } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreWithBranchITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreWithBranchITCase.java new file mode 100644 index 0000000000000..8a2374b3c367b --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreWithBranchITCase.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.paimon.flink.sink.FixedBucketSink; +import org.apache.paimon.flink.source.ContinuousFileStoreSource; +import org.apache.paimon.flink.source.StaticFileStoreSource; + +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * ITCase for {@link StaticFileStoreSource}, {@link ContinuousFileStoreSource} and {@link + * FixedBucketSink}. + */ +@ExtendWith(ParameterizedTestExtension.class) +public class FileStoreWithBranchITCase extends FileStoreITCase { + public FileStoreWithBranchITCase(boolean isBatch) { + super(isBatch); + } + + @BeforeAll + public static void before() { + branch = "testBranch"; + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java index e495ad3da5d4c..229517e763a7d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java @@ -168,6 +168,61 @@ public void testLookupChangelog() throws Exception { innerTestChangelogProducing(Collections.singletonList("'changelog-producer' = 'lookup'")); } + @Test + public void testTableReadWriteBranch() throws Exception { + TableEnvironment sEnv = + tableEnvironmentBuilder() + .streamingMode() + .checkpointIntervalMs(ThreadLocalRandom.current().nextInt(900) + 100) + .parallelism(1) + .build(); + + sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse")); + sEnv.executeSql("USE CATALOG testCatalog"); + sEnv.executeSql( + "CREATE TABLE T2 ( k INT, v STRING, PRIMARY KEY (k) NOT ENFORCED ) " + + "WITH ( " + + "'bucket' = '2'" + + ")"); + + CloseableIterator it = sEnv.executeSql("SELECT * FROM T2").collect(); + + // insert data + sEnv.executeSql("INSERT INTO T2 VALUES (1, 'A')").await(); + // read initial data + List actual = new ArrayList<>(); + for (int i = 0; i < 1; i++) { + actual.add(it.next().toString()); + } + + assertThat(actual).containsExactlyInAnyOrder("+I[1, A]"); + + // create tag + sEnv.executeSql( + String.format("CALL sys.create_tag('%s.%s', 'tag2', 1, '5 d')", "default", "T2")); + // create branch + sEnv.executeSql( + String.format( + "CALL sys.create_branch('%s.%s', 'branch1', 'tag2')", "default", "T2")); + // alter table + sEnv.executeSql("ALTER TABLE T2 SET ('changelog-producer'='full-compaction')"); + + CloseableIterator branchIt = + sEnv.executeSql("select * from T2 /*+ OPTIONS('branch' = 'branch1') */").collect(); + // insert data to branch + sEnv.executeSql( + "INSERT INTO T2/*+ OPTIONS('branch' = 'branch1') */ VALUES (10, 'v10'),(11, 'v11'),(12, 'v12')") + .await(); + + // read initial data + List actualBranch = new ArrayList<>(); + for (int i = 0; i < 4; i++) { + actualBranch.add(branchIt.next().toString()); + } + assertThat(actualBranch) + .containsExactlyInAnyOrder("+I[1, A]", "+I[10, v10]", "+I[11, v11]", "+I[12, v12]"); + } + private void innerTestChangelogProducing(List options) throws Exception { TableEnvironment sEnv = tableEnvironmentBuilder() diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java index 579237756994d..62543e818d8fa 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java @@ -27,6 +27,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.util.AbstractTestBase; import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.FileStoreTable; @@ -35,6 +36,7 @@ import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BranchManager; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.config.TableConfigOptions; @@ -49,6 +51,9 @@ import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; +import static org.apache.paimon.CoreOptions.BRANCH; +import static org.apache.paimon.options.CatalogOptions.WAREHOUSE; + /** {@link Action} test base. */ public abstract class ActionITCaseBase extends AbstractTestBase { @@ -60,6 +65,7 @@ public abstract class ActionITCaseBase extends AbstractTestBase { protected StreamTableCommit commit; protected Catalog catalog; private long incrementalIdentifier; + protected String branch = BranchManager.DEFAULT_MAIN_BRANCH; @BeforeEach public void before() throws IOException { @@ -68,7 +74,12 @@ public void before() throws IOException { tableName = "test_table_" + UUID.randomUUID(); commitUser = UUID.randomUUID().toString(); incrementalIdentifier = 0; - catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse))); + Map options = new HashMap<>(); + options.put(WAREHOUSE.key(), new Path(warehouse).toUri().toString()); + if (!branch.equals(BranchManager.DEFAULT_MAIN_BRANCH)) { + options.put(BRANCH.key(), branch); + } + catalog = CatalogFactory.createCatalog(CatalogContext.create(Options.fromMap(options))); } @AfterEach diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 32d25e7db1997..12ac096d21067 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -333,7 +333,7 @@ public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExis throw new TableNotExistException(identifier); } Path tableLocation = getDataTableLocation(identifier); - return new SchemaManager(fileIO, tableLocation) + return new SchemaManager(fileIO, tableLocation, branchName) .latest() .orElseThrow( () -> new RuntimeException("There is no paimon table in " + tableLocation)); @@ -419,7 +419,7 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) { client.alter_table(fromDB, fromTableName, table); Path fromPath = getDataTableLocation(fromTable); - if (new SchemaManager(fileIO, fromPath).listAllIds().size() > 0) { + if (new SchemaManager(fileIO, fromPath, branchName).listAllIds().size() > 0) { // Rename the file system's table directory. Maintain consistency between tables in // the file system and tables in the Hive Metastore. Path toPath = getDataTableLocation(toTable); @@ -585,7 +585,7 @@ private FieldSchema convertToFieldSchema(DataField dataField) { } private SchemaManager schemaManager(Identifier identifier) { - return new SchemaManager(fileIO, getDataTableLocation(identifier)) + return new SchemaManager(fileIO, getDataTableLocation(identifier), branchName) .withLock(lock(identifier)); }