> rangeShuffleByKey(
* See {@link Sampler}.
*/
@Internal
- public static class LocalSampleOperator extends TableStreamOperator>
- implements OneInputStreamOperator>, BoundedOneInput {
+ public static class LocalSampleOperator
+ extends TableStreamOperator>
+ implements OneInputStreamOperator, Tuple3>,
+ BoundedOneInput {
private static final long serialVersionUID = 1L;
private final int numSample;
- private transient Collector> collector;
- private transient Sampler sampler;
+ private transient Collector> collector;
+ private transient Sampler> sampler;
public LocalSampleOperator(int numSample) {
this.numSample = numSample;
@@ -184,15 +241,17 @@ public void open() throws Exception {
}
@Override
- public void processElement(StreamRecord streamRecord) throws Exception {
+ public void processElement(StreamRecord> streamRecord) throws Exception {
sampler.collect(streamRecord.getValue());
}
@Override
- public void endInput() throws Exception {
- Iterator> sampled = sampler.sample();
+ public void endInput() {
+ Iterator>> sampled = sampler.sample();
while (sampled.hasNext()) {
- collector.collect(sampled.next());
+ Tuple2> next = sampled.next();
+
+ collector.collect(new Tuple3<>(next.f0, next.f1.f0, next.f1.f1));
}
}
}
@@ -203,7 +262,8 @@ public void endInput() throws Exception {
* See {@link Sampler}.
*/
private static class GlobalSampleOperator extends TableStreamOperator>
- implements OneInputStreamOperator, List>, BoundedOneInput {
+ implements OneInputStreamOperator, List>,
+ BoundedOneInput {
private static final long serialVersionUID = 1L;
@@ -213,7 +273,7 @@ private static class GlobalSampleOperator extends TableStreamOperator
private transient Comparator keyComparator;
private transient Collector> collector;
- private transient Sampler sampler;
+ private transient Sampler> sampler;
public GlobalSampleOperator(
int numSample,
@@ -233,35 +293,32 @@ public void open() throws Exception {
}
@Override
- public void processElement(StreamRecord> record) throws Exception {
- Tuple2 tuple = record.getValue();
- sampler.collect(tuple.f0, tuple.f1);
+ public void processElement(StreamRecord> record)
+ throws Exception {
+ Tuple3 tuple = record.getValue();
+ sampler.collect(tuple.f0, new Tuple2<>(tuple.f1, tuple.f2));
}
@Override
- public void endInput() throws Exception {
- Iterator> sampled = sampler.sample();
+ public void endInput() {
+ Iterator>> sampled = sampler.sample();
+
+ List> sampledData = new ArrayList<>();
- List sampledData = new ArrayList<>();
while (sampled.hasNext()) {
sampledData.add(sampled.next().f1);
}
- sampledData.sort(keyComparator);
+ sampledData.sort((o1, o2) -> keyComparator.compare(o1.f0, o2.f0));
- int boundarySize = rangesNum - 1;
- @SuppressWarnings("unchecked")
- T[] boundaries = (T[]) new Object[boundarySize];
- if (sampledData.size() > 0) {
- double avgRange = sampledData.size() / (double) rangesNum;
- for (int i = 1; i < rangesNum; i++) {
- T record = sampledData.get((int) (i * avgRange));
- boundaries[i - 1] = record;
- }
- collector.collect(Arrays.asList(boundaries));
+ List range;
+ if (sampledData.isEmpty()) {
+ range = new ArrayList<>();
} else {
- collector.collect(Collections.emptyList());
+ range = Arrays.asList(allocateRangeBaseSize(sampledData, rangesNum));
}
+
+ collector.collect(range);
}
}
@@ -488,4 +545,40 @@ public int get() {
return list.get(RANDOM.nextInt(list.size()));
}
}
+
+ @VisibleForTesting
+ static T[] allocateRangeBaseSize(List> sampledData, int rangesNum) {
+ int sampeNum = sampledData.size();
+ int boundarySize = rangesNum - 1;
+ @SuppressWarnings("unchecked")
+ T[] boundaries = (T[]) new Object[boundarySize];
+
+ if (!sampledData.isEmpty()) {
+ long restSize = sampledData.stream().mapToLong(t -> (long) t.f1).sum();
+ double stepRange = restSize / (double) rangesNum;
+
+ int currentWeight = 0;
+ int index = 0;
+
+ for (int i = 0; i < boundarySize; i++) {
+ while (currentWeight < stepRange && index < sampeNum) {
+ boundaries[i] = sampledData.get(Math.min(index, sampeNum - 1)).f0;
+ int sampleWeight = sampledData.get(index++).f1;
+ currentWeight += sampleWeight;
+ restSize -= sampleWeight;
+ }
+
+ currentWeight = 0;
+ stepRange = restSize / (double) (rangesNum - i - 1);
+ }
+ }
+
+ for (int i = 0; i < boundarySize; i++) {
+ if (boundaries[i] == null) {
+ boundaries[i] = sampledData.get(sampeNum - 1).f0;
+ }
+ }
+
+ return boundaries;
+ }
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
index 4d049ff7241ee..c31aaa1a4c785 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
@@ -146,7 +146,9 @@ public Tuple2 map(RowData value) {
keyTypeInformation,
sampleSize,
rangeNum,
- sinkParallelism)
+ sinkParallelism,
+ valueRowType,
+ options.sortBySize())
.map(
a -> new JoinedRow(convertor.apply(a.f0), new FlinkRowWrapper(a.f1)),
internalRowType)
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
index cadd70e89d359..246f470af68a2 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
@@ -263,22 +263,26 @@ public void testSortCompactionOnEmptyData() throws Exception {
}
private void zorder(List columns) throws Exception {
+ String rangeStrategy = RANDOM.nextBoolean() ? "size" : "quantity";
if (RANDOM.nextBoolean()) {
- createAction("zorder", columns).run();
+ createAction("zorder", rangeStrategy, columns).run();
} else {
- callProcedure("zorder", columns);
+ callProcedure("zorder", rangeStrategy, columns);
}
}
private void order(List columns) throws Exception {
+ String rangeStrategy = RANDOM.nextBoolean() ? "size" : "quantity";
if (RANDOM.nextBoolean()) {
- createAction("order", columns).run();
+ createAction("order", rangeStrategy, columns).run();
} else {
- callProcedure("order", columns);
+ callProcedure("order", rangeStrategy, columns);
}
}
- private SortCompactAction createAction(String orderStrategy, List columns) {
+ private SortCompactAction createAction(
+ String orderStrategy, String rangeStrategy, List columns) {
+
return createAction(
SortCompactAction.class,
"compact",
@@ -291,14 +295,21 @@ private SortCompactAction createAction(String orderStrategy, List column
"--order_strategy",
orderStrategy,
"--order_by",
- String.join(",", columns));
+ String.join(",", columns),
+ "--table_conf sort-compaction.range-strategy=" + rangeStrategy,
+ rangeStrategy);
}
- private void callProcedure(String orderStrategy, List orderByColumns) {
+ private void callProcedure(
+ String orderStrategy, String rangeStrategy, List orderByColumns) {
callProcedure(
String.format(
- "CALL sys.compact('%s.%s', 'ALL', '%s', '%s')",
- database, tableName, orderStrategy, String.join(",", orderByColumns)),
+ "CALL sys.compact('%s.%s', 'ALL', '%s', '%s','sort-compaction.range-strategy=%s')",
+ database,
+ tableName,
+ orderStrategy,
+ String.join(",", orderByColumns),
+ rangeStrategy),
false,
true);
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/shuffle/RangeShuffleTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/shuffle/RangeShuffleTest.java
new file mode 100644
index 0000000000000..cb79a8628180b
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/shuffle/RangeShuffleTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.shuffle;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Test for {@link RangeShuffle}. */
+class RangeShuffleTest {
+
+ @Test
+ void testAllocateRange() {
+
+ // the size of test data is even
+ List> test0 =
+ Lists.newArrayList(
+ // key and size
+ new Tuple2<>(1, 1),
+ new Tuple2<>(2, 1),
+ new Tuple2<>(3, 1),
+ new Tuple2<>(4, 1),
+ new Tuple2<>(5, 1),
+ new Tuple2<>(6, 1));
+ Assertions.assertEquals(
+ "[2, 4]", Arrays.deepToString(RangeShuffle.allocateRangeBaseSize(test0, 3)));
+
+ // the size of test data is uneven,but can be evenly split based size
+ List> test2 =
+ Lists.newArrayList(
+ new Tuple2<>(1, 1),
+ new Tuple2<>(2, 1),
+ new Tuple2<>(3, 1),
+ new Tuple2<>(4, 1),
+ new Tuple2<>(5, 4),
+ new Tuple2<>(6, 4),
+ new Tuple2<>(7, 4));
+ Assertions.assertEquals(
+ "[4, 5, 6]", Arrays.deepToString(RangeShuffle.allocateRangeBaseSize(test2, 4)));
+
+ // the size of test data is uneven,and can not be evenly split
+ List> test1 =
+ Lists.newArrayList(
+ new Tuple2<>(1, 1),
+ new Tuple2<>(2, 2),
+ new Tuple2<>(3, 3),
+ new Tuple2<>(4, 1),
+ new Tuple2<>(5, 2),
+ new Tuple2<>(6, 3));
+
+ Assertions.assertEquals(
+ "[3, 5]", Arrays.deepToString(RangeShuffle.allocateRangeBaseSize(test1, 3)));
+ }
+}