From 6f4aa51b02f940838fc02da03a9a9cae6ac23e12 Mon Sep 17 00:00:00 2001 From: zhourui999 <40812600+zhourui999@users.noreply.github.com> Date: Wed, 24 Jul 2024 16:42:20 +0800 Subject: [PATCH] [flink] Improve the parallelism of Flink database compaction commit operator (#3789) --- .../flink/sink/cdc/FlinkCdcMultiTableSink.java | 1 + .../flink/sink/CombinedTableCompactorSink.java | 12 +++++++++--- .../sink}/MultiTableCommittableChannelComputer.java | 3 +-- 3 files changed, 11 insertions(+), 5 deletions(-) rename paimon-flink/{paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc => paimon-flink-common/src/main/java/org/apache/paimon/flink/sink}/MultiTableCommittableChannelComputer.java (94%) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java index 535c3d255f5a..55e987c6055f 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java @@ -25,6 +25,7 @@ import org.apache.paimon.flink.sink.FlinkSink; import org.apache.paimon.flink.sink.FlinkStreamPartitioner; import org.apache.paimon.flink.sink.MultiTableCommittable; +import org.apache.paimon.flink.sink.MultiTableCommittableChannelComputer; import org.apache.paimon.flink.sink.MultiTableCommittableTypeInfo; import org.apache.paimon.flink.sink.RestoreAndFailCommittableStateManager; import org.apache.paimon.flink.sink.StoreMultiCommitter; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java index df5396318378..ad3d57771d79 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java @@ -142,8 +142,15 @@ protected DataStreamSink doCommit( if (streamingCheckpointEnabled) { assertStreamingConfiguration(env); } + + DataStream partitioned = + FlinkStreamPartitioner.partition( + written, + new MultiTableCommittableChannelComputer(), + written.getParallelism()); SingleOutputStreamOperator committed = - written.transform( + partitioned + .transform( GLOBAL_COMMITTER_NAME, new MultiTableCommittableTypeInfo(), new CommitterOperator<>( @@ -154,8 +161,7 @@ protected DataStreamSink doCommit( createCommitterFactory(), createCommittableStateManager(), options.get(END_INPUT_WATERMARK))) - .setParallelism(1) - .setMaxParallelism(1); + .setParallelism(written.getParallelism()); return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableCommittableChannelComputer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableChannelComputer.java similarity index 94% rename from paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableCommittableChannelComputer.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableChannelComputer.java index c12259204922..405c6af271c4 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableCommittableChannelComputer.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCommittableChannelComputer.java @@ -16,9 +16,8 @@ * limitations under the License. */ -package org.apache.paimon.flink.sink.cdc; +package org.apache.paimon.flink.sink; -import org.apache.paimon.flink.sink.MultiTableCommittable; import org.apache.paimon.table.sink.ChannelComputer; import java.util.Objects;