From 84529149f8416c9bce76224c945dbbeb1e89f86b Mon Sep 17 00:00:00 2001 From: Aitozi Date: Wed, 27 Mar 2024 20:33:50 +0800 Subject: [PATCH] [core] Add limitation to the buffer spill disk usage --- .../java/org/apache/paimon/CoreOptions.java | 11 +++++++ .../crosspartition/GlobalIndexAssigner.java | 3 +- .../apache/paimon/disk/ChannelWithMeta.java | 6 ++++ .../apache/paimon/lookup/RocksDBState.java | 3 +- .../apache/paimon/mergetree/MergeSorter.java | 6 +++- .../paimon/mergetree/MergeTreeWriter.java | 5 +++ .../mergetree/SortBufferWriteBuffer.java | 5 ++- .../operation/KeyValueFileStoreWrite.java | 1 + .../paimon/sort/BinaryExternalSortBuffer.java | 28 ++++++++++++---- .../paimon/mergetree/MergeTreeTestBase.java | 1 + .../SortBufferWriteBufferTestBase.java | 2 ++ .../sort/BinaryExternalSortBufferTest.java | 33 +++++++++++++++++-- .../paimon/flink/sink/LocalMergeOperator.java | 2 ++ .../paimon/flink/sorter/SortOperator.java | 9 +++-- .../apache/paimon/flink/sorter/SortUtils.java | 3 +- .../paimon/flink/sorter/SortOperatorTest.java | 6 ++-- 16 files changed, 107 insertions(+), 17 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 05e9312373411..002897a67d4aa 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -309,6 +309,13 @@ public class CoreOptions implements Serializable { .withDescription( "Amount of data to build up in memory before converting to a sorted on-disk file."); + public static final ConfigOption WRITE_BUFFER_MAX_DISK_SIZE = + key("write-buffer.max-disk-size") + .memoryType() + .defaultValue(MemorySize.ofMebiBytes(10 * 1024)) + .withDescription( + "The max disk to use for write buffer spill. This only work when the write buffer spill is enabled"); + public static final ConfigOption WRITE_BUFFER_SPILLABLE = key("write-buffer-spillable") .booleanType() @@ -1256,6 +1263,10 @@ public boolean writeBufferSpillable(boolean usingObjectStore, boolean isStreamin return options.getOptional(WRITE_BUFFER_SPILLABLE).orElse(usingObjectStore || !isStreaming); } + public MemorySize writeBufferSpillDiskSize() { + return options.get(WRITE_BUFFER_MAX_DISK_SIZE); + } + public boolean useWriteBufferForAppend() { return options.get(WRITE_BUFFER_FOR_APPEND); } diff --git a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java index 787fc9e0f95c7..63853eb94c791 100644 --- a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java +++ b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java @@ -291,7 +291,8 @@ private void bulkLoadBootstrapRecords() { coreOptions.writeBufferSize() / 2, coreOptions.pageSize(), coreOptions.localSortMaxNumFileHandles(), - coreOptions.spillCompression()); + coreOptions.spillCompression(), + coreOptions.writeBufferSpillDiskSize()); Function iteratorFunction = sortOrder -> { diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWithMeta.java b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWithMeta.java index f728ad1dd21fc..ab414eeafa0ac 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWithMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWithMeta.java @@ -24,11 +24,13 @@ public class ChannelWithMeta { private final FileIOChannel.ID channel; private final int blockCount; private final int numBytesInLastBlock; + private final long size; public ChannelWithMeta(FileIOChannel.ID channel, int blockCount, int numBytesInLastBlock) { this.channel = channel; this.blockCount = blockCount; this.numBytesInLastBlock = numBytesInLastBlock; + this.size = channel.getPathFile().length(); } public FileIOChannel.ID getChannel() { @@ -42,4 +44,8 @@ public int getBlockCount() { public int getNumBytesInLastBlock() { return numBytesInLastBlock; } + + public long getSize() { + return size; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java index 2e10acb0e1460..c0417b8f1316b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java +++ b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java @@ -112,7 +112,8 @@ public static BinaryExternalSortBuffer createBulkLoadSorter( options.writeBufferSize() / 2, options.pageSize(), options.localSortMaxNumFileHandles(), - options.spillCompression()); + options.spillCompression(), + options.writeBufferSpillDiskSize()); } /** A class wraps byte[] to implement equals and hashCode. */ diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java index 5e30d16aa4408..0f54b40b681ad 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java @@ -32,6 +32,7 @@ import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier; import org.apache.paimon.mergetree.compact.MergeFunctionWrapper; import org.apache.paimon.mergetree.compact.SortMergeReader; +import org.apache.paimon.options.MemorySize; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.sort.BinaryExternalSortBuffer; import org.apache.paimon.sort.SortBuffer; @@ -67,6 +68,7 @@ public class MergeSorter { private final int spillThreshold; private final int spillSortMaxNumFiles; private final String compression; + private final MemorySize maxDiskSize; private final MemorySegmentPool memoryPool; @@ -86,6 +88,7 @@ public MergeSorter( this.memoryPool = new CachelessSegmentPool(options.sortSpillBufferSize(), options.pageSize()); this.ioManager = ioManager; + this.maxDiskSize = options.writeBufferSpillDiskSize(); } public MemorySegmentPool memoryPool() { @@ -213,7 +216,8 @@ public ExternalSorterWithLevel(@Nullable FieldsComparator userDefinedSeqComparat sortFields.toArray(), memoryPool, spillSortMaxNumFiles, - compression); + compression, + maxDiskSize); } public boolean put(KeyValue keyValue) throws IOException { diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index 537d838d0cbb4..32c6683786299 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -33,6 +33,7 @@ import org.apache.paimon.memory.MemoryOwner; import org.apache.paimon.memory.MemorySegmentPool; import org.apache.paimon.mergetree.compact.MergeFunction; +import org.apache.paimon.options.MemorySize; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CommitIncrement; import org.apache.paimon.utils.FieldsComparator; @@ -54,6 +55,7 @@ public class MergeTreeWriter implements RecordWriter, MemoryOwner { private final boolean writeBufferSpillable; + private final MemorySize maxDiskSize; private final int sortMaxFan; private final String sortCompression; private final IOManager ioManager; @@ -79,6 +81,7 @@ public class MergeTreeWriter implements RecordWriter, MemoryOwner { public MergeTreeWriter( boolean writeBufferSpillable, + MemorySize maxDiskSize, int sortMaxFan, String sortCompression, IOManager ioManager, @@ -92,6 +95,7 @@ public MergeTreeWriter( @Nullable CommitIncrement increment, @Nullable FieldsComparator userDefinedSeqComparator) { this.writeBufferSpillable = writeBufferSpillable; + this.maxDiskSize = maxDiskSize; this.sortMaxFan = sortMaxFan; this.sortCompression = sortCompression; this.ioManager = ioManager; @@ -141,6 +145,7 @@ public void setMemoryPool(MemorySegmentPool memoryPool) { userDefinedSeqComparator, memoryPool, writeBufferSpillable, + maxDiskSize, sortMaxFan, sortCompression, ioManager); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java index 072c8a0508c63..6406435a09ee9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/SortBufferWriteBuffer.java @@ -33,6 +33,7 @@ import org.apache.paimon.memory.MemorySegmentPool; import org.apache.paimon.mergetree.compact.MergeFunction; import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper; +import org.apache.paimon.options.MemorySize; import org.apache.paimon.sort.BinaryExternalSortBuffer; import org.apache.paimon.sort.BinaryInMemorySortBuffer; import org.apache.paimon.sort.SortBuffer; @@ -66,6 +67,7 @@ public SortBufferWriteBuffer( @Nullable FieldsComparator userDefinedSeqComparator, MemorySegmentPool memoryPool, boolean spillable, + MemorySize maxDiskSize, int sortMaxFan, String compression, IOManager ioManager) { @@ -118,7 +120,8 @@ public SortBufferWriteBuffer( inMemorySortBuffer, ioManager, sortMaxFan, - compression) + compression, + maxDiskSize) : inMemorySortBuffer; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 6f74e67208ded..a4b15323938bf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -195,6 +195,7 @@ protected MergeTreeWriter createWriter( return new MergeTreeWriter( bufferSpillable(), + options.writeBufferSpillDiskSize(), options.localSortMaxNumFileHandles(), options.spillCompression(), ioManager, diff --git a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java index f7cf9092859d9..c755d1d2c665b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java +++ b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java @@ -57,6 +57,7 @@ public class BinaryExternalSortBuffer implements SortBuffer { private final FileIOChannel.Enumerator enumerator; private final List spillChannelIDs; + private final MemorySize maxDiskSize; private int numRecords = 0; @@ -67,7 +68,8 @@ public BinaryExternalSortBuffer( BinaryInMemorySortBuffer inMemorySortBuffer, IOManager ioManager, int maxNumFileHandles, - String compression) { + String compression, + MemorySize maxDiskSize) { this.serializer = serializer; this.inMemorySortBuffer = inMemorySortBuffer; this.ioManager = ioManager; @@ -75,6 +77,7 @@ public BinaryExternalSortBuffer( this.maxNumFileHandles = maxNumFileHandles; this.compressionCodecFactory = BlockCompressionFactory.create(compression); this.compressionBlockSize = (int) MemorySize.parse("64 kb").getBytes(); + this.maxDiskSize = maxDiskSize; this.merger = new BinaryExternalMerger( ioManager, @@ -96,14 +99,16 @@ public static BinaryExternalSortBuffer create( long bufferSize, int pageSize, int maxNumFileHandles, - String compression) { + String compression, + MemorySize maxDiskSize) { return create( ioManager, rowType, keyFields, new HeapMemorySegmentPool(bufferSize, pageSize), maxNumFileHandles, - compression); + compression, + maxDiskSize); } public static BinaryExternalSortBuffer create( @@ -112,7 +117,8 @@ public static BinaryExternalSortBuffer create( int[] keyFields, MemorySegmentPool pool, int maxNumFileHandles, - String compression) { + String compression, + MemorySize maxDiskSize) { RecordComparator comparator = newRecordComparator(rowType.getFieldTypes(), keyFields); BinaryInMemorySortBuffer sortBuffer = BinaryInMemorySortBuffer.createBuffer( @@ -127,7 +133,8 @@ public static BinaryExternalSortBuffer create( sortBuffer, ioManager, maxNumFileHandles, - compression); + compression, + maxDiskSize); } @Override @@ -154,7 +161,16 @@ public long getOccupancy() { @Override public boolean flushMemory() throws IOException { spill(); - return true; + return getDiskUsage() < maxDiskSize.getBytes(); + } + + private long getDiskUsage() { + long bytes = 0; + + for (ChannelWithMeta spillChannelID : spillChannelIDs) { + bytes += spillChannelID.getSize(); + } + return bytes; } @VisibleForTesting diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index fef552abdabb6..a1e55f73fc56d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -420,6 +420,7 @@ private MergeTreeWriter createMergeTreeWriter( MergeTreeWriter writer = new MergeTreeWriter( false, + MemorySize.ofKibiBytes(10), 128, "lz4", null, diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java index 0d873507f43af..b71971ab6c1fe 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java @@ -30,6 +30,7 @@ import org.apache.paimon.mergetree.compact.MergeFunctionTestUtils; import org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction; import org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction; +import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import org.apache.paimon.sort.BinaryInMemorySortBuffer; import org.apache.paimon.types.BigIntType; @@ -69,6 +70,7 @@ public abstract class SortBufferWriteBufferTestBase { null, new HeapMemorySegmentPool(32 * 1024 * 3L, 32 * 1024), false, + MemorySize.MAX_VALUE, 128, "lz4", null); diff --git a/paimon-core/src/test/java/org/apache/paimon/sort/BinaryExternalSortBufferTest.java b/paimon-core/src/test/java/org/apache/paimon/sort/BinaryExternalSortBufferTest.java index 503a427d958e1..4c5cc12bf01af 100644 --- a/paimon-core/src/test/java/org/apache/paimon/sort/BinaryExternalSortBufferTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/sort/BinaryExternalSortBufferTest.java @@ -26,6 +26,7 @@ import org.apache.paimon.disk.IOManager; import org.apache.paimon.memory.HeapMemorySegmentPool; import org.apache.paimon.memory.MemorySegmentPool; +import org.apache.paimon.options.MemorySize; import org.apache.paimon.utils.MutableObjectIterator; import org.junit.jupiter.api.AfterEach; @@ -262,11 +263,38 @@ public void testSpillingRandom() throws Exception { sorter.clear(); } + @Test + public void testSpillingMaxDiskSize() throws Exception { + BinaryExternalSortBuffer sorter = createBuffer(128, MemorySize.ofKibiBytes(10)); + int size = 1000_000; + + MockBinaryRowReader reader = new MockBinaryRowReader(size); + sorter.write(reader); + + assertThat(sorter.flushMemory()).isFalse(); + assertThat(sorter.size()).isEqualTo(size); + + MutableObjectIterator iterator = sorter.sortedIterator(); + + BinaryRow next = serializer.createInstance(); + for (int i = 0; i < size; i++) { + next = iterator.next(next); + assertThat(next.getInt(0)).isEqualTo(i); + assertThat(next.getString(1).toString()).isEqualTo(getString(i)); + } + + sorter.clear(); + } + private BinaryExternalSortBuffer createBuffer() { - return createBuffer(128); + return createBuffer(128, MemorySize.MAX_VALUE); } private BinaryExternalSortBuffer createBuffer(int maxNumFileHandles) { + return createBuffer(maxNumFileHandles, MemorySize.MAX_VALUE); + } + + private BinaryExternalSortBuffer createBuffer(int maxNumFileHandles, MemorySize diskSize) { @SuppressWarnings({"unchecked", "rawtypes"}) BinaryInMemorySortBuffer inMemorySortBuffer = BinaryInMemorySortBuffer.createBuffer( @@ -281,7 +309,8 @@ private BinaryExternalSortBuffer createBuffer(int maxNumFileHandles) { inMemorySortBuffer, ioManager, maxNumFileHandles, - "lz4"); + "lz4", + diskSize); } /** Mock reader for binary row. */ diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java index a0529c575ed69..0a5eceb492151 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java @@ -27,6 +27,7 @@ import org.apache.paimon.memory.HeapMemorySegmentPool; import org.apache.paimon.mergetree.SortBufferWriteBuffer; import org.apache.paimon.mergetree.compact.MergeFunction; +import org.apache.paimon.options.MemorySize; import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.PrimaryKeyTableUtils; @@ -120,6 +121,7 @@ public List valueFields(TableSchema schema) { new HeapMemorySegmentPool( options.localMergeBufferSize(), options.pageSize()), false, + MemorySize.MAX_VALUE, options.localSortMaxNumFileHandles(), options.spillCompression(), null); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java index 1292c7ce31ce6..331d1058dab5d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortOperator.java @@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; +import org.apache.paimon.options.MemorySize; import org.apache.paimon.sort.BinaryExternalSortBuffer; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.MutableObjectIterator; @@ -45,6 +46,7 @@ public class SortOperator extends TableStreamOperator private final int spillSortMaxNumFiles; private final String spillCompression; private final int sinkParallelism; + private final MemorySize maxDiskSize; private transient BinaryExternalSortBuffer buffer; private transient IOManager ioManager; @@ -56,7 +58,8 @@ public SortOperator( int pageSize, int spillSortMaxNumFiles, String spillCompression, - int sinkParallelism) { + int sinkParallelism, + MemorySize maxDiskSize) { this.keyType = keyType; this.rowType = rowType; this.maxMemory = maxMemory; @@ -65,6 +68,7 @@ public SortOperator( this.spillSortMaxNumFiles = spillSortMaxNumFiles; this.spillCompression = spillCompression; this.sinkParallelism = sinkParallelism; + this.maxDiskSize = maxDiskSize; } @Override @@ -94,7 +98,8 @@ void initBuffer() { maxMemory, pageSize, spillSortMaxNumFiles, - spillCompression); + spillCompression, + maxDiskSize); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java index c31aaa1a4c785..00a1475afbad0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/SortUtils.java @@ -164,7 +164,8 @@ public Tuple2 map(RowData value) { options.pageSize(), options.localSortMaxNumFileHandles(), options.spillCompression(), - sinkParallelism)) + sinkParallelism, + options.writeBufferSpillDiskSize())) .setParallelism(sinkParallelism) // remove the key column from every row .map( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/SortOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/SortOperatorTest.java index 78db46b424262..09db305799e6f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/SortOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sorter/SortOperatorTest.java @@ -63,7 +63,8 @@ public void testSort() throws Exception { (int) MemorySize.parse("16 kb").getBytes(), 128, "lz4", - 1) {}; + 1, + MemorySize.MAX_VALUE) {}; OneInputStreamOperatorTestHarness harness = createTestHarness(sortOperator); harness.open(); @@ -111,7 +112,8 @@ public void testCloseSortOperator() throws Exception { (int) MemorySize.parse("16 kb").getBytes(), 128, "lz4", - 1) {}; + 1, + MemorySize.MAX_VALUE) {}; OneInputStreamOperatorTestHarness harness = createTestHarness(sortOperator); harness.open(); File[] files = harness.getEnvironment().getIOManager().getSpillingDirectories();