diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 4f011d9e31c5..0810f023bf64 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -279,9 +279,8 @@ public long memoryOccupancy() { @Override public void flushMemory() throws Exception { - if (sinkWriter.bufferSpillableWriter()) { - sinkWriter.spill(); - } else { + boolean success = sinkWriter.flushMemory(); + if (!success) { flush(false, false); } } @@ -307,6 +306,8 @@ private interface SinkWriter { List flush() throws IOException; + boolean flushMemory() throws IOException; + long memoryOccupancy(); void close(); @@ -314,8 +315,6 @@ private interface SinkWriter { void setMemoryPool(MemorySegmentPool memoryPool); boolean bufferSpillableWriter(); - - void spill() throws IOException; } /** @@ -346,6 +345,11 @@ public List flush() throws IOException { return flushedFiles; } + @Override + public boolean flushMemory() throws IOException { + return false; + } + @Override public long memoryOccupancy() { return 0; @@ -368,9 +372,6 @@ public void setMemoryPool(MemorySegmentPool memoryPool) { public boolean bufferSpillableWriter() { return false; } - - @Override - public void spill() throws IOException {} } /** @@ -441,8 +442,8 @@ public boolean bufferSpillableWriter() { } @Override - public void spill() throws IOException { - writeBuffer.spill(); + public boolean flushMemory() throws IOException { + return writeBuffer.flushMemory(); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java index f799888c1ff2..5181a9ed8c72 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java @@ -87,6 +87,12 @@ public void reset() { addCompleted = false; } + @Override + public boolean flushMemory() throws IOException { + spill(); + return true; + } + @Override public boolean put(InternalRow row) throws IOException { checkState(!addCompleted, "This buffer has add completed."); @@ -126,7 +132,7 @@ private void throwTooBigException(InternalRow row) throws IOException { + memorySize()); } - public void spill() throws IOException { + private void spill() throws IOException { FileIOChannel.ID channel = ioManager.createChannel(); BufferFileWriter writer = ioManager.createBufferFileWriter(channel); diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java index 04ed809c292c..83c4e423b03c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java @@ -81,8 +81,8 @@ public void reset() { } @Override - public void spill() { - throw new UnsupportedOperationException(); + public boolean flushMemory() throws IOException { + return false; } private void returnToSegmentPool() { diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java index b78d213b6622..7589b45eab9c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java @@ -39,7 +39,7 @@ public interface RowBuffer { void reset(); - void spill() throws IOException; + boolean flushMemory() throws IOException; RowBufferIterator newIterator();