Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support customized tag name in Batch TagCreationMode #4778

Merged
merged 1 commit into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,12 @@
<td><p>Enum</p></td>
<td>Whether to create tag automatically. And how to generate tags.<br /><br />Possible values:<ul><li>"none": No automatically created tags.</li><li>"process-time": Based on the time of the machine, create TAG once the processing time passes period time plus delay.</li><li>"watermark": Based on the watermark of the input, create TAG once the watermark passes period time plus delay.</li><li>"batch": In the batch processing scenario, the tag corresponding to the current snapshot is generated after the task is completed.</li></ul></td>
</tr>
<tr>
<td><h5>tag.batch.customized-name</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Use customized name when creating tags in Batch mode.</td>
</tr>
<tr>
<td><h5>tag.callback.#.param</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
10 changes: 10 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1263,6 +1263,12 @@ public class CoreOptions implements Serializable {
.defaultValue(false)
.withDescription("Whether to automatically complete missing tags.");

public static final ConfigOption<String> TAG_BATCH_CUSTOMIZED_NAME =
key("tag.batch.customized-name")
.stringType()
.noDefaultValue()
.withDescription("Use customized name when creating tags in Batch mode.");

public static final ConfigOption<Duration> SNAPSHOT_WATERMARK_IDLE_TIMEOUT =
key("snapshot.watermark-idle-timeout")
.durationType()
Expand Down Expand Up @@ -2241,6 +2247,10 @@ public boolean tagAutomaticCompletion() {
return options.get(TAG_AUTOMATIC_COMPLETION);
}

public String tagBatchCustomizedName() {
return options.get(TAG_BATCH_CUSTOMIZED_NAME);
}

public Duration snapshotWatermarkIdleTimeout() {
return options.get(SNAPSHOT_WATERMARK_IDLE_TIMEOUT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,10 @@ private void createTag() {
Instant instant = Instant.ofEpochMilli(snapshot.timeMillis());
LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
String tagName =
BATCH_WRITE_TAG_PREFIX
+ localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
table.coreOptions().tagBatchCustomizedName() != null
? table.coreOptions().tagBatchCustomizedName()
: BATCH_WRITE_TAG_PREFIX
+ localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
try {
// If the tag already exists, delete the tag
if (tagManager.tagExists(tagName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,55 @@ public void testBatchWriteGeneratorTag() throws Exception {
assertThat(tagManager.allTagNames()).containsOnly("many-tags-test2", tagName);
}

@Test
public void testBatchWriteGeneratorCustomizedTag() throws Exception {
FileStoreTable table = createFileStoreTable();
String customizedTag = "customized-tag";
// set tag.automatic-creation = batch
HashMap<String, String> dynamicOptions = new HashMap<>();
dynamicOptions.put("tag.automatic-creation", "batch");
dynamicOptions.put("tag.batch.customized-name", customizedTag);
table = table.copy(dynamicOptions);

StreamTableWrite write =
table.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite();

OneInputStreamOperatorFactory<Committable, Committable> committerOperatorFactory =
createCommitterOperatorFactory(
table,
initialCommitUser,
new RestoreAndFailCommittableStateManager<>(
ManifestCommittableSerializer::new));

OneInputStreamOperator<Committable, Committable> committerOperator =
committerOperatorFactory.createStreamOperator(
new StreamOperatorParameters<>(
new SourceOperatorStreamTask<Integer>(new DummyEnvironment()),
new MockStreamConfig(new Configuration(), 1),
new MockOutput<>(new ArrayList<>()),
null,
null,
null));

committerOperator.open();

TableCommitImpl tableCommit = table.newCommit(initialCommitUser);

write.write(GenericRow.of(1, 10L));
tableCommit.commit(write.prepareCommit(false, 1));

TagManager tagManager = table.tagManager();

// No tag is generated before the finish method
assertThat(table.tagManager().tagCount()).isEqualTo(0);
committerOperator.finish();
// After the finish method, a tag is generated
assertThat(table.tagManager().tagCount()).isEqualTo(1);
// Get tagName from tagManager.
String tagName = tagManager.allTagNames().get(0);
assertThat(tagName).isEqualTo(customizedTag);
}

@Override
protected OneInputStreamOperatorFactory<Committable, Committable>
createCommitterOperatorFactory(
Expand Down
Loading