Skip to content

Commit

Permalink
[core] Support customized tag name in Batch TagCreationMode
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangyuf committed Dec 26, 2024
1 parent bf58ddd commit 29dfecc
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 2 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,12 @@
<td><p>Enum</p></td>
<td>Whether to create tag automatically. And how to generate tags.<br /><br />Possible values:<ul><li>"none": No automatically created tags.</li><li>"process-time": Based on the time of the machine, create TAG once the processing time passes period time plus delay.</li><li>"watermark": Based on the watermark of the input, create TAG once the watermark passes period time plus delay.</li><li>"batch": In the batch processing scenario, the tag corresponding to the current snapshot is generated after the task is completed.</li></ul></td>
</tr>
<tr>
<td><h5>tag.batch.customized-name</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Use customized label when creating tags in Batch mode.</td>
</tr>
<tr>
<td><h5>tag.callback.#.param</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
10 changes: 10 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1263,6 +1263,12 @@ public class CoreOptions implements Serializable {
.defaultValue(false)
.withDescription("Whether to automatically complete missing tags.");

public static final ConfigOption<String> TAG_BATCH_CUSTOMIZED_NAME =
key("tag.batch.customized-name")
.stringType()
.noDefaultValue()
.withDescription("Use customized label when creating tags in Batch mode.");

public static final ConfigOption<Duration> SNAPSHOT_WATERMARK_IDLE_TIMEOUT =
key("snapshot.watermark-idle-timeout")
.durationType()
Expand Down Expand Up @@ -2241,6 +2247,10 @@ public boolean tagAutomaticCompletion() {
return options.get(TAG_AUTOMATIC_COMPLETION);
}

public String tagBatchCustomizedName() {
return options.get(TAG_BATCH_CUSTOMIZED_NAME);
}

public Duration snapshotWatermarkIdleTimeout() {
return options.get(SNAPSHOT_WATERMARK_IDLE_TIMEOUT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,10 @@ private void createTag() {
Instant instant = Instant.ofEpochMilli(snapshot.timeMillis());
LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
String tagName =
BATCH_WRITE_TAG_PREFIX
+ localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
table.coreOptions().tagBatchCustomizedName() != null
? table.coreOptions().tagBatchCustomizedName()
: BATCH_WRITE_TAG_PREFIX
+ localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
try {
// If the tag already exists, delete the tag
if (tagManager.tagExists(tagName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,55 @@ public void testBatchWriteGeneratorTag() throws Exception {
assertThat(tagManager.allTagNames()).containsOnly("many-tags-test2", tagName);
}

@Test
public void testBatchWriteGeneratorCustomizedTag() throws Exception {
FileStoreTable table = createFileStoreTable();
String customizedTag = "customized-tag";
// set tag.automatic-creation = batch
HashMap<String, String> dynamicOptions = new HashMap<>();
dynamicOptions.put("tag.automatic-creation", "batch");
dynamicOptions.put("tag.batch.customized-name", customizedTag);
table = table.copy(dynamicOptions);

StreamTableWrite write =
table.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite();

OneInputStreamOperatorFactory<Committable, Committable> committerOperatorFactory =
createCommitterOperatorFactory(
table,
initialCommitUser,
new RestoreAndFailCommittableStateManager<>(
ManifestCommittableSerializer::new));

OneInputStreamOperator<Committable, Committable> committerOperator =
committerOperatorFactory.createStreamOperator(
new StreamOperatorParameters<>(
new SourceOperatorStreamTask<Integer>(new DummyEnvironment()),
new MockStreamConfig(new Configuration(), 1),
new MockOutput<>(new ArrayList<>()),
null,
null,
null));

committerOperator.open();

TableCommitImpl tableCommit = table.newCommit(initialCommitUser);

write.write(GenericRow.of(1, 10L));
tableCommit.commit(write.prepareCommit(false, 1));

TagManager tagManager = table.tagManager();

// No tag is generated before the finish method
assertThat(table.tagManager().tagCount()).isEqualTo(0);
committerOperator.finish();
// After the finish method, a tag is generated
assertThat(table.tagManager().tagCount()).isEqualTo(1);
// Get tagName from tagManager.
String tagName = tagManager.allTagNames().get(0);
assertThat(tagName).isEqualTo(customizedTag);
}

@Override
protected OneInputStreamOperatorFactory<Committable, Committable>
createCommitterOperatorFactory(
Expand Down

0 comments on commit 29dfecc

Please sign in to comment.