diff --git a/docs/content/maintenance/manage-tags.md b/docs/content/maintenance/manage-tags.md
index 76afb7747a20..865e68b5e5b2 100644
--- a/docs/content/maintenance/manage-tags.md
+++ b/docs/content/maintenance/manage-tags.md
@@ -42,6 +42,7 @@ Paimon supports automatic creation of tags in writing job.
You can set `'tag.automatic-creation'` to `process-time` or `watermark`:
- `process-time`: Create TAG based on the time of the machine.
- `watermark`: Create TAG based on the watermark of the Sink input.
+- `batch`: In a batch processing scenario, a tag is generated after the current task is completed.
{{< hint info >}}
If you choose Watermark, you may need to specify the time zone of watermark, if watermark is not in the
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 82a8fc798b14..df46656ff325 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -303,7 +303,7 @@
metastore.tag-to-partition.preview |
none |
Enum |
- Whether to preview tag of generated snapshots in metastore. This allows the Hive engine to query specific tag before creation.
Possible values:- "none": No automatically created tags.
- "process-time": Based on the time of the machine, create TAG once the processing time passes period time plus delay.
- "watermark": Based on the watermark of the input, create TAG once the watermark passes period time plus delay.
|
+ Whether to preview tag of generated snapshots in metastore. This allows the Hive engine to query specific tag before creation.
Possible values:- "none": No automatically created tags.
- "process-time": Based on the time of the machine, create TAG once the processing time passes period time plus delay.
- "watermark": Based on the watermark of the input, create TAG once the watermark passes period time plus delay.
- "batch": In the batch processing scenario, the tag corresponding to the current snapshot is generated after the task is completed.
|
num-levels |
@@ -573,7 +573,7 @@
tag.automatic-creation |
none |
Enum |
- Whether to create tag automatically. And how to generate tags.
Possible values:- "none": No automatically created tags.
- "process-time": Based on the time of the machine, create TAG once the processing time passes period time plus delay.
- "watermark": Based on the watermark of the input, create TAG once the watermark passes period time plus delay.
|
+ Whether to create tag automatically. And how to generate tags.
Possible values:- "none": No automatically created tags.
- "process-time": Based on the time of the machine, create TAG once the processing time passes period time plus delay.
- "watermark": Based on the watermark of the input, create TAG once the watermark passes period time plus delay.
- "batch": In the batch processing scenario, the tag corresponding to the current snapshot is generated after the task is completed.
|
tag.callback.#.param |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index c14f7af5f3b2..5c2c660aebf7 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1971,8 +1971,10 @@ public enum TagCreationMode implements DescribedEnum {
"Based on the time of the machine, create TAG once the processing time passes period time plus delay."),
WATERMARK(
"watermark",
- "Based on the watermark of the input, create TAG once the watermark passes period time plus delay.");
-
+ "Based on the watermark of the input, create TAG once the watermark passes period time plus delay."),
+ BATCH(
+ "batch",
+ "In the batch processing scenario, the tag corresponding to the current snapshot is generated after the task is completed.");
private final String value;
private final String description;
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
index 5fad72a2aee9..6459cd4ba65f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java
@@ -182,7 +182,9 @@ protected StartingScanner createStartingScanner(boolean isStreaming) {
return new IncrementalTagStartingScanner(
snapshotManager,
incrementalBetween.getLeft(),
- incrementalBetween.getRight());
+ incrementalBetween.getRight(),
+ scanMode,
+ options.tagCreationMode());
}
} else {
return new IncrementalTimeStampStartingScanner(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
index 2cdf5bff9d26..c6c906c3473b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
@@ -18,43 +18,175 @@
package org.apache.paimon.table.source.snapshot;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
+import java.util.*;
+
/** {@link StartingScanner} for incremental changes by tag. */
public class IncrementalTagStartingScanner extends AbstractStartingScanner {
private final String start;
private final String end;
+ private final ScanMode scanMode;
+ private final CoreOptions.TagCreationMode tagCreationMode;
public IncrementalTagStartingScanner(
- SnapshotManager snapshotManager, String start, String end) {
+ SnapshotManager snapshotManager,
+ String start,
+ String end,
+ ScanMode scanMode,
+ CoreOptions.TagCreationMode tagCreationMode) {
super(snapshotManager);
this.start = start;
this.end = end;
- TagManager tagManager =
- new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath());
- Snapshot startingSnapshot = tagManager.taggedSnapshot(start);
- if (startingSnapshot != null) {
- this.startingSnapshotId = startingSnapshot.id();
- }
+ this.scanMode = scanMode;
+ this.tagCreationMode = tagCreationMode;
}
@Override
public Result scan(SnapshotReader reader) {
TagManager tagManager =
new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath());
- Snapshot tag1 = tagManager.taggedSnapshot(start);
- Snapshot tag2 = tagManager.taggedSnapshot(end);
-
- if (tag2.id() <= tag1.id()) {
- throw new IllegalArgumentException(
- String.format(
- "Tag end %s with snapshot id %s should be larger than tag start %s with snapshot id %s",
- end, tag2.id(), start, tag1.id()));
+
+ if (tagCreationMode == CoreOptions.TagCreationMode.BATCH) {
+ Snapshot tagStart;
+ Snapshot tagEnd;
+ try {
+ tagStart = tagManager.taggedSnapshot(start);
+ } catch (IllegalArgumentException e) {
+ tagStart = null;
+ }
+ try {
+ tagEnd = tagManager.taggedSnapshot(end);
+ } catch (IllegalArgumentException e) {
+ tagEnd = null;
+ }
+
+ if (tagStart == null && tagEnd == null) {
+ throw new NullPointerException(
+ "The tags of the incremental query cannot all be empty");
+ }
+ Snapshot tagResult = tagEnd == null ? tagStart : tagEnd;
+ Map, List> grouped = new HashMap<>();
+ // 兼容先知场景首次创建的情况
+ if (tagStart == null || tagEnd == null) {
+ List currentSplits = readSplits(reader, tagResult);
+ for (DataSplit split : currentSplits) {
+ grouped.computeIfAbsent(
+ Pair.of(split.partition(), split.bucket()),
+ k -> new ArrayList<>())
+ .addAll(split.dataFiles());
+ }
+ } else {
+ if (tagEnd.id() <= tagStart.id()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Tag end %s with snapshot id %s should be larger than tag start %s with snapshot id %s",
+ end, tagEnd.id(), start, tagStart.id()));
+ }
+ SortedMap tags = tagManager.tags();
+ for (Snapshot snapshot : tags.keySet()) {
+ if (snapshot.id() > tagStart.id() && snapshot.id() <= tagEnd.id()) {
+ List currentSplits = readSplits(reader, snapshot);
+ for (DataSplit split : currentSplits) {
+ grouped.computeIfAbsent(
+ Pair.of(split.partition(), split.bucket()),
+ k -> new ArrayList<>())
+ .addAll(split.dataFiles());
+ }
+ }
+ }
+ }
+
+ List result = new ArrayList<>();
+ for (Map.Entry, List> entry :
+ grouped.entrySet()) {
+ BinaryRow partition = entry.getKey().getLeft();
+ int bucket = entry.getKey().getRight();
+ for (List files :
+ reader.splitGenerator().splitForBatch(entry.getValue())) {
+ result.add(
+ DataSplit.builder()
+ .withSnapshot(tagResult.id())
+ .withPartition(partition)
+ .withBucket(bucket)
+ .withDataFiles(files)
+ .build());
+ }
+ }
+ return StartingScanner.fromPlan(
+ new SnapshotReader.Plan() {
+ @Override
+ public Long watermark() {
+ return null;
+ }
+
+ @Override
+ public Long snapshotId() {
+ return tagResult.id();
+ }
+
+ @Override
+ public List splits() {
+ return (List) result;
+ }
+ });
+ } else {
+ Snapshot tagStart = tagManager.taggedSnapshot(start);
+ Snapshot tagEnd = tagManager.taggedSnapshot(end);
+
+ if (tagEnd.id() <= tagStart.id()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Tag end %s with snapshot id %s should be larger than tag start %s with snapshot id %s",
+ end, tagEnd.id(), start, tagStart.id()));
+ }
+
+ if (tagEnd.id() <= tagStart.id()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Tag end %s with snapshot id %s should be larger than tag start %s with snapshot id %s",
+ end, tagEnd.id(), start, tagStart.id()));
+ }
+ return StartingScanner.fromPlan(
+ reader.withSnapshot(tagEnd).readIncrementalDiff(tagStart));
}
+ }
+
+ private List readSplits(SnapshotReader reader, Snapshot s) {
+ switch (scanMode) {
+ case CHANGELOG:
+ return readChangeLogSplits(reader, s);
+ case DELTA:
+ return readDeltaSplits(reader, s);
+ default:
+ throw new UnsupportedOperationException("Unsupported scan kind: " + scanMode);
+ }
+ }
- return StartingScanner.fromPlan(reader.withSnapshot(tag2).readIncrementalDiff(tag1));
+ private List readDeltaSplits(SnapshotReader reader, Snapshot s) {
+ if (s.commitKind() == Snapshot.CommitKind.OVERWRITE) {
+ // ignore OVERWRITE
+ return Collections.emptyList();
+ }
+ return (List) reader.withSnapshot(s).withMode(ScanMode.DELTA).read().splits();
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private List readChangeLogSplits(SnapshotReader reader, Snapshot s) {
+ if (s.commitKind() == Snapshot.CommitKind.OVERWRITE) {
+ // ignore OVERWRITE
+ return Collections.emptyList();
+ }
+ return (List) reader.withSnapshot(s).withMode(ScanMode.CHANGELOG).read().splits();
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java
index 671e8841dd57..cd1a3fda1b06 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java
@@ -47,7 +47,8 @@ private TagPreview(CoreOptions options) {
}
public static TagPreview create(CoreOptions options) {
- if (options.tagToPartitionPreview() != CoreOptions.TagCreationMode.NONE) {
+ if (options.tagToPartitionPreview() != CoreOptions.TagCreationMode.NONE
+ && options.tagToPartitionPreview() != CoreOptions.TagCreationMode.BATCH) {
return new TagPreview(options);
}
return null;
diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java
index dadfde3a752d..e017b215121c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java
@@ -79,6 +79,7 @@ static TagTimeExtractor createForTagPreview(CoreOptions options) {
static TagTimeExtractor create(CoreOptions.TagCreationMode mode, CoreOptions options) {
switch (mode) {
case NONE:
+ case BATCH:
return null;
case PROCESS_TIME:
return new ProcessTimeExtractor();
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
new file mode 100644
index 000000000000..033091f1f176
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.operation.TagDeletion;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.SetupableStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.SortedMap;
+
+/**
+ * Commit {@link Committable} for snapshot using the {@link CommitterOperator}. When the task is
+ * completed, the corresponding tag is generated.
+ */
+public class BatchWriteGeneratorTagOperator
+ implements OneInputStreamOperator,
+ SetupableStreamOperator,
+ BoundedOneInput {
+
+ private static final String BATCH_WRITE_TAG_PREFIX = "batch-write-";
+
+ private static final long serialVersionUID = 1L;
+
+ private final CommitterOperator commitOperator;
+
+ protected final FileStoreTable table;
+
+ public BatchWriteGeneratorTagOperator(
+ CommitterOperator commitOperator, FileStoreTable table) {
+ this.table = table;
+ this.commitOperator = commitOperator;
+ }
+
+ @Override
+ public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
+ throws Exception {
+ commitOperator.initializeState(streamTaskStateManager);
+ }
+
+ @Override
+ public OperatorSnapshotFutures snapshotState(
+ long checkpointId,
+ long timestamp,
+ CheckpointOptions checkpointOptions,
+ CheckpointStreamFactory storageLocation)
+ throws Exception {
+ return commitOperator.snapshotState(
+ checkpointId, timestamp, checkpointOptions, storageLocation);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ commitOperator.notifyCheckpointComplete(checkpointId);
+ }
+
+ @Override
+ public void notifyCheckpointAborted(long checkpointId) throws Exception {
+ commitOperator.notifyCheckpointAborted(checkpointId);
+ }
+
+ private void createTag() {
+ SnapshotManager snapshotManager = table.snapshotManager();
+ Snapshot snapshot = snapshotManager.latestSnapshot();
+ if (snapshot == null) {
+ return;
+ }
+ TagManager tagManager = table.tagManager();
+ TagDeletion tagDeletion = table.store().newTagDeletion();
+ Instant instant = Instant.ofEpochMilli(snapshot.timeMillis());
+ LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
+ String tagName =
+ BATCH_WRITE_TAG_PREFIX
+ + localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
+ try {
+ // Check whether the tag exists
+ if (tagManager.tagExists(tagName)) {
+ // When tag already exists, we want to consider different scenarios.
+ // If the table type is lookup or full-compaction, and there is no changelog created,
+ // then the data hasn't changed and compaction doesn't create a tag
+ CoreOptions.ChangelogProducer changelogProducer =
+ this.table.coreOptions().changelogProducer();
+ if (!((changelogProducer == CoreOptions.ChangelogProducer.LOOKUP
+ || changelogProducer
+ == CoreOptions.ChangelogProducer.FULL_COMPACTION)
+ && snapshot.changelogRecordCount() == 0)) {
+ tagManager.deleteTag(tagName, tagDeletion, snapshotManager);
+ tagManager.createTag(snapshot, tagName);
+ }
+ } else {
+ // If the tag does not exist, it is created
+ tagManager.createTag(snapshot, tagName);
+ }
+ // Expire the tag
+ expireTag();
+ } catch (Exception e) {
+ if (tagManager.tagExists(tagName)) {
+ tagManager.deleteTag(tagName, tagDeletion, snapshotManager);
+ }
+ }
+ }
+
+ private void expireTag() {
+ Integer tagNumRetainedMax = table.coreOptions().tagNumRetainedMax();
+ if (tagNumRetainedMax != null) {
+ SnapshotManager snapshotManager = table.snapshotManager();
+ if (snapshotManager.latestSnapshot() == null) {
+ return;
+ }
+ TagManager tagManager = table.tagManager();
+ TagDeletion tagDeletion = table.store().newTagDeletion();
+ SortedMap tags = tagManager.tags();
+ if (tags.size() > tagNumRetainedMax) {
+ int toDelete = tags.size() - tagNumRetainedMax;
+ int i = 0;
+ for (String tag : tags.values()) {
+ tagManager.deleteTag(tag, tagDeletion, snapshotManager);
+ i++;
+ if (i == toDelete) {
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void open() throws Exception {
+ commitOperator.open();
+ }
+
+ @Override
+ public void processElement(StreamRecord element) throws Exception {
+ commitOperator.processElement(element);
+ }
+
+ @Override
+ public void processWatermark(Watermark watermark) throws Exception {
+ commitOperator.processWatermark(watermark);
+ }
+
+ @Override
+ public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
+ commitOperator.processWatermarkStatus(watermarkStatus);
+ }
+
+ @Override
+ public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
+ commitOperator.processLatencyMarker(latencyMarker);
+ }
+
+ @Override
+ public void finish() throws Exception {
+ createTag();
+ commitOperator.finish();
+ }
+
+ @Override
+ public void close() throws Exception {
+ commitOperator.close();
+ }
+
+ @Override
+ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+ commitOperator.prepareSnapshotPreBarrier(checkpointId);
+ }
+
+ @Override
+ public void setKeyContextElement1(StreamRecord> record) throws Exception {
+ commitOperator.setKeyContextElement1(record);
+ }
+
+ @Override
+ public void setKeyContextElement2(StreamRecord> record) throws Exception {
+ commitOperator.setKeyContextElement2(record);
+ }
+
+ @Override
+ public OperatorMetricGroup getMetricGroup() {
+ return commitOperator.getMetricGroup();
+ }
+
+ @Override
+ public OperatorID getOperatorID() {
+ return commitOperator.getOperatorID();
+ }
+
+ @Override
+ public void setCurrentKey(Object key) {
+ commitOperator.setCurrentKey(key);
+ }
+
+ @Override
+ public Object getCurrentKey() {
+ return commitOperator.getCurrentKey();
+ }
+
+ @Override
+ public void setKeyContextElement(StreamRecord record) throws Exception {
+ commitOperator.setKeyContextElement(record);
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ commitOperator.endInput();
+ }
+
+ @Override
+ public void setup(StreamTask containingTask, StreamConfig config, Output output) {
+ commitOperator.setup(containingTask, config, output);
+ }
+
+ @Override
+ public ChainingStrategy getChainingStrategy() {
+ return commitOperator.getChainingStrategy();
+ }
+
+ @Override
+ public void setChainingStrategy(ChainingStrategy strategy) {
+ commitOperator.setChainingStrategy(strategy);
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index c9a6f3c05d74..480d41ae8d7a 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink;
import org.apache.paimon.CoreOptions.ChangelogProducer;
+import org.apache.paimon.CoreOptions.TagCreationMode;
import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.MemorySize;
@@ -203,6 +204,13 @@ protected DataStreamSink> doCommit(DataStream written, String com
table::tagManager,
() -> table.store().newTagDeletion());
}
+ if (conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.BATCH
+ && table.coreOptions().tagCreationMode() == TagCreationMode.BATCH) {
+ committerOperator =
+ new BatchWriteGeneratorTagOperator<>(
+ (CommitterOperator) committerOperator,
+ table);
+ }
SingleOutputStreamOperator> committed =
written.transform(
GLOBAL_COMMITTER_NAME + " : " + table.name(),
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
new file mode 100644
index 000000000000..894410dae60c
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.VersionedSerializerWrapper;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.manifest.ManifestCommittableSerializer;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.Objects;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link BatchWriteGeneratorTagOperator}. */
+public class BatchWriteGeneratorTagOperatorTest extends CommitterOperatorTest {
+
+ @Test
+ public void testBatchWriteGeneratorTag() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ // set tag.automatic-creation = batch
+ HashMap dynamicOptions = new HashMap<>();
+ dynamicOptions.put("tag.automatic-creation", "batch");
+ dynamicOptions.put("tag.num-retained-max", "2");
+ table = table.copy(dynamicOptions);
+
+ StreamTableWrite write =
+ table.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite();
+
+ OneInputStreamOperator committerOperator =
+ createCommitterOperator(
+ table,
+ initialCommitUser,
+ new RestoreAndFailCommittableStateManager<>(
+ () ->
+ new VersionedSerializerWrapper<>(
+ new ManifestCommittableSerializer())));
+
+ TableCommitImpl tableCommit = table.newCommit(initialCommitUser);
+
+ write.write(GenericRow.of(1, 10L));
+ tableCommit.commit(write.prepareCommit(false, 1));
+
+ SnapshotManager snapshotManager = table.newSnapshotReader().snapshotManager();
+ TagManager tagManager = table.tagManager();
+
+ // Generate tag name
+ String prefix = "batch-write-";
+ Instant instant =
+ Instant.ofEpochMilli(
+ Objects.requireNonNull(snapshotManager.latestSnapshot()).timeMillis());
+ LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
+ String tagName = prefix + localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
+
+ // No tag is generated before the finish method
+ assertThat(table.tagManager().tagCount()).isEqualTo(0);
+ committerOperator.finish();
+ // After the finish method, a tag is generated
+ assertThat(table.tagManager().tagCount()).isEqualTo(1);
+ // The tag is consistent with the latest snapshot
+ assertThat(tagManager.taggedSnapshot(tagName)).isEqualTo(snapshotManager.latestSnapshot());
+ }
+
+ @Override
+ protected OneInputStreamOperator createCommitterOperator(
+ FileStoreTable table,
+ String commitUser,
+ CommittableStateManager committableStateManager) {
+ return new BatchWriteGeneratorTagOperator<>(
+ (CommitterOperator)
+ super.createCommitterOperator(table, commitUser, committableStateManager),
+ table);
+ }
+}