diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 48822af1bd8a2..48e955ca74c3b 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -32,6 +32,12 @@
Boolean |
Whether to create underlying storage when reading and writing the table. |
+
+ branch |
+ "main" |
+ String |
+ Specify branch name. |
+
bucket |
-1 |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 2440bdcc60909..28004f69f7511 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -112,6 +112,9 @@ public class CoreOptions implements Serializable {
.noDefaultValue()
.withDescription("The file path of this table in the filesystem.");
+ public static final ConfigOption BRANCH =
+ key("branch").stringType().defaultValue("main").withDescription("Specify branch name.");
+
public static final ConfigOption FILE_FORMAT =
key("file.format")
.enumType(FileFormatType.class)
@@ -1149,6 +1152,17 @@ public Path path() {
return path(options.toMap());
}
+ public String branch() {
+ return branch(options.toMap());
+ }
+
+ public static String branch(Map options) {
+ if (options.containsKey(BRANCH.key())) {
+ return options.get(BRANCH.key());
+ }
+ return BRANCH.defaultValue();
+ }
+
public static Path path(Map options) {
return new Path(options.get(PATH.key()));
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 07a73d3de2aa9..c31a85fbb3eb8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -104,7 +104,7 @@ public FileStorePathFactory pathFactory() {
@Override
public SnapshotManager snapshotManager() {
- return new SnapshotManager(fileIO, options.path());
+ return new SnapshotManager(fileIO, options.path(), options.branch());
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 99efac540e802..0a18ae36141d6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -39,7 +39,6 @@
import static org.apache.paimon.predicate.PredicateBuilder.and;
import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
-import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
/** {@link FileStore} for reading and writing {@link InternalRow}. */
public class AppendOnlyFileStore extends AbstractFileStore {
@@ -71,11 +70,7 @@ public BucketMode bucketMode() {
@Override
public AppendOnlyFileStoreScan newScan() {
- return newScan(DEFAULT_MAIN_BRANCH);
- }
-
- public AppendOnlyFileStoreScan newScan(String branchName) {
- return newScan(false, branchName);
+ return newScan(false);
}
@Override
@@ -106,12 +101,12 @@ public AppendOnlyFileStoreWrite newWrite(
rowType,
pathFactory(),
snapshotManager(),
- newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter),
+ newScan(true).withManifestCacheFilter(manifestFilter),
options,
tableName);
}
- private AppendOnlyFileStoreScan newScan(boolean forWrite, String branchName) {
+ private AppendOnlyFileStoreScan newScan(boolean forWrite) {
ScanBucketFilter bucketFilter =
new ScanBucketFilter(bucketKeyType) {
@Override
@@ -146,7 +141,6 @@ public void pushdown(Predicate predicate) {
options.bucket(),
forWrite,
options.scanManifestParallelism(),
- branchName,
options.fileIndexReadEnabled());
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 870feffdef685..3643b9e30743d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -63,8 +63,6 @@ public interface FileStore extends Serializable {
FileStoreScan newScan();
- FileStoreScan newScan(String branchName);
-
ManifestList.Factory manifestListFactory();
ManifestFile.Factory manifestFileFactory();
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index aeb30731dfccb..460d0c38f539b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -56,7 +56,6 @@
import static org.apache.paimon.predicate.PredicateBuilder.and;
import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
-import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** {@link FileStore} for querying and updating {@link KeyValue}s. */
@@ -112,11 +111,7 @@ public BucketMode bucketMode() {
@Override
public KeyValueFileStoreScan newScan() {
- return newScan(DEFAULT_MAIN_BRANCH);
- }
-
- public KeyValueFileStoreScan newScan(String branchName) {
- return newScan(false, branchName);
+ return newScan(false);
}
@Override
@@ -185,7 +180,7 @@ public KeyValueFileStoreWrite newWrite(String commitUser, ManifestCacheFilter ma
pathFactory(),
format2PathFactory(),
snapshotManager(),
- newScan(true, DEFAULT_MAIN_BRANCH).withManifestCacheFilter(manifestFilter),
+ newScan(true).withManifestCacheFilter(manifestFilter),
indexFactory,
deletionVectorsMaintainerFactory,
options,
@@ -209,7 +204,7 @@ private Map format2PathFactory() {
return pathFactoryMap;
}
- private KeyValueFileStoreScan newScan(boolean forWrite, String branchName) {
+ private KeyValueFileStoreScan newScan(boolean forWrite) {
ScanBucketFilter bucketFilter =
new ScanBucketFilter(bucketKeyType) {
@Override
@@ -240,7 +235,6 @@ public void pushdown(Predicate keyFilter) {
options.bucket(),
forWrite,
options.scanManifestParallelism(),
- branchName,
options.deletionVectorsEnabled(),
options.mergeEngine());
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 4a4fb04fd4e2d..9f4ce8bec8c9c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -36,6 +36,7 @@
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.system.SystemTableLoader;
+import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.StringUtils;
import javax.annotation.Nullable;
@@ -66,6 +67,7 @@ public abstract class AbstractCatalog implements Catalog {
protected final FileIO fileIO;
protected final Map tableDefaultOptions;
protected final Options catalogOptions;
+ protected final String branchName;
@Nullable protected final LineageMetaFactory lineageMetaFactory;
@@ -74,6 +76,7 @@ protected AbstractCatalog(FileIO fileIO) {
this.lineageMetaFactory = null;
this.tableDefaultOptions = new HashMap<>();
this.catalogOptions = new Options();
+ branchName = BranchManager.DEFAULT_MAIN_BRANCH;
}
protected AbstractCatalog(FileIO fileIO, Options options) {
@@ -83,6 +86,7 @@ protected AbstractCatalog(FileIO fileIO, Options options) {
this.tableDefaultOptions =
convertToPropertiesPrefixKey(options.toMap(), TABLE_DEFAULT_OPTION_PREFIX);
this.catalogOptions = options;
+ this.branchName = options.get(CoreOptions.BRANCH);
}
@Override
@@ -325,7 +329,12 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
}
return table;
} else {
- return getDataTable(identifier);
+ Table table = getDataTable(identifier);
+ // Override branch option
+ if (!branchName.equals(BranchManager.DEFAULT_MAIN_BRANCH)) {
+ table.options().put(CoreOptions.BRANCH.key(), branchName);
+ }
+ return table;
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index 1e4e5b0ebaaac..e4815a4de42c9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -119,7 +119,7 @@ public boolean tableExists(Identifier identifier) {
}
private boolean tableExists(Path tablePath) {
- return new SchemaManager(fileIO, tablePath).listAllIds().size() > 0;
+ return new SchemaManager(fileIO, tablePath, branchName).listAllIds().size() > 0;
}
@Override
@@ -153,7 +153,7 @@ private SchemaManager schemaManager(Identifier identifier) {
new RuntimeException(
"No lock context when lock is enabled."))))
.orElse(null);
- return new SchemaManager(fileIO, path)
+ return new SchemaManager(fileIO, path, branchName)
.withLock(catalogLock == null ? null : Lock.fromCatalog(catalogLock, identifier));
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
index 7e7718b5bee96..922d513ebac2b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
@@ -288,7 +288,7 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
updateTable(connections, catalogKey, fromTable, toTable);
Path fromPath = getDataTableLocation(fromTable);
- if (new SchemaManager(fileIO, fromPath).listAllIds().size() > 0) {
+ if (new SchemaManager(fileIO, fromPath, branchName).listAllIds().size() > 0) {
// Rename the file system's table directory. Maintain consistency between tables in
// the file system and tables in the Hive Metastore.
Path toPath = getDataTableLocation(toTable);
@@ -323,7 +323,7 @@ protected TableSchema getDataTableSchema(Identifier identifier) throws TableNotE
throw new TableNotExistException(identifier);
}
Path tableLocation = getDataTableLocation(identifier);
- return new SchemaManager(fileIO, tableLocation)
+ return new SchemaManager(fileIO, tableLocation, branchName)
.latest()
.orElseThrow(
() -> new RuntimeException("There is no paimon table in " + tableLocation));
@@ -374,7 +374,7 @@ public void close() throws Exception {
}
private SchemaManager getSchemaManager(Identifier identifier) {
- return new SchemaManager(fileIO, getDataTableLocation(identifier))
+ return new SchemaManager(fileIO, getDataTableLocation(identifier), branchName)
.withLock(lock(identifier));
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index a911b084742b4..d90731f31ae74 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -75,7 +75,6 @@ public abstract class AbstractFileStoreScan implements FileStoreScan {
private final SchemaManager schemaManager;
private final TableSchema schema;
protected final ScanBucketFilter bucketKeyFilter;
- private final String branchName;
private PartitionPredicate partitionFilter;
private Snapshot specifiedSnapshot = null;
@@ -98,8 +97,7 @@ public AbstractFileStoreScan(
ManifestList.Factory manifestListFactory,
int numOfBuckets,
boolean checkNumOfBuckets,
- Integer scanManifestParallelism,
- String branchName) {
+ Integer scanManifestParallelism) {
this.partitionType = partitionType;
this.bucketKeyFilter = bucketKeyFilter;
this.snapshotManager = snapshotManager;
@@ -111,7 +109,6 @@ public AbstractFileStoreScan(
this.checkNumOfBuckets = checkNumOfBuckets;
this.tableSchemas = new ConcurrentHashMap<>();
this.scanManifestParallelism = scanManifestParallelism;
- this.branchName = branchName;
}
@Override
@@ -368,7 +365,7 @@ private Pair> readManifests() {
if (manifests == null) {
snapshot =
specifiedSnapshot == null
- ? snapshotManager.latestSnapshot(branchName)
+ ? snapshotManager.latestSnapshot()
: specifiedSnapshot;
if (snapshot == null) {
manifests = Collections.emptyList();
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index baa2e9f4ab4ac..a716f997755ff 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -62,7 +62,6 @@ public AppendOnlyFileStoreScan(
int numOfBuckets,
boolean checkNumOfBuckets,
Integer scanManifestParallelism,
- String branchName,
boolean fileIndexReadEnabled) {
super(
partitionType,
@@ -74,8 +73,7 @@ public AppendOnlyFileStoreScan(
manifestListFactory,
numOfBuckets,
checkNumOfBuckets,
- scanManifestParallelism,
- branchName);
+ scanManifestParallelism);
this.fieldStatsConverters =
new FieldStatsConverters(sid -> scanTableSchema(sid).fields(), schema.id());
this.fileIndexReadEnabled = fileIndexReadEnabled;
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index e389a471c4ec5..61303879b8b93 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -251,7 +251,7 @@ public void commit(
// we can skip conflict checking in tryCommit method.
// This optimization is mainly used to decrease the number of times we read from
// files.
- latestSnapshot = snapshotManager.latestSnapshot(branchName);
+ latestSnapshot = snapshotManager.latestSnapshot();
if (latestSnapshot != null && checkAppendFiles) {
// it is possible that some partitions only have compact changes,
// so we need to contain all changes
@@ -654,7 +654,7 @@ private int tryCommit(
@Nullable String statsFileName) {
int cnt = 0;
while (true) {
- Snapshot latestSnapshot = snapshotManager.latestSnapshot(branchName);
+ Snapshot latestSnapshot = snapshotManager.latestSnapshot();
cnt++;
if (tryCommitOnce(
tableFiles,
@@ -754,7 +754,7 @@ public boolean tryCommitOnce(
Path newSnapshotPath =
branchName.equals(DEFAULT_MAIN_BRANCH)
? snapshotManager.snapshotPath(newSnapshotId)
- : snapshotManager.branchSnapshotPath(branchName, newSnapshotId);
+ : snapshotManager.snapshotPath(branchName, newSnapshotId);
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to commit table files to snapshot #" + newSnapshotId);
@@ -836,7 +836,7 @@ public boolean tryCommitOnce(
newIndexManifest = indexManifest;
}
- long latestSchemaId = schemaManager.latest(branchName).get().id();
+ long latestSchemaId = schemaManager.latest().get().id();
// write new stats or inherit from the previous snapshot
String statsFileName = null;
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index b4c4909aed3f8..83da1221ad9e4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -62,7 +62,6 @@ public KeyValueFileStoreScan(
int numOfBuckets,
boolean checkNumOfBuckets,
Integer scanManifestParallelism,
- String branchName,
boolean deletionVectorsEnabled,
MergeEngine mergeEngine) {
super(
@@ -75,8 +74,7 @@ public KeyValueFileStoreScan(
manifestListFactory,
numOfBuckets,
checkNumOfBuckets,
- scanManifestParallelism,
- branchName);
+ scanManifestParallelism);
this.fieldKeyStatsConverters =
new FieldStatsConverters(
sid -> keyValueFieldsExtractor.keyFields(scanTableSchema(sid)),
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index d94da91ef4c56..fa8f0b9e3289b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -44,6 +44,7 @@
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.StringUtils;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
@@ -78,12 +79,18 @@ public class SchemaManager implements Serializable {
private final FileIO fileIO;
private final Path tableRoot;
-
@Nullable private transient Lock lock;
+ private final String branch;
public SchemaManager(FileIO fileIO, Path tableRoot) {
+ this(fileIO, tableRoot, DEFAULT_MAIN_BRANCH);
+ }
+
+ /** Specify the default branch for data writing. */
+ public SchemaManager(FileIO fileIO, Path tableRoot, String branch) {
this.fileIO = fileIO;
this.tableRoot = tableRoot;
+ this.branch = StringUtils.isBlank(branch) ? DEFAULT_MAIN_BRANCH : branch;
}
public SchemaManager withLock(@Nullable Lock lock) {
@@ -91,48 +98,56 @@ public SchemaManager withLock(@Nullable Lock lock) {
return this;
}
- /** @return latest schema. */
public Optional latest() {
- return latest(DEFAULT_MAIN_BRANCH);
+ return latest(branch);
}
- public Optional latest(String branchName) {
- Path directoryPath =
- branchName.equals(DEFAULT_MAIN_BRANCH)
- ? schemaDirectory()
- : branchSchemaDirectory(branchName);
+ public Optional latest(String branch) {
try {
- return listVersionedFiles(fileIO, directoryPath, SCHEMA_PREFIX)
+ return listVersionedFiles(fileIO, schemaDirectory(branch), SCHEMA_PREFIX)
.reduce(Math::max)
- .map(this::schema);
+ .map(id -> schema(id, branch));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
- /** List all schema. */
public List listAll() {
- return listAllIds().stream().map(this::schema).collect(Collectors.toList());
+ return listAll(branch);
+ }
+
+ public List listAll(String branch) {
+ return listAllIds(branch).stream()
+ .map(id -> schema(id, branch))
+ .collect(Collectors.toList());
}
- /** List all schema IDs. */
public List listAllIds() {
+ return listAllIds(branch);
+ }
+
+ /** List all schema IDs. */
+ public List listAllIds(String branch) {
try {
- return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX)
+ return listVersionedFiles(fileIO, schemaDirectory(branch), SCHEMA_PREFIX)
.collect(Collectors.toList());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
- /** Create a new schema from {@link Schema}. */
public TableSchema createTable(Schema schema) throws Exception {
- return createTable(schema, false);
+ return createTable(branch, schema, false);
}
public TableSchema createTable(Schema schema, boolean ignoreIfExistsSame) throws Exception {
+ return createTable(branch, schema, ignoreIfExistsSame);
+ }
+
+ public TableSchema createTable(String branch, Schema schema, boolean ignoreIfExistsSame)
+ throws Exception {
while (true) {
- Optional latest = latest();
+ Optional latest = latest(branch);
if (latest.isPresent()) {
TableSchema oldSchema = latest.get();
boolean isSame =
@@ -166,25 +181,36 @@ public TableSchema createTable(Schema schema, boolean ignoreIfExistsSame) throws
options,
schema.comment());
- boolean success = commit(newSchema);
+ boolean success = commit(branch, newSchema);
if (success) {
return newSchema;
}
}
}
- /** Update {@link SchemaChange}s. */
public TableSchema commitChanges(SchemaChange... changes) throws Exception {
- return commitChanges(Arrays.asList(changes));
+ return commitChanges(branch, changes);
}
/** Update {@link SchemaChange}s. */
+ public TableSchema commitChanges(String branch, SchemaChange... changes) throws Exception {
+ return commitChanges(branch, Arrays.asList(changes));
+ }
+
public TableSchema commitChanges(List changes)
+ throws Catalog.ColumnAlreadyExistException, Catalog.TableNotExistException,
+ Catalog.ColumnNotExistException {
+ return commitChanges(branch, changes);
+ }
+
+ /** Update {@link SchemaChange}s. */
+ public TableSchema commitChanges(String branch, List changes)
throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException,
Catalog.ColumnNotExistException {
while (true) {
TableSchema schema =
- latest().orElseThrow(
+ latest(branch)
+ .orElseThrow(
() ->
new Catalog.TableNotExistException(
fromPath(tableRoot.toString(), true)));
@@ -376,7 +402,7 @@ public TableSchema commitChanges(List changes)
newComment);
try {
- boolean success = commit(newSchema);
+ boolean success = commit(branch, newSchema);
if (success) {
return newSchema;
}
@@ -387,8 +413,13 @@ public TableSchema commitChanges(List changes)
}
public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {
+ return mergeSchema(branch, rowType, allowExplicitCast);
+ }
+
+ public boolean mergeSchema(String branch, RowType rowType, boolean allowExplicitCast) {
TableSchema current =
- latest().orElseThrow(
+ latest(branch)
+ .orElseThrow(
() ->
new RuntimeException(
"It requires that the current schema to exist when calling 'mergeSchema'"));
@@ -397,7 +428,7 @@ public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {
return false;
} else {
try {
- return commit(update);
+ return commit(branch, update);
} catch (Exception e) {
throw new RuntimeException("Failed to commit the schema.", e);
}
@@ -470,9 +501,13 @@ private void updateColumn(
@VisibleForTesting
boolean commit(TableSchema newSchema) throws Exception {
- SchemaValidation.validateTableSchema(newSchema);
+ return commit(branch, newSchema);
+ }
- Path schemaPath = toSchemaPath(newSchema.id());
+ @VisibleForTesting
+ boolean commit(String branchName, TableSchema newSchema) throws Exception {
+ SchemaValidation.validateTableSchema(newSchema);
+ Path schemaPath = toSchemaPath(branchName, newSchema.id());
Callable callable = () -> fileIO.writeFileUtf8(schemaPath, newSchema.toString());
if (lock == null) {
return callable.call();
@@ -480,10 +515,15 @@ boolean commit(TableSchema newSchema) throws Exception {
return lock.runWithLock(callable);
}
- /** Read schema for schema id. */
public TableSchema schema(long id) {
+ return schema(id, branch);
+ }
+
+ /** Read schema for schema id. */
+ public TableSchema schema(long id, String branch) {
try {
- return JsonSerdeUtil.fromJson(fileIO.readFileUtf8(toSchemaPath(id)), TableSchema.class);
+ return JsonSerdeUtil.fromJson(
+ fileIO.readFileUtf8(toSchemaPath(branch, id)), TableSchema.class);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
@@ -501,18 +541,34 @@ private Path schemaDirectory() {
return new Path(tableRoot + "/schema");
}
+ public Path schemaDirectory(String branchName) {
+ return branchName.equals(DEFAULT_MAIN_BRANCH)
+ ? schemaDirectory()
+ : new Path(getBranchPath(tableRoot, branchName) + "/schema");
+ }
+
@VisibleForTesting
public Path toSchemaPath(long id) {
return new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + id);
}
- public Path branchSchemaDirectory(String branchName) {
- return new Path(getBranchPath(tableRoot, branchName) + "/schema");
+ public Path toSchemaPath(String branchName, long schemaId) {
+ return branchName.equals(DEFAULT_MAIN_BRANCH)
+ ? toSchemaPath(schemaId)
+ : new Path(
+ getBranchPath(tableRoot, branchName)
+ + "/schema/"
+ + SCHEMA_PREFIX
+ + schemaId);
}
- public Path branchSchemaPath(String branchName, long schemaId) {
- return new Path(
- getBranchPath(tableRoot, branchName) + "/schema/" + SCHEMA_PREFIX + schemaId);
+ /**
+ * Delete schema with specific id.
+ *
+ * @param schemaId the schema id to delete.
+ */
+ public void deleteSchema(long schemaId) {
+ deleteSchema(branch, schemaId);
}
/**
@@ -520,8 +576,8 @@ public Path branchSchemaPath(String branchName, long schemaId) {
*
* @param schemaId the schema id to delete.
*/
- public void deleteSchema(long schemaId) {
- fileIO.deleteQuietly(toSchemaPath(schemaId));
+ public void deleteSchema(String branch, long schemaId) {
+ fileIO.deleteQuietly(toSchemaPath(branch, schemaId));
}
public static void checkAlterTableOption(String key) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
index 398aa98d128d7..1438e81de2efb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
@@ -104,7 +104,8 @@ public TableSchema(
this.highestFieldId = highestFieldId;
this.partitionKeys = partitionKeys;
this.primaryKeys = primaryKeys;
- this.options = Collections.unmodifiableMap(options);
+ Objects.requireNonNull(options);
+ this.options = options;
this.comment = comment;
this.timeMillis = timeMillis;
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 1ca321d563a59..4f2cbd888597d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -71,7 +71,6 @@
import java.util.function.BiConsumer;
import static org.apache.paimon.CoreOptions.PATH;
-import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
@@ -139,13 +138,13 @@ public RowKeyExtractor createRowKeyExtractor() {
@Override
public SnapshotReader newSnapshotReader() {
- return newSnapshotReader(DEFAULT_MAIN_BRANCH);
+ return newSnapshotReader(CoreOptions.branch(options()));
}
@Override
public SnapshotReader newSnapshotReader(String branchName) {
return new SnapshotReaderImpl(
- store().newScan(branchName),
+ store().newScan(),
tableSchema,
coreOptions(),
snapshotManager(),
@@ -246,7 +245,8 @@ private FileStoreTable copyInternal(Map dynamicOptions, boolean
@Override
public FileStoreTable copyWithLatestSchema() {
Map options = tableSchema.options();
- SchemaManager schemaManager = new SchemaManager(fileIO(), location());
+ SchemaManager schemaManager =
+ new SchemaManager(fileIO(), location(), CoreOptions.branch(options()));
Optional optionalLatestSchema = schemaManager.latest();
if (optionalLatestSchema.isPresent()) {
TableSchema newTableSchema = optionalLatestSchema.get();
@@ -259,7 +259,7 @@ public FileStoreTable copyWithLatestSchema() {
}
protected SchemaManager schemaManager() {
- return new SchemaManager(fileIO(), path);
+ return new SchemaManager(fileIO(), path, CoreOptions.branch(options()));
}
@Override
@@ -309,7 +309,7 @@ public ExpireSnapshots newExpireChangelog() {
@Override
public TableCommitImpl newCommit(String commitUser) {
// Compatibility with previous design, the main branch is written by default
- return newCommit(commitUser, DEFAULT_MAIN_BRANCH);
+ return newCommit(commitUser, CoreOptions.branch(options()));
}
public TableCommitImpl newCommit(String commitUser, String branchName) {
@@ -561,7 +561,7 @@ public void rollbackTo(String tagName) {
@Override
public TagManager tagManager() {
- return new TagManager(fileIO, path);
+ return new TagManager(fileIO, path, CoreOptions.branch(options()));
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
index 6382535858676..e941a89e01b3f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
@@ -53,8 +53,9 @@ public static FileStoreTable create(FileIO fileIO, Path path) {
public static FileStoreTable create(FileIO fileIO, Options options) {
Path tablePath = CoreOptions.path(options);
+ String branchName = CoreOptions.branch(options.toMap());
TableSchema tableSchema =
- new SchemaManager(fileIO, tablePath)
+ new SchemaManager(fileIO, tablePath, branchName)
.latest()
.orElseThrow(
() ->
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index 4656deb676710..3f71812e5b86c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -101,13 +101,13 @@ public void createBranch(String branchName, String tagName) {
try {
// Copy the corresponding tag, snapshot and schema files into the branch directory
fileIO.copyFileUtf8(
- tagManager.tagPath(tagName), tagManager.branchTagPath(branchName, tagName));
+ tagManager.tagPath(tagName), tagManager.tagPath(branchName, tagName));
fileIO.copyFileUtf8(
snapshotManager.snapshotPath(snapshot.id()),
- snapshotManager.branchSnapshotPath(branchName, snapshot.id()));
+ snapshotManager.snapshotPath(branchName, snapshot.id()));
fileIO.copyFileUtf8(
schemaManager.toSchemaPath(snapshot.schemaId()),
- schemaManager.branchSchemaPath(branchName, snapshot.schemaId()));
+ schemaManager.toSchemaPath(branchName, snapshot.schemaId()));
} catch (IOException e) {
throw new RuntimeException(
String.format(
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index dbbc8fffdc050..3287f92a74e9e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -63,10 +63,17 @@ public class SnapshotManager implements Serializable {
private final FileIO fileIO;
private final Path tablePath;
+ private final String branch;
public SnapshotManager(FileIO fileIO, Path tablePath) {
+ this(fileIO, tablePath, DEFAULT_MAIN_BRANCH);
+ }
+
+ /** Specify the default branch for data writing. */
+ public SnapshotManager(FileIO fileIO, Path tablePath, String branchName) {
this.fileIO = fileIO;
this.tablePath = tablePath;
+ this.branch = StringUtils.isBlank(branchName) ? DEFAULT_MAIN_BRANCH : branchName;
}
public FileIO fileIO() {
@@ -85,55 +92,74 @@ public Path changelogDirectory() {
return new Path(tablePath + "/changelog");
}
- public Path longLivedChangelogPath(long snapshotId) {
- return new Path(tablePath + "/changelog/" + CHANGELOG_PREFIX + snapshotId);
+ public Path changelogDirectory(String branchName) {
+ return branchName.equals(DEFAULT_MAIN_BRANCH)
+ ? changelogDirectory()
+ : new Path(getBranchPath(tablePath, branchName) + "/changelog");
}
- public Path snapshotPath(long snapshotId) {
- return new Path(tablePath + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId);
+ public Path longLivedChangelogPath(long snapshotId) {
+ return new Path(tablePath + "/changelog/" + CHANGELOG_PREFIX + snapshotId);
}
- public Path branchSnapshotDirectory(String branchName) {
- return new Path(getBranchPath(tablePath, branchName) + "/snapshot");
+ public Path longLivedChangelogPath(String branchName, long snapshotId) {
+ return branchName.equals(DEFAULT_MAIN_BRANCH)
+ ? longLivedChangelogPath(snapshotId)
+ : new Path(
+ getBranchPath(tablePath, branchName)
+ + "/changelog/"
+ + CHANGELOG_PREFIX
+ + snapshotId);
}
- public Path branchSnapshotPath(String branchName, long snapshotId) {
- return new Path(
- getBranchPath(tablePath, branchName) + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId);
+ public Path snapshotPath(long snapshotId) {
+ return new Path(tablePath + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId);
}
- public Path snapshotPathByBranch(String branchName, long snapshotId) {
+ public Path snapshotPath(String branchName, long snapshotId) {
return branchName.equals(DEFAULT_MAIN_BRANCH)
? snapshotPath(snapshotId)
- : branchSnapshotPath(branchName, snapshotId);
+ : new Path(
+ getBranchPath(tablePath, branchName)
+ + "/snapshot/"
+ + SNAPSHOT_PREFIX
+ + snapshotId);
}
- public Path snapshotDirByBranch(String branchName) {
+ public Path snapshotDirectory(String branchName) {
return branchName.equals(DEFAULT_MAIN_BRANCH)
? snapshotDirectory()
- : branchSnapshotDirectory(branchName);
+ : new Path(getBranchPath(tablePath, branchName) + "/snapshot");
}
public Snapshot snapshot(long snapshotId) {
- return snapshot(DEFAULT_MAIN_BRANCH, snapshotId);
+ return snapshot(snapshotId, branch);
}
public Changelog changelog(long snapshotId) {
- Path changelogPath = longLivedChangelogPath(snapshotId);
+ return changelog(snapshotId, branch);
+ }
+
+ public Changelog changelog(long snapshotId, String branch) {
+ Path changelogPath = longLivedChangelogPath(branch, snapshotId);
return Changelog.fromPath(fileIO, changelogPath);
}
public Changelog longLivedChangelog(long snapshotId) {
- return Changelog.fromPath(fileIO, longLivedChangelogPath(snapshotId));
+ return Changelog.fromPath(fileIO, longLivedChangelogPath(branch, snapshotId));
}
- public Snapshot snapshot(String branchName, long snapshotId) {
- Path snapshotPath = snapshotPathByBranch(branchName, snapshotId);
+ public Snapshot snapshot(long snapshotId, String branch) {
+ Path snapshotPath = snapshotPath(branch, snapshotId);
return Snapshot.fromPath(fileIO, snapshotPath);
}
public boolean snapshotExists(long snapshotId) {
- Path path = snapshotPath(snapshotId);
+ return snapshotExists(snapshotId, branch);
+ }
+
+ public boolean snapshotExists(long snapshotId, String branch) {
+ Path path = snapshotPath(branch, snapshotId);
try {
return fileIO.exists(path);
} catch (IOException e) {
@@ -144,7 +170,11 @@ public boolean snapshotExists(long snapshotId) {
}
public boolean longLivedChangelogExists(long snapshotId) {
- Path path = longLivedChangelogPath(snapshotId);
+ return longLivedChangelogExists(branch, snapshotId);
+ }
+
+ public boolean longLivedChangelogExists(String branch, long snapshotId) {
+ Path path = longLivedChangelogPath(branch, snapshotId);
try {
return fileIO.exists(path);
} catch (IOException e) {
@@ -155,75 +185,91 @@ public boolean longLivedChangelogExists(long snapshotId) {
}
public @Nullable Snapshot latestSnapshot() {
- return latestSnapshot(DEFAULT_MAIN_BRANCH);
+ return latestSnapshot(branch);
}
- public @Nullable Snapshot latestSnapshot(String branchName) {
- Long snapshotId = latestSnapshotId(branchName);
- return snapshotId == null ? null : snapshot(branchName, snapshotId);
+ public @Nullable Snapshot latestSnapshot(String branch) {
+ Long snapshotId = latestSnapshotId(branch);
+ return snapshotId == null ? null : snapshot(snapshotId, branch);
}
public @Nullable Long latestSnapshotId() {
- return latestSnapshotId(DEFAULT_MAIN_BRANCH);
+ return latestSnapshotId(branch);
}
- public @Nullable Long latestSnapshotId(String branchName) {
+ public @Nullable Long latestSnapshotId(String branch) {
try {
- return findLatest(snapshotDirByBranch(branchName), SNAPSHOT_PREFIX, this::snapshotPath);
+ return findLatest(snapshotDirectory(branch), SNAPSHOT_PREFIX, this::snapshotPath);
} catch (IOException e) {
throw new RuntimeException("Failed to find latest snapshot id", e);
}
}
public @Nullable Snapshot earliestSnapshot() {
- Long snapshotId = earliestSnapshotId();
- return snapshotId == null ? null : snapshot(snapshotId);
+ return earliestSnapshot(branch);
+ }
+
+ public @Nullable Snapshot earliestSnapshot(String branch) {
+ Long snapshotId = earliestSnapshotId(branch);
+ return snapshotId == null ? null : snapshot(snapshotId, branch);
}
public @Nullable Long earliestSnapshotId() {
- return earliestSnapshotId(DEFAULT_MAIN_BRANCH);
+ return earliestSnapshotId(branch);
+ }
+
+ public @Nullable Long earliestSnapshotId(String branchName) {
+ try {
+ return findEarliest(snapshotDirectory(branchName), SNAPSHOT_PREFIX, this::snapshotPath);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to find earliest snapshot id", e);
+ }
}
public @Nullable Long earliestLongLivedChangelogId() {
+ return earliestLongLivedChangelogId(branch);
+ }
+
+ public @Nullable Long earliestLongLivedChangelogId(String branch) {
try {
return findEarliest(
- changelogDirectory(), CHANGELOG_PREFIX, this::longLivedChangelogPath);
+ changelogDirectory(branch), CHANGELOG_PREFIX, this::longLivedChangelogPath);
} catch (IOException e) {
throw new RuntimeException("Failed to find earliest changelog id", e);
}
}
public @Nullable Long latestLongLivedChangelogId() {
+ return latestLongLivedChangelogId(branch);
+ }
+
+ public @Nullable Long latestLongLivedChangelogId(String branch) {
try {
- return findLatest(changelogDirectory(), CHANGELOG_PREFIX, this::longLivedChangelogPath);
+ return findLatest(
+ changelogDirectory(branch), CHANGELOG_PREFIX, this::longLivedChangelogPath);
} catch (IOException e) {
throw new RuntimeException("Failed to find latest changelog id", e);
}
}
public @Nullable Long latestChangelogId() {
- return latestSnapshotId();
+ return latestSnapshotId(branch);
}
- public @Nullable Long earliestSnapshotId(String branchName) {
- try {
- return findEarliest(
- snapshotDirByBranch(branchName), SNAPSHOT_PREFIX, this::snapshotPath);
- } catch (IOException e) {
- throw new RuntimeException("Failed to find earliest snapshot id", e);
- }
+ public @Nullable Long pickOrLatest(Predicate predicate) {
+ return pickOrLatest(branch, predicate);
}
- public @Nullable Long pickOrLatest(Predicate predicate) {
- Long latestId = latestSnapshotId();
- Long earliestId = earliestSnapshotId();
+ public @Nullable Long pickOrLatest(String branch, Predicate predicate) {
+ Long latestId = latestSnapshotId(branch);
+ Long earliestId = earliestSnapshotId(branch);
if (latestId == null || earliestId == null) {
return null;
}
for (long snapshotId = latestId; snapshotId >= earliestId; snapshotId--) {
- if (snapshotExists(snapshotId)) {
- Snapshot snapshot = snapshot(snapshotId);
+ if (snapshotExists(snapshotId, branch)) {
+ Snapshot snapshot = snapshot(snapshotId, branch);
if (predicate.test(snapshot)) {
return snapshot.id();
}
@@ -234,39 +280,47 @@ public boolean longLivedChangelogExists(long snapshotId) {
}
private Snapshot changelogOrSnapshot(long snapshotId) {
- if (longLivedChangelogExists(snapshotId)) {
- return changelog(snapshotId);
+ return changelogOrSnapshot(snapshotId, branch);
+ }
+
+ private Snapshot changelogOrSnapshot(long snapshotId, String branch) {
+ if (longLivedChangelogExists(branch, snapshotId)) {
+ return changelog(snapshotId, branch);
} else {
- return snapshot(snapshotId);
+ return snapshot(snapshotId, branch);
}
}
+ public @Nullable Long earlierThanTimeMills(long timestampMills, boolean startFromChangelog) {
+ return earlierThanTimeMills(branch, timestampMills, startFromChangelog);
+ }
+
/**
* Returns the latest snapshot earlier than the timestamp mills. A non-existent snapshot may be
* returned if all snapshots are equal to or later than the timestamp mills.
*/
- public @Nullable Long earlierThanTimeMills(long timestampMills, boolean startFromChangelog) {
- Long earliestSnapshot = earliestSnapshotId();
+ public @Nullable Long earlierThanTimeMills(
+ String branch, long timestampMills, boolean startFromChangelog) {
+ Long earliestSnapshot = earliestSnapshotId(branch);
Long earliest;
if (startFromChangelog) {
- Long earliestChangelog = earliestLongLivedChangelogId();
+ Long earliestChangelog = earliestLongLivedChangelogId(branch);
earliest = earliestChangelog == null ? earliestSnapshot : earliestChangelog;
} else {
earliest = earliestSnapshot;
}
- Long latest = latestSnapshotId();
-
+ Long latest = latestSnapshotId(branch);
if (earliest == null || latest == null) {
return null;
}
- if (changelogOrSnapshot(earliest).timeMillis() >= timestampMills) {
+ if (changelogOrSnapshot(earliest, branch).timeMillis() >= timestampMills) {
return earliest - 1;
}
while (earliest < latest) {
long mid = (earliest + latest + 1) / 2;
- if (changelogOrSnapshot(mid).timeMillis() < timestampMills) {
+ if (changelogOrSnapshot(mid, branch).timeMillis() < timestampMills) {
earliest = mid;
} else {
latest = mid - 1;
@@ -275,24 +329,28 @@ private Snapshot changelogOrSnapshot(long snapshotId) {
return earliest;
}
+ public @Nullable Snapshot earlierOrEqualTimeMills(long timestampMills) {
+ return earlierOrEqualTimeMills(branch, timestampMills);
+ }
+
/**
* Returns a {@link Snapshot} whoes commit time is earlier than or equal to given timestamp
* mills. If there is no such a snapshot, returns null.
*/
- public @Nullable Snapshot earlierOrEqualTimeMills(long timestampMills) {
- Long earliest = earliestSnapshotId();
- Long latest = latestSnapshotId();
+ public @Nullable Snapshot earlierOrEqualTimeMills(String branch, long timestampMills) {
+ Long earliest = earliestSnapshotId(branch);
+ Long latest = latestSnapshotId(branch);
if (earliest == null || latest == null) {
return null;
}
- if (snapshot(earliest).timeMillis() > timestampMills) {
+ if (snapshot(earliest, branch).timeMillis() > timestampMills) {
return null;
}
Snapshot finalSnapshot = null;
while (earliest <= latest) {
long mid = earliest + (latest - earliest) / 2; // Avoid overflow
- Snapshot snapshot = snapshot(mid);
+ Snapshot snapshot = snapshot(mid, branch);
long commitTime = snapshot.timeMillis();
if (commitTime > timestampMills) {
latest = mid - 1; // Search in the left half
@@ -365,31 +423,47 @@ private Snapshot changelogOrSnapshot(long snapshotId) {
}
public long snapshotCount() throws IOException {
- return listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX).count();
+ return snapshotCount(branch);
+ }
+
+ public long snapshotCount(String branch) throws IOException {
+ return listVersionedFiles(fileIO, snapshotDirectory(branch), SNAPSHOT_PREFIX).count();
}
public Iterator snapshots() throws IOException {
- return listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX)
- .map(this::snapshot)
+ return snapshots(branch);
+ }
+
+ public Iterator snapshots(String branch) throws IOException {
+ return listVersionedFiles(fileIO, snapshotDirectory(branch), SNAPSHOT_PREFIX)
+ .map(id -> snapshot(id, branch))
.sorted(Comparator.comparingLong(Snapshot::id))
.iterator();
}
public Iterator changelogs() throws IOException {
- return listVersionedFiles(fileIO, changelogDirectory(), CHANGELOG_PREFIX)
- .map(this::changelog)
+ return changelogs(branch);
+ }
+
+ public Iterator changelogs(String branch) throws IOException {
+ return listVersionedFiles(fileIO, changelogDirectory(branch), CHANGELOG_PREFIX)
+ .map(snapshotId -> changelog(snapshotId, branch))
.sorted(Comparator.comparingLong(Changelog::id))
.iterator();
}
+ public List safelyGetAllSnapshots() throws IOException {
+ return safelyGetAllSnapshots(branch);
+ }
+
/**
* If {@link FileNotFoundException} is thrown when reading the snapshot file, this snapshot may
* be deleted by other processes, so just skip this snapshot.
*/
- public List safelyGetAllSnapshots() throws IOException {
+ public List safelyGetAllSnapshots(String branch) throws IOException {
List paths =
- listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX)
- .map(this::snapshotPath)
+ listVersionedFiles(fileIO, snapshotDirectory(branch), SNAPSHOT_PREFIX)
+ .map(id -> snapshotPath(branch, id))
.collect(Collectors.toList());
List snapshots = new ArrayList<>();
@@ -404,9 +478,13 @@ public List safelyGetAllSnapshots() throws IOException {
}
public List safelyGetAllChangelogs() throws IOException {
+ return safelyGetAllChangelogs(branch);
+ }
+
+ public List safelyGetAllChangelogs(String branch) throws IOException {
List paths =
- listVersionedFiles(fileIO, changelogDirectory(), CHANGELOG_PREFIX)
- .map(this::longLivedChangelogPath)
+ listVersionedFiles(fileIO, changelogDirectory(branch), CHANGELOG_PREFIX)
+ .map(id -> longLivedChangelogPath(branch, id))
.collect(Collectors.toList());
List changelogs = new ArrayList<>();
@@ -426,7 +504,8 @@ public List safelyGetAllChangelogs() throws IOException {
* result.
*/
public List tryGetNonSnapshotFiles(Predicate fileStatusFilter) {
- return listPathWithFilter(snapshotDirectory(), fileStatusFilter, nonSnapshotFileFilter());
+ return listPathWithFilter(
+ snapshotDirectory(branch), fileStatusFilter, nonSnapshotFileFilter());
}
public List tryGetNonChangelogFiles(Predicate fileStatusFilter) {
@@ -470,18 +549,22 @@ private Predicate nonChangelogFileFilter() {
}
public Optional latestSnapshotOfUser(String user) {
- Long latestId = latestSnapshotId();
+ return latestSnapshotOfUser(branch, user);
+ }
+
+ public Optional latestSnapshotOfUser(String branch, String user) {
+ Long latestId = latestSnapshotId(branch);
if (latestId == null) {
return Optional.empty();
}
long earliestId =
Preconditions.checkNotNull(
- earliestSnapshotId(),
+ earliestSnapshotId(branch),
"Latest snapshot id is not null, but earliest snapshot id is null. "
+ "This is unexpected.");
for (long id = latestId; id >= earliestId; id--) {
- Snapshot snapshot = snapshot(id);
+ Snapshot snapshot = snapshot(id, branch);
if (user.equals(snapshot.commitUser())) {
return Optional.of(snapshot);
}
@@ -489,19 +572,24 @@ public Optional latestSnapshotOfUser(String user) {
return Optional.empty();
}
- /** Find the snapshot of the specified identifiers written by the specified user. */
public List findSnapshotsForIdentifiers(
@Nonnull String user, List identifiers) {
+ return findSnapshotsForIdentifiers(user, identifiers, branch);
+ }
+
+ /** Find the snapshot of the specified identifiers written by the specified user. */
+ public List findSnapshotsForIdentifiers(
+ @Nonnull String user, List identifiers, String branch) {
if (identifiers.isEmpty()) {
return Collections.emptyList();
}
- Long latestId = latestSnapshotId();
+ Long latestId = latestSnapshotId(branch);
if (latestId == null) {
return Collections.emptyList();
}
long earliestId =
Preconditions.checkNotNull(
- earliestSnapshotId(),
+ earliestSnapshotId(branch),
"Latest snapshot id is not null, but earliest snapshot id is null. "
+ "This is unexpected.");
@@ -509,7 +597,7 @@ public List findSnapshotsForIdentifiers(
List matchedSnapshots = new ArrayList<>();
Set remainingIdentifiers = new HashSet<>(identifiers);
for (long id = latestId; id >= earliestId && !remainingIdentifiers.isEmpty(); id--) {
- Snapshot snapshot = snapshot(id);
+ Snapshot snapshot = snapshot(id, branch);
if (user.equals(snapshot.commitUser())) {
if (remainingIdentifiers.remove(snapshot.commitIdentifier())) {
matchedSnapshots.add(snapshot);
@@ -523,7 +611,12 @@ public List findSnapshotsForIdentifiers(
}
public void commitChangelog(Changelog changelog, long id) throws IOException {
- fileIO.writeFileUtf8(longLivedChangelogPath(id), changelog.toJson());
+ fileIO.writeFileUtf8(longLivedChangelogPath(branch, id), changelog.toJson());
+ }
+
+ @Nullable
+ public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) {
+ return traversalSnapshotsFromLatestSafely(checker, branch);
}
/**
@@ -532,12 +625,12 @@ public void commitChangelog(Changelog changelog, long id) throws IOException {
* unreadable snapshots.
*/
@Nullable
- public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) {
- Long latestId = latestSnapshotId();
+ public Snapshot traversalSnapshotsFromLatestSafely(Filter checker, String branch) {
+ Long latestId = latestSnapshotId(branch);
if (latestId == null) {
return null;
}
- Long earliestId = earliestSnapshotId();
+ Long earliestId = earliestSnapshotId(branch);
if (earliestId == null) {
return null;
}
@@ -545,9 +638,9 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) {
for (long id = latestId; id >= earliestId; id--) {
Snapshot snapshot;
try {
- snapshot = snapshot(id);
+ snapshot = snapshot(id, branch);
} catch (Exception e) {
- Long newEarliestId = earliestSnapshotId();
+ Long newEarliestId = earliestSnapshotId(branch);
if (newEarliestId == null) {
return null;
}
@@ -582,7 +675,6 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) {
return snapshotId;
}
}
-
return findByListFiles(Math::max, dir, prefix);
}
@@ -602,7 +694,11 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) {
}
public Long readHint(String fileName) {
- return readHint(fileName, snapshotDirByBranch(DEFAULT_MAIN_BRANCH));
+ return readHint(fileName, branch);
+ }
+
+ public Long readHint(String fileName, String branch) {
+ return readHint(fileName, snapshotDirectory(branch));
}
public Long readHint(String fileName, Path dir) {
@@ -629,11 +725,11 @@ private Long findByListFiles(BinaryOperator reducer, Path dir, String pref
}
public void commitLatestHint(long snapshotId) throws IOException {
- commitLatestHint(snapshotId, DEFAULT_MAIN_BRANCH);
+ commitLatestHint(snapshotId, branch);
}
public void commitLatestHint(long snapshotId, String branchName) throws IOException {
- commitHint(snapshotId, LATEST, snapshotDirByBranch(branchName));
+ commitHint(snapshotId, LATEST, snapshotDirectory(branchName));
}
public void commitLongLivedChangelogLatestHint(long snapshotId) throws IOException {
@@ -645,11 +741,11 @@ public void commitLongLivedChangelogEarliestHint(long snapshotId) throws IOExcep
}
public void commitEarliestHint(long snapshotId) throws IOException {
- commitEarliestHint(snapshotId, DEFAULT_MAIN_BRANCH);
+ commitEarliestHint(snapshotId, branch);
}
public void commitEarliestHint(long snapshotId, String branchName) throws IOException {
- commitHint(snapshotId, EARLIEST, snapshotDirByBranch(branchName));
+ commitHint(snapshotId, EARLIEST, snapshotDirectory(branchName));
}
private void commitHint(long snapshotId, String fileName, Path dir) throws IOException {
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index 8b7818fed782e..634e9f760ae9e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -45,6 +45,7 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.BranchManager.getBranchPath;
import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -58,10 +59,17 @@ public class TagManager {
private final FileIO fileIO;
private final Path tablePath;
+ private final String branch;
public TagManager(FileIO fileIO, Path tablePath) {
+ this(fileIO, tablePath, DEFAULT_MAIN_BRANCH);
+ }
+
+ /** Specify the default branch for data writing. */
+ public TagManager(FileIO fileIO, Path tablePath, String branch) {
this.fileIO = fileIO;
this.tablePath = tablePath;
+ this.branch = StringUtils.isBlank(branch) ? DEFAULT_MAIN_BRANCH : branch;
}
/** Return the root Directory of tags. */
@@ -69,31 +77,49 @@ public Path tagDirectory() {
return new Path(tablePath + "/tag");
}
+ /** Return the root Directory of tags. */
+ public Path tagDirectory(String branch) {
+ return branch.equals(DEFAULT_MAIN_BRANCH)
+ ? tagDirectory()
+ : new Path(getBranchPath(tablePath, branch) + "/tag");
+ }
+
/** Return the path of a tag. */
public Path tagPath(String tagName) {
return new Path(tablePath + "/tag/" + TAG_PREFIX + tagName);
}
/** Return the path of a tag in branch. */
- public Path branchTagPath(String branchName, String tagName) {
- return new Path(getBranchPath(tablePath, branchName) + "/tag/" + TAG_PREFIX + tagName);
+ public Path tagPath(String branch, String tagName) {
+ return branch.equals(DEFAULT_MAIN_BRANCH)
+ ? tagPath(tagName)
+ : new Path(getBranchPath(tablePath, branch) + "/tag/" + TAG_PREFIX + tagName);
}
- /** Create a tag from given snapshot and save it in the storage. */
public void createTag(
Snapshot snapshot,
String tagName,
@Nullable Duration timeRetained,
List callbacks) {
+ createTag(snapshot, tagName, timeRetained, callbacks, branch);
+ }
+
+ /** Create a tag from given snapshot and save it in the storage. */
+ public void createTag(
+ Snapshot snapshot,
+ String tagName,
+ @Nullable Duration timeRetained,
+ List callbacks,
+ String branch) {
checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is blank.", tagName);
// skip create tag for the same snapshot of the same name.
- if (tagExists(tagName)) {
- Snapshot tagged = taggedSnapshot(tagName);
+ if (tagExists(branch, tagName)) {
+ Snapshot tagged = taggedSnapshot(branch, tagName);
Preconditions.checkArgument(
tagged.id() == snapshot.id(), "Tag name '%s' already exists.", tagName);
} else {
- Path newTagPath = tagPath(tagName);
+ Path newTagPath = tagPath(branch, tagName);
try {
fileIO.writeFileUtf8(
newTagPath,
@@ -121,10 +147,18 @@ public void createTag(
}
}
- /** Make sure the tagNames are ALL tags of one snapshot. */
public void deleteAllTagsOfOneSnapshot(
List tagNames, TagDeletion tagDeletion, SnapshotManager snapshotManager) {
- Snapshot taggedSnapshot = taggedSnapshot(tagNames.get(0));
+ deleteAllTagsOfOneSnapshot(tagNames, tagDeletion, snapshotManager, branch);
+ }
+
+ /** Make sure the tagNames are ALL tags of one snapshot. */
+ public void deleteAllTagsOfOneSnapshot(
+ List tagNames,
+ TagDeletion tagDeletion,
+ SnapshotManager snapshotManager,
+ String branch) {
+ Snapshot taggedSnapshot = taggedSnapshot(branch, tagNames.get(0));
List taggedSnapshots;
// skip file deletion if snapshot exists
@@ -133,11 +167,11 @@ public void deleteAllTagsOfOneSnapshot(
return;
} else {
// FileIO discovers tags by tag file, so we should read all tags before we delete tag
- taggedSnapshots = taggedSnapshots();
- tagNames.forEach(tagName -> fileIO.deleteQuietly(tagPath(tagName)));
+ taggedSnapshots = taggedSnapshots(branch);
+ tagNames.forEach(tagName -> fileIO.deleteQuietly(tagPath(branch, tagName)));
}
- doClean(taggedSnapshot, taggedSnapshots, snapshotManager, tagDeletion);
+ doClean(taggedSnapshot, taggedSnapshots, snapshotManager, tagDeletion, branch);
}
public void deleteTag(
@@ -145,21 +179,29 @@ public void deleteTag(
TagDeletion tagDeletion,
SnapshotManager snapshotManager,
List callbacks) {
+ deleteTag(tagName, tagDeletion, snapshotManager, callbacks, branch);
+ }
+
+ public void deleteTag(
+ String tagName,
+ TagDeletion tagDeletion,
+ SnapshotManager snapshotManager,
+ List callbacks,
+ String branch) {
checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is blank.", tagName);
- checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName);
+ checkArgument(tagExists(branch, tagName), "Tag '%s' doesn't exist.", tagName);
- Snapshot taggedSnapshot = taggedSnapshot(tagName);
+ Snapshot taggedSnapshot = taggedSnapshot(branch, tagName);
List taggedSnapshots;
// skip file deletion if snapshot exists
- if (snapshotManager.snapshotExists(taggedSnapshot.id())) {
- deleteTagMetaFile(tagName, callbacks);
+ if (snapshotManager.snapshotExists(taggedSnapshot.id(), branch)) {
+ deleteTagMetaFile(tagName, callbacks, branch);
return;
} else {
// FileIO discovers tags by tag file, so we should read all tags before we delete tag
SortedMap> tags = tags();
- deleteTagMetaFile(tagName, callbacks);
-
+ deleteTagMetaFile(tagName, callbacks, branch);
// skip data file clean if more than 1 tags are created based on this snapshot
if (tags.get(taggedSnapshot).size() > 1) {
return;
@@ -167,11 +209,15 @@ public void deleteTag(
taggedSnapshots = new ArrayList<>(tags.keySet());
}
- doClean(taggedSnapshot, taggedSnapshots, snapshotManager, tagDeletion);
+ doClean(taggedSnapshot, taggedSnapshots, snapshotManager, tagDeletion, branch);
}
private void deleteTagMetaFile(String tagName, List callbacks) {
- fileIO.deleteQuietly(tagPath(tagName));
+ deleteTagMetaFile(tagName, callbacks, branch);
+ }
+
+ private void deleteTagMetaFile(String tagName, List callbacks, String branch) {
+ fileIO.deleteQuietly(tagPath(branch, tagName));
try {
callbacks.forEach(callback -> callback.notifyDeletion(tagName));
} finally {
@@ -185,7 +231,8 @@ private void doClean(
Snapshot taggedSnapshot,
List taggedSnapshots,
SnapshotManager snapshotManager,
- TagDeletion tagDeletion) {
+ TagDeletion tagDeletion,
+ String branch) {
// collect skipping sets from the left neighbor tag and the nearest right neighbor (either
// the earliest snapshot or right neighbor tag)
List skippedSnapshots = new ArrayList<>();
@@ -196,7 +243,7 @@ private void doClean(
skippedSnapshots.add(taggedSnapshots.get(index - 1));
}
// the nearest right neighbor
- Snapshot right = snapshotManager.earliestSnapshot();
+ Snapshot right = snapshotManager.earliestSnapshot(branch);
if (index + 1 < taggedSnapshots.size()) {
Snapshot rightTag = taggedSnapshots.get(index + 1);
right = right.id() < rightTag.id() ? right : rightTag;
@@ -226,9 +273,9 @@ private void doClean(
taggedSnapshot, tagDeletion.manifestSkippingSet(skippedSnapshots));
}
- /** Check if a tag exists. */
- public boolean tagExists(String tagName) {
- Path path = tagPath(tagName);
+ /** Check if a branch tag exists. */
+ public boolean tagExists(String branch, String tagName) {
+ Path path = tagPath(branch, tagName);
try {
return fileIO.exists(path);
} catch (IOException e) {
@@ -239,11 +286,20 @@ public boolean tagExists(String tagName) {
}
}
- /** Get the tagged snapshot by name. */
+ /** Check if a tag exists. */
+ public boolean tagExists(String tagName) {
+ return tagExists(DEFAULT_MAIN_BRANCH, tagName);
+ }
+
+ /** Get the branch tagged snapshot by name. */
public Snapshot taggedSnapshot(String tagName) {
- checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName);
- // Trim to snapshot to avoid equals and compare snapshot.
- return Tag.fromPath(fileIO, tagPath(tagName)).trimToSnapshot();
+ return taggedSnapshot(DEFAULT_MAIN_BRANCH, tagName);
+ }
+
+ /** Get the tagged snapshot by name. */
+ public Snapshot taggedSnapshot(String branch, String tagName) {
+ checkArgument(tagExists(branch, tagName), "Tag '%s' doesn't exist.", tagName);
+ return Tag.fromPath(fileIO, tagPath(branch, tagName)).trimToSnapshot();
}
public long tagCount() {
@@ -259,11 +315,24 @@ public List taggedSnapshots() {
return new ArrayList<>(tags().keySet());
}
- /** Get all tagged snapshots with tag names sorted by snapshot id. */
+ /** Get all tagged snapshots sorted by snapshot id. */
+ public List taggedSnapshots(String branch) {
+ return new ArrayList<>(tags(branch).keySet());
+ }
+
+ /** Get all tagged snapshots with names sorted by snapshot id. */
+ public SortedMap> tags(String branch) {
+ return tags(branch, tagName -> true);
+ }
+
public SortedMap> tags() {
return tags(tagName -> true);
}
+ public SortedMap> tags(Predicate filter) {
+ return tags(DEFAULT_MAIN_BRANCH, filter);
+ }
+
/**
* Retrieves a sorted map of snapshots filtered based on a provided predicate. The predicate
* determines which tag names should be included in the result. Only snapshots with tag names
@@ -275,12 +344,12 @@ public SortedMap> tags() {
* name.
* @throws RuntimeException if an IOException occurs during retrieval of snapshots.
*/
- public SortedMap> tags(Predicate filter) {
+ public SortedMap> tags(String branch, Predicate filter) {
TreeMap> tags =
new TreeMap<>(Comparator.comparingLong(Snapshot::id));
try {
List paths =
- listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX)
+ listVersionedFileStatus(fileIO, tagDirectory(branch), TAG_PREFIX)
.map(FileStatus::getPath)
.collect(Collectors.toList());
@@ -325,11 +394,15 @@ public List> tagObjects() {
}
public List sortTagsOfOneSnapshot(List tagNames) {
+ return sortTagsOfOneSnapshot(branch, tagNames);
+ }
+
+ public List sortTagsOfOneSnapshot(String branch, List tagNames) {
return tagNames.stream()
.map(
name -> {
try {
- return fileIO.getFileStatus(tagPath(name));
+ return fileIO.getFileStatus(tagPath(branch, name));
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 6adc3aff04f8c..dd671136426f4 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -53,10 +53,12 @@
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.StringUtils;
import org.apache.paimon.utils.TagManager;
import org.slf4j.Logger;
@@ -98,6 +100,8 @@ public class TestFileStore extends KeyValueFileStore {
private long commitIdentifier;
+ private String branch;
+
private TestFileStore(
String root,
CoreOptions options,
@@ -106,10 +110,11 @@ private TestFileStore(
RowType valueType,
KeyValueFieldsExtractor keyValueFieldsExtractor,
MergeFunctionFactory mfFactory,
- TableSchema tableSchema) {
+ TableSchema tableSchema,
+ String branch) {
super(
FileIOFinder.find(new Path(root)),
- schemaManager(root, options),
+ schemaManager(root, options, branch),
tableSchema != null
? tableSchema
: new TableSchema(
@@ -137,10 +142,11 @@ private TestFileStore(
this.commitUser = UUID.randomUUID().toString();
this.commitIdentifier = 0L;
+ this.branch = branch;
}
- private static SchemaManager schemaManager(String root, CoreOptions options) {
- return new SchemaManager(FileIOFinder.find(new Path(root)), options.path());
+ private static SchemaManager schemaManager(String root, CoreOptions options, String branch) {
+ return new SchemaManager(FileIOFinder.find(new Path(root)), options.path(), branch);
}
public AbstractFileStoreWrite newWrite() {
@@ -331,7 +337,7 @@ public List commitDataImpl(
.write(kv);
}
- FileStoreCommit commit = newCommit(commitUser);
+ FileStoreCommit commit = newCommit(commitUser, branch);
ManifestCommittable committable =
new ManifestCommittable(
identifier == null ? commitIdentifier++ : identifier, watermark);
@@ -352,12 +358,12 @@ public List commitDataImpl(
}
SnapshotManager snapshotManager = snapshotManager();
- Long snapshotIdBeforeCommit = snapshotManager.latestSnapshotId();
+ Long snapshotIdBeforeCommit = snapshotManager.latestSnapshotId(branch);
if (snapshotIdBeforeCommit == null) {
snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
}
commitFunction.accept(commit, committable);
- Long snapshotIdAfterCommit = snapshotManager.latestSnapshotId();
+ Long snapshotIdAfterCommit = snapshotManager.latestSnapshotId(branch);
if (snapshotIdAfterCommit == null) {
snapshotIdAfterCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
}
@@ -497,18 +503,18 @@ public void assertCleaned() throws IOException {
// - latest should < true_latest
// - earliest should < true_earliest
SnapshotManager snapshotManager = snapshotManager();
- Path snapshotDir = snapshotManager.snapshotDirectory();
+ Path snapshotDir = snapshotManager.snapshotDirectory(branch);
Path earliest = new Path(snapshotDir, SnapshotManager.EARLIEST);
Path latest = new Path(snapshotDir, SnapshotManager.LATEST);
if (actualFiles.remove(earliest)) {
- long earliestId = snapshotManager.readHint(SnapshotManager.EARLIEST);
+ long earliestId = snapshotManager.readHint(SnapshotManager.EARLIEST, branch);
fileIO.delete(earliest, false);
- assertThat(earliestId <= snapshotManager.earliestSnapshotId()).isTrue();
+ assertThat(earliestId <= snapshotManager.earliestSnapshotId(branch)).isTrue();
}
if (actualFiles.remove(latest)) {
- long latestId = snapshotManager.readHint(SnapshotManager.LATEST);
+ long latestId = snapshotManager.readHint(SnapshotManager.LATEST, branch);
fileIO.delete(latest, false);
- assertThat(latestId <= snapshotManager.latestSnapshotId()).isTrue();
+ assertThat(latestId <= snapshotManager.latestSnapshotId(branch)).isTrue();
}
Path changelogDir = snapshotManager.changelogDirectory();
Path earliestChangelog = new Path(changelogDir, SnapshotManager.EARLIEST);
@@ -540,7 +546,9 @@ private Set getFilesInUse() {
Set result = new HashSet<>();
SchemaManager schemaManager = new SchemaManager(fileIO, options.path());
- schemaManager.listAllIds().forEach(id -> result.add(schemaManager.toSchemaPath(id)));
+ schemaManager
+ .listAllIds(branch)
+ .forEach(id -> result.add(schemaManager.toSchemaPath(branch, id)));
SnapshotManager snapshotManager = snapshotManager();
Long latestSnapshotId = snapshotManager.latestSnapshotId();
@@ -689,6 +697,7 @@ public static class Builder {
private final TableSchema tableSchema;
private CoreOptions.ChangelogProducer changelogProducer;
+ private final String branch;
public Builder(
String format,
@@ -700,6 +709,30 @@ public Builder(
KeyValueFieldsExtractor keyValueFieldsExtractor,
MergeFunctionFactory mfFactory,
TableSchema tableSchema) {
+ this(
+ format,
+ root,
+ numBuckets,
+ partitionType,
+ keyType,
+ valueType,
+ keyValueFieldsExtractor,
+ mfFactory,
+ tableSchema,
+ BranchManager.DEFAULT_MAIN_BRANCH);
+ }
+
+ public Builder(
+ String format,
+ String root,
+ int numBuckets,
+ RowType partitionType,
+ RowType keyType,
+ RowType valueType,
+ KeyValueFieldsExtractor keyValueFieldsExtractor,
+ MergeFunctionFactory mfFactory,
+ TableSchema tableSchema,
+ String branch) {
this.format = format;
this.root = root;
this.numBuckets = numBuckets;
@@ -711,6 +744,7 @@ public Builder(
this.tableSchema = tableSchema;
this.changelogProducer = CoreOptions.ChangelogProducer.NONE;
+ this.branch = StringUtils.isEmpty(branch) ? BranchManager.DEFAULT_MAIN_BRANCH : branch;
}
public Builder changelogProducer(CoreOptions.ChangelogProducer changelogProducer) {
@@ -738,6 +772,7 @@ public TestFileStore build() {
// disable dynamic-partition-overwrite in FileStoreCommit layer test
conf.set(CoreOptions.DYNAMIC_PARTITION_OVERWRITE, false);
+ conf.set(CoreOptions.BRANCH, branch);
return new TestFileStore(
root,
@@ -747,7 +782,8 @@ public TestFileStore build() {
valueType,
keyValueFieldsExtractor,
mfFactory,
- tableSchema);
+ tableSchema,
+ branch);
}
}
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanBranchTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanBranchTest.java
new file mode 100644
index 0000000000000..d787065826e64
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanBranchTest.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.TestFileStore;
+import org.apache.paimon.TestKeyValueGenerator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link KeyValueFileStoreScan}. */
+public class KeyValueFileStoreScanBranchTest {
+
+ private static final int NUM_BUCKETS = 10;
+
+ private TestKeyValueGenerator gen;
+ @TempDir java.nio.file.Path tempDir;
+ private TestFileStore store;
+ private SnapshotManager snapshotManager;
+
+ private String branch = UUID.randomUUID() + "-branch";
+
+ @BeforeEach
+ public void beforeEach() throws Exception {
+ gen = new TestKeyValueGenerator();
+ store =
+ new TestFileStore.Builder(
+ "avro",
+ tempDir.toString(),
+ NUM_BUCKETS,
+ TestKeyValueGenerator.DEFAULT_PART_TYPE,
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+ TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR,
+ DeduplicateMergeFunction.factory(),
+ null,
+ branch)
+ .build();
+ snapshotManager = store.snapshotManager();
+
+ LocalFileIO localFile = LocalFileIO.create();
+
+ SchemaManager schemaManager =
+ new SchemaManager(localFile, new Path(tempDir.toUri()), branch);
+ schemaManager.createTable(
+ new Schema(
+ TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(),
+ TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
+ TestKeyValueGenerator.getPrimaryKeys(
+ TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED),
+ Collections.emptyMap(),
+ null));
+ }
+
+ @Test
+ public void testWithPartitionFilter() throws Exception {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ List data = generateData(random.nextInt(1000) + 1);
+ List partitions =
+ data.stream()
+ .map(kv -> gen.getPartition(kv))
+ .distinct()
+ .collect(Collectors.toList());
+ Snapshot snapshot = writeData(data);
+
+ Set wantedPartitions = new HashSet<>();
+ for (int i = random.nextInt(partitions.size() + 1); i > 0; i--) {
+ wantedPartitions.add(partitions.get(random.nextInt(partitions.size())));
+ }
+
+ FileStoreScan scan = store.newScan();
+ scan.withSnapshot(snapshot.id());
+ scan.withPartitionFilter(new ArrayList<>(wantedPartitions));
+
+ Map expected =
+ store.toKvMap(
+ wantedPartitions.isEmpty()
+ ? data
+ : data.stream()
+ .filter(
+ kv ->
+ wantedPartitions.contains(
+ gen.getPartition(kv)))
+ .collect(Collectors.toList()));
+ runTestExactMatch(scan, snapshot.id(), expected);
+ }
+
+ @Test
+ public void testWithKeyFilter() throws Exception {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ List data = generateData(random.nextInt(1000) + 1);
+ Snapshot snapshot = writeData(data);
+
+ int wantedShopId = data.get(random.nextInt(data.size())).key().getInt(0);
+
+ KeyValueFileStoreScan scan = store.newScan();
+ scan.withSnapshot(snapshot.id());
+ scan.withKeyFilter(
+ new PredicateBuilder(RowType.of(new IntType(false))).equal(0, wantedShopId));
+
+ Map expected =
+ store.toKvMap(
+ data.stream()
+ .filter(kv -> kv.key().getInt(0) == wantedShopId)
+ .collect(Collectors.toList()));
+ runTestContainsAll(scan, snapshot.id(), expected);
+ }
+
+ @Test
+ public void testWithValueFilter() throws Exception {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ List data = generateData(100, Math.abs(random.nextInt(1000)));
+ writeData(data, 0);
+ data = generateData(100, Math.abs(random.nextInt(1000)) + 1000);
+ writeData(data, 1);
+ data = generateData(100, Math.abs(random.nextInt(1000)) + 2000);
+ writeData(data, 2);
+ generateData(100, Math.abs(random.nextInt(1000)) + 3000);
+ Snapshot snapshot = writeData(data, 3);
+
+ KeyValueFileStoreScan scan = store.newScan();
+ scan.withSnapshot(snapshot.id());
+ List files = scan.plan().files();
+
+ scan = store.newScan();
+ scan.withSnapshot(snapshot.id());
+ scan.withValueFilter(
+ new PredicateBuilder(TestKeyValueGenerator.DEFAULT_ROW_TYPE)
+ .between(1, 1000, 2000));
+
+ List filesFiltered = scan.plan().files();
+
+ assertThat(files.size()).isEqualTo(4);
+ assertThat(filesFiltered.size()).isEqualTo(1);
+ }
+
+ @Test
+ public void testWithValuePartitionFilter() throws Exception {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ List data = generateData(100, Math.abs(random.nextInt(1000)));
+ writeData(data, "0", 0);
+ data = generateData(100, Math.abs(random.nextInt(1000)) + 1000);
+ writeData(data, "1", 0);
+ data = generateData(100, Math.abs(random.nextInt(1000)) + 2000);
+ writeData(data, "2", 0);
+ generateData(100, Math.abs(random.nextInt(1000)) + 3000);
+ Snapshot snapshot = writeData(data, "3", 0);
+
+ KeyValueFileStoreScan scan = store.newScan();
+ scan.withSnapshot(snapshot.id());
+ List files = scan.plan().files();
+
+ scan = store.newScan();
+ scan.withSnapshot(snapshot.id());
+ scan.withValueFilter(
+ new PredicateBuilder(TestKeyValueGenerator.DEFAULT_ROW_TYPE)
+ .between(1, 1000, 2000));
+
+ List filesFiltered = scan.plan().files();
+
+ assertThat(files.size()).isEqualTo(4);
+ assertThat(filesFiltered.size()).isEqualTo(1);
+ }
+
+ @Test
+ public void testWithBucket() throws Exception {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ List data = generateData(random.nextInt(1000) + 1);
+ Snapshot snapshot = writeData(data);
+
+ int wantedBucket = random.nextInt(NUM_BUCKETS);
+
+ FileStoreScan scan = store.newScan();
+ scan.withSnapshot(snapshot.id());
+ scan.withBucket(wantedBucket);
+
+ Map expected =
+ store.toKvMap(
+ data.stream()
+ .filter(kv -> getBucket(kv) == wantedBucket)
+ .collect(Collectors.toList()));
+ runTestExactMatch(scan, snapshot.id(), expected);
+ }
+
+ @Test
+ public void testWithSnapshot() throws Exception {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int numCommits = random.nextInt(10) + 1;
+ int wantedCommit = random.nextInt(numCommits);
+
+ List snapshots = new ArrayList<>();
+ List> allData = new ArrayList<>();
+ for (int i = 0; i < numCommits; i++) {
+ List data = generateData(random.nextInt(100) + 1);
+ snapshots.add(writeData(data));
+ allData.add(data);
+ }
+ long wantedSnapshot = snapshots.get(wantedCommit).id();
+
+ FileStoreScan scan = store.newScan();
+ scan.withSnapshot(wantedSnapshot);
+
+ Map expected =
+ store.toKvMap(
+ allData.subList(0, wantedCommit + 1).stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList()));
+ runTestExactMatch(scan, wantedSnapshot, expected);
+ }
+
+ @Test
+ public void testWithManifestList() throws Exception {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int numCommits = random.nextInt(10) + 1;
+ for (int i = 0; i < numCommits; i++) {
+ List data = generateData(random.nextInt(100) + 1);
+ writeData(data);
+ }
+
+ ManifestList manifestList = store.manifestListFactory().create();
+ long wantedSnapshotId = random.nextLong(snapshotManager.latestSnapshotId(branch)) + 1;
+ Snapshot wantedSnapshot = snapshotManager.snapshot(wantedSnapshotId);
+ List wantedManifests = wantedSnapshot.dataManifests(manifestList);
+
+ FileStoreScan scan = store.newScan();
+ scan.withManifestList(wantedManifests);
+
+ List expectedKvs = store.readKvsFromSnapshot(wantedSnapshotId);
+ gen.sort(expectedKvs);
+ Map expected = store.toKvMap(expectedKvs);
+ runTestExactMatch(scan, null, expected);
+ }
+
+ private void runTestExactMatch(
+ FileStoreScan scan, Long expectedSnapshotId, Map expected)
+ throws Exception {
+ Map actual = getActualKvMap(scan, expectedSnapshotId);
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ private void runTestContainsAll(
+ FileStoreScan scan, Long expectedSnapshotId, Map expected)
+ throws Exception {
+ Map actual = getActualKvMap(scan, expectedSnapshotId);
+ for (Map.Entry entry : expected.entrySet()) {
+ assertThat(actual).containsKey(entry.getKey());
+ assertThat(actual.get(entry.getKey())).isEqualTo(entry.getValue());
+ }
+ }
+
+ private Map getActualKvMap(FileStoreScan scan, Long expectedSnapshotId)
+ throws Exception {
+ FileStoreScan.Plan plan = scan.plan();
+ assertThat(plan.snapshotId()).isEqualTo(expectedSnapshotId);
+
+ List actualKvs = store.readKvsFromManifestEntries(plan.files(), false);
+ gen.sort(actualKvs);
+ return store.toKvMap(actualKvs);
+ }
+
+ private List generateData(int numRecords) {
+ List data = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ data.add(gen.next());
+ }
+ return data;
+ }
+
+ private List generateData(int numRecords, int hr) {
+ List data = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ data.add(gen.nextInsert("", hr, null, null, null));
+ }
+ return data;
+ }
+
+ private Snapshot writeData(List kvs) throws Exception {
+ List snapshots = store.commitData(kvs, gen::getPartition, this::getBucket);
+ return snapshots.get(snapshots.size() - 1);
+ }
+
+ private Snapshot writeData(List kvs, int bucket) throws Exception {
+ List snapshots = store.commitData(kvs, gen::getPartition, b -> bucket);
+ return snapshots.get(snapshots.size() - 1);
+ }
+
+ private Snapshot writeData(List kvs, String partition, int bucket) throws Exception {
+ BinaryRow binaryRow = new BinaryRow(2);
+ BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);
+ binaryRowWriter.writeString(0, BinaryString.fromString(partition));
+ binaryRowWriter.writeInt(1, 0);
+ binaryRowWriter.complete();
+ List snapshots = store.commitData(kvs, p -> binaryRow, b -> bucket);
+ return snapshots.get(snapshots.size() - 1);
+ }
+
+ private int getBucket(KeyValue kv) {
+ return (kv.key().hashCode() % NUM_BUCKETS + NUM_BUCKETS) % NUM_BUCKETS;
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
index 806c869f94f3c..44f502f41c705 100644
--- a/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/operation/MergeFileSplitReadTest.java
@@ -127,10 +127,7 @@ public List keyFields(TableSchema schema) {
@Override
public List valueFields(TableSchema schema) {
return Collections.singletonList(
- new DataField(
- 0,
- "count",
- new org.apache.paimon.types.BigIntType()));
+ new DataField(0, "count", new BigIntType()));
}
},
TestValueCountMergeFunction.factory());
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
index 6893649d532d1..a37802f33c9df 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
@@ -120,9 +120,12 @@ public void testBatchReadWrite() throws Exception {
public void testBranchBatchReadWrite() throws Exception {
FileStoreTable table = createFileStoreTable();
generateBranch(table);
- writeBranchData(table);
- List splits = toSplits(table.newSnapshotReader(BRANCH_NAME).read().dataSplits());
- TableRead read = table.newRead();
+
+ FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME);
+ writeBranchData(tableBranch);
+ List splits =
+ toSplits(tableBranch.newSnapshotReader(BRANCH_NAME).read().dataSplits());
+ TableRead read = tableBranch.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
.hasSameElementsAs(
Arrays.asList(
@@ -300,15 +303,18 @@ public void testBatchSplitOrderByPartition() throws Exception {
public void testBranchStreamingReadWrite() throws Exception {
FileStoreTable table = createFileStoreTable();
generateBranch(table);
- writeBranchData(table);
+
+ FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME);
+ writeBranchData(tableBranch);
List splits =
toSplits(
- table.newSnapshotReader(BRANCH_NAME)
+ tableBranch
+ .newSnapshotReader(BRANCH_NAME)
.withMode(ScanMode.DELTA)
.read()
.dataSplits());
- TableRead read = table.newRead();
+ TableRead read = tableBranch.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING))
.isEqualTo(
@@ -694,6 +700,25 @@ protected FileStoreTable createFileStoreTable(Consumer configure) throw
return new AppendOnlyFileStoreTable(FileIOFinder.find(tablePath), tablePath, tableSchema);
}
+ @Override
+ protected FileStoreTable createFileStoreTable(String branch, Consumer configure)
+ throws Exception {
+ Options conf = new Options();
+ conf.set(CoreOptions.PATH, tablePath.toString());
+ conf.set(CoreOptions.BRANCH, branch);
+ configure.accept(conf);
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), tablePath, branch),
+ new Schema(
+ ROW_TYPE.getFields(),
+ Collections.singletonList("pt"),
+ Collections.emptyList(),
+ conf.toMap(),
+ ""));
+ return new AppendOnlyFileStoreTable(FileIOFinder.find(tablePath), tablePath, tableSchema);
+ }
+
@Override
protected FileStoreTable overwriteTestFileStoreTable() throws Exception {
Options conf = new Options();
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index 138a30d5bf87b..bd7a4a2cebfcb 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -1019,22 +1019,21 @@ public void testCreateBranch() throws Exception {
// verify test-tag in test-branch is equal to snapshot 2
Snapshot branchTag =
Snapshot.fromPath(
- new TraceableFileIO(), tagManager.branchTagPath("test-branch", "test-tag"));
+ new TraceableFileIO(), tagManager.tagPath("test-branch", "test-tag"));
assertThat(branchTag.equals(snapshot2)).isTrue();
// verify snapshot in test-branch is equal to snapshot 2
SnapshotManager snapshotManager = new SnapshotManager(new TraceableFileIO(), tablePath);
Snapshot branchSnapshot =
Snapshot.fromPath(
- new TraceableFileIO(),
- snapshotManager.branchSnapshotPath("test-branch", 2));
+ new TraceableFileIO(), snapshotManager.snapshotPath("test-branch", 2));
assertThat(branchSnapshot.equals(snapshot2)).isTrue();
// verify schema in test-branch is equal to schema 0
SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(), tablePath);
TableSchema branchSchema =
SchemaManager.fromPath(
- new TraceableFileIO(), schemaManager.branchSchemaPath("test-branch", 0));
+ new TraceableFileIO(), schemaManager.toSchemaPath("test-branch", 0));
TableSchema schema0 = schemaManager.schema(0);
assertThat(branchSchema.equals(schema0)).isTrue();
}
@@ -1312,9 +1311,10 @@ public void testBranchWriteAndRead() throws Exception {
generateBranch(table);
+ FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME);
// Write data to branch1
- try (StreamTableWrite write = table.newWrite(commitUser);
- StreamTableCommit commit = table.newCommit(commitUser, BRANCH_NAME)) {
+ try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+ StreamTableCommit commit = tableBranch.newCommit(commitUser, BRANCH_NAME)) {
write.write(rowData(2, 20, 200L));
commit.commit(1, write.prepareCommit(false, 2));
}
@@ -1330,16 +1330,20 @@ public void testBranchWriteAndRead() throws Exception {
// Validate data in branch1
assertThat(
getResult(
- table.newRead(),
- toSplits(table.newSnapshotReader(BRANCH_NAME).read().dataSplits()),
+ tableBranch.newRead(),
+ toSplits(
+ tableBranch
+ .newSnapshotReader(BRANCH_NAME)
+ .read()
+ .dataSplits()),
BATCH_ROW_TO_STRING))
.containsExactlyInAnyOrder(
"0|0|0|binary|varbinary|mapKey:mapVal|multiset",
"2|20|200|binary|varbinary|mapKey:mapVal|multiset");
// Write two rows data to branch1 again
- try (StreamTableWrite write = table.newWrite(commitUser);
- StreamTableCommit commit = table.newCommit(commitUser, BRANCH_NAME)) {
+ try (StreamTableWrite write = tableBranch.newWrite(commitUser);
+ StreamTableCommit commit = tableBranch.newCommit(commitUser, BRANCH_NAME)) {
write.write(rowData(3, 30, 300L));
write.write(rowData(4, 40, 400L));
commit.commit(2, write.prepareCommit(false, 3));
@@ -1356,8 +1360,12 @@ public void testBranchWriteAndRead() throws Exception {
// Verify data in branch1
assertThat(
getResult(
- table.newRead(),
- toSplits(table.newSnapshotReader(BRANCH_NAME).read().dataSplits()),
+ tableBranch.newRead(),
+ toSplits(
+ tableBranch
+ .newSnapshotReader(BRANCH_NAME)
+ .read()
+ .dataSplits()),
BATCH_ROW_TO_STRING))
.containsExactlyInAnyOrder(
"0|0|0|binary|varbinary|mapKey:mapVal|multiset",
@@ -1446,6 +1454,14 @@ protected FileStoreTable createFileStoreTable(int numOfBucket) throws Exception
return createFileStoreTable(conf -> conf.set(BUCKET, numOfBucket));
}
+ protected FileStoreTable createFileStoreTable(String branch, int numOfBucket) throws Exception {
+ return createFileStoreTable(branch, conf -> conf.set(BUCKET, numOfBucket));
+ }
+
+ protected FileStoreTable createFileStoreTable(String branch) throws Exception {
+ return createFileStoreTable(branch, 1);
+ }
+
protected FileStoreTable createFileStoreTable() throws Exception {
return createFileStoreTable(1);
}
@@ -1453,6 +1469,9 @@ protected FileStoreTable createFileStoreTable() throws Exception {
protected abstract FileStoreTable createFileStoreTable(Consumer configure)
throws Exception;
+ protected abstract FileStoreTable createFileStoreTable(
+ String branch, Consumer configure) throws Exception;
+
protected abstract FileStoreTable overwriteTestFileStoreTable() throws Exception;
private static InternalRow overwriteRow(Object... values) {
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index 43d338eb8fc33..9c14bff9eed22 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -81,6 +81,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;
+import static org.apache.paimon.CoreOptions.BRANCH;
import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MAX;
import static org.apache.paimon.CoreOptions.CHANGELOG_NUM_RETAINED_MIN;
@@ -257,9 +258,11 @@ public void testBatchReadWrite() throws Exception {
public void testBranchBatchReadWrite() throws Exception {
FileStoreTable table = createFileStoreTable();
generateBranch(table);
- writeBranchData(table);
- List splits = toSplits(table.newSnapshotReader(BRANCH_NAME).read().dataSplits());
- TableRead read = table.newRead();
+ FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME);
+ writeBranchData(tableBranch);
+ List splits =
+ toSplits(tableBranch.newSnapshotReader(BRANCH_NAME).read().dataSplits());
+ TableRead read = tableBranch.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, BATCH_ROW_TO_STRING))
.isEqualTo(
Collections.singletonList(
@@ -328,15 +331,18 @@ public void testStreamingReadWrite() throws Exception {
public void testBranchStreamingReadWrite() throws Exception {
FileStoreTable table = createFileStoreTable();
generateBranch(table);
- writeBranchData(table);
+
+ FileStoreTable tableBranch = createFileStoreTable(BRANCH_NAME);
+ writeBranchData(tableBranch);
List splits =
toSplits(
- table.newSnapshotReader(BRANCH_NAME)
+ tableBranch
+ .newSnapshotReader(BRANCH_NAME)
.withMode(ScanMode.DELTA)
.read()
.dataSplits());
- TableRead read = table.newRead();
+ TableRead read = tableBranch.newRead();
assertThat(getResult(read, splits, binaryRow(1), 0, STREAMING_ROW_TO_STRING))
.isEqualTo(
Collections.singletonList(
@@ -1645,4 +1651,29 @@ private FileStoreTable createFileStoreTable(Consumer configure, RowType
""));
return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath), tablePath, tableSchema);
}
+
+ @Override
+ protected FileStoreTable createFileStoreTable(String branch, Consumer configure)
+ throws Exception {
+ return createFileStoreTable(branch, configure, ROW_TYPE);
+ }
+
+ private FileStoreTable createFileStoreTable(
+ String branch, Consumer configure, RowType rowType) throws Exception {
+ Options options = new Options();
+ options.set(CoreOptions.PATH, tablePath.toString());
+ options.set(BUCKET, 1);
+ options.set(BRANCH, branch);
+ configure.accept(options);
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), tablePath),
+ new Schema(
+ rowType.getFields(),
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "a"),
+ options.toMap(),
+ ""));
+ return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath), tablePath, tableSchema);
+ }
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
index a55bda9118a56..f5874bed7eac7 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java
@@ -502,12 +502,6 @@ public Optional latest() {
.orElseThrow(IllegalStateException::new)));
}
- @Override
- public Optional latest(String branchName) {
- // for compatibility test
- return latest();
- }
-
@Override
public List listAll() {
return new ArrayList<>(tableSchemas.values());
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java b/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java
index 548dc7205f434..ee74a9e8f213c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java
@@ -110,6 +110,31 @@ protected FileStoreTable createFileStoreTable(Consumer configure) throw
return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath), tablePath, schema);
}
+ @Override
+ protected FileStoreTable createFileStoreTable(String branch, Consumer configure)
+ throws Exception {
+ Options options = new Options();
+ options.set(CoreOptions.BUCKET, 1);
+ options.set(CoreOptions.PATH, tablePath.toString());
+ // Run with minimal memory to ensure a more intense preempt
+ // Currently a writer needs at least one page
+ int pages = 10;
+ options.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(pages * 1024));
+ options.set(CoreOptions.PAGE_SIZE, new MemorySize(1024));
+ options.set(CoreOptions.BRANCH, branch);
+ configure.accept(options);
+ TableSchema schema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), tablePath, branch),
+ new Schema(
+ ROW_TYPE.getFields(),
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "a"),
+ options.toMap(),
+ ""));
+ return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath), tablePath, schema);
+ }
+
@Override
protected FileStoreTable overwriteTestFileStoreTable() throws Exception {
Options conf = new Options();
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java
index 4c9f8ff9b55de..1617b4cc33278 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java
@@ -58,9 +58,12 @@ private void createTableIfNeeded(Context context) {
if (options.get(AUTO_CREATE)) {
try {
Path tablePath = CoreOptions.path(table.getOptions());
+ String branch = CoreOptions.branch(table.getOptions());
SchemaManager schemaManager =
new SchemaManager(
- FileIO.get(tablePath, createCatalogContext(context)), tablePath);
+ FileIO.get(tablePath, createCatalogContext(context)),
+ tablePath,
+ branch);
if (!schemaManager.latest().isPresent()) {
schemaManager.createTable(FlinkCatalog.fromCatalogTable(table));
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
index 8e08824567ad3..22104159a4e45 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
@@ -42,7 +42,9 @@ protected OneInputStreamOperator createWriteOperator(
@Override
protected Committer.Factory createCommitterFactory(
boolean streamingCheckpointEnabled) {
- return (user, metricGroup) -> new StoreCommitter(table.newCommit(user), metricGroup);
+ return (user, metricGroup) ->
+ new StoreCommitter(
+ table.newCommit(user, table.coreOptions().branch()), metricGroup);
}
@Override
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index fa4526897bcd5..42332d647ac77 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.TagCreationMode;
import org.apache.paimon.manifest.ManifestCommittable;
@@ -231,6 +232,8 @@ protected DataStreamSink> doCommit(DataStream written, String com
commitUser,
createCommitterFactory(streamingCheckpointEnabled),
createCommittableStateManager());
+
+ String branch = CoreOptions.branch(table.options());
if (Options.fromMap(table.options()).get(SINK_AUTO_TAG_FOR_SAVEPOINT)) {
committerOperator =
new AutoTagForSavepointCommitterOperator<>(
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
index b812c04912b12..039d798c98539 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestCommittableSerializer;
@@ -48,7 +49,7 @@ protected Committer.Factory createCommitterFac
// a restart.
return (user, metricGroup) ->
new StoreCommitter(
- table.newCommit(user)
+ table.newCommit(user, CoreOptions.branch(table.options()))
.withOverwrite(overwritePartition)
.ignoreEmptyCommit(!streamingCheckpointEnabled),
metricGroup);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
index cf825cec7ba41..8be3b937c0b8e 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.append.AppendOnlyCompactionTask;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.FileStoreTable;
@@ -50,7 +51,9 @@ protected OneInputStreamOperator createWr
@Override
protected Committer.Factory createCommitterFactory(
boolean streamingCheckpointEnabled) {
- return (s, metricGroup) -> new StoreCommitter(table.newCommit(s), metricGroup);
+ return (s, metricGroup) ->
+ new StoreCommitter(
+ table.newCommit(s, CoreOptions.branch(table.options())), metricGroup);
}
@Override
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
index 95defcd4253d4..61a464c2f1db6 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java
@@ -259,6 +259,22 @@ public void testDynamicOptions() throws Exception {
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(2));
}
+ @Test
+ public void testReadWriteBranch() throws Exception {
+ // create table
+ sql("CREATE TABLE T (id INT)");
+ // insert data
+ batchSql("INSERT INTO T VALUES (1)");
+ // create tag
+ paimonTable("T").createTag("tag1", 1);
+ // create branch
+ paimonTable("T").createBranch("branch1", "tag1");
+ // insert data to branch
+ batchSql("INSERT INTO T/*+ OPTIONS('branch' = 'branch1') */ VALUES (2)");
+ List rows = batchSql("select * from T /*+ OPTIONS('branch' = 'branch1') */");
+ assertThat(rows).containsExactlyInAnyOrder(Row.of(2), Row.of(1));
+ }
+
@Override
protected List ddl() {
return Arrays.asList(
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index e6fb350982d81..e2f3d8b50eb80 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -691,7 +691,8 @@ public void testTagsTable() throws Exception {
List result = sql("SELECT tag_name, snapshot_id, schema_id, record_count FROM T$tags");
- assertThat(result).containsExactly(Row.of("tag1", 1L, 0L, 1L), Row.of("tag2", 2L, 0L, 2L));
+ assertThat(result)
+ .containsExactlyInAnyOrder(Row.of("tag1", 1L, 0L, 1L), Row.of("tag2", 2L, 0L, 2L));
}
@Test
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
index dff589e9295e1..5e9a5f109ec60 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
@@ -33,6 +33,7 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.utils.BlockingIterator;
+import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.FailingFileIO;
import org.apache.flink.api.common.functions.MapFunction;
@@ -62,6 +63,7 @@
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -76,6 +78,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.paimon.CoreOptions.BRANCH;
import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.PATH;
@@ -124,11 +127,18 @@ public class FileStoreITCase extends AbstractTestBase {
private final StreamExecutionEnvironment env;
+ protected static String branch;
+
public FileStoreITCase(boolean isBatch) {
this.isBatch = isBatch;
this.env = isBatch ? buildBatchEnv() : buildStreamEnv();
}
+ @BeforeAll
+ public static void before() {
+ branch = BranchManager.DEFAULT_MAIN_BRANCH;
+ }
+
@Parameters(name = "isBatch-{0}")
public static List getVarSeg() {
return Arrays.asList(true, false);
@@ -464,7 +474,7 @@ public static FileStoreTable buildFileStoreTable(
"");
return retryArtificialException(
() -> {
- new SchemaManager(LocalFileIO.create(), tablePath).createTable(schema);
+ new SchemaManager(LocalFileIO.create(), tablePath, branch).createTable(schema);
return FileStoreTableFactory.create(LocalFileIO.create(), options);
});
}
@@ -480,6 +490,7 @@ public static Options buildConfiguration(boolean noFail, String temporaryPath) {
options.set(PATH, FailingFileIO.getFailingPath(failingName, temporaryPath));
}
options.set(FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
+ options.set(BRANCH, branch);
return options;
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreWithBranchITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreWithBranchITCase.java
new file mode 100644
index 0000000000000..8a2374b3c367b
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreWithBranchITCase.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.flink.sink.FixedBucketSink;
+import org.apache.paimon.flink.source.ContinuousFileStoreSource;
+import org.apache.paimon.flink.source.StaticFileStoreSource;
+
+import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * ITCase for {@link StaticFileStoreSource}, {@link ContinuousFileStoreSource} and {@link
+ * FixedBucketSink}.
+ */
+@ExtendWith(ParameterizedTestExtension.class)
+public class FileStoreWithBranchITCase extends FileStoreITCase {
+ public FileStoreWithBranchITCase(boolean isBatch) {
+ super(isBatch);
+ }
+
+ @BeforeAll
+ public static void before() {
+ branch = "testBranch";
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index e495ad3da5d4c..229517e763a7d 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -168,6 +168,61 @@ public void testLookupChangelog() throws Exception {
innerTestChangelogProducing(Collections.singletonList("'changelog-producer' = 'lookup'"));
}
+ @Test
+ public void testTableReadWriteBranch() throws Exception {
+ TableEnvironment sEnv =
+ tableEnvironmentBuilder()
+ .streamingMode()
+ .checkpointIntervalMs(ThreadLocalRandom.current().nextInt(900) + 100)
+ .parallelism(1)
+ .build();
+
+ sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
+ sEnv.executeSql("USE CATALOG testCatalog");
+ sEnv.executeSql(
+ "CREATE TABLE T2 ( k INT, v STRING, PRIMARY KEY (k) NOT ENFORCED ) "
+ + "WITH ( "
+ + "'bucket' = '2'"
+ + ")");
+
+ CloseableIterator it = sEnv.executeSql("SELECT * FROM T2").collect();
+
+ // insert data
+ sEnv.executeSql("INSERT INTO T2 VALUES (1, 'A')").await();
+ // read initial data
+ List actual = new ArrayList<>();
+ for (int i = 0; i < 1; i++) {
+ actual.add(it.next().toString());
+ }
+
+ assertThat(actual).containsExactlyInAnyOrder("+I[1, A]");
+
+ // create tag
+ sEnv.executeSql(
+ String.format("CALL sys.create_tag('%s.%s', 'tag2', 1, '5 d')", "default", "T2"));
+ // create branch
+ sEnv.executeSql(
+ String.format(
+ "CALL sys.create_branch('%s.%s', 'branch1', 'tag2')", "default", "T2"));
+ // alter table
+ sEnv.executeSql("ALTER TABLE T2 SET ('changelog-producer'='full-compaction')");
+
+ CloseableIterator branchIt =
+ sEnv.executeSql("select * from T2 /*+ OPTIONS('branch' = 'branch1') */").collect();
+ // insert data to branch
+ sEnv.executeSql(
+ "INSERT INTO T2/*+ OPTIONS('branch' = 'branch1') */ VALUES (10, 'v10'),(11, 'v11'),(12, 'v12')")
+ .await();
+
+ // read initial data
+ List actualBranch = new ArrayList<>();
+ for (int i = 0; i < 4; i++) {
+ actualBranch.add(branchIt.next().toString());
+ }
+ assertThat(actualBranch)
+ .containsExactlyInAnyOrder("+I[1, A]", "+I[10, v10]", "+I[11, v11]", "+I[12, v12]");
+ }
+
private void innerTestChangelogProducing(List options) throws Exception {
TableEnvironment sEnv =
tableEnvironmentBuilder()
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
index 579237756994d..62543e818d8fa 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
@@ -27,6 +27,7 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
@@ -35,6 +36,7 @@
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BranchManager;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
@@ -49,6 +51,9 @@
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.paimon.CoreOptions.BRANCH;
+import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
+
/** {@link Action} test base. */
public abstract class ActionITCaseBase extends AbstractTestBase {
@@ -60,6 +65,7 @@ public abstract class ActionITCaseBase extends AbstractTestBase {
protected StreamTableCommit commit;
protected Catalog catalog;
private long incrementalIdentifier;
+ protected String branch = BranchManager.DEFAULT_MAIN_BRANCH;
@BeforeEach
public void before() throws IOException {
@@ -68,7 +74,12 @@ public void before() throws IOException {
tableName = "test_table_" + UUID.randomUUID();
commitUser = UUID.randomUUID().toString();
incrementalIdentifier = 0;
- catalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(warehouse)));
+ Map options = new HashMap<>();
+ options.put(WAREHOUSE.key(), new Path(warehouse).toUri().toString());
+ if (!branch.equals(BranchManager.DEFAULT_MAIN_BRANCH)) {
+ options.put(BRANCH.key(), branch);
+ }
+ catalog = CatalogFactory.createCatalog(CatalogContext.create(Options.fromMap(options)));
}
@AfterEach
diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 32d25e7db1997..12ac096d21067 100644
--- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -333,7 +333,7 @@ public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExis
throw new TableNotExistException(identifier);
}
Path tableLocation = getDataTableLocation(identifier);
- return new SchemaManager(fileIO, tableLocation)
+ return new SchemaManager(fileIO, tableLocation, branchName)
.latest()
.orElseThrow(
() -> new RuntimeException("There is no paimon table in " + tableLocation));
@@ -419,7 +419,7 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
client.alter_table(fromDB, fromTableName, table);
Path fromPath = getDataTableLocation(fromTable);
- if (new SchemaManager(fileIO, fromPath).listAllIds().size() > 0) {
+ if (new SchemaManager(fileIO, fromPath, branchName).listAllIds().size() > 0) {
// Rename the file system's table directory. Maintain consistency between tables in
// the file system and tables in the Hive Metastore.
Path toPath = getDataTableLocation(toTable);
@@ -585,7 +585,7 @@ private FieldSchema convertToFieldSchema(DataField dataField) {
}
private SchemaManager schemaManager(Identifier identifier) {
- return new SchemaManager(fileIO, getDataTableLocation(identifier))
+ return new SchemaManager(fileIO, getDataTableLocation(identifier), branchName)
.withLock(lock(identifier));
}