diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java index 56ead8a8584e..46a0c09fad30 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java @@ -44,6 +44,7 @@ public class FlinkSinkBuilder { @Nullable private Map overwritePartition; @Nullable private LogSinkFunction logSinkFunction; @Nullable private Integer parallelism; + private boolean boundedInput = false; private boolean compactSink = false; public FlinkSinkBuilder(FileStoreTable table) { @@ -79,6 +80,11 @@ public FlinkSinkBuilder withParallelism(@Nullable Integer parallelism) { return this; } + public FlinkSinkBuilder withBoundedInputStream(boolean bounded) { + this.boundedInput = bounded; + return this; + } + public FlinkSinkBuilder forCompact(boolean compactSink) { this.compactSink = compactSink; return this; @@ -142,7 +148,8 @@ private DataStreamSink buildUnawareBucketSink(DataStream input) (AppendOnlyFileStoreTable) table, overwritePartition, logSinkFunction, - parallelism) + parallelism, + boundedInput) .sinkFrom(input); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java index be6628bde95e..3874e2ca7e1a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java @@ -133,6 +133,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { .withLogSinkFunction(logSinkFunction) .withOverwritePartition(overwrite ? staticPartitions : null) .withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM)) + .withBoundedInputStream(context.isBounded()) .build()); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java index 856432582f7c..802a98dcf222 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java @@ -40,16 +40,19 @@ public class UnawareBucketWriteSink extends FileStoreSink { private final boolean enableCompaction; private final AppendOnlyFileStoreTable table; private final Integer parallelism; + private final boolean boundedInput; public UnawareBucketWriteSink( AppendOnlyFileStoreTable table, Map overwritePartitions, LogSinkFunction logSinkFunction, - Integer parallelism) { + Integer parallelism, + boolean boundedInput) { super(table, overwritePartitions, logSinkFunction); this.table = table; this.enableCompaction = !table.coreOptions().writeOnly(); this.parallelism = parallelism; + this.boundedInput = boundedInput; } @Override @@ -63,7 +66,8 @@ public DataStreamSink sinkFrom(DataStream input, String initialC .get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING; // if enable compaction, we need to add compaction topology to this job - if (enableCompaction && isStreamingMode) { + if (enableCompaction && isStreamingMode && !boundedInput) { + // if streaming mode with bounded input, we disable compaction topology UnawareBucketCompactionTopoBuilder builder = new UnawareBucketCompactionTopoBuilder( input.getExecutionEnvironment(), table.name(), table);