Skip to content

Commit

Permalink
[flink] Refactor compactorSink to support extended compact type. (#4569)
Browse files Browse the repository at this point in the history
  • Loading branch information
LinMingQiang authored Nov 22, 2024
1 parent f0c3645 commit c907544
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ private void buildForTraditionalCompaction(
}
CompactorSourceBuilder sourceBuilder =
new CompactorSourceBuilder(identifier.getFullName(), table);
CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table);
CompactorSinkBuilder sinkBuilder =
new CompactorSinkBuilder(table).withFullCompaction(!isStreaming);

sourceBuilder.withPartitionPredicate(getPredicate());
DataStreamSource<RowData> source =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ private void buildForTraditionalCompaction(
CompactorSourceBuilder sourceBuilder =
new CompactorSourceBuilder(fullName, table)
.withPartitionIdleTime(partitionIdleTime);
CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table);
CompactorSinkBuilder sinkBuilder =
new CompactorSinkBuilder(table).withFullCompaction(!isStreaming);

DataStreamSource<RowData> source =
sourceBuilder.withEnv(env).withContinuousMode(isStreaming).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,17 @@ public class CompactorSink extends FlinkSink<RowData> {

private static final long serialVersionUID = 1L;

public CompactorSink(FileStoreTable table) {
private final boolean fullCompaction;

public CompactorSink(FileStoreTable table, boolean fullCompaction) {
super(table, false);
this.fullCompaction = fullCompaction;
}

@Override
protected OneInputStreamOperator<RowData, Committable> createWriteOperator(
StoreSinkWrite.Provider writeProvider, String commitUser) {
return new StoreCompactOperator(table, writeProvider, commitUser);
return new StoreCompactOperator(table, writeProvider, commitUser, fullCompaction);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class CompactorSinkBuilder {

private DataStream<RowData> input;

private boolean fullCompaction;

public CompactorSinkBuilder(FileStoreTable table) {
this.table = table;
}
Expand All @@ -46,6 +48,11 @@ public CompactorSinkBuilder withInput(DataStream<RowData> input) {
return this;
}

public CompactorSinkBuilder withFullCompaction(boolean fullCompaction) {
this.fullCompaction = fullCompaction;
return this;
}

public DataStreamSink<?> build() {
BucketMode bucketMode = table.bucketMode();
switch (bucketMode) {
Expand All @@ -66,6 +73,6 @@ private DataStreamSink<?> buildForBucketAware() {
.orElse(null);
DataStream<RowData> partitioned =
partition(input, new BucketsRowChannelComputer(), parallelism);
return new CompactorSink(table).sinkFrom(partitioned);
return new CompactorSink(table, fullCompaction).sinkFrom(partitioned);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class StoreCompactOperator extends PrepareCommitOperator<RowData, Committ
private final FileStoreTable table;
private final StoreSinkWrite.Provider storeSinkWriteProvider;
private final String initialCommitUser;
private final boolean fullCompaction;

private transient StoreSinkWriteState state;
private transient StoreSinkWrite write;
Expand All @@ -61,14 +62,16 @@ public class StoreCompactOperator extends PrepareCommitOperator<RowData, Committ
public StoreCompactOperator(
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
String initialCommitUser,
boolean fullCompaction) {
super(Options.fromMap(table.options()));
Preconditions.checkArgument(
!table.coreOptions().writeOnly(),
CoreOptions.WRITE_ONLY.key() + " should not be true for StoreCompactOperator.");
this.table = table;
this.storeSinkWriteProvider = storeSinkWriteProvider;
this.initialCommitUser = initialCommitUser;
this.fullCompaction = fullCompaction;
}

@Override
Expand Down Expand Up @@ -136,10 +139,7 @@ protected List<Committable> prepareCommit(boolean waitCompaction, long checkpoin

try {
for (Pair<BinaryRow, Integer> partitionBucket : waitToCompact) {
write.compact(
partitionBucket.getKey(),
partitionBucket.getRight(),
!write.streamingMode());
write.compact(partitionBucket.getKey(), partitionBucket.getRight(), fullCompaction);
}
} catch (Exception e) {
throw new RuntimeException("Exception happens while executing compaction.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void testCompact() throws Exception {
.withContinuousMode(false)
.withPartitionPredicate(predicate)
.build();
new CompactorSinkBuilder(table).withInput(source).build();
new CompactorSinkBuilder(table).withFullCompaction(true).withInput(source).build();
env.execute();

snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId());
Expand Down Expand Up @@ -182,6 +182,7 @@ public void testCompactParallelism() throws Exception {
String.valueOf(sinkParalellism));
}
}))
.withFullCompaction(false)
.withInput(source)
.build();

Expand Down Expand Up @@ -267,7 +268,8 @@ protected StoreCompactOperator createCompactOperator(FileStoreTable table) {
false,
memoryPool,
metricGroup),
"test");
"test",
true);
}

protected MultiTablesStoreCompactOperator createMultiTablesCompactOperator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public void testCompactExactlyOnce(boolean streamingMode) throws Exception {
getTableDefault(),
(table, commitUser, state, ioManager, memoryPool, metricGroup) ->
compactRememberStoreWrite,
"10086");
"10086",
!streamingMode);

TypeSerializer<Committable> serializer =
new CommittableTypeInfo().createSerializer(new ExecutionConfig());
Expand Down

0 comments on commit c907544

Please sign in to comment.