Skip to content

Commit

Permalink
[core] Add compression for ExternalBuffer. (#3146)
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 authored Apr 3, 2024
1 parent e4ca384 commit 6757f56
Show file tree
Hide file tree
Showing 12 changed files with 90 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public interface MemorySegmentPool extends MemorySegmentSource {
/**
* Get the page size of each page this pool holds.
*
* @return the page size
* @return the page size, the bytes size in one page.
*/
int pageSize();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow>, MemoryOwner
private final List<DataFileMeta> compactAfter;
private final LongCounter seqNumCounter;
private final String fileCompression;
private final String spillCompression;
private SinkWriter sinkWriter;
private final FieldStatsCollector.Factory[] statsCollectors;
private final IOManager ioManager;
Expand All @@ -93,6 +94,7 @@ public AppendOnlyWriter(
boolean useWriteBuffer,
boolean spillable,
String fileCompression,
String spillCompression,
FieldStatsCollector.Factory[] statsCollectors,
MemorySize maxDiskSize) {
this.fileIO = fileIO;
Expand All @@ -109,13 +111,14 @@ public AppendOnlyWriter(
this.compactAfter = new ArrayList<>();
this.seqNumCounter = new LongCounter(maxSequenceNumber + 1);
this.fileCompression = fileCompression;
this.spillCompression = spillCompression;
this.ioManager = ioManager;
this.statsCollectors = statsCollectors;
this.maxDiskSize = maxDiskSize;

this.sinkWriter =
useWriteBuffer
? new BufferedSinkWriter(spillable, maxDiskSize)
? new BufferedSinkWriter(spillable, maxDiskSize, spillCompression)
: new DirectSinkWriter();

if (increment != null) {
Expand Down Expand Up @@ -211,7 +214,7 @@ public void toBufferedWriter() throws Exception {
trySyncLatestCompaction(true);

sinkWriter.close();
sinkWriter = new BufferedSinkWriter(true, maxDiskSize);
sinkWriter = new BufferedSinkWriter(true, maxDiskSize, spillCompression);
sinkWriter.setMemoryPool(memorySegmentPool);
}
}
Expand Down Expand Up @@ -378,11 +381,14 @@ private class BufferedSinkWriter implements SinkWriter {

private final MemorySize maxDiskSize;

private final String compression;

private RowBuffer writeBuffer;

private BufferedSinkWriter(boolean spillable, MemorySize maxDiskSize) {
private BufferedSinkWriter(boolean spillable, MemorySize maxDiskSize, String compression) {
this.spillable = spillable;
this.maxDiskSize = maxDiskSize;
this.compression = compression;
}

@Override
Expand Down Expand Up @@ -439,7 +445,8 @@ public void setMemoryPool(MemorySegmentPool memoryPool) {
memoryPool,
new InternalRowSerializer(writeSchema),
spillable,
maxDiskSize);
maxDiskSize,
compression);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ public void open(
coreOptions.writeBufferSize() / 2, coreOptions.pageSize()),
new InternalRowSerializer(table.rowType()),
true,
coreOptions.writeBufferSpillDiskSize());
coreOptions.writeBufferSpillDiskSize(),
coreOptions.spillCompression());
}

public void bootstrapKey(InternalRow value) throws IOException {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import org.apache.paimon.compression.BlockCompressionFactory;
import org.apache.paimon.compression.BlockDecompressor;
import org.apache.paimon.data.AbstractPagedInputView;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.serializer.BinaryRowSerializer;
import org.apache.paimon.io.DataInputView;
import org.apache.paimon.memory.Buffer;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.utils.MutableObjectIterator;

import java.io.EOFException;
import java.io.IOException;
Expand Down Expand Up @@ -100,4 +103,34 @@ public List<MemorySegment> close() throws IOException {
public FileIOChannel getChannel() {
return reader;
}

public MutableObjectIterator<BinaryRow> createBinaryRowIterator(
BinaryRowSerializer serializer) {
return new BinaryRowChannelInputViewIterator(serializer);
}

private class BinaryRowChannelInputViewIterator implements MutableObjectIterator<BinaryRow> {

protected final BinaryRowSerializer serializer;

public BinaryRowChannelInputViewIterator(BinaryRowSerializer serializer) {
this.serializer = serializer;
}

@Override
public BinaryRow next(BinaryRow reuse) throws IOException {
try {
return this.serializer.deserializeFromPages(reuse, ChannelReaderInputView.this);
} catch (EOFException e) {
close();
return null;
}
}

@Override
public BinaryRow next() throws IOException {
throw new UnsupportedOperationException(
"This method is disabled due to performance issue!");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ public int close() throws IOException {
return -1;
}

public void closeAndDelete() throws IOException {
try {
close();
} finally {
writer.deleteChannel();
}
}

@Override
protected MemorySegment nextSegment(MemorySegment current, int positionInCurrent)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
package org.apache.paimon.disk;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.compression.BlockCompressionFactory;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.AbstractRowDataSerializer;
import org.apache.paimon.data.serializer.BinaryRowSerializer;
import org.apache.paimon.memory.Buffer;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.options.MemorySize;
Expand All @@ -49,6 +49,7 @@ public class ExternalBuffer implements RowBuffer {
private final BinaryRowSerializer binaryRowSerializer;
private final InMemoryBuffer inMemoryBuffer;
private final MemorySize maxDiskSize;
private final BlockCompressionFactory compactionFactory;

// The size of each segment
private final int segmentSize;
Expand All @@ -62,11 +63,14 @@ public class ExternalBuffer implements RowBuffer {
IOManager ioManager,
MemorySegmentPool pool,
AbstractRowDataSerializer<?> serializer,
MemorySize maxDiskSize) {
MemorySize maxDiskSize,
String compression) {
this.ioManager = ioManager;
this.pool = pool;
this.maxDiskSize = maxDiskSize;

this.compactionFactory = BlockCompressionFactory.create(compression);

this.binaryRowSerializer =
serializer instanceof BinaryRowSerializer
? (BinaryRowSerializer) serializer.duplicate()
Expand Down Expand Up @@ -155,10 +159,11 @@ private void throwTooBigException(InternalRow row) throws IOException {
private void spill() throws IOException {
FileIOChannel.ID channel = ioManager.createChannel();

BufferFileWriter writer = ioManager.createBufferFileWriter(channel);
ChannelWriterOutputView channelWriterOutputView =
new ChannelWriterOutputView(
ioManager.createBufferFileWriter(channel), compactionFactory, segmentSize);
int numRecordBuffers = inMemoryBuffer.getNumRecordBuffers();
ArrayList<MemorySegment> segments = inMemoryBuffer.getRecordBufferSegments();
long writeBytes;
try {
// spill in memory buffer in zero-copy.
for (int i = 0; i < numRecordBuffers; i++) {
Expand All @@ -167,16 +172,15 @@ private void spill() throws IOException {
i == numRecordBuffers - 1
? inMemoryBuffer.getNumBytesInLastBuffer()
: segment.size();
writer.writeBlock(Buffer.create(segment, bufferSize));
channelWriterOutputView.write(segment, 0, bufferSize);
}
writeBytes = writer.getSize();
LOG.info(
"here spill the reset buffer data with {} records {} bytes",
inMemoryBuffer.size(),
writer.getSize());
writer.close();
channelWriterOutputView.getNumBytes());
channelWriterOutputView.close();
} catch (IOException e) {
writer.closeAndDelete();
channelWriterOutputView.closeAndDelete();
throw e;
}

Expand All @@ -185,7 +189,7 @@ private void spill() throws IOException {
channel,
inMemoryBuffer.getNumRecordBuffers(),
inMemoryBuffer.getNumBytesInLastBuffer(),
writeBytes));
channelWriterOutputView.getNumBytes()));

inMemoryBuffer.reset();
}
Expand Down Expand Up @@ -224,7 +228,7 @@ public class BufferIterator implements RowBufferIterator {
private int currentChannelID = -1;
private BinaryRow row;
private boolean closed;
private BufferFileReaderInputView channelReader;
private ChannelReaderInputView channelReader;

private BufferIterator() {
this.closed = false;
Expand Down Expand Up @@ -303,7 +307,13 @@ private void nextSpilledIterator() throws IOException {

// new reader.
this.channelReader =
new BufferFileReaderInputView(channel.getChannel(), ioManager, segmentSize);
new ChannelReaderInputView(
channel.getChannel(),
ioManager,
compactionFactory,
segmentSize,
channel.getBlockCount());

this.currentIterator = channelReader.createBinaryRowIterator(binaryRowSerializer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ static RowBuffer getBuffer(
MemorySegmentPool memoryPool,
AbstractRowDataSerializer<InternalRow> serializer,
boolean spillable,
MemorySize maxDiskSize) {
MemorySize maxDiskSize,
String compression) {
if (spillable) {
return new ExternalBuffer(ioManager, memoryPool, serializer, maxDiskSize);
return new ExternalBuffer(ioManager, memoryPool, serializer, maxDiskSize, compression);
} else {
return new InMemoryBuffer(memoryPool, serializer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class AppendOnlyFileStoreWrite extends MemoryFileStoreWrite<InternalRow>
private final int compactionMaxFileNum;
private final boolean commitForceCompact;
private final String fileCompression;
private final String spillCompression;
private final boolean useWriteBuffer;
private final boolean spillable;
private final MemorySize maxDiskSize;
Expand Down Expand Up @@ -101,6 +102,7 @@ public AppendOnlyFileStoreWrite(
this.commitForceCompact = options.commitForceCompact();
this.skipCompaction = options.writeOnly();
this.fileCompression = options.fileCompression();
this.spillCompression = options.spillCompression();
this.useWriteBuffer = options.useWriteBufferForAppend();
this.spillable = options.writeBufferSpillable(fileIO.isObjectStore(), isStreamingMode);
this.maxDiskSize = options.writeBufferSpillDiskSize();
Expand Down Expand Up @@ -149,6 +151,7 @@ protected RecordWriter<InternalRow> createWriter(
useWriteBuffer || forceBufferSpill,
spillable || forceBufferSpill,
fileCompression,
spillCompression,
statsCollectors,
maxDiskSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ private Pair<AppendOnlyWriter, List<DataFileMeta>> createWriter(
useWriteBuffer,
spillable,
CoreOptions.FILE_COMPRESSION.defaultValue(),
CoreOptions.SPILL_COMPRESSION.defaultValue(),
StatsCollectorFactories.createStatsFactories(
options, AppendOnlyWriterTest.SCHEMA.getFieldNames()),
MemorySize.MAX_VALUE);
Expand Down
Loading

0 comments on commit 6757f56

Please sign in to comment.