From e00dd415977e744cc1301613c70d57d9ec9dbb69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E7=A5=A5=E5=B9=B3?= Date: Tue, 9 Apr 2024 14:16:39 +0800 Subject: [PATCH 1/3] Balance MonitorFunction data shuffle for partitioned table --- .../source/operator/MonitorFunction.java | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java index 30ac622ce45c..71de5aec08b3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java @@ -31,6 +31,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -45,11 +46,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; -import java.util.NavigableMap; -import java.util.OptionalLong; -import java.util.TreeMap; +import java.util.*; /** * This is the single (non-parallel) monitoring task, it is responsible for: @@ -230,14 +227,21 @@ public static DataStream buildSource( ReadBuilder readBuilder, long monitorInterval, boolean emitSnapshotWatermark) { + KeySelector keySelector = + split -> { + if (Objects.nonNull(((DataSplit) split).partition())) { + return Math.abs(((DataSplit) split).partition().hashCode()) + + ((DataSplit) split).bucket(); + } else { + return ((DataSplit) split).bucket(); + } + }; return env.addSource( new MonitorFunction(readBuilder, monitorInterval, emitSnapshotWatermark), name + "-Monitor", new JavaTypeInfo<>(Split.class)) .forceNonParallel() - .partitionCustom( - (key, numPartitions) -> key % numPartitions, - split -> ((DataSplit) split).bucket()) + .partitionCustom((key, numPartitions) -> key % numPartitions, keySelector) .transform(name + "-Reader", typeInfo, new ReadOperator(readBuilder)); } } From 58c269b08efb86d4ee6a0c27a6b776a8ba3da1fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E7=A5=A5=E5=B9=B3?= Date: Tue, 9 Apr 2024 14:21:42 +0800 Subject: [PATCH 2/3] format code --- .../paimon/flink/source/operator/MonitorFunction.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java index 71de5aec08b3..0a272468667b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java @@ -46,7 +46,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.List; +import java.util.NavigableMap; +import java.util.OptionalLong; +import java.util.TreeMap; /** * This is the single (non-parallel) monitoring task, it is responsible for: @@ -229,7 +233,7 @@ public static DataStream buildSource( boolean emitSnapshotWatermark) { KeySelector keySelector = split -> { - if (Objects.nonNull(((DataSplit) split).partition())) { + if (((DataSplit) split).partition() != null) { return Math.abs(((DataSplit) split).partition().hashCode()) + ((DataSplit) split).bucket(); } else { From 0ee573cf622c78ea3ffb589bce5a9dd66f80403a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E7=A5=A5=E5=B9=B3?= Date: Tue, 9 Apr 2024 17:22:23 +0800 Subject: [PATCH 3/3] use ChannelComputer.select --- .../source/operator/MonitorFunction.java | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java index 0a272468667b..254e3acd20ee 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.source.operator; import org.apache.paimon.flink.utils.JavaTypeInfo; +import org.apache.paimon.table.sink.ChannelComputer; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.EndOfScanException; import org.apache.paimon.table.source.ReadBuilder; @@ -31,7 +32,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -231,21 +231,18 @@ public static DataStream buildSource( ReadBuilder readBuilder, long monitorInterval, boolean emitSnapshotWatermark) { - KeySelector keySelector = - split -> { - if (((DataSplit) split).partition() != null) { - return Math.abs(((DataSplit) split).partition().hashCode()) - + ((DataSplit) split).bucket(); - } else { - return ((DataSplit) split).bucket(); - } - }; return env.addSource( new MonitorFunction(readBuilder, monitorInterval, emitSnapshotWatermark), name + "-Monitor", new JavaTypeInfo<>(Split.class)) .forceNonParallel() - .partitionCustom((key, numPartitions) -> key % numPartitions, keySelector) + .partitionCustom( + (key, numPartitions) -> + ChannelComputer.select(key.f0, key.f1, numPartitions), + split -> { + DataSplit dataSplit = (DataSplit) split; + return Tuple2.of(dataSplit.partition(), dataSplit.bucket()); + }) .transform(name + "-Reader", typeInfo, new ReadOperator(readBuilder)); } }