-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[flink]Added the function of generating labels when Flink Sink is completed … #2469
Conversation
…in batch processing scenarios.
@@ -301,6 +301,13 @@ public class FlinkConnectorOptions { | |||
.withDescription( | |||
"Sink committer memory to control heap memory of global committer."); | |||
|
|||
public static final ConfigOption<Boolean> SINK_FINISH_GENERATAR_TAG = | |||
ConfigOptions.key("sink.finish.generatar-tag") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this feature be a part of tag.automatic-creation
? We can introduce BATCH
TagCreationMode.
The downside of this is that it cannot be orthogonal to the automatic creation of tags in streaming writing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we hope to use tags in batch processing scenarios and generate corresponding tags after the current task ends.
Do you mean to add a batch mode in tag.automatic-creation without adding new configuration? If that's the case, I think it's okay
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, tag.automatic-creation=batch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please take another look
* Commit {@link Committable} for snapshot using the {@link CommitterOperator}. | ||
* When the task is completed, the corresponding tag is generated. | ||
*/ | ||
public class SinkFinishGeneratorTagOperator<CommitT, GlobalCommitT> implements |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you create a ITCase for this feature?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I'll provide it later
* Commit {@link Committable} for snapshot using the {@link CommitterOperator}. | ||
* When the task is completed, the corresponding tag is generated. | ||
*/ | ||
public class SinkFinishGeneratorTagOperator<CommitT, GlobalCommitT> implements |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also ut case.
SetupableStreamOperator, | ||
BoundedOneInput { | ||
|
||
private static final String SINK_FINISH_TAG_PREFIX = "sinkFinish-"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only concern is here, better name?
Maybe batch-write-2023-12-13
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, updated
…processing scenarios
Compile failed? |
…processing scenarios
+1 and merged |
This closes apache#2469 (cherry picked from commit 5a8b597)
Purpose
We need to enhance paimon's capabilities in flink batch processing scenarios, such as generating tags after flink batch processing is completed. This function has a very good practical effect when combined with changelog internally.
Tests
API and Format
Documentation