-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[flink] Introduce Range Partition And Sort in Append Scalable Table Batch Writing for Flink #3384
Conversation
a1d3c50
to
642b444
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @WencongLiu , left comments.
int sinkParallelism, | ||
int localSampleSize, | ||
int globalSampleSize) { | ||
checkArgument(!sortColumns.isEmpty(), "Sort columns cannot be empty"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need to check here, this is a private constructor.
You can check in the Builder.build.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, placing all the check statements in the build is already sufficient. The changes have been made as requested.
public Builder setSinkParallelism(int sinkParallelism) { | ||
checkArgument( | ||
sinkParallelism > 0, | ||
"The sink parallelism must be specified when sorting the table data. Please set it using the key: " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use checkArgument(boolean expression, String errorMessageTemplate, @CheckForNull Object... errorMessageArgs)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} | ||
|
||
public Builder setRangeNumber(int rangeNumber) { | ||
checkArgument(rangeNumber > 0, "Range number must be positive"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move all check to build
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
// 4. Remove the range index or both range index and key. (shuffle according range | ||
// partition) | ||
if (outputWithKey) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to change here.
You can just output with key, and introduce a RemoveKeyOperator.
It should have no impact to performance if it is chaining to upstream operator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The outputWithKey parameter is indeed unnecessary, and the changes have been made as requested.
if (boundedInput == null) { | ||
boundedInput = !FlinkSink.isStreaming(input); | ||
} | ||
checkState(boundedInput, "The clustering should be executed under batch mode."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we should discuss semantics, maybe we can just not sort in streaming mode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
+1 for not failing the job. We can enable the feature only when all the conditions are matched. Otherwise, if the
by-columns
are configured, we can print a warning log about it. -
We probably should also mention the limitations (batch mode, table type, etc.) in the configuration description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JingsongLi
Good point, ignoring clustering in stream mode is a sensible design. This avoids the need for users to manually adjust the table configuration under streaming mode.
- I've removed the check and added releated warning log.
- I've added the limitations of table type and batch mode in configuration description. I've also added a commit to introduce the clustering feature in paimon docs.
@@ -119,8 +133,86 @@ public FlinkSinkBuilder inputBounded(boolean bounded) { | |||
return this; | |||
} | |||
|
|||
/** Set the table sort info. */ | |||
public FlinkSinkBuilder setTableSortInfo( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clusteringIfPossible
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -119,8 +133,86 @@ public FlinkSinkBuilder inputBounded(boolean bounded) { | |||
return this; | |||
} | |||
|
|||
/** Set the table sort info. */ | |||
public FlinkSinkBuilder setTableSortInfo( | |||
String sortColumnsString, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
respect option keys, sortColumnsString => clusterColumns
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
private final List<String> sortColumns; | ||
|
||
private final String sortStrategy; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest to use OrderType
for this field. This would allow us to check the field early, in Builder#setSortStrategy
.
if (boundedInput == null) { | ||
boundedInput = !FlinkSink.isStreaming(input); | ||
} | ||
checkState(boundedInput, "The clustering should be executed under batch mode."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
+1 for not failing the job. We can enable the feature only when all the conditions are matched. Otherwise, if the
by-columns
are configured, we can print a warning log about it. -
We probably should also mention the limitations (batch mode, table type, etc.) in the configuration description.
@@ -119,8 +133,86 @@ public FlinkSinkBuilder inputBounded(boolean bounded) { | |||
return this; | |||
} | |||
|
|||
/** Set the table sort info. */ | |||
public FlinkSinkBuilder setTableSortInfo( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
582e2ab
to
860bc0f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@WencongLiu
Thanks for addressing my comments. LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
CC @leaves12138 to final check. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Purpose
Linked issue: close #3385
Introduce Range Partition And Sort in Append Scalable Table Batch Writing for Flink.
Tests
TableSortInfoTest
for theTableSortInfo
.RangePartitionAndSortForUnawareBucketTableITCase
.API and Format
no
Documentation
Yes. The doc of FlinkConnectorOptions has been updated.