diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java index 30ac622ce45c..254e3acd20ee 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.source.operator; import org.apache.paimon.flink.utils.JavaTypeInfo; +import org.apache.paimon.table.sink.ChannelComputer; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.EndOfScanException; import org.apache.paimon.table.source.ReadBuilder; @@ -236,8 +237,12 @@ public static DataStream buildSource( new JavaTypeInfo<>(Split.class)) .forceNonParallel() .partitionCustom( - (key, numPartitions) -> key % numPartitions, - split -> ((DataSplit) split).bucket()) + (key, numPartitions) -> + ChannelComputer.select(key.f0, key.f1, numPartitions), + split -> { + DataSplit dataSplit = (DataSplit) split; + return Tuple2.of(dataSplit.partition(), dataSplit.bucket()); + }) .transform(name + "-Reader", typeInfo, new ReadOperator(readBuilder)); } }