Skip to content

Commit

Permalink
[flink] Balance MonitorFunction data shuffle for partitioned table (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
wxplovecc authored Apr 9, 2024
1 parent a29c0a6 commit f067979
Showing 1 changed file with 7 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.flink.source.operator;

import org.apache.paimon.flink.utils.JavaTypeInfo;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.ReadBuilder;
Expand Down Expand Up @@ -236,8 +237,12 @@ public static DataStream<RowData> buildSource(
new JavaTypeInfo<>(Split.class))
.forceNonParallel()
.partitionCustom(
(key, numPartitions) -> key % numPartitions,
split -> ((DataSplit) split).bucket())
(key, numPartitions) ->
ChannelComputer.select(key.f0, key.f1, numPartitions),
split -> {
DataSplit dataSplit = (DataSplit) split;
return Tuple2.of(dataSplit.partition(), dataSplit.bucket());
})
.transform(name + "-Reader", typeInfo, new ReadOperator(readBuilder));
}
}

0 comments on commit f067979

Please sign in to comment.