diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 803ba475c764..0810f023bf64 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -176,8 +176,8 @@ public boolean isCompacting() { return compactManager.isCompacting(); } - private void flush(boolean waitForLatestCompaction, boolean forcedFullCompaction) - throws Exception { + @VisibleForTesting + void flush(boolean waitForLatestCompaction, boolean forcedFullCompaction) throws Exception { long start = System.currentTimeMillis(); List flushedFiles = sinkWriter.flush(); @@ -279,7 +279,10 @@ public long memoryOccupancy() { @Override public void flushMemory() throws Exception { - flush(false, false); + boolean success = sinkWriter.flushMemory(); + if (!success) { + flush(false, false); + } } @VisibleForTesting @@ -303,6 +306,8 @@ private interface SinkWriter { List flush() throws IOException; + boolean flushMemory() throws IOException; + long memoryOccupancy(); void close(); @@ -340,6 +345,11 @@ public List flush() throws IOException { return flushedFiles; } + @Override + public boolean flushMemory() throws IOException { + return false; + } + @Override public long memoryOccupancy() { return 0; @@ -430,5 +440,10 @@ public void setMemoryPool(MemorySegmentPool memoryPool) { public boolean bufferSpillableWriter() { return spillable; } + + @Override + public boolean flushMemory() throws IOException { + return writeBuffer.flushMemory(); + } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java index 24c3611c7e66..5181a9ed8c72 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java @@ -87,6 +87,12 @@ public void reset() { addCompleted = false; } + @Override + public boolean flushMemory() throws IOException { + spill(); + return true; + } + @Override public boolean put(InternalRow row) throws IOException { checkState(!addCompleted, "This buffer has add completed."); diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java index d1218ee669ab..83c4e423b03c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java @@ -80,6 +80,11 @@ public void reset() { } } + @Override + public boolean flushMemory() throws IOException { + return false; + } + private void returnToSegmentPool() { pool.returnAll(this.recordBufferSegments); this.recordBufferSegments.clear(); diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java index c5c881331ff4..7589b45eab9c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java @@ -39,6 +39,8 @@ public interface RowBuffer { void reset(); + boolean flushMemory() throws IOException; + RowBufferIterator newIterator(); /** Iterator to fetch record from buffer. */ diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index 05f8e883f983..5ab6b8537c45 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -34,6 +34,7 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.memory.HeapMemorySegmentPool; +import org.apache.paimon.memory.MemoryPoolFactory; import org.apache.paimon.options.Options; import org.apache.paimon.stats.FieldStatsArraySerializer; import org.apache.paimon.types.DataType; @@ -54,6 +55,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -335,6 +337,60 @@ public void testExternalBufferWorks() throws Exception { writer.close(); } + @Test + public void testSpillWorksAndMoreSmallFilesGenerated() throws Exception { + List writers = new ArrayList<>(); + HeapMemorySegmentPool heapMemorySegmentPool = new HeapMemorySegmentPool(2501024L, 1024); + MemoryPoolFactory memoryPoolFactory = new MemoryPoolFactory(heapMemorySegmentPool); + for (int i = 0; i < 1000; i++) { + AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE, true); + memoryPoolFactory.addOwners(Arrays.asList(writer)); + memoryPoolFactory.notifyNewOwner(writer); + writers.add(writer); + } + + char[] s = new char[1024]; + Arrays.fill(s, 'a'); + + for (AppendOnlyWriter writer : writers) { + writer.write(row(0, String.valueOf("a"), PART)); + } + + for (AppendOnlyWriter writer : writers) { + writer.write(row(0, String.valueOf(s), PART)); + } + + for (int j = 0; j < 100; j++) { + for (AppendOnlyWriter writer : writers) { + writer.write(row(j, String.valueOf(s), PART)); + writer.write(row(j, String.valueOf(s), PART)); + writer.write(row(j, String.valueOf(s), PART)); + writer.write(row(j, String.valueOf(s), PART)); + writer.write(row(j, String.valueOf(s), PART)); + } + } + + writers.forEach( + writer -> { + try { + List fileMetas = + writer.prepareCommit(false).newFilesIncrement().newFiles(); + assertThat(fileMetas.size()).isEqualTo(1); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + writers.forEach( + writer -> { + try { + writer.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + @Test public void testNoBuffer() throws Exception { AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE); @@ -363,7 +419,7 @@ public void testMultipleFlush() throws Exception { writer.write(row(j, String.valueOf(s), PART)); } - writer.flushMemory(); + writer.flush(false, false); Assertions.assertThat(writer.memoryOccupancy()).isEqualTo(0L); Assertions.assertThat(writer.getWriteBuffer().size()).isEqualTo(0); Assertions.assertThat(writer.getNewFiles().size()).isGreaterThan(0); @@ -374,7 +430,7 @@ public void testMultipleFlush() throws Exception { for (int j = 0; j < 100; j++) { writer.write(row(j, String.valueOf(s), PART)); } - writer.flushMemory(); + writer.flush(false, false); Assertions.assertThat(writer.memoryOccupancy()).isEqualTo(0L); Assertions.assertThat(writer.getWriteBuffer().size()).isEqualTo(0);