Skip to content

Commit

Permalink
[core] Remove bounded check for append compact topology (#3948)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Aug 13, 2024
1 parent 65df3f9 commit 037dc54
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
public class CdcUnawareBucketSink extends UnawareBucketSink<CdcRecord> {

public CdcUnawareBucketSink(FileStoreTable table, Integer parallelism) {
super(table, null, null, parallelism, false);
super(table, null, null, parallelism);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ public class FlinkSinkBuilder {
private DataStream<RowData> input;
@Nullable protected Map<String, String> overwritePartition;
@Nullable protected Integer parallelism;
private Boolean boundedInput = null;
@Nullable private TableSortInfo tableSortInfo;

// ============== for extension ==============
Expand Down Expand Up @@ -131,15 +130,6 @@ public FlinkSinkBuilder parallelism(int parallelism) {
return this;
}

/**
* Set input bounded, if it is bounded, append table sink does not generate a topology for
* merging small files.
*/
public FlinkSinkBuilder inputBounded(boolean bounded) {
this.boundedInput = bounded;
return this;
}

/** Clustering the input data if possible. */
public FlinkSinkBuilder clusteringIfPossible(
String clusteringColumns,
Expand All @@ -152,10 +142,7 @@ public FlinkSinkBuilder clusteringIfPossible(
return this;
}
checkState(input != null, "The input stream should be specified earlier.");
if (boundedInput == null) {
boundedInput = !FlinkSink.isStreaming(input);
}
if (!boundedInput || !table.bucketMode().equals(BUCKET_UNAWARE)) {
if (FlinkSink.isStreaming(input) || !table.bucketMode().equals(BUCKET_UNAWARE)) {
LOG.warn(
"Clustering is enabled; however, it has been skipped as "
+ "it only supports the bucket unaware table without primary keys and "
Expand Down Expand Up @@ -282,11 +269,7 @@ private DataStreamSink<?> buildUnawareBucketSink(DataStream<InternalRow> input)
checkArgument(
table.primaryKeys().isEmpty(),
"Unaware bucket mode only works with append-only table for now.");
if (boundedInput == null) {
boundedInput = !FlinkSink.isStreaming(input);
}
return new RowUnawareBucketSink(
table, overwritePartition, logSinkFunction, parallelism, boundedInput)
return new RowUnawareBucketSink(table, overwritePartition, logSinkFunction, parallelism)
.sinkFrom(input);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
new DataStream<>(
dataStream.getExecutionEnvironment(),
dataStream.getTransformation()))
.inputBounded(context.isBounded())
.clusteringIfPossible(
conf.get(CLUSTERING_COLUMNS),
conf.get(CLUSTERING_STRATEGY),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ public RowUnawareBucketSink(
FileStoreTable table,
Map<String, String> overwritePartitions,
LogSinkFunction logSinkFunction,
Integer parallelism,
boolean boundedInput) {
super(table, overwritePartitions, logSinkFunction, parallelism, boundedInput);
Integer parallelism) {
super(table, overwritePartitions, logSinkFunction, parallelism);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,16 @@ public abstract class UnawareBucketSink<T> extends FlinkWriteSink<T> {
protected final LogSinkFunction logSinkFunction;

@Nullable protected final Integer parallelism;
protected final boolean boundedInput;

public UnawareBucketSink(
FileStoreTable table,
@Nullable Map<String, String> overwritePartitions,
LogSinkFunction logSinkFunction,
@Nullable Integer parallelism,
boolean boundedInput) {
@Nullable Integer parallelism) {
super(table, overwritePartitions);
this.table = table;
this.logSinkFunction = logSinkFunction;
this.parallelism = parallelism;
this.boundedInput = boundedInput;
}

@Override
Expand All @@ -69,7 +66,7 @@ public DataStream<Committable> doWrite(
.get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;
// if enable compaction, we need to add compaction topology to this job
if (enableCompaction && isStreamingMode && !boundedInput) {
if (enableCompaction && isStreamingMode) {
written =
written.transform(
"Compact Coordinator: " + table.name(),
Expand Down

0 comments on commit 037dc54

Please sign in to comment.