Skip to content

Commit

Permalink
[core] Add limitation to the buffer spill disk usage
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi committed Mar 28, 2024
1 parent f70f9d0 commit 8452914
Show file tree
Hide file tree
Showing 16 changed files with 107 additions and 17 deletions.
11 changes: 11 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,13 @@ public class CoreOptions implements Serializable {
.withDescription(
"Amount of data to build up in memory before converting to a sorted on-disk file.");

public static final ConfigOption<MemorySize> WRITE_BUFFER_MAX_DISK_SIZE =
key("write-buffer.max-disk-size")
.memoryType()
.defaultValue(MemorySize.ofMebiBytes(10 * 1024))
.withDescription(
"The max disk to use for write buffer spill. This only work when the write buffer spill is enabled");

public static final ConfigOption<Boolean> WRITE_BUFFER_SPILLABLE =
key("write-buffer-spillable")
.booleanType()
Expand Down Expand Up @@ -1256,6 +1263,10 @@ public boolean writeBufferSpillable(boolean usingObjectStore, boolean isStreamin
return options.getOptional(WRITE_BUFFER_SPILLABLE).orElse(usingObjectStore || !isStreaming);
}

public MemorySize writeBufferSpillDiskSize() {
return options.get(WRITE_BUFFER_MAX_DISK_SIZE);
}

public boolean useWriteBufferForAppend() {
return options.get(WRITE_BUFFER_FOR_APPEND);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ private void bulkLoadBootstrapRecords() {
coreOptions.writeBufferSize() / 2,
coreOptions.pageSize(),
coreOptions.localSortMaxNumFileHandles(),
coreOptions.spillCompression());
coreOptions.spillCompression(),
coreOptions.writeBufferSpillDiskSize());

Function<SortOrder, RowIterator> iteratorFunction =
sortOrder -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ public class ChannelWithMeta {
private final FileIOChannel.ID channel;
private final int blockCount;
private final int numBytesInLastBlock;
private final long size;

public ChannelWithMeta(FileIOChannel.ID channel, int blockCount, int numBytesInLastBlock) {
this.channel = channel;
this.blockCount = blockCount;
this.numBytesInLastBlock = numBytesInLastBlock;
this.size = channel.getPathFile().length();
}

public FileIOChannel.ID getChannel() {
Expand All @@ -42,4 +44,8 @@ public int getBlockCount() {
public int getNumBytesInLastBlock() {
return numBytesInLastBlock;
}

public long getSize() {
return size;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ public static BinaryExternalSortBuffer createBulkLoadSorter(
options.writeBufferSize() / 2,
options.pageSize(),
options.localSortMaxNumFileHandles(),
options.spillCompression());
options.spillCompression(),
options.writeBufferSpillDiskSize());
}

/** A class wraps byte[] to implement equals and hashCode. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
import org.apache.paimon.mergetree.compact.SortMergeReader;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.sort.SortBuffer;
Expand Down Expand Up @@ -67,6 +68,7 @@ public class MergeSorter {
private final int spillThreshold;
private final int spillSortMaxNumFiles;
private final String compression;
private final MemorySize maxDiskSize;

private final MemorySegmentPool memoryPool;

Expand All @@ -86,6 +88,7 @@ public MergeSorter(
this.memoryPool =
new CachelessSegmentPool(options.sortSpillBufferSize(), options.pageSize());
this.ioManager = ioManager;
this.maxDiskSize = options.writeBufferSpillDiskSize();
}

public MemorySegmentPool memoryPool() {
Expand Down Expand Up @@ -213,7 +216,8 @@ public ExternalSorterWithLevel(@Nullable FieldsComparator userDefinedSeqComparat
sortFields.toArray(),
memoryPool,
spillSortMaxNumFiles,
compression);
compression,
maxDiskSize);
}

public boolean put(KeyValue keyValue) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.FieldsComparator;
Expand All @@ -54,6 +55,7 @@
public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {

private final boolean writeBufferSpillable;
private final MemorySize maxDiskSize;
private final int sortMaxFan;
private final String sortCompression;
private final IOManager ioManager;
Expand All @@ -79,6 +81,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {

public MergeTreeWriter(
boolean writeBufferSpillable,
MemorySize maxDiskSize,
int sortMaxFan,
String sortCompression,
IOManager ioManager,
Expand All @@ -92,6 +95,7 @@ public MergeTreeWriter(
@Nullable CommitIncrement increment,
@Nullable FieldsComparator userDefinedSeqComparator) {
this.writeBufferSpillable = writeBufferSpillable;
this.maxDiskSize = maxDiskSize;
this.sortMaxFan = sortMaxFan;
this.sortCompression = sortCompression;
this.ioManager = ioManager;
Expand Down Expand Up @@ -141,6 +145,7 @@ public void setMemoryPool(MemorySegmentPool memoryPool) {
userDefinedSeqComparator,
memoryPool,
writeBufferSpillable,
maxDiskSize,
sortMaxFan,
sortCompression,
ioManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.sort.BinaryInMemorySortBuffer;
import org.apache.paimon.sort.SortBuffer;
Expand Down Expand Up @@ -66,6 +67,7 @@ public SortBufferWriteBuffer(
@Nullable FieldsComparator userDefinedSeqComparator,
MemorySegmentPool memoryPool,
boolean spillable,
MemorySize maxDiskSize,
int sortMaxFan,
String compression,
IOManager ioManager) {
Expand Down Expand Up @@ -118,7 +120,8 @@ public SortBufferWriteBuffer(
inMemorySortBuffer,
ioManager,
sortMaxFan,
compression)
compression,
maxDiskSize)
: inMemorySortBuffer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ protected MergeTreeWriter createWriter(

return new MergeTreeWriter(
bufferSpillable(),
options.writeBufferSpillDiskSize(),
options.localSortMaxNumFileHandles(),
options.spillCompression(),
ioManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class BinaryExternalSortBuffer implements SortBuffer {

private final FileIOChannel.Enumerator enumerator;
private final List<ChannelWithMeta> spillChannelIDs;
private final MemorySize maxDiskSize;

private int numRecords = 0;

Expand All @@ -67,14 +68,16 @@ public BinaryExternalSortBuffer(
BinaryInMemorySortBuffer inMemorySortBuffer,
IOManager ioManager,
int maxNumFileHandles,
String compression) {
String compression,
MemorySize maxDiskSize) {
this.serializer = serializer;
this.inMemorySortBuffer = inMemorySortBuffer;
this.ioManager = ioManager;
this.channelManager = new SpillChannelManager();
this.maxNumFileHandles = maxNumFileHandles;
this.compressionCodecFactory = BlockCompressionFactory.create(compression);
this.compressionBlockSize = (int) MemorySize.parse("64 kb").getBytes();
this.maxDiskSize = maxDiskSize;
this.merger =
new BinaryExternalMerger(
ioManager,
Expand All @@ -96,14 +99,16 @@ public static BinaryExternalSortBuffer create(
long bufferSize,
int pageSize,
int maxNumFileHandles,
String compression) {
String compression,
MemorySize maxDiskSize) {
return create(
ioManager,
rowType,
keyFields,
new HeapMemorySegmentPool(bufferSize, pageSize),
maxNumFileHandles,
compression);
compression,
maxDiskSize);
}

public static BinaryExternalSortBuffer create(
Expand All @@ -112,7 +117,8 @@ public static BinaryExternalSortBuffer create(
int[] keyFields,
MemorySegmentPool pool,
int maxNumFileHandles,
String compression) {
String compression,
MemorySize maxDiskSize) {
RecordComparator comparator = newRecordComparator(rowType.getFieldTypes(), keyFields);
BinaryInMemorySortBuffer sortBuffer =
BinaryInMemorySortBuffer.createBuffer(
Expand All @@ -127,7 +133,8 @@ public static BinaryExternalSortBuffer create(
sortBuffer,
ioManager,
maxNumFileHandles,
compression);
compression,
maxDiskSize);
}

@Override
Expand All @@ -154,7 +161,16 @@ public long getOccupancy() {
@Override
public boolean flushMemory() throws IOException {
spill();
return true;
return getDiskUsage() < maxDiskSize.getBytes();
}

private long getDiskUsage() {
long bytes = 0;

for (ChannelWithMeta spillChannelID : spillChannelIDs) {
bytes += spillChannelID.getSize();
}
return bytes;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ private MergeTreeWriter createMergeTreeWriter(
MergeTreeWriter writer =
new MergeTreeWriter(
false,
MemorySize.ofKibiBytes(10),
128,
"lz4",
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.paimon.mergetree.compact.MergeFunctionTestUtils;
import org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction;
import org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.sort.BinaryInMemorySortBuffer;
import org.apache.paimon.types.BigIntType;
Expand Down Expand Up @@ -69,6 +70,7 @@ public abstract class SortBufferWriteBufferTestBase {
null,
new HeapMemorySegmentPool(32 * 1024 * 3L, 32 * 1024),
false,
MemorySize.MAX_VALUE,
128,
"lz4",
null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.utils.MutableObjectIterator;

import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -262,11 +263,38 @@ public void testSpillingRandom() throws Exception {
sorter.clear();
}

@Test
public void testSpillingMaxDiskSize() throws Exception {
BinaryExternalSortBuffer sorter = createBuffer(128, MemorySize.ofKibiBytes(10));
int size = 1000_000;

MockBinaryRowReader reader = new MockBinaryRowReader(size);
sorter.write(reader);

assertThat(sorter.flushMemory()).isFalse();
assertThat(sorter.size()).isEqualTo(size);

MutableObjectIterator<BinaryRow> iterator = sorter.sortedIterator();

BinaryRow next = serializer.createInstance();
for (int i = 0; i < size; i++) {
next = iterator.next(next);
assertThat(next.getInt(0)).isEqualTo(i);
assertThat(next.getString(1).toString()).isEqualTo(getString(i));
}

sorter.clear();
}

private BinaryExternalSortBuffer createBuffer() {
return createBuffer(128);
return createBuffer(128, MemorySize.MAX_VALUE);
}

private BinaryExternalSortBuffer createBuffer(int maxNumFileHandles) {
return createBuffer(maxNumFileHandles, MemorySize.MAX_VALUE);
}

private BinaryExternalSortBuffer createBuffer(int maxNumFileHandles, MemorySize diskSize) {
@SuppressWarnings({"unchecked", "rawtypes"})
BinaryInMemorySortBuffer inMemorySortBuffer =
BinaryInMemorySortBuffer.createBuffer(
Expand All @@ -281,7 +309,8 @@ private BinaryExternalSortBuffer createBuffer(int maxNumFileHandles) {
inMemorySortBuffer,
ioManager,
maxNumFileHandles,
"lz4");
"lz4",
diskSize);
}

/** Mock reader for binary row. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.mergetree.SortBufferWriteBuffer;
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.PrimaryKeyTableUtils;
Expand Down Expand Up @@ -120,6 +121,7 @@ public List<DataField> valueFields(TableSchema schema) {
new HeapMemorySegmentPool(
options.localMergeBufferSize(), options.pageSize()),
false,
MemorySize.MAX_VALUE,
options.localSortMaxNumFileHandles(),
options.spillCompression(),
null);
Expand Down
Loading

0 comments on commit 8452914

Please sign in to comment.