diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 7d6db0ce6e28..5959b6b4f096 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -569,6 +569,12 @@
Duration |
In watermarking, if a source remains idle beyond the specified timeout duration, it triggers snapshot advancement and facilitates tag creation. |
+
+ sort-compaction.local-sample.magnification |
+ 1000 |
+ Integer |
+ The magnification of local sample for sort-compaction.The size of local sample is sink parallelism * magnification. |
+
sort-compaction.range-strategy |
QUANTITY |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index c6fde0f3ed8b..cbe5f45c5b50 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1076,6 +1076,12 @@ public class CoreOptions implements Serializable {
+ "If the data size allocated for the sorting task is uneven,which may lead to performance bottlenecks, "
+ "the config can be set to size.");
+ public static final ConfigOption SORT_COMPACTION_SAMPLE_MAGNIFICATION =
+ key("sort-compaction.local-sample.magnification")
+ .intType()
+ .defaultValue(1000)
+ .withDescription(
+ "The magnification of local sample for sort-compaction.The size of local sample is sink parallelism * magnification.");
private final Options options;
public CoreOptions(Map options) {
@@ -1146,6 +1152,10 @@ public boolean sortBySize() {
return options.get(SORT_RANG_STRATEGY) == RangeStrategy.SIZE;
}
+ public Integer getLocalSampleMagnification() {
+ return options.get(SORT_COMPACTION_SAMPLE_MAGNIFICATION);
+ }
+
public static FileFormat createFileFormat(
Options options, ConfigOption formatOption) {
String formatIdentifier = options.get(formatOption).toString();
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
index 9c67e88559aa..1883890ea0e7 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/shuffle/RangeShuffle.java
@@ -96,7 +96,8 @@ public static DataStream> rangeShuffleByKey(
DataStream> inputDataStream,
SerializableSupplier> keyComparator,
TypeInformation keyTypeInformation,
- int sampleSize,
+ int localSampleSize,
+ int globalSampleSize,
int rangeNum,
int outParallelism,
RowType valueRowType,
@@ -116,7 +117,7 @@ public static DataStream> rangeShuffleByKey(
new OneInputTransformation<>(
keyInput,
"LOCAL SAMPLE",
- new LocalSampleOperator<>(sampleSize),
+ new LocalSampleOperator<>(localSampleSize),
new TupleTypeInfo<>(
BasicTypeInfo.DOUBLE_TYPE_INFO,
keyTypeInformation,
@@ -128,7 +129,7 @@ public static DataStream> rangeShuffleByKey(
new OneInputTransformation<>(
localSample,
"GLOBAL SAMPLE",
- new GlobalSampleOperator<>(sampleSize, keyComparator, rangeNum),
+ new GlobalSampleOperator<>(globalSampleSize, keyComparator, rangeNum),
new ListTypeInfo<>(keyTypeInformation),
1);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
index 00a1475afbad..9a1dbb729b9e 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java
@@ -99,9 +99,17 @@ public static DataStream sortStreamByKey(
"The adaptive batch scheduler is not supported. Please set the sink parallelism using the key: "
+ FlinkConnectorOptions.SINK_PARALLELISM.key());
}
- final int sampleSize = sinkParallelism * 1000;
+ int localSampleMagnification = options.getLocalSampleMagnification();
+ if (localSampleMagnification < 20) {
+ throw new IllegalArgumentException(
+ String.format(
+ "the config '%s=%d' should not be set too small,greater than or equal to 20 is needed.",
+ CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(),
+ localSampleMagnification));
+ }
+ final int localSampleSize = sinkParallelism * localSampleMagnification;
+ final int globalSampleSize = sinkParallelism * 1000;
final int rangeNum = sinkParallelism * 10;
-
int keyFieldCount = sortKeyType.getFieldCount();
int valueFieldCount = valueRowType.getFieldCount();
final int[] valueProjectionMap = new int[valueFieldCount];
@@ -144,7 +152,8 @@ public Tuple2 map(RowData value) {
inputWithKey,
shuffleKeyComparator,
keyTypeInformation,
- sampleSize,
+ localSampleSize,
+ globalSampleSize,
rangeNum,
sinkParallelism,
valueRowType,
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
index 317661feb8db..b41da234eba7 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForUnawareBucketITCase.java
@@ -39,6 +39,8 @@
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -347,22 +349,55 @@ private void order(List columns) throws Exception {
private SortCompactAction createAction(
String orderStrategy, String rangeStrategy, List columns) {
+ return createAction(orderStrategy, rangeStrategy, columns, Lists.newArrayList());
+ }
+
+ private SortCompactAction createAction(
+ String orderStrategy,
+ String rangeStrategy,
+ List columns,
+ List extraConfigs) {
+ ArrayList args =
+ Lists.newArrayList(
+ "compact",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--order_strategy",
+ orderStrategy,
+ "--order_by",
+ String.join(",", columns),
+ "--table_conf",
+ "sort-compaction.range-strategy=" + rangeStrategy);
+ args.addAll(extraConfigs);
+ return createAction(SortCompactAction.class, args.toArray(new String[0]));
+ }
- return createAction(
- SortCompactAction.class,
- "compact",
- "--warehouse",
- warehouse,
- "--database",
- database,
- "--table",
- tableName,
- "--order_strategy",
- orderStrategy,
- "--order_by",
- String.join(",", columns),
- "--table_conf sort-compaction.range-strategy=" + rangeStrategy,
- rangeStrategy);
+ @Test
+ public void testvalidSampleConfig() throws Exception {
+ prepareData(300, 1);
+ {
+ ArrayList extraCompactionConfig =
+ Lists.newArrayList(
+ "--table_conf", "sort-compaction.local-sample.magnification=1");
+ Assertions.assertThatCode(
+ () -> {
+ createAction(
+ "order",
+ "size",
+ Arrays.asList(
+ "f0", "f1", "f2", "f3", "f4", "f5", "f6",
+ "f7", "f8", "f9", "f10", "f11", "f12",
+ "f13", "f14", "f15"),
+ extraCompactionConfig)
+ .run();
+ })
+ .hasMessage(
+ "the config 'sort-compaction.local-sample.magnification=1' should not be set too small,greater than or equal to 20 is needed.");
+ }
}
private void createTable() throws Exception {