diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcUnawareBucketSink.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcUnawareBucketSink.java index 5cd3d6085f5a..c60772d10b16 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcUnawareBucketSink.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcUnawareBucketSink.java @@ -29,7 +29,7 @@ public class CdcUnawareBucketSink extends UnawareBucketSink { public CdcUnawareBucketSink(FileStoreTable table, Integer parallelism) { - super(table, null, null, parallelism, false); + super(table, null, null, parallelism); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java index dd73bd5590b6..546f82ec1f84 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java @@ -76,7 +76,6 @@ public class FlinkSinkBuilder { private DataStream input; @Nullable protected Map overwritePartition; @Nullable protected Integer parallelism; - private Boolean boundedInput = null; @Nullable private TableSortInfo tableSortInfo; // ============== for extension ============== @@ -131,15 +130,6 @@ public FlinkSinkBuilder parallelism(int parallelism) { return this; } - /** - * Set input bounded, if it is bounded, append table sink does not generate a topology for - * merging small files. - */ - public FlinkSinkBuilder inputBounded(boolean bounded) { - this.boundedInput = bounded; - return this; - } - /** Clustering the input data if possible. */ public FlinkSinkBuilder clusteringIfPossible( String clusteringColumns, @@ -152,10 +142,7 @@ public FlinkSinkBuilder clusteringIfPossible( return this; } checkState(input != null, "The input stream should be specified earlier."); - if (boundedInput == null) { - boundedInput = !FlinkSink.isStreaming(input); - } - if (!boundedInput || !table.bucketMode().equals(BUCKET_UNAWARE)) { + if (FlinkSink.isStreaming(input) || !table.bucketMode().equals(BUCKET_UNAWARE)) { LOG.warn( "Clustering is enabled; however, it has been skipped as " + "it only supports the bucket unaware table without primary keys and " @@ -282,11 +269,7 @@ private DataStreamSink buildUnawareBucketSink(DataStream input) checkArgument( table.primaryKeys().isEmpty(), "Unaware bucket mode only works with append-only table for now."); - if (boundedInput == null) { - boundedInput = !FlinkSink.isStreaming(input); - } - return new RowUnawareBucketSink( - table, overwritePartition, logSinkFunction, parallelism, boundedInput) + return new RowUnawareBucketSink(table, overwritePartition, logSinkFunction, parallelism) .sinkFrom(input); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java index ebe1c4c51cd9..e719b9a77af3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java @@ -135,7 +135,6 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { new DataStream<>( dataStream.getExecutionEnvironment(), dataStream.getTransformation())) - .inputBounded(context.isBounded()) .clusteringIfPossible( conf.get(CLUSTERING_COLUMNS), conf.get(CLUSTERING_STRATEGY), diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java index d7b8c76cfc40..b670b905d587 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java @@ -32,9 +32,8 @@ public RowUnawareBucketSink( FileStoreTable table, Map overwritePartitions, LogSinkFunction logSinkFunction, - Integer parallelism, - boolean boundedInput) { - super(table, overwritePartitions, logSinkFunction, parallelism, boundedInput); + Integer parallelism) { + super(table, overwritePartitions, logSinkFunction, parallelism); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java index c4ac9990668a..7c79d53b61c4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java @@ -42,19 +42,16 @@ public abstract class UnawareBucketSink extends FlinkWriteSink { protected final LogSinkFunction logSinkFunction; @Nullable protected final Integer parallelism; - protected final boolean boundedInput; public UnawareBucketSink( FileStoreTable table, @Nullable Map overwritePartitions, LogSinkFunction logSinkFunction, - @Nullable Integer parallelism, - boolean boundedInput) { + @Nullable Integer parallelism) { super(table, overwritePartitions); this.table = table; this.logSinkFunction = logSinkFunction; this.parallelism = parallelism; - this.boundedInput = boundedInput; } @Override @@ -69,7 +66,7 @@ public DataStream doWrite( .get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING; // if enable compaction, we need to add compaction topology to this job - if (enableCompaction && isStreamingMode && !boundedInput) { + if (enableCompaction && isStreamingMode) { written = written.transform( "Compact Coordinator: " + table.name(),