Skip to content

Commit

Permalink
support ExternalBuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi committed Mar 29, 2024
1 parent 220f631 commit c08348b
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.io.RowDataRollingFileWriter;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -76,6 +77,7 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow>, MemoryOwner
private final IOManager ioManager;

private MemorySegmentPool memorySegmentPool;
private MemorySize maxDiskSize;

public AppendOnlyWriter(
FileIO fileIO,
Expand All @@ -92,7 +94,8 @@ public AppendOnlyWriter(
boolean useWriteBuffer,
boolean spillable,
String fileCompression,
FieldStatsCollector.Factory[] statsCollectors) {
FieldStatsCollector.Factory[] statsCollectors,
MemorySize maxDiskSize) {
this.fileIO = fileIO;
this.schemaId = schemaId;
this.fileFormat = fileFormat;
Expand All @@ -109,9 +112,12 @@ public AppendOnlyWriter(
this.fileCompression = fileCompression;
this.ioManager = ioManager;
this.statsCollectors = statsCollectors;
this.maxDiskSize = maxDiskSize;

this.sinkWriter =
useWriteBuffer ? new BufferedSinkWriter(spillable) : new DirectSinkWriter();
useWriteBuffer
? new BufferedSinkWriter(spillable, maxDiskSize)
: new DirectSinkWriter();

if (increment != null) {
newFiles.addAll(increment.newFilesIncrement().newFiles());
Expand Down Expand Up @@ -205,7 +211,7 @@ public void toBufferedWriter() throws Exception {
trySyncLatestCompaction(true);

sinkWriter.close();
sinkWriter = new BufferedSinkWriter(true);
sinkWriter = new BufferedSinkWriter(true, maxDiskSize);
sinkWriter.setMemoryPool(memorySegmentPool);
}
}
Expand Down Expand Up @@ -370,10 +376,13 @@ private class BufferedSinkWriter implements SinkWriter {

private final boolean spillable;

private final MemorySize maxDiskSize;

private RowBuffer writeBuffer;

private BufferedSinkWriter(boolean spillable) {
private BufferedSinkWriter(boolean spillable, MemorySize maxDiskSize) {
this.spillable = spillable;
this.maxDiskSize = maxDiskSize;
}

@Override
Expand Down Expand Up @@ -429,7 +438,8 @@ public void setMemoryPool(MemorySegmentPool memoryPool) {
ioManager,
memoryPool,
new InternalRowSerializer(writeSchema),
spillable);
spillable,
maxDiskSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.memory.Buffer;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.utils.MutableObjectIterator;

import org.slf4j.Logger;
Expand All @@ -47,6 +48,7 @@ public class ExternalBuffer implements RowBuffer {
private final MemorySegmentPool pool;
private final BinaryRowSerializer binaryRowSerializer;
private final InMemoryBuffer inMemoryBuffer;
private final MemorySize maxDiskSize;

// The size of each segment
private final int segmentSize;
Expand All @@ -57,9 +59,13 @@ public class ExternalBuffer implements RowBuffer {
private boolean addCompleted;

ExternalBuffer(
IOManager ioManager, MemorySegmentPool pool, AbstractRowDataSerializer<?> serializer) {
IOManager ioManager,
MemorySegmentPool pool,
AbstractRowDataSerializer<?> serializer,
MemorySize maxDiskSize) {
this.ioManager = ioManager;
this.pool = pool;
this.maxDiskSize = maxDiskSize;

this.binaryRowSerializer =
serializer instanceof BinaryRowSerializer
Expand Down Expand Up @@ -90,7 +96,16 @@ public void reset() {
@Override
public boolean flushMemory() throws IOException {
spill();
return true;
return getDiskUsage() < maxDiskSize.getBytes();
}

private long getDiskUsage() {
long bytes = 0;

for (ChannelWithMeta spillChannelID : spilledChannelIDs) {
bytes += spillChannelID.getSize();
}
return bytes;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.AbstractRowDataSerializer;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.options.MemorySize;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -57,9 +58,10 @@ static RowBuffer getBuffer(
IOManager ioManager,
MemorySegmentPool memoryPool,
AbstractRowDataSerializer<InternalRow> serializer,
boolean spillable) {
boolean spillable,
MemorySize maxDiskSize) {
if (spillable) {
return new ExternalBuffer(ioManager, memoryPool, serializer);
return new ExternalBuffer(ioManager, memoryPool, serializer, maxDiskSize);
} else {
return new InMemoryBuffer(memoryPool, serializer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.RowDataRollingFileWriter;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.table.BucketMode;
Expand Down Expand Up @@ -69,6 +70,7 @@ public class AppendOnlyFileStoreWrite extends MemoryFileStoreWrite<InternalRow>
private final String fileCompression;
private final boolean useWriteBuffer;
private final boolean spillable;
private final MemorySize maxDiskSize;
private final FieldStatsCollector.Factory[] statsCollectors;

private boolean forceBufferSpill = false;
Expand Down Expand Up @@ -101,6 +103,7 @@ public AppendOnlyFileStoreWrite(
this.fileCompression = options.fileCompression();
this.useWriteBuffer = options.useWriteBufferForAppend();
this.spillable = options.writeBufferSpillable(fileIO.isObjectStore(), isStreamingMode);
this.maxDiskSize = options.writeBufferSpillDiskSize();
this.statsCollectors =
StatsCollectorFactories.createStatsFactories(options, rowType.getFieldNames());
}
Expand Down Expand Up @@ -146,7 +149,8 @@ protected RecordWriter<InternalRow> createWriter(
useWriteBuffer || forceBufferSpill,
spillable || forceBufferSpill,
fileCompression,
statsCollectors);
statsCollectors,
maxDiskSize);
}

public AppendOnlyCompactManager.CompactRewriter compactRewriter(
Expand Down

0 comments on commit c08348b

Please sign in to comment.