Skip to content

Commit

Permalink
Merge branch 'refs/heads/master' into tc-paimon-0.9
Browse files Browse the repository at this point in the history
  • Loading branch information
wxplovecc committed Aug 26, 2024
2 parents 766f7c4 + 0c1da9f commit 8d7b953
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,18 @@ public void build() throws Exception {
private void buildForTraditionalCompaction(
StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming)
throws Exception {
if (isStreaming) {
// for completely asynchronous compaction
HashMap<String, String> dynamicOptions =
new HashMap<String, String>() {
{
put(CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER.key(), "2147483647");
put(CoreOptions.SORT_SPILL_THRESHOLD.key(), "10");
put(CoreOptions.LOOKUP_WAIT.key(), "false");
}
};
table = table.copy(dynamicOptions);
}
CompactorSourceBuilder sourceBuilder =
new CompactorSourceBuilder(identifier.getFullName(), table);
CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,18 @@ private void buildForTraditionalCompaction(
String fullName,
FileStoreTable table,
boolean isStreaming) {
if (isStreaming) {
// for completely asynchronous compaction
HashMap<String, String> dynamicOptions =
new HashMap<String, String>() {
{
put(CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER.key(), "2147483647");
put(CoreOptions.SORT_SPILL_THRESHOLD.key(), "10");
put(CoreOptions.LOOKUP_WAIT.key(), "false");
}
};
table = table.copy(dynamicOptions);
}

CompactorSourceBuilder sourceBuilder =
new CompactorSourceBuilder(fullName, table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ protected DataStreamSink<?> doCommit(
false,
options.get(SINK_COMMITTER_OPERATOR_CHAINING),
commitUser,
createCommitterFactory(),
createCommitterFactory(isStreaming),
createCommittableStateManager(),
options.get(END_INPUT_WATERMARK)))
.setParallelism(written.getParallelism());
Expand All @@ -179,9 +179,14 @@ protected DataStreamSink<?> doCommit(
}

protected Committer.Factory<MultiTableCommittable, WrappedManifestCommittable>
createCommitterFactory() {
createCommitterFactory(boolean isStreaming) {
Map<String, String> dynamicOptions = options.toMap();
dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false");
if (isStreaming) {
dynamicOptions.put(CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER.key(), "2147483647");
dynamicOptions.put(CoreOptions.SORT_SPILL_THRESHOLD.key(), "10");
dynamicOptions.put(CoreOptions.LOOKUP_WAIT.key(), "false");
}
return context -> new StoreMultiCommitter(catalogLoader, context, true, dynamicOptions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.flink.table.data.RowData;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -215,10 +214,19 @@ private FileStoreTable getTable(Identifier tableId) throws InterruptedException
while (true) {
try {
table = (FileStoreTable) catalog.getTable(tableId);
table =
table.copy(
Collections.singletonMap(
CoreOptions.WRITE_ONLY.key(), "false"));
HashMap<String, String> dynamicOptions =
new HashMap<String, String>() {
{
put(CoreOptions.WRITE_ONLY.key(), "false");
}
};
if (isStreaming) {
dynamicOptions.put(
CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER.key(), "2147483647");
dynamicOptions.put(CoreOptions.SORT_SPILL_THRESHOLD.key(), "10");
dynamicOptions.put(CoreOptions.LOOKUP_WAIT.key(), "false");
}
table = table.copy(dynamicOptions);
tables.put(tableId, table);
break;
} catch (Catalog.TableNotExistException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public DataStreamSink<?> build(DataStream<InternalRow> input, @Nullable Integer
DataStream<Tuple2<InternalRow, Integer>> bucketAssigned =
partitionByKeyHash
.transform(
"dynamic-bucket-assigner",
"cross-partition-bucket-assigner",
rowWithBucketType,
GlobalIndexAssignerOperator.forRowData(table))
.setParallelism(partitionByKeyHash.getParallelism());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,6 @@ public static Map<String, String> compactOptions(boolean isStreaming) {
}
}

public static Map<String, String> partitionCompactOptions() {

return new HashMap<String, String>() {
{
put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), null);
put(CoreOptions.SCAN_TIMESTAMP.key(), null);
put(CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS.key(), null);
put(CoreOptions.SCAN_SNAPSHOT_ID.key(), null);
put(CoreOptions.SCAN_MODE.key(), CoreOptions.StartupMode.LATEST_FULL.toString());
put(CoreOptions.WRITE_ONLY.key(), "false");
}
};
}

public static boolean shouldCompactTable(
Identifier tableIdentifier, Pattern includingPattern, Pattern excludingPattern) {
String paimonFullTableName = tableIdentifier.getFullName();
Expand Down

0 comments on commit 8d7b953

Please sign in to comment.