diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java index 3cb3513dde62..19c209709b5c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java @@ -124,6 +124,18 @@ public void build() throws Exception { private void buildForTraditionalCompaction( StreamExecutionEnvironment env, FileStoreTable table, boolean isStreaming) throws Exception { + if (isStreaming) { + // for completely asynchronous compaction + HashMap dynamicOptions = + new HashMap() { + { + put(CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER.key(), "2147483647"); + put(CoreOptions.SORT_SPILL_THRESHOLD.key(), "10"); + put(CoreOptions.LOOKUP_WAIT.key(), "false"); + } + }; + table = table.copy(dynamicOptions); + } CompactorSourceBuilder sourceBuilder = new CompactorSourceBuilder(identifier.getFullName(), table); CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java index 7c5854e2d453..ef6772e36eed 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java @@ -235,6 +235,18 @@ private void buildForTraditionalCompaction( String fullName, FileStoreTable table, boolean isStreaming) { + if (isStreaming) { + // for completely asynchronous compaction + HashMap dynamicOptions = + new HashMap() { + { + put(CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER.key(), "2147483647"); + put(CoreOptions.SORT_SPILL_THRESHOLD.key(), "10"); + put(CoreOptions.LOOKUP_WAIT.key(), "false"); + } + }; + table = table.copy(dynamicOptions); + } CompactorSourceBuilder sourceBuilder = new CompactorSourceBuilder(fullName, table) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java index 3a8bf1a64e52..87a28091fa30 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java @@ -158,7 +158,7 @@ protected DataStreamSink doCommit( false, options.get(SINK_COMMITTER_OPERATOR_CHAINING), commitUser, - createCommitterFactory(), + createCommitterFactory(isStreaming), createCommittableStateManager(), options.get(END_INPUT_WATERMARK))) .setParallelism(written.getParallelism()); @@ -179,9 +179,14 @@ protected DataStreamSink doCommit( } protected Committer.Factory - createCommitterFactory() { + createCommitterFactory(boolean isStreaming) { Map dynamicOptions = options.toMap(); dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false"); + if (isStreaming) { + dynamicOptions.put(CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER.key(), "2147483647"); + dynamicOptions.put(CoreOptions.SORT_SPILL_THRESHOLD.key(), "10"); + dynamicOptions.put(CoreOptions.LOOKUP_WAIT.key(), "false"); + } return context -> new StoreMultiCommitter(catalogLoader, context, true, dynamicOptions); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java index ccbccaa5fa7c..513b694fb33c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java @@ -36,7 +36,6 @@ import org.apache.flink.table.data.RowData; import java.io.IOException; -import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -215,10 +214,19 @@ private FileStoreTable getTable(Identifier tableId) throws InterruptedException while (true) { try { table = (FileStoreTable) catalog.getTable(tableId); - table = - table.copy( - Collections.singletonMap( - CoreOptions.WRITE_ONLY.key(), "false")); + HashMap dynamicOptions = + new HashMap() { + { + put(CoreOptions.WRITE_ONLY.key(), "false"); + } + }; + if (isStreaming) { + dynamicOptions.put( + CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER.key(), "2147483647"); + dynamicOptions.put(CoreOptions.SORT_SPILL_THRESHOLD.key(), "10"); + dynamicOptions.put(CoreOptions.LOOKUP_WAIT.key(), "false"); + } + table = table.copy(dynamicOptions); tables.put(tableId, table); break; } catch (Catalog.TableNotExistException e) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/MultiTablesCompactorUtil.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/MultiTablesCompactorUtil.java index 27264eb80c1a..fc648b4681e6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/MultiTablesCompactorUtil.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/MultiTablesCompactorUtil.java @@ -62,20 +62,6 @@ public static Map compactOptions(boolean isStreaming) { } } - public static Map partitionCompactOptions() { - - return new HashMap() { - { - put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), null); - put(CoreOptions.SCAN_TIMESTAMP.key(), null); - put(CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS.key(), null); - put(CoreOptions.SCAN_SNAPSHOT_ID.key(), null); - put(CoreOptions.SCAN_MODE.key(), CoreOptions.StartupMode.LATEST_FULL.toString()); - put(CoreOptions.WRITE_ONLY.key(), "false"); - } - }; - } - public static boolean shouldCompactTable( Identifier tableIdentifier, Pattern includingPattern, Pattern excludingPattern) { String paimonFullTableName = tableIdentifier.getFullName();