diff --git a/docs/content/maintenance/dedicated-compaction.md b/docs/content/maintenance/dedicated-compaction.md index 25eec7e0cef4..017d2bc36a91 100644 --- a/docs/content/maintenance/dedicated-compaction.md +++ b/docs/content/maintenance/dedicated-compaction.md @@ -93,6 +93,7 @@ Run the following command to submit a compaction job for the table. --database \ --table \ [--partition ] \ + [--table-conf ] \ [--catalog-conf [--catalog-conf ...]] \ ``` @@ -107,6 +108,7 @@ Example: compact table --table test_table \ --partition dt=20221126,hh=08 \ --partition dt=20221127,hh=09 \ + --table-conf sink.parallelism=10 \ --catalog-conf s3.endpoint=https://****.com \ --catalog-conf s3.access-key=***** \ --catalog-conf s3.secret-key=***** diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java index d523d0dc8a53..350532dbd032 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.sink; +import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; @@ -25,6 +26,8 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.table.data.RowData; +import java.util.Optional; + import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition; /** Builder for {@link CompactorSink}. */ @@ -56,7 +59,13 @@ public DataStreamSink build() { } private DataStreamSink buildForBucketAware() { - DataStream partitioned = partition(input, new BucketsRowChannelComputer(), null); + Integer parallelism = + Optional.ofNullable( + table.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key())) + .map(Integer::valueOf) + .orElse(null); + DataStream partitioned = + partition(input, new BucketsRowChannelComputer(), parallelism); return new CompactorSink(table).sinkFrom(partitioned); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java index 11ad726d66d1..b9571f6b3107 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java @@ -21,6 +21,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; +import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.source.CompactorSourceBuilder; import org.apache.paimon.flink.util.AbstractTestBase; import org.apache.paimon.fs.Path; @@ -45,6 +46,7 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.data.RowData; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -54,6 +56,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; @@ -134,6 +137,36 @@ public void testCompact() throws Exception { } } + @Test + public void testCompactParallelism() throws Exception { + FileStoreTable table = createFileStoreTable(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + CompactorSourceBuilder sourceBuilder = + new CompactorSourceBuilder(tablePath.toString(), table); + DataStreamSource source = + sourceBuilder + .withEnv(env) + .withContinuousMode(false) + .withPartitions(getSpecifiedPartitions()) + .build(); + Integer sinkParalellism = new Random().nextInt(100); + new CompactorSinkBuilder( + table.copy( + new HashMap() { + { + put( + FlinkConnectorOptions.SINK_PARALLELISM.key(), + String.valueOf(sinkParalellism)); + } + })) + .withInput(source) + .build(); + + Assertions.assertThat(env.getTransformations().get(0).getParallelism()) + .isEqualTo(sinkParalellism); + } + private List> getSpecifiedPartitions() { Map partition1 = new HashMap<>(); partition1.put("dt", "20221208");