diff --git a/docs/content/maintenance/manage-tags.md b/docs/content/maintenance/manage-tags.md
index 76afb7747a20..865e68b5e5b2 100644
--- a/docs/content/maintenance/manage-tags.md
+++ b/docs/content/maintenance/manage-tags.md
@@ -42,6 +42,7 @@ Paimon supports automatic creation of tags in writing job.
You can set `'tag.automatic-creation'` to `process-time` or `watermark`:
- `process-time`: Create TAG based on the time of the machine.
- `watermark`: Create TAG based on the watermark of the Sink input.
+- `batch`: In a batch processing scenario, a tag is generated after the current task is completed.
{{< hint info >}}
If you choose Watermark, you may need to specify the time zone of watermark, if watermark is not in the
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 82a8fc798b14..df46656ff325 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -303,7 +303,7 @@
metastore.tag-to-partition.preview |
none |
Enum |
- Whether to preview tag of generated snapshots in metastore. This allows the Hive engine to query specific tag before creation.
Possible values:- "none": No automatically created tags.
- "process-time": Based on the time of the machine, create TAG once the processing time passes period time plus delay.
- "watermark": Based on the watermark of the input, create TAG once the watermark passes period time plus delay.
|
+ Whether to preview tag of generated snapshots in metastore. This allows the Hive engine to query specific tag before creation.
Possible values:- "none": No automatically created tags.
- "process-time": Based on the time of the machine, create TAG once the processing time passes period time plus delay.
- "watermark": Based on the watermark of the input, create TAG once the watermark passes period time plus delay.
- "batch": In the batch processing scenario, the tag corresponding to the current snapshot is generated after the task is completed.
|
num-levels |
@@ -573,7 +573,7 @@
tag.automatic-creation |
none |
Enum |
- Whether to create tag automatically. And how to generate tags.
Possible values:- "none": No automatically created tags.
- "process-time": Based on the time of the machine, create TAG once the processing time passes period time plus delay.
- "watermark": Based on the watermark of the input, create TAG once the watermark passes period time plus delay.
|
+ Whether to create tag automatically. And how to generate tags.
Possible values:- "none": No automatically created tags.
- "process-time": Based on the time of the machine, create TAG once the processing time passes period time plus delay.
- "watermark": Based on the watermark of the input, create TAG once the watermark passes period time plus delay.
- "batch": In the batch processing scenario, the tag corresponding to the current snapshot is generated after the task is completed.
|
tag.callback.#.param |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index c14f7af5f3b2..5c2c660aebf7 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1971,8 +1971,10 @@ public enum TagCreationMode implements DescribedEnum {
"Based on the time of the machine, create TAG once the processing time passes period time plus delay."),
WATERMARK(
"watermark",
- "Based on the watermark of the input, create TAG once the watermark passes period time plus delay.");
-
+ "Based on the watermark of the input, create TAG once the watermark passes period time plus delay."),
+ BATCH(
+ "batch",
+ "In the batch processing scenario, the tag corresponding to the current snapshot is generated after the task is completed.");
private final String value;
private final String description;
diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java
index 671e8841dd57..cd1a3fda1b06 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagPreview.java
@@ -47,7 +47,8 @@ private TagPreview(CoreOptions options) {
}
public static TagPreview create(CoreOptions options) {
- if (options.tagToPartitionPreview() != CoreOptions.TagCreationMode.NONE) {
+ if (options.tagToPartitionPreview() != CoreOptions.TagCreationMode.NONE
+ && options.tagToPartitionPreview() != CoreOptions.TagCreationMode.BATCH) {
return new TagPreview(options);
}
return null;
diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java
index dadfde3a752d..e017b215121c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagTimeExtractor.java
@@ -79,6 +79,7 @@ static TagTimeExtractor createForTagPreview(CoreOptions options) {
static TagTimeExtractor create(CoreOptions.TagCreationMode mode, CoreOptions options) {
switch (mode) {
case NONE:
+ case BATCH:
return null;
case PROCESS_TIME:
return new ProcessTimeExtractor();
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
new file mode 100644
index 000000000000..dff75ff29b33
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.operation.TagDeletion;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.SetupableStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.SortedMap;
+
+/**
+ * Commit {@link Committable} for snapshot using the {@link CommitterOperator}. When the task is
+ * completed, the corresponding tag is generated.
+ */
+public class BatchWriteGeneratorTagOperator
+ implements OneInputStreamOperator,
+ SetupableStreamOperator,
+ BoundedOneInput {
+
+ private static final String BATCH_WRITE_TAG_PREFIX = "batch-write-";
+
+ private static final long serialVersionUID = 1L;
+
+ private final CommitterOperator commitOperator;
+
+ protected final FileStoreTable table;
+
+ public BatchWriteGeneratorTagOperator(
+ CommitterOperator commitOperator, FileStoreTable table) {
+ this.table = table;
+ this.commitOperator = commitOperator;
+ }
+
+ @Override
+ public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
+ throws Exception {
+ commitOperator.initializeState(streamTaskStateManager);
+ }
+
+ @Override
+ public OperatorSnapshotFutures snapshotState(
+ long checkpointId,
+ long timestamp,
+ CheckpointOptions checkpointOptions,
+ CheckpointStreamFactory storageLocation)
+ throws Exception {
+ return commitOperator.snapshotState(
+ checkpointId, timestamp, checkpointOptions, storageLocation);
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ commitOperator.notifyCheckpointComplete(checkpointId);
+ }
+
+ @Override
+ public void notifyCheckpointAborted(long checkpointId) throws Exception {
+ commitOperator.notifyCheckpointAborted(checkpointId);
+ }
+
+ private void createTag() {
+ SnapshotManager snapshotManager = table.snapshotManager();
+ Snapshot snapshot = snapshotManager.latestSnapshot();
+ if (snapshot == null) {
+ return;
+ }
+ TagManager tagManager = table.tagManager();
+ TagDeletion tagDeletion = table.store().newTagDeletion();
+ Instant instant = Instant.ofEpochMilli(snapshot.timeMillis());
+ LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
+ String tagName =
+ BATCH_WRITE_TAG_PREFIX
+ + localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
+ try {
+ // If the tag already exists, delete the tag
+ if (tagManager.tagExists(tagName)) {
+ tagManager.deleteTag(tagName, tagDeletion, snapshotManager);
+ }
+ // Create a new tag
+ tagManager.createTag(snapshot, tagName);
+ // Expire the tag
+ expireTag();
+ } catch (Exception e) {
+ if (tagManager.tagExists(tagName)) {
+ tagManager.deleteTag(tagName, tagDeletion, snapshotManager);
+ }
+ }
+ }
+
+ private void expireTag() {
+ Integer tagNumRetainedMax = table.coreOptions().tagNumRetainedMax();
+ if (tagNumRetainedMax != null) {
+ SnapshotManager snapshotManager = table.snapshotManager();
+ if (snapshotManager.latestSnapshot() == null) {
+ return;
+ }
+ TagManager tagManager = table.tagManager();
+ TagDeletion tagDeletion = table.store().newTagDeletion();
+ SortedMap tags = tagManager.tags();
+ if (tags.size() > tagNumRetainedMax) {
+ int toDelete = tags.size() - tagNumRetainedMax;
+ int i = 0;
+ for (String tag : tags.values()) {
+ tagManager.deleteTag(tag, tagDeletion, snapshotManager);
+ i++;
+ if (i == toDelete) {
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void open() throws Exception {
+ commitOperator.open();
+ }
+
+ @Override
+ public void processElement(StreamRecord element) throws Exception {
+ commitOperator.processElement(element);
+ }
+
+ @Override
+ public void processWatermark(Watermark watermark) throws Exception {
+ commitOperator.processWatermark(watermark);
+ }
+
+ @Override
+ public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
+ commitOperator.processWatermarkStatus(watermarkStatus);
+ }
+
+ @Override
+ public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
+ commitOperator.processLatencyMarker(latencyMarker);
+ }
+
+ @Override
+ public void finish() throws Exception {
+ createTag();
+ commitOperator.finish();
+ }
+
+ @Override
+ public void close() throws Exception {
+ commitOperator.close();
+ }
+
+ @Override
+ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+ commitOperator.prepareSnapshotPreBarrier(checkpointId);
+ }
+
+ @Override
+ public void setKeyContextElement1(StreamRecord> record) throws Exception {
+ commitOperator.setKeyContextElement1(record);
+ }
+
+ @Override
+ public void setKeyContextElement2(StreamRecord> record) throws Exception {
+ commitOperator.setKeyContextElement2(record);
+ }
+
+ @Override
+ public OperatorMetricGroup getMetricGroup() {
+ return commitOperator.getMetricGroup();
+ }
+
+ @Override
+ public OperatorID getOperatorID() {
+ return commitOperator.getOperatorID();
+ }
+
+ @Override
+ public void setCurrentKey(Object key) {
+ commitOperator.setCurrentKey(key);
+ }
+
+ @Override
+ public Object getCurrentKey() {
+ return commitOperator.getCurrentKey();
+ }
+
+ @Override
+ public void setKeyContextElement(StreamRecord record) throws Exception {
+ commitOperator.setKeyContextElement(record);
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ commitOperator.endInput();
+ }
+
+ @Override
+ public void setup(StreamTask containingTask, StreamConfig config, Output output) {
+ commitOperator.setup(containingTask, config, output);
+ }
+
+ @Override
+ public ChainingStrategy getChainingStrategy() {
+ return commitOperator.getChainingStrategy();
+ }
+
+ @Override
+ public void setChainingStrategy(ChainingStrategy strategy) {
+ commitOperator.setChainingStrategy(strategy);
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index c9a6f3c05d74..480d41ae8d7a 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink;
import org.apache.paimon.CoreOptions.ChangelogProducer;
+import org.apache.paimon.CoreOptions.TagCreationMode;
import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.MemorySize;
@@ -203,6 +204,13 @@ protected DataStreamSink> doCommit(DataStream written, String com
table::tagManager,
() -> table.store().newTagDeletion());
}
+ if (conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.BATCH
+ && table.coreOptions().tagCreationMode() == TagCreationMode.BATCH) {
+ committerOperator =
+ new BatchWriteGeneratorTagOperator<>(
+ (CommitterOperator) committerOperator,
+ table);
+ }
SingleOutputStreamOperator> committed =
written.transform(
GLOBAL_COMMITTER_NAME + " : " + table.name(),
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
new file mode 100644
index 000000000000..894410dae60c
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.VersionedSerializerWrapper;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.manifest.ManifestCommittableSerializer;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.Objects;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link BatchWriteGeneratorTagOperator}. */
+public class BatchWriteGeneratorTagOperatorTest extends CommitterOperatorTest {
+
+ @Test
+ public void testBatchWriteGeneratorTag() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ // set tag.automatic-creation = batch
+ HashMap dynamicOptions = new HashMap<>();
+ dynamicOptions.put("tag.automatic-creation", "batch");
+ dynamicOptions.put("tag.num-retained-max", "2");
+ table = table.copy(dynamicOptions);
+
+ StreamTableWrite write =
+ table.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite();
+
+ OneInputStreamOperator committerOperator =
+ createCommitterOperator(
+ table,
+ initialCommitUser,
+ new RestoreAndFailCommittableStateManager<>(
+ () ->
+ new VersionedSerializerWrapper<>(
+ new ManifestCommittableSerializer())));
+
+ TableCommitImpl tableCommit = table.newCommit(initialCommitUser);
+
+ write.write(GenericRow.of(1, 10L));
+ tableCommit.commit(write.prepareCommit(false, 1));
+
+ SnapshotManager snapshotManager = table.newSnapshotReader().snapshotManager();
+ TagManager tagManager = table.tagManager();
+
+ // Generate tag name
+ String prefix = "batch-write-";
+ Instant instant =
+ Instant.ofEpochMilli(
+ Objects.requireNonNull(snapshotManager.latestSnapshot()).timeMillis());
+ LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
+ String tagName = prefix + localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
+
+ // No tag is generated before the finish method
+ assertThat(table.tagManager().tagCount()).isEqualTo(0);
+ committerOperator.finish();
+ // After the finish method, a tag is generated
+ assertThat(table.tagManager().tagCount()).isEqualTo(1);
+ // The tag is consistent with the latest snapshot
+ assertThat(tagManager.taggedSnapshot(tagName)).isEqualTo(snapshotManager.latestSnapshot());
+ }
+
+ @Override
+ protected OneInputStreamOperator createCommitterOperator(
+ FileStoreTable table,
+ String commitUser,
+ CommittableStateManager committableStateManager) {
+ return new BatchWriteGeneratorTagOperator<>(
+ (CommitterOperator)
+ super.createCommitterOperator(table, commitUser, committableStateManager),
+ table);
+ }
+}