Skip to content

Commit

Permalink
[flink] Disable compaction topology when streaming with bounded input…
Browse files Browse the repository at this point in the history
… stream (#2463)
  • Loading branch information
leaves12138 authored and JingsongLi committed Dec 11, 2023
1 parent 1e96efd commit 624a553
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class FlinkSinkBuilder {
@Nullable private Map<String, String> overwritePartition;
@Nullable private LogSinkFunction logSinkFunction;
@Nullable private Integer parallelism;
private boolean boundedInput = false;
private boolean compactSink = false;

public FlinkSinkBuilder(FileStoreTable table) {
Expand Down Expand Up @@ -79,6 +80,11 @@ public FlinkSinkBuilder withParallelism(@Nullable Integer parallelism) {
return this;
}

public FlinkSinkBuilder withBoundedInputStream(boolean bounded) {
this.boundedInput = bounded;
return this;
}

public FlinkSinkBuilder forCompact(boolean compactSink) {
this.compactSink = compactSink;
return this;
Expand Down Expand Up @@ -142,7 +148,8 @@ private DataStreamSink<?> buildUnawareBucketSink(DataStream<InternalRow> input)
(AppendOnlyFileStoreTable) table,
overwritePartition,
logSinkFunction,
parallelism)
parallelism,
boundedInput)
.sinkFrom(input);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
.withLogSinkFunction(logSinkFunction)
.withOverwritePartition(overwrite ? staticPartitions : null)
.withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM))
.withBoundedInputStream(context.isBounded())
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,19 @@ public class UnawareBucketWriteSink extends FileStoreSink {
private final boolean enableCompaction;
private final AppendOnlyFileStoreTable table;
private final Integer parallelism;
private final boolean boundedInput;

public UnawareBucketWriteSink(
AppendOnlyFileStoreTable table,
Map<String, String> overwritePartitions,
LogSinkFunction logSinkFunction,
Integer parallelism) {
Integer parallelism,
boolean boundedInput) {
super(table, overwritePartitions, logSinkFunction);
this.table = table;
this.enableCompaction = !table.coreOptions().writeOnly();
this.parallelism = parallelism;
this.boundedInput = boundedInput;
}

@Override
Expand All @@ -63,7 +66,8 @@ public DataStreamSink<?> sinkFrom(DataStream<InternalRow> input, String initialC
.get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;
// if enable compaction, we need to add compaction topology to this job
if (enableCompaction && isStreamingMode) {
if (enableCompaction && isStreamingMode && !boundedInput) {
// if streaming mode with bounded input, we disable compaction topology
UnawareBucketCompactionTopoBuilder builder =
new UnawareBucketCompactionTopoBuilder(
input.getExecutionEnvironment(), table.name(), table);
Expand Down

0 comments on commit 624a553

Please sign in to comment.