diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index f810c464b58c2..e483e3c19f740 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -24,7 +24,6 @@ import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; -import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SerializableRunnable; @@ -48,10 +47,8 @@ import javax.annotation.Nullable; import java.io.Serializable; -import java.util.ArrayList; import java.util.HashSet; import java.util.LinkedList; -import java.util.List; import java.util.Queue; import java.util.Set; @@ -225,13 +222,6 @@ public DataStream doWrite( commitUser)) .setParallelism(parallelism == null ? input.getParallelism() : parallelism); - boolean writeMCacheEnabled = table.coreOptions().writeManifestCache().getBytes() > 0; - boolean hashDynamicMode = table.bucketMode() == BucketMode.HASH_DYNAMIC; - if (!isStreaming && (writeMCacheEnabled || hashDynamicMode)) { - assertBatchAdaptiveParallelism( - env, written.getParallelism(), writeMCacheEnabled, hashDynamicMode); - } - Options options = Options.fromMap(table.options()); if (options.get(SINK_USE_MANAGED_MEMORY)) { declareManagedMemory(written, options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY)); @@ -328,26 +318,6 @@ public static void assertBatchAdaptiveParallelism( assertBatchAdaptiveParallelism(env, sinkParallelism, msg); } - public static void assertBatchAdaptiveParallelism( - StreamExecutionEnvironment env, - int sinkParallelism, - boolean writeMCacheEnabled, - boolean hashDynamicMode) { - List messages = new ArrayList<>(); - if (writeMCacheEnabled) { - messages.add("Write Manifest Cache"); - } - if (hashDynamicMode) { - messages.add("Dynamic Bucket Mode"); - } - String msg = - String.format( - "Paimon Sink with %s does not support Flink's Adaptive Parallelism mode. " - + "Please manually turn it off or set Paimon `sink.parallelism` manually.", - messages); - assertBatchAdaptiveParallelism(env, sinkParallelism, msg); - } - public static void assertBatchAdaptiveParallelism( StreamExecutionEnvironment env, int sinkParallelism, String exceptionMsg) { try { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java index 546f82ec1f843..b2fa2f95ff552 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java @@ -44,12 +44,14 @@ import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import static org.apache.flink.configuration.CoreOptions.DEFAULT_PARALLELISM; import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR; import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_STRATEGY; import static org.apache.paimon.flink.FlinkConnectorOptions.MIN_CLUSTERING_SAMPLE_FACTOR; @@ -208,6 +210,7 @@ public FlinkSinkBuilder clusteringIfPossible( /** Build {@link DataStreamSink}. */ public DataStreamSink build() { + parallelism = checkAndUpdateParallelism(input, parallelism); input = trySortInput(input); DataStream input = mapToInternalRow(this.input, table.rowType()); if (table.coreOptions().localMergeEnabled() && table.schema().primaryKeys().size() > 0) { @@ -282,4 +285,39 @@ private DataStream trySortInput(DataStream input) { } return input; } + + private Integer checkAndUpdateParallelism(DataStream input, Integer parallelism) { + try { + boolean parallelismUndefined = parallelism == null || parallelism == -1; + boolean isStreaming = FlinkSink.isStreaming(input); + boolean isAdaptiveParallelismEnabled = + AdaptiveParallelism.isEnabled(input.getExecutionEnvironment()); + boolean writeMCacheEnabled = table.coreOptions().writeManifestCache().getBytes() > 0; + boolean hashDynamicMode = table.bucketMode() == BucketMode.HASH_DYNAMIC; + if (parallelismUndefined + && isStreaming + && isAdaptiveParallelismEnabled + && (writeMCacheEnabled || hashDynamicMode)) { + List messages = new ArrayList<>(); + if (writeMCacheEnabled) { + messages.add("Write Manifest Cache"); + } + if (hashDynamicMode) { + messages.add("Dynamic Bucket Mode"); + } + String msg = + String.format( + "Paimon Sink with %s does not support Flink's Adaptive Parallelism mode. " + + "Configuring sink parallelism to `%s` instead. You can also set Paimon " + + "`sink.parallelism` manually to override this configuration.", + messages, DEFAULT_PARALLELISM.key()); + LOG.warn(msg); + return input.getExecutionEnvironment().getConfiguration().get(DEFAULT_PARALLELISM); + } + return parallelism; + } catch (NoClassDefFoundError ignored) { + // before 1.17, there is no adaptive parallelism + return parallelism; + } + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index eea8e3a3c2171..c60443a128338 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -53,30 +53,22 @@ protected List ddl() { public void testAQEWithWriteManifest() { batchSql("ALTER TABLE T SET ('write-manifest-cache' = '1 mb')"); batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)"); - assertThatThrownBy(() -> batchSql("INSERT INTO T SELECT a, b, c FROM T GROUP BY a,b,c")) - .hasMessageContaining( - "Paimon Sink with [Write Manifest Cache] does not support Flink's Adaptive Parallelism mode."); - - // work fine - batchSql( - "INSERT INTO T /*+ OPTIONS('sink.parallelism'='1') */ SELECT a, b, c FROM T GROUP BY a,b,c"); - - // work fine too - batchSql("ALTER TABLE T SET ('write-manifest-cache' = '0 b')"); batchSql("INSERT INTO T SELECT a, b, c FROM T GROUP BY a,b,c"); + assertThat(batchSql("SELECT * FROM T")) + .containsExactlyInAnyOrder( + Row.of(1, 11, 111), + Row.of(2, 22, 222), + Row.of(1, 11, 111), + Row.of(2, 22, 222)); } @Test public void testAQEWithDynamicBucket() { batchSql("CREATE TABLE IF NOT EXISTS D_T (a INT PRIMARY KEY NOT ENFORCED, b INT, c INT)"); batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)"); - assertThatThrownBy(() -> batchSql("INSERT INTO D_T SELECT a, b, c FROM T GROUP BY a,b,c")) - .hasMessageContaining( - "Paimon Sink with [Dynamic Bucket Mode] does not support Flink's Adaptive Parallelism mode."); - - // work fine - batchSql( - "INSERT INTO D_T /*+ OPTIONS('sink.parallelism'='1') */ SELECT a, b, c FROM T GROUP BY a,b,c"); + batchSql("INSERT INTO D_T SELECT a, b, c FROM T GROUP BY a,b,c"); + assertThat(batchSql("SELECT * FROM D_T")) + .containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 222)); } @Test