From 632a57b1ddef2e4f061902ca0ea0ea2fdd04117c Mon Sep 17 00:00:00 2001 From: LinMingQiang Date: Thu, 5 Dec 2024 00:00:06 +0800 Subject: [PATCH 1/2] [flink] Optimizing parallelism for fixed bucekt and non-partitioned table. --- .../org/apache/paimon/flink/sink/FlinkSinkBuilder.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java index 5703c408243b..7e2781b3e7a9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java @@ -265,6 +265,15 @@ protected DataStreamSink buildDynamicBucketSink( } protected DataStreamSink buildForFixedBucket(DataStream input) { + int bucketNums = table.bucketSpec().getNumBuckets(); + if (parallelism == null + && bucketNums < input.getParallelism() + && table.partitionKeys().isEmpty()) { + // For non-partitioned table, if the bucketNums is less than job parallelism. + LOG.warn( + "For non-partitioned table, the writerOperator's parallelism will be set to bucketNums if the bucketNums is less than inputOperator's parallelism."); + parallelism = bucketNums; + } DataStream partitioned = partition( input, From 4c14a2a6f06f253968945b0558c90ced0fb33170 Mon Sep 17 00:00:00 2001 From: LinMingQiang Date: Thu, 5 Dec 2024 00:15:21 +0800 Subject: [PATCH 2/2] [flink] Optimizing parallelism for fixed bucekt and non-partitioned table. --- .../java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java index 7e2781b3e7a9..ecaa5678dd0b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java @@ -271,7 +271,8 @@ protected DataStreamSink buildForFixedBucket(DataStream input) { && table.partitionKeys().isEmpty()) { // For non-partitioned table, if the bucketNums is less than job parallelism. LOG.warn( - "For non-partitioned table, the writerOperator's parallelism will be set to bucketNums if the bucketNums is less than inputOperator's parallelism."); + "For non-partitioned table, if bucketNums is less than the parallelism of inputOperator," + + " then the parallelism of writerOperator will be set to bucketNums."); parallelism = bucketNums; } DataStream partitioned =