From a4c2c4c19af5723a5d1a83f3a5528413ea54822f Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Tue, 5 Mar 2024 19:54:54 +0800 Subject: [PATCH] support flink write branch #2861 --- .../java/org/apache/paimon/CoreOptions.java | 17 +++ .../apache/paimon/options/CatalogOptions.java | 3 + .../paimon/catalog/AbstractCatalog.java | 5 + .../paimon/catalog/FileSystemCatalog.java | 2 +- .../apache/paimon/schema/SchemaManager.java | 78 ++++++++---- .../paimon/table/FileStoreTableFactory.java | 8 +- .../apache/paimon/utils/BranchManager.java | 2 +- .../apache/paimon/utils/SnapshotManager.java | 21 +++- .../org/apache/paimon/utils/TagManager.java | 112 ++++++++++++++---- .../paimon/table/FileStoreTableTestBase.java | 2 +- .../paimon/flink/FlinkTableFactory.java | 3 +- .../AutoTagForSavepointCommitterOperator.java | 30 ++++- .../sink/BatchWriteGeneratorTagOperator.java | 33 ++++-- .../paimon/flink/sink/CompactorSink.java | 4 +- .../apache/paimon/flink/sink/FlinkSink.java | 8 +- .../paimon/flink/sink/FlinkTableSink.java | 6 +- .../paimon/flink/sink/FlinkWriteSink.java | 2 +- .../sink/UnawareBucketCompactionSink.java | 3 +- .../org/apache/paimon/hive/HiveCatalog.java | 8 +- 19 files changed, 265 insertions(+), 82 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index b5b7ea432ac8e..fd23cd360ec8c 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -102,6 +102,12 @@ public class CoreOptions implements Serializable { .noDefaultValue() .withDescription("The file path of this table in the filesystem."); + public static final ConfigOption BRANCH_NAME = + key("branch-name") + .stringType() + .noDefaultValue() + .withDescription("Specify branch name."); + public static final ConfigOption FILE_FORMAT = key("file.format") .enumType(FileFormatType.class) @@ -1115,6 +1121,17 @@ public Path path() { return path(options.toMap()); } + public String branch() { + return branch(options.toMap()); + } + + public static String branch(Map options) { + if (options.containsKey(BRANCH_NAME.key())) { + return options.get(BRANCH_NAME.key()); + } + return "main"; + } + public static Path path(Map options) { return new Path(options.get(PATH.key())); } diff --git a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java index f00a35a750940..dd6fb28551eb9 100644 --- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java @@ -110,4 +110,7 @@ public class CatalogOptions { TextElement.text( "\"custom\": You can implement LineageMetaFactory and LineageMeta to store lineage information in customized storage.")) .build()); + + public static final ConfigOption BRANCH_NAME = + key("branch-name").stringType().defaultValue("main").withDescription("branch name"); } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index c69a72b0db6a5..115e3b3bcbde4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -36,6 +36,7 @@ import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.system.SystemTableLoader; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.StringUtils; import javax.annotation.Nullable; @@ -50,6 +51,7 @@ import java.util.UUID; import java.util.stream.Collectors; +import static org.apache.paimon.options.CatalogOptions.BRANCH_NAME; import static org.apache.paimon.options.CatalogOptions.LINEAGE_META; import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE; @@ -66,6 +68,7 @@ public abstract class AbstractCatalog implements Catalog { protected final FileIO fileIO; protected final Map tableDefaultOptions; protected final Options catalogOptions; + protected final String branchName; @Nullable protected final LineageMetaFactory lineageMetaFactory; @@ -74,6 +77,7 @@ protected AbstractCatalog(FileIO fileIO) { this.lineageMetaFactory = null; this.tableDefaultOptions = new HashMap<>(); this.catalogOptions = new Options(); + branchName = BranchManager.DEFAULT_MAIN_BRANCH; } protected AbstractCatalog(FileIO fileIO, Options options) { @@ -83,6 +87,7 @@ protected AbstractCatalog(FileIO fileIO, Options options) { this.tableDefaultOptions = convertToPropertiesPrefixKey(options.toMap(), TABLE_DEFAULT_OPTION_PREFIX); this.catalogOptions = options; + this.branchName = options.get(BRANCH_NAME); if (lockEnabled()) { checkArgument(options.contains(LOCK_TYPE), "No lock type when lock is enabled."); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index e71c92dc4007f..1f237f942415b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -118,7 +118,7 @@ public boolean tableExists(Identifier identifier) { } private boolean tableExists(Path tablePath) { - return new SchemaManager(fileIO, tablePath).listAllIds().size() > 0; + return new SchemaManager(fileIO, tablePath).listAllIds(branchName).size() > 0; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index a6d274688aea0..b551b3e25c829 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -96,12 +96,8 @@ public Optional latest() { } public Optional latest(String branchName) { - Path directoryPath = - branchName.equals(DEFAULT_MAIN_BRANCH) - ? schemaDirectory() - : branchSchemaDirectory(branchName); try { - return listVersionedFiles(fileIO, directoryPath, SCHEMA_PREFIX) + return listVersionedFiles(fileIO, schemaDirectory(branchName), SCHEMA_PREFIX) .reduce(Math::max) .map(this::schema); } catch (IOException e) { @@ -111,23 +107,36 @@ public Optional latest(String branchName) { /** List all schema. */ public List listAll() { - return listAllIds().stream().map(this::schema).collect(Collectors.toList()); + return listAll(DEFAULT_MAIN_BRANCH); + } + + public List listAll(String branchName) { + return listAllIds(branchName).stream().map(this::schema).collect(Collectors.toList()); } - /** List all schema IDs. */ public List listAllIds() { + return listAllIds(DEFAULT_MAIN_BRANCH); + } + + /** List all schema IDs. */ + public List listAllIds(String branchName) { try { - return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX) + return listVersionedFiles(fileIO, schemaDirectory(branchName), SCHEMA_PREFIX) .collect(Collectors.toList()); } catch (IOException e) { throw new UncheckedIOException(e); } } - /** Create a new schema from {@link Schema}. */ public TableSchema createTable(Schema schema) throws Exception { + return createTable(schema, DEFAULT_MAIN_BRANCH); + } + + /** Create a new schema from {@link Schema}. */ + public TableSchema createTable(Schema schema, String branchName) throws Exception { while (true) { - latest().ifPresent( + latest(branchName) + .ifPresent( latest -> { throw new IllegalStateException( "Schema in filesystem exists, please use updating," @@ -151,20 +160,24 @@ public TableSchema createTable(Schema schema) throws Exception { options, schema.comment()); - boolean success = commit(newSchema); + boolean success = commit(branchName, newSchema); if (success) { return newSchema; } } } - /** Update {@link SchemaChange}s. */ public TableSchema commitChanges(SchemaChange... changes) throws Exception { - return commitChanges(Arrays.asList(changes)); + return commitChanges(DEFAULT_MAIN_BRANCH, changes); + } + + /** Update {@link SchemaChange}s. */ + public TableSchema commitChanges(String branchName, SchemaChange... changes) throws Exception { + return commitChanges(branchName, Arrays.asList(changes)); } /** Update {@link SchemaChange}s. */ - public TableSchema commitChanges(List changes) + public TableSchema commitChanges(String branchName, List changes) throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException, Catalog.ColumnNotExistException { while (true) { @@ -361,7 +374,7 @@ public TableSchema commitChanges(List changes) newComment); try { - boolean success = commit(newSchema); + boolean success = commit(branchName, newSchema); if (success) { return newSchema; } @@ -455,9 +468,13 @@ private void updateColumn( @VisibleForTesting boolean commit(TableSchema newSchema) throws Exception { - SchemaValidation.validateTableSchema(newSchema); + return commit(DEFAULT_MAIN_BRANCH, newSchema); + } - Path schemaPath = toSchemaPath(newSchema.id()); + @VisibleForTesting + boolean commit(String branchName, TableSchema newSchema) throws Exception { + SchemaValidation.validateTableSchema(newSchema); + Path schemaPath = branchSchemaPath(branchName, newSchema.id()); Callable callable = () -> fileIO.writeFileUtf8(schemaPath, newSchema.toString()); if (lock == null) { return callable.call(); @@ -486,20 +503,35 @@ private Path schemaDirectory() { return new Path(tableRoot + "/schema"); } + public Path schemaDirectory(String branchName) { + return branchName.equals(DEFAULT_MAIN_BRANCH) + ? schemaDirectory() + : new Path(getBranchPath(tableRoot, branchName) + "/schema"); + } + @VisibleForTesting public Path toSchemaPath(long id) { return new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + id); } - public Path branchSchemaDirectory(String branchName) { - return new Path(getBranchPath(tableRoot, branchName) + "/schema"); - } - public Path branchSchemaPath(String branchName, long schemaId) { - return new Path( - getBranchPath(tableRoot, branchName) + "/schema/" + SCHEMA_PREFIX + schemaId); + return branchName.equals(DEFAULT_MAIN_BRANCH) + ? toSchemaPath(schemaId) + : new Path( + getBranchPath(tableRoot, branchName) + + "/schema/" + + SCHEMA_PREFIX + + schemaId); } + /** + * Delete schema with specific id. + * + * @param schemaId the schema id to delete. + */ + public void deleteSchema(String branchName, long schemaId) { + fileIO.deleteQuietly(branchSchemaPath(branchName, schemaId)); + } /** * Delete schema with specific id. * diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java index 6382535858676..53f28c6f98c90 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java @@ -26,10 +26,12 @@ import org.apache.paimon.options.Options; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.utils.BranchManager; import java.io.IOException; import java.io.UncheckedIOException; +import static org.apache.paimon.CoreOptions.BRANCH_NAME; import static org.apache.paimon.CoreOptions.PATH; /** Factory to create {@link FileStoreTable}. */ @@ -53,9 +55,13 @@ public static FileStoreTable create(FileIO fileIO, Path path) { public static FileStoreTable create(FileIO fileIO, Options options) { Path tablePath = CoreOptions.path(options); + String branchName = BranchManager.DEFAULT_MAIN_BRANCH; + if (options.contains(BRANCH_NAME)) { + branchName = options.get(BRANCH_NAME); + } TableSchema tableSchema = new SchemaManager(fileIO, tablePath) - .latest() + .latest(branchName) .orElseThrow( () -> new IllegalArgumentException( diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 6564bd4e56dcd..c28907e4e3496 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -99,7 +99,7 @@ public void createBranch(String branchName, String tagName) { try { // Copy the corresponding tag, snapshot and schema files into the branch directory fileIO.copyFileUtf8( - tagManager.tagPath(tagName), tagManager.branchTagPath(branchName, tagName)); + tagManager.tagPath(tagName), tagManager.tagPath(branchName, tagName)); fileIO.copyFileUtf8( snapshotManager.snapshotPath(snapshot.id()), snapshotManager.branchSnapshotPath(branchName, snapshot.id())); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index b330fc30389f4..c4275b42280c6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -113,7 +113,11 @@ public Snapshot snapshot(String branchName, long snapshotId) { } public boolean snapshotExists(long snapshotId) { - Path path = snapshotPath(snapshotId); + return snapshotExists(DEFAULT_MAIN_BRANCH, snapshotId); + } + + public boolean snapshotExists(String branchName, long snapshotId) { + Path path = snapshotPathByBranch(branchName, snapshotId); try { return fileIO.exists(path); } catch (IOException e) { @@ -149,6 +153,11 @@ public boolean snapshotExists(long snapshotId) { return snapshotId == null ? null : snapshot(snapshotId); } + public @Nullable Snapshot earliestSnapshot(String branchName) { + Long snapshotId = earliestSnapshotId(branchName); + return snapshotId == null ? null : snapshot(snapshotId); + } + public @Nullable Long earliestSnapshotId() { return earliestSnapshotId(DEFAULT_MAIN_BRANCH); } @@ -318,13 +327,17 @@ public Optional latestSnapshotOfUser(String user) { return Optional.empty(); } - /** Find the snapshot of the specified identifiers written by the specified user. */ public List findSnapshotsForIdentifiers( @Nonnull String user, List identifiers) { + return findSnapshotsForIdentifiers(user, identifiers, DEFAULT_MAIN_BRANCH); + } + /** Find the snapshot of the specified identifiers written by the specified user. */ + public List findSnapshotsForIdentifiers( + @Nonnull String user, List identifiers, String branchName) { if (identifiers.isEmpty()) { return Collections.emptyList(); } - Long latestId = latestSnapshotId(); + Long latestId = latestSnapshotId(branchName); if (latestId == null) { return Collections.emptyList(); } @@ -338,7 +351,7 @@ public List findSnapshotsForIdentifiers( List matchedSnapshots = new ArrayList<>(); Set remainingIdentifiers = new HashSet<>(identifiers); for (long id = latestId; id >= earliestId && !remainingIdentifiers.isEmpty(); id--) { - Snapshot snapshot = snapshot(id); + Snapshot snapshot = snapshot(branchName, id); if (user.equals(snapshot.commitUser())) { if (remainingIdentifiers.remove(snapshot.commitIdentifier())) { matchedSnapshots.add(snapshot); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index a29a3e151c766..ee80b970a2582 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -40,6 +40,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.BranchManager.getBranchPath; import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -64,22 +65,35 @@ public Path tagDirectory() { return new Path(tablePath + "/tag"); } + /** Return the root Directory of tags. */ + public Path tagDirectory(String branchName) { + return branchName.equals(DEFAULT_MAIN_BRANCH) + ? tagDirectory() + : new Path(getBranchPath(tablePath, branchName) + "/tag"); + } + /** Return the path of a tag. */ public Path tagPath(String tagName) { return new Path(tablePath + "/tag/" + TAG_PREFIX + tagName); } /** Return the path of a tag in branch. */ - public Path branchTagPath(String branchName, String tagName) { - return new Path(getBranchPath(tablePath, branchName) + "/tag/" + TAG_PREFIX + tagName); + public Path tagPath(String branchName, String tagName) { + return branchName.equals(DEFAULT_MAIN_BRANCH) + ? tagPath(tagName) + : new Path(getBranchPath(tablePath, branchName) + "/tag/" + TAG_PREFIX + tagName); } - /** Create a tag from given snapshot and save it in the storage. */ public void createTag(Snapshot snapshot, String tagName, List callbacks) { + createTag(snapshot, tagName, callbacks, DEFAULT_MAIN_BRANCH); + } + /** Create a tag from given snapshot and save it in the storage. */ + public void createTag( + Snapshot snapshot, String tagName, List callbacks, String branchName) { checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is blank.", tagName); - checkArgument(!tagExists(tagName), "Tag name '%s' already exists.", tagName); + checkArgument(!tagExists(branchName, tagName), "Tag name '%s' already exists.", tagName); - Path newTagPath = tagPath(tagName); + Path newTagPath = tagPath(branchName, tagName); try { fileIO.writeFileUtf8(newTagPath, snapshot.toJson()); } catch (IOException e) { @@ -102,8 +116,11 @@ public void createTag(Snapshot snapshot, String tagName, List callb /** Make sure the tagNames are ALL tags of one snapshot. */ public void deleteAllTagsOfOneSnapshot( - List tagNames, TagDeletion tagDeletion, SnapshotManager snapshotManager) { - Snapshot taggedSnapshot = taggedSnapshot(tagNames.get(0)); + List tagNames, + TagDeletion tagDeletion, + SnapshotManager snapshotManager, + String branchName) { + Snapshot taggedSnapshot = taggedSnapshot(branchName, tagNames.get(0)); List taggedSnapshots; // skip file deletion if snapshot exists @@ -112,29 +129,37 @@ public void deleteAllTagsOfOneSnapshot( return; } else { // FileIO discovers tags by tag file, so we should read all tags before we delete tag - taggedSnapshots = taggedSnapshots(); - tagNames.forEach(tagName -> fileIO.deleteQuietly(tagPath(tagName))); + taggedSnapshots = taggedSnapshots(branchName); + tagNames.forEach(tagName -> fileIO.deleteQuietly(tagPath(branchName, tagName))); } - doClean(taggedSnapshot, taggedSnapshots, snapshotManager, tagDeletion); + doClean(taggedSnapshot, taggedSnapshots, snapshotManager, tagDeletion, branchName); } public void deleteTag( String tagName, TagDeletion tagDeletion, SnapshotManager snapshotManager) { + deleteTag(tagName, tagDeletion, snapshotManager, DEFAULT_MAIN_BRANCH); + } + + public void deleteTag( + String tagName, + TagDeletion tagDeletion, + SnapshotManager snapshotManager, + String branchName) { checkArgument(!StringUtils.isBlank(tagName), "Tag name '%s' is blank.", tagName); - checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName); + checkArgument(tagExists(branchName, tagName), "Tag '%s' doesn't exist.", tagName); - Snapshot taggedSnapshot = taggedSnapshot(tagName); + Snapshot taggedSnapshot = taggedSnapshot(branchName, tagName); List taggedSnapshots; // skip file deletion if snapshot exists if (snapshotManager.snapshotExists(taggedSnapshot.id())) { - fileIO.deleteQuietly(tagPath(tagName)); + fileIO.deleteQuietly(tagPath(branchName, tagName)); return; } else { // FileIO discovers tags by tag file, so we should read all tags before we delete tag SortedMap> tags = tags(); - fileIO.deleteQuietly(tagPath(tagName)); + fileIO.deleteQuietly(tagPath(branchName, tagName)); // skip data file clean if more than 1 tags are created based on this snapshot if (tags.get(taggedSnapshot).size() > 1) { @@ -143,14 +168,15 @@ public void deleteTag( taggedSnapshots = new ArrayList<>(tags.keySet()); } - doClean(taggedSnapshot, taggedSnapshots, snapshotManager, tagDeletion); + doClean(taggedSnapshot, taggedSnapshots, snapshotManager, tagDeletion, branchName); } private void doClean( Snapshot taggedSnapshot, List taggedSnapshots, SnapshotManager snapshotManager, - TagDeletion tagDeletion) { + TagDeletion tagDeletion, + String branchName) { // collect skipping sets from the left neighbor tag and the nearest right neighbor (either // the earliest snapshot or right neighbor tag) List skippedSnapshots = new ArrayList<>(); @@ -161,7 +187,7 @@ private void doClean( skippedSnapshots.add(taggedSnapshots.get(index - 1)); } // the nearest right neighbor - Snapshot right = snapshotManager.earliestSnapshot(); + Snapshot right = snapshotManager.earliestSnapshot(branchName); if (index + 1 < taggedSnapshots.size()) { Snapshot rightTag = taggedSnapshots.get(index + 1); right = right.id() < rightTag.id() ? right : rightTag; @@ -191,9 +217,9 @@ private void doClean( taggedSnapshot, tagDeletion.manifestSkippingSet(skippedSnapshots)); } - /** Check if a tag exists. */ - public boolean tagExists(String tagName) { - Path path = tagPath(tagName); + /** Check if a branch tag exists. */ + public boolean tagExists(String branchName, String tagName) { + Path path = tagPath(branchName, tagName); try { return fileIO.exists(path); } catch (IOException e) { @@ -204,10 +230,28 @@ public boolean tagExists(String tagName) { } } - /** Get the tagged snapshot by name. */ + /** Check if a tag exists. */ + public boolean tagExists(String tagName) { + return tagExists(DEFAULT_MAIN_BRANCH, tagName); + } + + /** Get the branch tagged snapshot by name. */ public Snapshot taggedSnapshot(String tagName) { - checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName); - return Snapshot.fromPath(fileIO, tagPath(tagName)); + return taggedSnapshot(DEFAULT_MAIN_BRANCH, tagName); + } + + /** Get the tagged snapshot by name. */ + public Snapshot taggedSnapshot(String branchName, String tagName) { + checkArgument(tagExists(branchName, tagName), "Tag '%s' doesn't exist.", tagName); + return Snapshot.fromPath(fileIO, tagPath(branchName, tagName)); + } + + public long tagCount(String branchName) { + try { + return listVersionedFileStatus(fileIO, tagDirectory(branchName), TAG_PREFIX).count(); + } catch (IOException e) { + throw new RuntimeException(e); + } } public long tagCount() { @@ -222,12 +266,24 @@ public long tagCount() { public List taggedSnapshots() { return new ArrayList<>(tags().keySet()); } + /** Get all tagged snapshots sorted by snapshot id. */ + public List taggedSnapshots(String branchName) { + return new ArrayList<>(tags(branchName).keySet()); + } + + /** Get all tagged snapshots with names sorted by snapshot id. */ + public SortedMap> tags(String branchName) { + return tags(branchName, tagName -> true); + } /** Get all tagged snapshots with names sorted by snapshot id. */ public SortedMap> tags() { return tags(tagName -> true); } + public SortedMap> tags(Predicate filter) { + return tags(DEFAULT_MAIN_BRANCH, filter); + } /** * Retrieves a sorted map of snapshots filtered based on a provided predicate. The predicate * determines which tag names should be included in the result. Only snapshots with tag names @@ -239,12 +295,12 @@ public SortedMap> tags() { * name. * @throws RuntimeException if an IOException occurs during retrieval of snapshots. */ - public SortedMap> tags(Predicate filter) { + public SortedMap> tags(String branchName, Predicate filter) { TreeMap> tags = new TreeMap<>(Comparator.comparingLong(Snapshot::id)); try { List paths = - listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX) + listVersionedFileStatus(fileIO, tagDirectory(branchName), TAG_PREFIX) .map(FileStatus::getPath) .collect(Collectors.toList()); @@ -269,11 +325,15 @@ public SortedMap> tags(Predicate filter) { } public List sortTagsOfOneSnapshot(List tagNames) { + return sortTagsOfOneSnapshot(DEFAULT_MAIN_BRANCH, tagNames); + } + + public List sortTagsOfOneSnapshot(String branchName, List tagNames) { return tagNames.stream() .map( name -> { try { - return fileIO.getFileStatus(tagPath(name)); + return fileIO.getFileStatus(tagPath(branchName, name)); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index 4cd019568c41f..965baf99b8c48 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -949,7 +949,7 @@ public void testCreateBranch() throws Exception { // verify test-tag in test-branch is equal to snapshot 2 Snapshot branchTag = Snapshot.fromPath( - new TraceableFileIO(), tagManager.branchTagPath("test-branch", "test-tag")); + new TraceableFileIO(), tagManager.tagPath("test-branch", "test-tag")); assertThat(branchTag.equals(snapshot2)).isTrue(); // verify snapshot in test-branch is equal to snapshot 2 diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java index 4c9f8ff9b55de..3f63a7feec23a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java @@ -58,11 +58,12 @@ private void createTableIfNeeded(Context context) { if (options.get(AUTO_CREATE)) { try { Path tablePath = CoreOptions.path(table.getOptions()); + String branch = CoreOptions.branch(table.getOptions()); SchemaManager schemaManager = new SchemaManager( FileIO.get(tablePath, createCatalogContext(context)), tablePath); if (!schemaManager.latest().isPresent()) { - schemaManager.createTable(FlinkCatalog.fromCatalogTable(table)); + schemaManager.createTable(FlinkCatalog.fromCatalogTable(table), branch); } } catch (Exception e) { throw new RuntimeException(e); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java index da3425e9b0d35..03db8b6ab5543 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java @@ -21,6 +21,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.operation.TagDeletion; import org.apache.paimon.table.sink.TagCallback; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.SerializableSupplier; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -86,6 +87,8 @@ public class AutoTagForSavepointCommitterOperator private transient ListState identifiersForTagsState; + private transient String branchName; + public AutoTagForSavepointCommitterOperator( CommitterOperator commitOperator, SerializableSupplier snapshotManagerFactory, @@ -98,6 +101,23 @@ public AutoTagForSavepointCommitterOperator( this.tagDeletionFactory = tagDeletionFactory; this.callbacksSupplier = callbacksSupplier; this.identifiersForTags = new TreeSet<>(); + this.branchName = BranchManager.DEFAULT_MAIN_BRANCH; + } + + public AutoTagForSavepointCommitterOperator( + CommitterOperator commitOperator, + SerializableSupplier snapshotManagerFactory, + SerializableSupplier tagManagerFactory, + SerializableSupplier tagDeletionFactory, + SerializableSupplier> callbacksSupplier, + String branchName) { + this.commitOperator = commitOperator; + this.tagManagerFactory = tagManagerFactory; + this.snapshotManagerFactory = snapshotManagerFactory; + this.tagDeletionFactory = tagDeletionFactory; + this.callbacksSupplier = callbacksSupplier; + this.identifiersForTags = new TreeSet<>(); + this.branchName = branchName; } @Override @@ -155,19 +175,19 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception { commitOperator.notifyCheckpointAborted(checkpointId); identifiersForTags.remove(checkpointId); String tagName = SAVEPOINT_TAG_PREFIX + checkpointId; - if (tagManager.tagExists(tagName)) { - tagManager.deleteTag(tagName, tagDeletion, snapshotManager); + if (tagManager.tagExists(branchName, tagName)) { + tagManager.deleteTag(tagName, tagDeletion, snapshotManager, branchName); } } private void createTagForIdentifiers(List identifiers) { List snapshotForTags = snapshotManager.findSnapshotsForIdentifiers( - commitOperator.getCommitUser(), identifiers); + commitOperator.getCommitUser(), identifiers, branchName); for (Snapshot snapshot : snapshotForTags) { String tagName = SAVEPOINT_TAG_PREFIX + snapshot.commitIdentifier(); - if (!tagManager.tagExists(tagName)) { - tagManager.createTag(snapshot, tagName, callbacks); + if (!tagManager.tagExists(branchName, tagName)) { + tagManager.createTag(snapshot, tagName, callbacks, branchName); } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java index d65ab74140fe5..b1ea26421d31e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java @@ -21,6 +21,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.operation.TagDeletion; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -65,10 +66,22 @@ public class BatchWriteGeneratorTagOperator protected final FileStoreTable table; + protected final String branchName; + public BatchWriteGeneratorTagOperator( CommitterOperator commitOperator, FileStoreTable table) { this.table = table; this.commitOperator = commitOperator; + this.branchName = BranchManager.DEFAULT_MAIN_BRANCH; + } + + public BatchWriteGeneratorTagOperator( + CommitterOperator commitOperator, + FileStoreTable table, + String branchName) { + this.table = table; + this.commitOperator = commitOperator; + this.branchName = branchName; } @Override @@ -100,7 +113,7 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception { private void createTag() { SnapshotManager snapshotManager = table.snapshotManager(); - Snapshot snapshot = snapshotManager.latestSnapshot(); + Snapshot snapshot = snapshotManager.latestSnapshot(branchName); if (snapshot == null) { return; } @@ -113,11 +126,11 @@ private void createTag() { + localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")); try { // If the tag already exists, delete the tag - if (tagManager.tagExists(tagName)) { - tagManager.deleteTag(tagName, tagDeletion, snapshotManager); + if (tagManager.tagExists(branchName, tagName)) { + tagManager.deleteTag(tagName, tagDeletion, snapshotManager, branchName); } // Create a new tag - tagManager.createTag(snapshot, tagName, table.store().createTagCallbacks()); + tagManager.createTag(snapshot, tagName, table.store().createTagCallbacks(), branchName); // Expire the tag expireTag(); } catch (Exception e) { @@ -136,18 +149,20 @@ private void expireTag() { } TagManager tagManager = table.tagManager(); TagDeletion tagDeletion = table.store().newTagDeletion(); - long tagCount = tagManager.tagCount(); + long tagCount = tagManager.tagCount(branchName); while (tagCount > tagNumRetainedMax) { - for (List tagNames : tagManager.tags().values()) { + for (List tagNames : tagManager.tags(branchName).values()) { if (tagCount - tagNames.size() >= tagNumRetainedMax) { tagManager.deleteAllTagsOfOneSnapshot( - tagNames, tagDeletion, snapshotManager); + tagNames, tagDeletion, snapshotManager, branchName); tagCount = tagCount - tagNames.size(); } else { - List sortedTagNames = tagManager.sortTagsOfOneSnapshot(tagNames); + List sortedTagNames = + tagManager.sortTagsOfOneSnapshot(branchName, tagNames); for (String toBeDeleted : sortedTagNames) { - tagManager.deleteTag(toBeDeleted, tagDeletion, snapshotManager); + tagManager.deleteTag( + toBeDeleted, tagDeletion, snapshotManager, branchName); tagCount--; if (tagCount == tagNumRetainedMax) { break; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java index 8e08824567ad3..22104159a4e45 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java @@ -42,7 +42,9 @@ protected OneInputStreamOperator createWriteOperator( @Override protected Committer.Factory createCommitterFactory( boolean streamingCheckpointEnabled) { - return (user, metricGroup) -> new StoreCommitter(table.newCommit(user), metricGroup); + return (user, metricGroup) -> + new StoreCommitter( + table.newCommit(user, table.coreOptions().branch()), metricGroup); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index 545bd7f070729..4487dcd71abd9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -238,6 +238,8 @@ protected DataStreamSink doCommit(DataStream written, String com commitUser, createCommitterFactory(streamingCheckpointEnabled), createCommittableStateManager()); + + String branch = table.coreOptions().branch(); if (Options.fromMap(table.options()).get(SINK_AUTO_TAG_FOR_SAVEPOINT)) { committerOperator = new AutoTagForSavepointCommitterOperator<>( @@ -245,14 +247,16 @@ protected DataStreamSink doCommit(DataStream written, String com table::snapshotManager, table::tagManager, () -> table.store().newTagDeletion(), - () -> table.store().createTagCallbacks()); + () -> table.store().createTagCallbacks(), + branch); } if (conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.BATCH && table.coreOptions().tagCreationMode() == TagCreationMode.BATCH) { committerOperator = new BatchWriteGeneratorTagOperator<>( (CommitterOperator) committerOperator, - table); + table, + branch); } SingleOutputStreamOperator committed = written.transform( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java index 50bc45b752f8d..0d1ae452bf91b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java @@ -47,7 +47,11 @@ public FlinkTableSink( @Override public void executeTruncation() { FileStoreCommit commit = - ((FileStoreTable) table).store().newCommit(UUID.randomUUID().toString()); + ((FileStoreTable) table) + .store() + .newCommit( + UUID.randomUUID().toString(), + ((FileStoreTable) table).coreOptions().branch()); long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER; commit.purgeTable(identifier); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java index b812c04912b12..a57cf57c6aaf1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java @@ -48,7 +48,7 @@ protected Committer.Factory createCommitterFac // a restart. return (user, metricGroup) -> new StoreCommitter( - table.newCommit(user) + table.newCommit(user, table.coreOptions().branch()) .withOverwrite(overwritePartition) .ignoreEmptyCommit(!streamingCheckpointEnabled), metricGroup); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java index cf825cec7ba41..a95c89ded1382 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java @@ -50,7 +50,8 @@ protected OneInputStreamOperator createWr @Override protected Committer.Factory createCommitterFactory( boolean streamingCheckpointEnabled) { - return (s, metricGroup) -> new StoreCommitter(table.newCommit(s), metricGroup); + return (s, metricGroup) -> + new StoreCommitter(table.newCommit(s, table.coreOptions().branch()), metricGroup); } @Override diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 589e920370e02..3b48a844c24b1 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -330,7 +330,7 @@ public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExis } Path tableLocation = getDataTableLocation(identifier); return new SchemaManager(fileIO, tableLocation) - .latest() + .latest(branchName) .orElseThrow( () -> new RuntimeException("There is no paimon table in " + tableLocation)); } @@ -373,7 +373,7 @@ protected void createTableImpl(Identifier identifier, Schema schema) { // if changes on Hive fails there is no harm to perform the same changes to files again TableSchema tableSchema; try { - tableSchema = schemaManager(identifier).createTable(schema); + tableSchema = schemaManager(identifier).createTable(schema, branchName); } catch (Exception e) { throw new RuntimeException( "Failed to commit changes of table " @@ -440,7 +440,7 @@ protected void alterTableImpl(Identifier identifier, List changes) final SchemaManager schemaManager = schemaManager(identifier); // first commit changes to underlying files - TableSchema schema = schemaManager.commitChanges(changes); + TableSchema schema = schemaManager.commitChanges(branchName, changes); try { // sync to hive hms @@ -450,7 +450,7 @@ protected void alterTableImpl(Identifier identifier, List changes) client.alter_table( identifier.getDatabaseName(), identifier.getObjectName(), table, true); } catch (Exception te) { - schemaManager.deleteSchema(schema.id()); + schemaManager.deleteSchema(branchName, schema.id()); throw new RuntimeException(te); } }