From 68f68ddf97408f98faf5cd2b17b182c7e472b37e Mon Sep 17 00:00:00 2001 From: yejunhao Date: Mon, 4 Mar 2024 13:35:04 +0800 Subject: [PATCH 1/3] [bug] spillable with memory preemption has bug : preempt memory will cause writer flush memory to disk which should not happen --- .../paimon/append/AppendOnlyWriter.java | 16 +++++- .../apache/paimon/disk/ExternalBuffer.java | 2 +- .../apache/paimon/disk/InMemoryBuffer.java | 5 ++ .../org/apache/paimon/disk/RowBuffer.java | 2 + .../paimon/append/AppendOnlyWriterTest.java | 56 +++++++++++++++++++ 5 files changed, 79 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 803ba475c764..809a18eeec56 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -279,7 +279,11 @@ public long memoryOccupancy() { @Override public void flushMemory() throws Exception { - flush(false, false); + if (sinkWriter.bufferSpillableWriter()) { + sinkWriter.spill(); + } else { + flush(false, false); + } } @VisibleForTesting @@ -310,6 +314,8 @@ private interface SinkWriter { void setMemoryPool(MemorySegmentPool memoryPool); boolean bufferSpillableWriter(); + + void spill() throws IOException; } /** @@ -362,6 +368,9 @@ public void setMemoryPool(MemorySegmentPool memoryPool) { public boolean bufferSpillableWriter() { return false; } + + @Override + public void spill() throws IOException {} } /** @@ -430,5 +439,10 @@ public void setMemoryPool(MemorySegmentPool memoryPool) { public boolean bufferSpillableWriter() { return spillable; } + + @Override + public void spill() throws IOException { + writeBuffer.spill(); + } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java index 24c3611c7e66..f799888c1ff2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java @@ -126,7 +126,7 @@ private void throwTooBigException(InternalRow row) throws IOException { + memorySize()); } - private void spill() throws IOException { + public void spill() throws IOException { FileIOChannel.ID channel = ioManager.createChannel(); BufferFileWriter writer = ioManager.createBufferFileWriter(channel); diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java index d1218ee669ab..04ed809c292c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java @@ -80,6 +80,11 @@ public void reset() { } } + @Override + public void spill() { + throw new UnsupportedOperationException(); + } + private void returnToSegmentPool() { pool.returnAll(this.recordBufferSegments); this.recordBufferSegments.clear(); diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java index c5c881331ff4..b78d213b6622 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java @@ -39,6 +39,8 @@ public interface RowBuffer { void reset(); + void spill() throws IOException; + RowBufferIterator newIterator(); /** Iterator to fetch record from buffer. */ diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index 05f8e883f983..bffd244b1077 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -34,6 +34,7 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.memory.HeapMemorySegmentPool; +import org.apache.paimon.memory.MemoryPoolFactory; import org.apache.paimon.options.Options; import org.apache.paimon.stats.FieldStatsArraySerializer; import org.apache.paimon.types.DataType; @@ -54,6 +55,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -335,6 +337,60 @@ public void testExternalBufferWorks() throws Exception { writer.close(); } + @Test + public void testSpillWorksAndMoreSmallFilesGenerated() throws Exception { + List writers = new ArrayList<>(); + HeapMemorySegmentPool heapMemorySegmentPool = new HeapMemorySegmentPool(2501024L, 1024); + MemoryPoolFactory memoryPoolFactory = new MemoryPoolFactory(heapMemorySegmentPool); + for (int i = 0; i < 1000; i++) { + AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE, true); + memoryPoolFactory.addOwners(Arrays.asList(writer)); + memoryPoolFactory.notifyNewOwner(writer); + writers.add(writer); + } + + char[] s = new char[1024]; + Arrays.fill(s, 'a'); + + for (AppendOnlyWriter writer : writers) { + writer.write(row(0, String.valueOf("a"), PART)); + } + + for (AppendOnlyWriter writer : writers) { + writer.write(row(0, String.valueOf(s), PART)); + } + + for (int j = 0; j < 100; j++) { + for (AppendOnlyWriter writer : writers) { + writer.write(row(j, String.valueOf(s), PART)); + writer.write(row(j, String.valueOf(s), PART)); + writer.write(row(j, String.valueOf(s), PART)); + writer.write(row(j, String.valueOf(s), PART)); + writer.write(row(j, String.valueOf(s), PART)); + } + } + + writers.forEach( + writer -> { + try { + List fileMetas = + writer.prepareCommit(false).newFilesIncrement().newFiles(); + assertThat(fileMetas.size()).isEqualTo(1); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + writers.forEach( + writer -> { + try { + writer.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + @Test public void testNoBuffer() throws Exception { AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE); From 0d2a50849cfb3451d7d135032bf33e777e04f27e Mon Sep 17 00:00:00 2001 From: yejunhao Date: Mon, 4 Mar 2024 14:03:13 +0800 Subject: [PATCH 2/3] [fix] fix minus --- .../main/java/org/apache/paimon/append/AppendOnlyWriter.java | 4 ++-- .../java/org/apache/paimon/append/AppendOnlyWriterTest.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 809a18eeec56..4f011d9e31c5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -176,8 +176,8 @@ public boolean isCompacting() { return compactManager.isCompacting(); } - private void flush(boolean waitForLatestCompaction, boolean forcedFullCompaction) - throws Exception { + @VisibleForTesting + void flush(boolean waitForLatestCompaction, boolean forcedFullCompaction) throws Exception { long start = System.currentTimeMillis(); List flushedFiles = sinkWriter.flush(); diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index bffd244b1077..5ab6b8537c45 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -419,7 +419,7 @@ public void testMultipleFlush() throws Exception { writer.write(row(j, String.valueOf(s), PART)); } - writer.flushMemory(); + writer.flush(false, false); Assertions.assertThat(writer.memoryOccupancy()).isEqualTo(0L); Assertions.assertThat(writer.getWriteBuffer().size()).isEqualTo(0); Assertions.assertThat(writer.getNewFiles().size()).isGreaterThan(0); @@ -430,7 +430,7 @@ public void testMultipleFlush() throws Exception { for (int j = 0; j < 100; j++) { writer.write(row(j, String.valueOf(s), PART)); } - writer.flushMemory(); + writer.flush(false, false); Assertions.assertThat(writer.memoryOccupancy()).isEqualTo(0L); Assertions.assertThat(writer.getWriteBuffer().size()).isEqualTo(0); From 8c2003d3d66a89c2481e14a2c95b936d7ccef0f5 Mon Sep 17 00:00:00 2001 From: yejunhao Date: Mon, 4 Mar 2024 16:05:52 +0800 Subject: [PATCH 3/3] [fix] fix comment --- .../paimon/append/AppendOnlyWriter.java | 21 ++++++++++--------- .../apache/paimon/disk/ExternalBuffer.java | 8 ++++++- .../apache/paimon/disk/InMemoryBuffer.java | 4 ++-- .../org/apache/paimon/disk/RowBuffer.java | 2 +- 4 files changed, 21 insertions(+), 14 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 4f011d9e31c5..0810f023bf64 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -279,9 +279,8 @@ public long memoryOccupancy() { @Override public void flushMemory() throws Exception { - if (sinkWriter.bufferSpillableWriter()) { - sinkWriter.spill(); - } else { + boolean success = sinkWriter.flushMemory(); + if (!success) { flush(false, false); } } @@ -307,6 +306,8 @@ private interface SinkWriter { List flush() throws IOException; + boolean flushMemory() throws IOException; + long memoryOccupancy(); void close(); @@ -314,8 +315,6 @@ private interface SinkWriter { void setMemoryPool(MemorySegmentPool memoryPool); boolean bufferSpillableWriter(); - - void spill() throws IOException; } /** @@ -346,6 +345,11 @@ public List flush() throws IOException { return flushedFiles; } + @Override + public boolean flushMemory() throws IOException { + return false; + } + @Override public long memoryOccupancy() { return 0; @@ -368,9 +372,6 @@ public void setMemoryPool(MemorySegmentPool memoryPool) { public boolean bufferSpillableWriter() { return false; } - - @Override - public void spill() throws IOException {} } /** @@ -441,8 +442,8 @@ public boolean bufferSpillableWriter() { } @Override - public void spill() throws IOException { - writeBuffer.spill(); + public boolean flushMemory() throws IOException { + return writeBuffer.flushMemory(); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java index f799888c1ff2..5181a9ed8c72 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java @@ -87,6 +87,12 @@ public void reset() { addCompleted = false; } + @Override + public boolean flushMemory() throws IOException { + spill(); + return true; + } + @Override public boolean put(InternalRow row) throws IOException { checkState(!addCompleted, "This buffer has add completed."); @@ -126,7 +132,7 @@ private void throwTooBigException(InternalRow row) throws IOException { + memorySize()); } - public void spill() throws IOException { + private void spill() throws IOException { FileIOChannel.ID channel = ioManager.createChannel(); BufferFileWriter writer = ioManager.createBufferFileWriter(channel); diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java index 04ed809c292c..83c4e423b03c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java @@ -81,8 +81,8 @@ public void reset() { } @Override - public void spill() { - throw new UnsupportedOperationException(); + public boolean flushMemory() throws IOException { + return false; } private void returnToSegmentPool() { diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java index b78d213b6622..7589b45eab9c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java @@ -39,7 +39,7 @@ public interface RowBuffer { void reset(); - void spill() throws IOException; + boolean flushMemory() throws IOException; RowBufferIterator newIterator();