Skip to content

Commit

Permalink
[flink] Make completely asynchronous in stream compaction job (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
LsomeYeah authored Aug 26, 2024
1 parent c5a950e commit 8a661eb
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,18 @@ public void build() throws Exception {
private void buildForTraditionalCompaction(
StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming)
throws Exception {
if (isStreaming) {
// for completely asynchronous compaction
HashMap<String, String> dynamicOptions =
new HashMap<String, String>() {
{
put(CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER.key(), "2147483647");
put(CoreOptions.SORT_SPILL_THRESHOLD.key(), "10");
put(CoreOptions.LOOKUP_WAIT.key(), "false");
}
};
table = table.copy(dynamicOptions);
}
CompactorSourceBuilder sourceBuilder =
new CompactorSourceBuilder(identifier.getFullName(), table);
CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,18 @@ private void buildForTraditionalCompaction(
String fullName,
FileStoreTable table,
boolean isStreaming) {
if (isStreaming) {
// for completely asynchronous compaction
HashMap<String, String> dynamicOptions =
new HashMap<String, String>() {
{
put(CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER.key(), "2147483647");
put(CoreOptions.SORT_SPILL_THRESHOLD.key(), "10");
put(CoreOptions.LOOKUP_WAIT.key(), "false");
}
};
table = table.copy(dynamicOptions);
}

CompactorSourceBuilder sourceBuilder =
new CompactorSourceBuilder(fullName, table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ protected DataStreamSink<?> doCommit(
false,
options.get(SINK_COMMITTER_OPERATOR_CHAINING),
commitUser,
createCommitterFactory(),
createCommitterFactory(isStreaming),
createCommittableStateManager(),
options.get(END_INPUT_WATERMARK)))
.setParallelism(written.getParallelism());
Expand All @@ -179,9 +179,14 @@ protected DataStreamSink<?> doCommit(
}

protected Committer.Factory<MultiTableCommittable, WrappedManifestCommittable>
createCommitterFactory() {
createCommitterFactory(boolean isStreaming) {
Map<String, String> dynamicOptions = options.toMap();
dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false");
if (isStreaming) {
dynamicOptions.put(CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER.key(), "2147483647");
dynamicOptions.put(CoreOptions.SORT_SPILL_THRESHOLD.key(), "10");
dynamicOptions.put(CoreOptions.LOOKUP_WAIT.key(), "false");
}
return context -> new StoreMultiCommitter(catalogLoader, context, true, dynamicOptions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.flink.table.data.RowData;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -215,10 +214,19 @@ private FileStoreTable getTable(Identifier tableId) throws InterruptedException
while (true) {
try {
table = (FileStoreTable) catalog.getTable(tableId);
table =
table.copy(
Collections.singletonMap(
CoreOptions.WRITE_ONLY.key(), "false"));
HashMap<String, String> dynamicOptions =
new HashMap<String, String>() {
{
put(CoreOptions.WRITE_ONLY.key(), "false");
}
};
if (isStreaming) {
dynamicOptions.put(
CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER.key(), "2147483647");
dynamicOptions.put(CoreOptions.SORT_SPILL_THRESHOLD.key(), "10");
dynamicOptions.put(CoreOptions.LOOKUP_WAIT.key(), "false");
}
table = table.copy(dynamicOptions);
tables.put(tableId, table);
break;
} catch (Catalog.TableNotExistException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,6 @@ public static Map<String, String> compactOptions(boolean isStreaming) {
}
}

public static Map<String, String> partitionCompactOptions() {

return new HashMap<String, String>() {
{
put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), null);
put(CoreOptions.SCAN_TIMESTAMP.key(), null);
put(CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS.key(), null);
put(CoreOptions.SCAN_SNAPSHOT_ID.key(), null);
put(CoreOptions.SCAN_MODE.key(), CoreOptions.StartupMode.LATEST_FULL.toString());
put(CoreOptions.WRITE_ONLY.key(), "false");
}
};
}

public static boolean shouldCompactTable(
Identifier tableIdentifier, Pattern includingPattern, Pattern excludingPattern) {
String paimonFullTableName = tableIdentifier.getFullName();
Expand Down

0 comments on commit 8a661eb

Please sign in to comment.