diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 1133de289fa3..f60d0ec91009 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -900,6 +900,12 @@
Enum |
Whether to create tag automatically. And how to generate tags.
Possible values:- "none": No automatically created tags.
- "process-time": Based on the time of the machine, create TAG once the processing time passes period time plus delay.
- "watermark": Based on the watermark of the input, create TAG once the watermark passes period time plus delay.
- "batch": In the batch processing scenario, the tag corresponding to the current snapshot is generated after the task is completed.
|
+
+ tag.batch.customized-name |
+ (none) |
+ String |
+ Use customized name when creating tags in Batch mode. |
+
tag.callback.#.param |
(none) |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 6e1e9bba076b..fc4aa78d23e0 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1263,6 +1263,12 @@ public class CoreOptions implements Serializable {
.defaultValue(false)
.withDescription("Whether to automatically complete missing tags.");
+ public static final ConfigOption TAG_BATCH_CUSTOMIZED_NAME =
+ key("tag.batch.customized-name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Use customized name when creating tags in Batch mode.");
+
public static final ConfigOption SNAPSHOT_WATERMARK_IDLE_TIMEOUT =
key("snapshot.watermark-idle-timeout")
.durationType()
@@ -2241,6 +2247,10 @@ public boolean tagAutomaticCompletion() {
return options.get(TAG_AUTOMATIC_COMPLETION);
}
+ public String tagBatchCustomizedName() {
+ return options.get(TAG_BATCH_CUSTOMIZED_NAME);
+ }
+
public Duration snapshotWatermarkIdleTimeout() {
return options.get(SNAPSHOT_WATERMARK_IDLE_TIMEOUT);
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
index 1cbcc4b2262f..3f2ea76ffec9 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java
@@ -102,8 +102,10 @@ private void createTag() {
Instant instant = Instant.ofEpochMilli(snapshot.timeMillis());
LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
String tagName =
- BATCH_WRITE_TAG_PREFIX
- + localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
+ table.coreOptions().tagBatchCustomizedName() != null
+ ? table.coreOptions().tagBatchCustomizedName()
+ : BATCH_WRITE_TAG_PREFIX
+ + localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
try {
// If the tag already exists, delete the tag
if (tagManager.tagExists(tagName)) {
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
index 68162832eac9..987b95d6e39f 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
@@ -124,6 +124,55 @@ public void testBatchWriteGeneratorTag() throws Exception {
assertThat(tagManager.allTagNames()).containsOnly("many-tags-test2", tagName);
}
+ @Test
+ public void testBatchWriteGeneratorCustomizedTag() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ String customizedTag = "customized-tag";
+ // set tag.automatic-creation = batch
+ HashMap dynamicOptions = new HashMap<>();
+ dynamicOptions.put("tag.automatic-creation", "batch");
+ dynamicOptions.put("tag.batch.customized-name", customizedTag);
+ table = table.copy(dynamicOptions);
+
+ StreamTableWrite write =
+ table.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite();
+
+ OneInputStreamOperatorFactory committerOperatorFactory =
+ createCommitterOperatorFactory(
+ table,
+ initialCommitUser,
+ new RestoreAndFailCommittableStateManager<>(
+ ManifestCommittableSerializer::new));
+
+ OneInputStreamOperator committerOperator =
+ committerOperatorFactory.createStreamOperator(
+ new StreamOperatorParameters<>(
+ new SourceOperatorStreamTask(new DummyEnvironment()),
+ new MockStreamConfig(new Configuration(), 1),
+ new MockOutput<>(new ArrayList<>()),
+ null,
+ null,
+ null));
+
+ committerOperator.open();
+
+ TableCommitImpl tableCommit = table.newCommit(initialCommitUser);
+
+ write.write(GenericRow.of(1, 10L));
+ tableCommit.commit(write.prepareCommit(false, 1));
+
+ TagManager tagManager = table.tagManager();
+
+ // No tag is generated before the finish method
+ assertThat(table.tagManager().tagCount()).isEqualTo(0);
+ committerOperator.finish();
+ // After the finish method, a tag is generated
+ assertThat(table.tagManager().tagCount()).isEqualTo(1);
+ // Get tagName from tagManager.
+ String tagName = tagManager.allTagNames().get(0);
+ assertThat(tagName).isEqualTo(customizedTag);
+ }
+
@Override
protected OneInputStreamOperatorFactory
createCommitterOperatorFactory(