diff --git a/docs/content/maintenance/write-performance.md b/docs/content/maintenance/write-performance.md index 929f6c17a32e..0129c07a03dc 100644 --- a/docs/content/maintenance/write-performance.md +++ b/docs/content/maintenance/write-performance.md @@ -110,7 +110,7 @@ the threshold. No (none) Integer - The number of sorted runs that trigger the stopping of writes, the default value is 'num-sorted-run.compaction-trigger' + 1. + The number of sorted runs that trigger the stopping of writes, the default value is 'num-sorted-run.compaction-trigger' + 3. diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 48822af1bd8a..468da3f5a5b6 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -405,7 +405,7 @@
num-sorted-run.stop-trigger
(none) Integer - The number of sorted runs that trigger the stopping of writes, the default value is 'num-sorted-run.compaction-trigger' + 1. + The number of sorted runs that trigger the stopping of writes, the default value is 'num-sorted-run.compaction-trigger' + 3.
page-size
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 2440bdcc6090..86bcb91fd018 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -421,7 +421,7 @@ public class CoreOptions implements Serializable { .noDefaultValue() .withDescription( "The number of sorted runs that trigger the stopping of writes," - + " the default value is 'num-sorted-run.compaction-trigger' + 1."); + + " the default value is 'num-sorted-run.compaction-trigger' + 3."); public static final ConfigOption NUM_LEVELS = key("num-levels") @@ -1415,7 +1415,7 @@ public Duration optimizedCompactionInterval() { public int numSortedRunStopTrigger() { Integer stopTrigger = options.get(NUM_SORTED_RUNS_STOP_TRIGGER); if (stopTrigger == null) { - stopTrigger = MathUtils.incrementSafely(numSortedRunCompactionTrigger()); + stopTrigger = MathUtils.addSafely(numSortedRunCompactionTrigger(), 3); } return Math.max(numSortedRunCompactionTrigger(), stopTrigger); } diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/MathUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/MathUtils.java index c50646bd266e..ba9994d9b1fd 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/MathUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/MathUtils.java @@ -101,4 +101,13 @@ public static int incrementSafely(int a) { } return a + 1; } + + /** Safely add the given int value by another int value, ensuring that no overflow occurs. */ + public static int addSafely(int a, int b) { + try { + return Math.addExact(a, b); + } catch (ArithmeticException e) { + return Integer.MAX_VALUE; + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index f525c66667aa..17673fc06fa7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -241,10 +241,18 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul @Override public CommitIncrement prepareCommit(boolean waitCompaction) throws Exception { flushWriteBuffer(waitCompaction, false); - trySyncLatestCompaction( - waitCompaction - || commitForceCompact - || compactManager.shouldWaitForPreparingCheckpoint()); + if (commitForceCompact) { + waitCompaction = true; + } + // Decide again whether to wait here. + // For example, in the case of repeated failures in writing, it is possible that Level 0 + // files were successfully committed, but failed to restart during the compaction phase, + // which may result in an increasing number of Level 0 files. This wait can avoid this + // situation. + if (compactManager.shouldWaitForPreparingCheckpoint()) { + waitCompaction = true; + } + trySyncLatestCompaction(waitCompaction); return drainIncrement(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index a1e55f73fc56..3f9bd3e6ea85 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -133,12 +133,15 @@ private SchemaManager createTestingSchemaManager(Path path) { } private void recreateMergeTree(long targetFileSize) { - Options configuration = new Options(); - configuration.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3)); - configuration.set(CoreOptions.PAGE_SIZE, new MemorySize(4096)); - configuration.set(CoreOptions.TARGET_FILE_SIZE, new MemorySize(targetFileSize)); - configuration.set(CoreOptions.SORT_ENGINE, getSortEngine()); - options = new CoreOptions(configuration); + Options options = new Options(); + options.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3)); + options.set(CoreOptions.PAGE_SIZE, new MemorySize(4096)); + options.set(CoreOptions.TARGET_FILE_SIZE, new MemorySize(targetFileSize)); + options.set(CoreOptions.SORT_ENGINE, getSortEngine()); + options.set( + CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER, + options.get(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER) + 1); + this.options = new CoreOptions(options); RowType keyType = new RowType(singletonList(new DataField(0, "k", new IntType()))); RowType valueType = new RowType(singletonList(new DataField(0, "v", new IntType()))); @@ -188,9 +191,9 @@ public List valueFields(TableSchema schema) { valueType, flushingAvro, pathFactoryMap, - options.targetFileSize()); - writerFactory = writerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0, options); - compactWriterFactory = writerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0, options); + this.options.targetFileSize()); + writerFactory = writerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0, this.options); + compactWriterFactory = writerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0, this.options); writer = createMergeTreeWriter(Collections.emptyList()); }