Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Remove bounded check for append compact topology #3948

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
public class CdcUnawareBucketSink extends UnawareBucketSink<CdcRecord> {

public CdcUnawareBucketSink(FileStoreTable table, Integer parallelism) {
super(table, null, null, parallelism, false);
super(table, null, null, parallelism);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ public class FlinkSinkBuilder {
private DataStream<RowData> input;
@Nullable protected Map<String, String> overwritePartition;
@Nullable protected Integer parallelism;
private Boolean boundedInput = null;
@Nullable private TableSortInfo tableSortInfo;

// ============== for extension ==============
Expand Down Expand Up @@ -131,15 +130,6 @@ public FlinkSinkBuilder parallelism(int parallelism) {
return this;
}

/**
* Set input bounded, if it is bounded, append table sink does not generate a topology for
* merging small files.
*/
public FlinkSinkBuilder inputBounded(boolean bounded) {
this.boundedInput = bounded;
return this;
}

/** Clustering the input data if possible. */
public FlinkSinkBuilder clusteringIfPossible(
String clusteringColumns,
Expand All @@ -152,10 +142,7 @@ public FlinkSinkBuilder clusteringIfPossible(
return this;
}
checkState(input != null, "The input stream should be specified earlier.");
if (boundedInput == null) {
boundedInput = !FlinkSink.isStreaming(input);
}
if (!boundedInput || !table.bucketMode().equals(BUCKET_UNAWARE)) {
if (FlinkSink.isStreaming(input) || !table.bucketMode().equals(BUCKET_UNAWARE)) {
LOG.warn(
"Clustering is enabled; however, it has been skipped as "
+ "it only supports the bucket unaware table without primary keys and "
Expand Down Expand Up @@ -282,11 +269,7 @@ private DataStreamSink<?> buildUnawareBucketSink(DataStream<InternalRow> input)
checkArgument(
table.primaryKeys().isEmpty(),
"Unaware bucket mode only works with append-only table for now.");
if (boundedInput == null) {
boundedInput = !FlinkSink.isStreaming(input);
}
return new RowUnawareBucketSink(
table, overwritePartition, logSinkFunction, parallelism, boundedInput)
return new RowUnawareBucketSink(table, overwritePartition, logSinkFunction, parallelism)
.sinkFrom(input);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
new DataStream<>(
dataStream.getExecutionEnvironment(),
dataStream.getTransformation()))
.inputBounded(context.isBounded())
.clusteringIfPossible(
conf.get(CLUSTERING_COLUMNS),
conf.get(CLUSTERING_STRATEGY),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ public RowUnawareBucketSink(
FileStoreTable table,
Map<String, String> overwritePartitions,
LogSinkFunction logSinkFunction,
Integer parallelism,
boolean boundedInput) {
super(table, overwritePartitions, logSinkFunction, parallelism, boundedInput);
Integer parallelism) {
super(table, overwritePartitions, logSinkFunction, parallelism);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,16 @@ public abstract class UnawareBucketSink<T> extends FlinkWriteSink<T> {
protected final LogSinkFunction logSinkFunction;

@Nullable protected final Integer parallelism;
protected final boolean boundedInput;

public UnawareBucketSink(
FileStoreTable table,
@Nullable Map<String, String> overwritePartitions,
LogSinkFunction logSinkFunction,
@Nullable Integer parallelism,
boolean boundedInput) {
@Nullable Integer parallelism) {
super(table, overwritePartitions);
this.table = table;
this.logSinkFunction = logSinkFunction;
this.parallelism = parallelism;
this.boundedInput = boundedInput;
}

@Override
Expand All @@ -69,7 +66,7 @@ public DataStream<Committable> doWrite(
.get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;
// if enable compaction, we need to add compaction topology to this job
if (enableCompaction && isStreamingMode && !boundedInput) {
if (enableCompaction && isStreamingMode) {
written =
written.transform(
"Compact Coordinator: " + table.name(),
Expand Down
Loading