-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Flink]add range strategy for sort compaction #2749
[Flink]add range strategy for sort compaction #2749
Conversation
9eb0f3c
to
63e3b69
Compare
2644b4a
to
7960d3f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for @wg1026688210 a lot, just some comment of mine left. @JingsongLi please take a look at this pull request
} else { | ||
callProcedure("zorder", columns); | ||
callProcedure("zorder", rangeStrategy, columns); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need not to add rangStrategy in the parameters. We can find anywhere in the sort process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, we can find in SortUtils.sortStreamByKey, Strategy strategy = table.coreOptions().sortRangeStrategy
@@ -287,14 +293,21 @@ private SortCompactAction createAction(String orderStrategy, List<String> column | |||
"--order_strategy", | |||
orderStrategy, | |||
"--order_by", | |||
String.join(",", columns)); | |||
String.join(",", columns), | |||
"--sort_conf range_strategy=" + rangeStrategy, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add this as table-config, when we do compaction, we just --table_conf sort.range.strategy = xxx. By this way, we can simplify code and keep compatibility with old code.
double targetSize = (totalSize) / (double) (rangeNum); | ||
|
||
@SuppressWarnings("unchecked") | ||
T[] range = (T[]) new Object[rangeNum]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not rangeNum - 1? If we separate a array num to 1000 part, we should set 999 point?
ed5de85
to
e2a96b6
Compare
e2a96b6
to
89b1867
Compare
@leaves12138 @JingsongLi PTAL |
1a0dfa4
to
dda4c63
Compare
5398a22
to
d2cbfb9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for @wg1026688210 , comment above
long x = | ||
row.getTimestamp(index, localZonedTimestampType.getPrecision()) | ||
.getMillisecond(); | ||
return 8; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why define x, seems don't use it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
21dc581
to
ad2e117
Compare
Purpose
Improve the processing efficiency of sort compaction operations when there is data size skewness on range phase.
Tests
SortCompactActionForUnawareBucketITCase
RangeShuffleTest
InternalRowToSizeVisitorTest
API and Format
add config
--table_conf sort-compaction.range-strategy= SIZE
Documentation