diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java index e6c301ded64d..7926fa60a566 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BucketUnawareCompactSource.java @@ -24,6 +24,7 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.source.EndOfScanException; +import org.apache.paimon.utils.Preconditions; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.configuration.Configuration; @@ -74,6 +75,9 @@ public BucketUnawareCompactSource( @Override public void open(Configuration parameters) throws Exception { compactionCoordinator = new AppendOnlyTableCompactionCoordinator(table, streaming, filter); + Preconditions.checkArgument( + this.getRuntimeContext().getNumberOfParallelSubtasks() == 1, + "Compaction Operator parallelism in paimon MUST be one."); } @Override @@ -120,12 +124,15 @@ public static DataStreamSource buildSource( String tableIdentifier) { final StreamSource sourceOperator = new StreamSource<>(source); - return new DataStreamSource<>( - env, - new CompactionTaskTypeInfo(), - sourceOperator, - false, - COMPACTION_COORDINATOR_NAME + " : " + tableIdentifier, - streaming ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED); + return (DataStreamSource) + new DataStreamSource<>( + env, + new CompactionTaskTypeInfo(), + sourceOperator, + false, + COMPACTION_COORDINATOR_NAME + " : " + tableIdentifier, + streaming ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED) + .setParallelism(1) + .setMaxParallelism(1); } }