From c08348bc349ec0ffa41d9ef7c0ef13496d62fb82 Mon Sep 17 00:00:00 2001 From: Aitozi Date: Fri, 29 Mar 2024 23:29:40 +0800 Subject: [PATCH] support ExternalBuffer --- .../paimon/append/AppendOnlyWriter.java | 20 ++++++++++++++----- .../apache/paimon/disk/ExternalBuffer.java | 19 ++++++++++++++++-- .../org/apache/paimon/disk/RowBuffer.java | 6 ++++-- .../operation/AppendOnlyFileStoreWrite.java | 6 +++++- 4 files changed, 41 insertions(+), 10 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 71c863fcb016..1ab054531541 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -33,6 +33,7 @@ import org.apache.paimon.io.RowDataRollingFileWriter; import org.apache.paimon.memory.MemoryOwner; import org.apache.paimon.memory.MemorySegmentPool; +import org.apache.paimon.options.MemorySize; import org.apache.paimon.statistics.FieldStatsCollector; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; @@ -76,6 +77,7 @@ public class AppendOnlyWriter implements RecordWriter, MemoryOwner private final IOManager ioManager; private MemorySegmentPool memorySegmentPool; + private MemorySize maxDiskSize; public AppendOnlyWriter( FileIO fileIO, @@ -92,7 +94,8 @@ public AppendOnlyWriter( boolean useWriteBuffer, boolean spillable, String fileCompression, - FieldStatsCollector.Factory[] statsCollectors) { + FieldStatsCollector.Factory[] statsCollectors, + MemorySize maxDiskSize) { this.fileIO = fileIO; this.schemaId = schemaId; this.fileFormat = fileFormat; @@ -109,9 +112,12 @@ public AppendOnlyWriter( this.fileCompression = fileCompression; this.ioManager = ioManager; this.statsCollectors = statsCollectors; + this.maxDiskSize = maxDiskSize; this.sinkWriter = - useWriteBuffer ? new BufferedSinkWriter(spillable) : new DirectSinkWriter(); + useWriteBuffer + ? new BufferedSinkWriter(spillable, maxDiskSize) + : new DirectSinkWriter(); if (increment != null) { newFiles.addAll(increment.newFilesIncrement().newFiles()); @@ -205,7 +211,7 @@ public void toBufferedWriter() throws Exception { trySyncLatestCompaction(true); sinkWriter.close(); - sinkWriter = new BufferedSinkWriter(true); + sinkWriter = new BufferedSinkWriter(true, maxDiskSize); sinkWriter.setMemoryPool(memorySegmentPool); } } @@ -370,10 +376,13 @@ private class BufferedSinkWriter implements SinkWriter { private final boolean spillable; + private final MemorySize maxDiskSize; + private RowBuffer writeBuffer; - private BufferedSinkWriter(boolean spillable) { + private BufferedSinkWriter(boolean spillable, MemorySize maxDiskSize) { this.spillable = spillable; + this.maxDiskSize = maxDiskSize; } @Override @@ -429,7 +438,8 @@ public void setMemoryPool(MemorySegmentPool memoryPool) { ioManager, memoryPool, new InternalRowSerializer(writeSchema), - spillable); + spillable, + maxDiskSize); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java index 5181a9ed8c72..f5035d3f80cb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java @@ -26,6 +26,7 @@ import org.apache.paimon.memory.Buffer; import org.apache.paimon.memory.MemorySegment; import org.apache.paimon.memory.MemorySegmentPool; +import org.apache.paimon.options.MemorySize; import org.apache.paimon.utils.MutableObjectIterator; import org.slf4j.Logger; @@ -47,6 +48,7 @@ public class ExternalBuffer implements RowBuffer { private final MemorySegmentPool pool; private final BinaryRowSerializer binaryRowSerializer; private final InMemoryBuffer inMemoryBuffer; + private final MemorySize maxDiskSize; // The size of each segment private final int segmentSize; @@ -57,9 +59,13 @@ public class ExternalBuffer implements RowBuffer { private boolean addCompleted; ExternalBuffer( - IOManager ioManager, MemorySegmentPool pool, AbstractRowDataSerializer serializer) { + IOManager ioManager, + MemorySegmentPool pool, + AbstractRowDataSerializer serializer, + MemorySize maxDiskSize) { this.ioManager = ioManager; this.pool = pool; + this.maxDiskSize = maxDiskSize; this.binaryRowSerializer = serializer instanceof BinaryRowSerializer @@ -90,7 +96,16 @@ public void reset() { @Override public boolean flushMemory() throws IOException { spill(); - return true; + return getDiskUsage() < maxDiskSize.getBytes(); + } + + private long getDiskUsage() { + long bytes = 0; + + for (ChannelWithMeta spillChannelID : spilledChannelIDs) { + bytes += spillChannelID.getSize(); + } + return bytes; } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java index 7589b45eab9c..b4d3aa89fa2a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java @@ -22,6 +22,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.AbstractRowDataSerializer; import org.apache.paimon.memory.MemorySegmentPool; +import org.apache.paimon.options.MemorySize; import java.io.Closeable; import java.io.IOException; @@ -57,9 +58,10 @@ static RowBuffer getBuffer( IOManager ioManager, MemorySegmentPool memoryPool, AbstractRowDataSerializer serializer, - boolean spillable) { + boolean spillable, + MemorySize maxDiskSize) { if (spillable) { - return new ExternalBuffer(ioManager, memoryPool, serializer); + return new ExternalBuffer(ioManager, memoryPool, serializer, maxDiskSize); } else { return new InMemoryBuffer(memoryPool, serializer); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index fbb51960a06d..dbf73caef2f6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -32,6 +32,7 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.io.RowDataRollingFileWriter; +import org.apache.paimon.options.MemorySize; import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.statistics.FieldStatsCollector; import org.apache.paimon.table.BucketMode; @@ -69,6 +70,7 @@ public class AppendOnlyFileStoreWrite extends MemoryFileStoreWrite private final String fileCompression; private final boolean useWriteBuffer; private final boolean spillable; + private final MemorySize maxDiskSize; private final FieldStatsCollector.Factory[] statsCollectors; private boolean forceBufferSpill = false; @@ -101,6 +103,7 @@ public AppendOnlyFileStoreWrite( this.fileCompression = options.fileCompression(); this.useWriteBuffer = options.useWriteBufferForAppend(); this.spillable = options.writeBufferSpillable(fileIO.isObjectStore(), isStreamingMode); + this.maxDiskSize = options.writeBufferSpillDiskSize(); this.statsCollectors = StatsCollectorFactories.createStatsFactories(options, rowType.getFieldNames()); } @@ -146,7 +149,8 @@ protected RecordWriter createWriter( useWriteBuffer || forceBufferSpill, spillable || forceBufferSpill, fileCompression, - statsCollectors); + statsCollectors, + maxDiskSize); } public AppendOnlyCompactManager.CompactRewriter compactRewriter(