diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 446931acacda..767c7406b03d 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -605,6 +605,12 @@
Integer |
Max split size should be cached for one task while scanning. If splits size cached in enumerator are greater than tasks size multiply by this value, scanner will pause scanning. |
+
+ scan.shuffle-by-partition |
+ false |
+ Boolean |
+ Whether shuffle by partition and bucket. |
+
scan.mode |
default |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 17bff3653229..5504603c7927 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -310,6 +310,12 @@ public class CoreOptions implements Serializable {
"Max split size should be cached for one task while scanning. "
+ "If splits size cached in enumerator are greater than tasks size multiply by this value, scanner will pause scanning.");
+ public static final ConfigOption SCAN_SHUFFLE_BY_PARTITION =
+ key("scan.shuffle-by-partition")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether shuffle by partition and bucket.");
+
@Immutable
public static final ConfigOption MERGE_ENGINE =
key("merge-engine")
@@ -1595,6 +1601,10 @@ public int scanSplitMaxPerTask() {
return options.get(SCAN_MAX_SPLITS_PER_TASK);
}
+ public boolean scanShuffleByPartition() {
+ return options.get(SCAN_SHUFFLE_BY_PARTITION);
+ }
+
public int localSortMaxNumFileHandles() {
return options.get(LOCAL_SORT_MAX_NUM_FILE_HANDLES);
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 4ec3cd5f3197..51f086621a1b 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -23,6 +23,7 @@
import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner;
import org.apache.paimon.flink.source.assigners.SplitAssigner;
import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.SnapshotNotExistPlan;
@@ -81,6 +82,8 @@ public class ContinuousFileSplitEnumerator
private boolean stopTriggerScan = false;
+ private boolean shuffleByPartition = false;
+
public ContinuousFileSplitEnumerator(
SplitEnumeratorContext context,
Collection remainSplits,
@@ -88,7 +91,8 @@ public ContinuousFileSplitEnumerator(
long discoveryInterval,
StreamTableScan scan,
BucketMode bucketMode,
- int splitMaxPerTask) {
+ int splitMaxPerTask,
+ boolean shuffleByPartition) {
checkArgument(discoveryInterval > 0L);
this.context = checkNotNull(context);
this.nextSnapshotId = nextSnapshotId;
@@ -98,6 +102,7 @@ public ContinuousFileSplitEnumerator(
this.scan = scan;
this.splitAssigner = createSplitAssigner(bucketMode);
this.splitMaxNum = context.currentParallelism() * splitMaxPerTask;
+ this.shuffleByPartition = shuffleByPartition;
addSplits(remainSplits);
this.consumerProgressCalculator =
@@ -275,7 +280,12 @@ protected synchronized void assignSplits() {
}
protected int assignSuggestedTask(FileStoreSourceSplit split) {
- return ((DataSplit) split.split()).bucket() % context.currentParallelism();
+ DataSplit dataSplit = ((DataSplit) split.split());
+ if (shuffleByPartition) {
+ return ChannelComputer.select(
+ dataSplit.partition(), dataSplit.bucket(), context.currentParallelism());
+ }
+ return dataSplit.bucket() % context.currentParallelism();
}
protected SplitAssigner createSplitAssigner(BucketMode bucketMode) {
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index ca61ab4eb07d..e7b553951f76 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -107,6 +107,7 @@ protected SplitEnumerator buildEn
coreOptions.continuousDiscoveryInterval().toMillis(),
scan,
bucketMode,
- coreOptions.scanSplitMaxPerTask());
+ coreOptions.scanSplitMaxPerTask(),
+ coreOptions.scanShuffleByPartition());
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
index e3e0aabb6f73..a038a2e02a6c 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
@@ -94,7 +94,8 @@ public AlignedContinuousFileSplitEnumerator(
StreamTableScan scan,
BucketMode bucketMode,
long alignTimeout,
- int splitPerTaskMax) {
+ int splitPerTaskMax,
+ boolean shuffleByPartition) {
super(
context,
remainSplits,
@@ -102,7 +103,8 @@ public AlignedContinuousFileSplitEnumerator(
discoveryInterval,
scan,
bucketMode,
- splitPerTaskMax);
+ splitPerTaskMax,
+ shuffleByPartition);
this.pendingPlans = new ArrayBlockingQueue<>(MAX_PENDING_PLAN);
this.alignedAssigner = (AlignedSplitAssigner) super.splitAssigner;
this.nextSnapshotId = nextSnapshotId;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
index c00b7ea51d26..5a23985911d8 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
@@ -91,6 +91,7 @@ protected SplitEnumerator buildEn
scan,
bucketMode,
options.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT).toMillis(),
- options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK));
+ options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
+ options.get(CoreOptions.SCAN_SHUFFLE_BY_PARTITION));
}
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index b0077ec1b391..3a10f9c8d5b3 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -878,7 +878,7 @@ public Builder withBucketMode(BucketMode bucketMode) {
public ContinuousFileSplitEnumerator build() {
return new ContinuousFileSplitEnumerator(
- context, initialSplits, null, discoveryInterval, scan, bucketMode, 10);
+ context, initialSplits, null, discoveryInterval, scan, bucketMode, 10, false);
}
}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
index c164e00da157..a5edc2804061 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
@@ -245,7 +245,15 @@ public Builder setAlignedTimeout(long timeout) {
public AlignedContinuousFileSplitEnumerator build() {
return new AlignedContinuousFileSplitEnumerator(
- context, initialSplits, null, discoveryInterval, scan, bucketMode, timeout, 10);
+ context,
+ initialSplits,
+ null,
+ discoveryInterval,
+ scan,
+ bucketMode,
+ timeout,
+ 10,
+ false);
}
}
}