From 8a661eb148a5d75ac3cc2206c73ab963df794094 Mon Sep 17 00:00:00 2001 From: LsomeYeah <94825748+LsomeYeah@users.noreply.github.com> Date: Mon, 26 Aug 2024 17:15:57 +0800 Subject: [PATCH 1/2] [flink] Make completely asynchronous in stream compaction job (#4049) --- .../paimon/flink/action/CompactAction.java | 12 ++++++++++++ .../flink/action/CompactDatabaseAction.java | 12 ++++++++++++ .../flink/sink/CombinedTableCompactorSink.java | 9 +++++++-- .../sink/MultiTablesStoreCompactOperator.java | 18 +++++++++++++----- .../flink/utils/MultiTablesCompactorUtil.java | 14 -------------- 5 files changed, 44 insertions(+), 21 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java index 3cb3513dde62..19c209709b5c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java @@ -124,6 +124,18 @@ public void build() throws Exception { private void buildForTraditionalCompaction( StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming) throws Exception { + if (isStreaming) { + // for completely asynchronous compaction + HashMap dynamicOptions = + new HashMap() { + { + put(CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER.key(), "2147483647"); + put(CoreOptions.SORT_SPILL_THRESHOLD.key(), "10"); + put(CoreOptions.LOOKUP_WAIT.key(), "false"); + } + }; + table = table.copy(dynamicOptions); + } CompactorSourceBuilder sourceBuilder = new CompactorSourceBuilder(identifier.getFullName(), table); CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java index 7c5854e2d453..ef6772e36eed 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java @@ -235,6 +235,18 @@ private void buildForTraditionalCompaction( String fullName, FileStoreTable table, boolean isStreaming) { + if (isStreaming) { + // for completely asynchronous compaction + HashMap dynamicOptions = + new HashMap() { + { + put(CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER.key(), "2147483647"); + put(CoreOptions.SORT_SPILL_THRESHOLD.key(), "10"); + put(CoreOptions.LOOKUP_WAIT.key(), "false"); + } + }; + table = table.copy(dynamicOptions); + } CompactorSourceBuilder sourceBuilder = new CompactorSourceBuilder(fullName, table) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java index 3a8bf1a64e52..87a28091fa30 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java @@ -158,7 +158,7 @@ protected DataStreamSink doCommit( false, options.get(SINK_COMMITTER_OPERATOR_CHAINING), commitUser, - createCommitterFactory(), + createCommitterFactory(isStreaming), createCommittableStateManager(), options.get(END_INPUT_WATERMARK))) .setParallelism(written.getParallelism()); @@ -179,9 +179,14 @@ protected DataStreamSink doCommit( } protected Committer.Factory - createCommitterFactory() { + createCommitterFactory(boolean isStreaming) { Map dynamicOptions = options.toMap(); dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false"); + if (isStreaming) { + dynamicOptions.put(CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER.key(), "2147483647"); + dynamicOptions.put(CoreOptions.SORT_SPILL_THRESHOLD.key(), "10"); + dynamicOptions.put(CoreOptions.LOOKUP_WAIT.key(), "false"); + } return context -> new StoreMultiCommitter(catalogLoader, context, true, dynamicOptions); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java index ccbccaa5fa7c..513b694fb33c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java @@ -36,7 +36,6 @@ import org.apache.flink.table.data.RowData; import java.io.IOException; -import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -215,10 +214,19 @@ private FileStoreTable getTable(Identifier tableId) throws InterruptedException while (true) { try { table = (FileStoreTable) catalog.getTable(tableId); - table = - table.copy( - Collections.singletonMap( - CoreOptions.WRITE_ONLY.key(), "false")); + HashMap dynamicOptions = + new HashMap() { + { + put(CoreOptions.WRITE_ONLY.key(), "false"); + } + }; + if (isStreaming) { + dynamicOptions.put( + CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER.key(), "2147483647"); + dynamicOptions.put(CoreOptions.SORT_SPILL_THRESHOLD.key(), "10"); + dynamicOptions.put(CoreOptions.LOOKUP_WAIT.key(), "false"); + } + table = table.copy(dynamicOptions); tables.put(tableId, table); break; } catch (Catalog.TableNotExistException e) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/MultiTablesCompactorUtil.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/MultiTablesCompactorUtil.java index 27264eb80c1a..fc648b4681e6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/MultiTablesCompactorUtil.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/MultiTablesCompactorUtil.java @@ -62,20 +62,6 @@ public static Map compactOptions(boolean isStreaming) { } } - public static Map partitionCompactOptions() { - - return new HashMap() { - { - put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), null); - put(CoreOptions.SCAN_TIMESTAMP.key(), null); - put(CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS.key(), null); - put(CoreOptions.SCAN_SNAPSHOT_ID.key(), null); - put(CoreOptions.SCAN_MODE.key(), CoreOptions.StartupMode.LATEST_FULL.toString()); - put(CoreOptions.WRITE_ONLY.key(), "false"); - } - }; - } - public static boolean shouldCompactTable( Identifier tableIdentifier, Pattern includingPattern, Pattern excludingPattern) { String paimonFullTableName = tableIdentifier.getFullName(); From 0c1da9f85c38990705697f89119d3ed65a182954 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Mon, 26 Aug 2024 17:38:21 +0800 Subject: [PATCH 2/2] [hotfix] Changelog operator name to cross-partition-bucket-assigner --- .../apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java index f4da370729d0..26e080c32e83 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java @@ -112,7 +112,7 @@ public DataStreamSink build(DataStream input, @Nullable Integer DataStream> bucketAssigned = partitionByKeyHash .transform( - "dynamic-bucket-assigner", + "cross-partition-bucket-assigner", rowWithBucketType, GlobalIndexAssignerOperator.forRowData(table)) .setParallelism(partitionByKeyHash.getParallelism());