diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index f525c66667aaf..17673fc06fa7e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -241,10 +241,18 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul @Override public CommitIncrement prepareCommit(boolean waitCompaction) throws Exception { flushWriteBuffer(waitCompaction, false); - trySyncLatestCompaction( - waitCompaction - || commitForceCompact - || compactManager.shouldWaitForPreparingCheckpoint()); + if (commitForceCompact) { + waitCompaction = true; + } + // Decide again whether to wait here. + // For example, in the case of repeated failures in writing, it is possible that Level 0 + // files were successfully committed, but failed to restart during the compaction phase, + // which may result in an increasing number of Level 0 files. This wait can avoid this + // situation. + if (compactManager.shouldWaitForPreparingCheckpoint()) { + waitCompaction = true; + } + trySyncLatestCompaction(waitCompaction); return drainIncrement(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index a1e55f73fc56d..3f9bd3e6ea851 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -133,12 +133,15 @@ private SchemaManager createTestingSchemaManager(Path path) { } private void recreateMergeTree(long targetFileSize) { - Options configuration = new Options(); - configuration.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3)); - configuration.set(CoreOptions.PAGE_SIZE, new MemorySize(4096)); - configuration.set(CoreOptions.TARGET_FILE_SIZE, new MemorySize(targetFileSize)); - configuration.set(CoreOptions.SORT_ENGINE, getSortEngine()); - options = new CoreOptions(configuration); + Options options = new Options(); + options.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3)); + options.set(CoreOptions.PAGE_SIZE, new MemorySize(4096)); + options.set(CoreOptions.TARGET_FILE_SIZE, new MemorySize(targetFileSize)); + options.set(CoreOptions.SORT_ENGINE, getSortEngine()); + options.set( + CoreOptions.NUM_SORTED_RUNS_STOP_TRIGGER, + options.get(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER) + 1); + this.options = new CoreOptions(options); RowType keyType = new RowType(singletonList(new DataField(0, "k", new IntType()))); RowType valueType = new RowType(singletonList(new DataField(0, "v", new IntType()))); @@ -188,9 +191,9 @@ public List valueFields(TableSchema schema) { valueType, flushingAvro, pathFactoryMap, - options.targetFileSize()); - writerFactory = writerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0, options); - compactWriterFactory = writerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0, options); + this.options.targetFileSize()); + writerFactory = writerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0, this.options); + compactWriterFactory = writerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0, this.options); writer = createMergeTreeWriter(Collections.emptyList()); }