From 8484bb4a25e5b1873cad1716bb6076d6f60913ed Mon Sep 17 00:00:00 2001 From: HunterXHunter Date: Sat, 7 Dec 2024 00:18:51 +0800 Subject: [PATCH] [flink] Optimizing parallelism for fixed bucekt and non-partitioned table (#4643) --- .../org/apache/paimon/flink/sink/FlinkSinkBuilder.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java index 5703c408243b..ecaa5678dd0b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java @@ -265,6 +265,16 @@ protected DataStreamSink buildDynamicBucketSink( } protected DataStreamSink buildForFixedBucket(DataStream input) { + int bucketNums = table.bucketSpec().getNumBuckets(); + if (parallelism == null + && bucketNums < input.getParallelism() + && table.partitionKeys().isEmpty()) { + // For non-partitioned table, if the bucketNums is less than job parallelism. + LOG.warn( + "For non-partitioned table, if bucketNums is less than the parallelism of inputOperator," + + " then the parallelism of writerOperator will be set to bucketNums."); + parallelism = bucketNums; + } DataStream partitioned = partition( input,