Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Add compression for ExternalBuffer. #3146

Merged
merged 4 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public interface MemorySegmentPool extends MemorySegmentSource {
/**
* Get the page size of each page this pool holds.
*
* @return the page size
* @return the page size, the bytes size in one page.
*/
int pageSize();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow>, MemoryOwner
private final List<DataFileMeta> compactAfter;
private final LongCounter seqNumCounter;
private final String fileCompression;
private final String spillCompression;
private SinkWriter sinkWriter;
private final FieldStatsCollector.Factory[] statsCollectors;
private final IOManager ioManager;
Expand All @@ -93,6 +94,7 @@ public AppendOnlyWriter(
boolean useWriteBuffer,
boolean spillable,
String fileCompression,
String spillCompression,
FieldStatsCollector.Factory[] statsCollectors,
MemorySize maxDiskSize) {
this.fileIO = fileIO;
Expand All @@ -109,13 +111,14 @@ public AppendOnlyWriter(
this.compactAfter = new ArrayList<>();
this.seqNumCounter = new LongCounter(maxSequenceNumber + 1);
this.fileCompression = fileCompression;
this.spillCompression = spillCompression;
this.ioManager = ioManager;
this.statsCollectors = statsCollectors;
this.maxDiskSize = maxDiskSize;

this.sinkWriter =
useWriteBuffer
? new BufferedSinkWriter(spillable, maxDiskSize)
? new BufferedSinkWriter(spillable, maxDiskSize, spillCompression)
: new DirectSinkWriter();

if (increment != null) {
Expand Down Expand Up @@ -211,7 +214,7 @@ public void toBufferedWriter() throws Exception {
trySyncLatestCompaction(true);

sinkWriter.close();
sinkWriter = new BufferedSinkWriter(true, maxDiskSize);
sinkWriter = new BufferedSinkWriter(true, maxDiskSize, spillCompression);
sinkWriter.setMemoryPool(memorySegmentPool);
}
}
Expand Down Expand Up @@ -378,11 +381,14 @@ private class BufferedSinkWriter implements SinkWriter {

private final MemorySize maxDiskSize;

private final String compression;

private RowBuffer writeBuffer;

private BufferedSinkWriter(boolean spillable, MemorySize maxDiskSize) {
private BufferedSinkWriter(boolean spillable, MemorySize maxDiskSize, String compression) {
this.spillable = spillable;
this.maxDiskSize = maxDiskSize;
this.compression = compression;
}

@Override
Expand Down Expand Up @@ -439,7 +445,8 @@ public void setMemoryPool(MemorySegmentPool memoryPool) {
memoryPool,
new InternalRowSerializer(writeSchema),
spillable,
maxDiskSize);
maxDiskSize,
compression);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ public void open(
coreOptions.writeBufferSize() / 2, coreOptions.pageSize()),
new InternalRowSerializer(table.rowType()),
true,
coreOptions.writeBufferSpillDiskSize());
coreOptions.writeBufferSpillDiskSize(),
coreOptions.spillCompression());
}

public void bootstrapKey(InternalRow value) throws IOException {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import org.apache.paimon.compression.BlockCompressionFactory;
import org.apache.paimon.compression.BlockDecompressor;
import org.apache.paimon.data.AbstractPagedInputView;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.serializer.BinaryRowSerializer;
import org.apache.paimon.io.DataInputView;
import org.apache.paimon.memory.Buffer;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.utils.MutableObjectIterator;

import java.io.EOFException;
import java.io.IOException;
Expand Down Expand Up @@ -100,4 +103,34 @@ public List<MemorySegment> close() throws IOException {
public FileIOChannel getChannel() {
return reader;
}

public MutableObjectIterator<BinaryRow> createBinaryRowIterator(
BinaryRowSerializer serializer) {
return new BinaryRowChannelInputViewIterator(serializer);
}

private class BinaryRowChannelInputViewIterator implements MutableObjectIterator<BinaryRow> {

protected final BinaryRowSerializer serializer;

public BinaryRowChannelInputViewIterator(BinaryRowSerializer serializer) {
this.serializer = serializer;
}

@Override
public BinaryRow next(BinaryRow reuse) throws IOException {
try {
return this.serializer.deserializeFromPages(reuse, ChannelReaderInputView.this);
} catch (EOFException e) {
close();
return null;
}
}

@Override
public BinaryRow next() throws IOException {
throw new UnsupportedOperationException(
"This method is disabled due to performance issue!");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ public int close() throws IOException {
return -1;
}

public void closeAndDelete() throws IOException {
try {
close();
} finally {
writer.deleteChannel();
}
}

@Override
protected MemorySegment nextSegment(MemorySegment current, int positionInCurrent)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
package org.apache.paimon.disk;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.compression.BlockCompressionFactory;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.AbstractRowDataSerializer;
import org.apache.paimon.data.serializer.BinaryRowSerializer;
import org.apache.paimon.memory.Buffer;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.options.MemorySize;
Expand All @@ -49,6 +49,7 @@ public class ExternalBuffer implements RowBuffer {
private final BinaryRowSerializer binaryRowSerializer;
private final InMemoryBuffer inMemoryBuffer;
private final MemorySize maxDiskSize;
private final BlockCompressionFactory compactionFactory;

// The size of each segment
private final int segmentSize;
Expand All @@ -62,11 +63,14 @@ public class ExternalBuffer implements RowBuffer {
IOManager ioManager,
MemorySegmentPool pool,
AbstractRowDataSerializer<?> serializer,
MemorySize maxDiskSize) {
MemorySize maxDiskSize,
String compression) {
this.ioManager = ioManager;
this.pool = pool;
this.maxDiskSize = maxDiskSize;

this.compactionFactory = BlockCompressionFactory.create(compression);

this.binaryRowSerializer =
serializer instanceof BinaryRowSerializer
? (BinaryRowSerializer) serializer.duplicate()
Expand Down Expand Up @@ -155,10 +159,11 @@ private void throwTooBigException(InternalRow row) throws IOException {
private void spill() throws IOException {
FileIOChannel.ID channel = ioManager.createChannel();

BufferFileWriter writer = ioManager.createBufferFileWriter(channel);
ChannelWriterOutputView channelWriterOutputView =
new ChannelWriterOutputView(
ioManager.createBufferFileWriter(channel), compactionFactory, segmentSize);
int numRecordBuffers = inMemoryBuffer.getNumRecordBuffers();
ArrayList<MemorySegment> segments = inMemoryBuffer.getRecordBufferSegments();
long writeBytes;
try {
// spill in memory buffer in zero-copy.
for (int i = 0; i < numRecordBuffers; i++) {
Expand All @@ -167,16 +172,15 @@ private void spill() throws IOException {
i == numRecordBuffers - 1
? inMemoryBuffer.getNumBytesInLastBuffer()
: segment.size();
writer.writeBlock(Buffer.create(segment, bufferSize));
channelWriterOutputView.write(segment, 0, bufferSize);
}
writeBytes = writer.getSize();
LOG.info(
"here spill the reset buffer data with {} records {} bytes",
inMemoryBuffer.size(),
writer.getSize());
writer.close();
channelWriterOutputView.getNumBytes());
channelWriterOutputView.close();
} catch (IOException e) {
writer.closeAndDelete();
channelWriterOutputView.closeAndDelete();
throw e;
}

Expand All @@ -185,7 +189,7 @@ private void spill() throws IOException {
channel,
inMemoryBuffer.getNumRecordBuffers(),
inMemoryBuffer.getNumBytesInLastBuffer(),
writeBytes));
channelWriterOutputView.getNumBytes()));

inMemoryBuffer.reset();
}
Expand Down Expand Up @@ -224,7 +228,7 @@ public class BufferIterator implements RowBufferIterator {
private int currentChannelID = -1;
private BinaryRow row;
private boolean closed;
private BufferFileReaderInputView channelReader;
private ChannelReaderInputView channelReader;

private BufferIterator() {
this.closed = false;
Expand Down Expand Up @@ -303,7 +307,13 @@ private void nextSpilledIterator() throws IOException {

// new reader.
this.channelReader =
new BufferFileReaderInputView(channel.getChannel(), ioManager, segmentSize);
new ChannelReaderInputView(
channel.getChannel(),
ioManager,
compactionFactory,
segmentSize,
channel.getBlockCount());

this.currentIterator = channelReader.createBinaryRowIterator(binaryRowSerializer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ static RowBuffer getBuffer(
MemorySegmentPool memoryPool,
AbstractRowDataSerializer<InternalRow> serializer,
boolean spillable,
MemorySize maxDiskSize) {
MemorySize maxDiskSize,
String compression) {
if (spillable) {
return new ExternalBuffer(ioManager, memoryPool, serializer, maxDiskSize);
return new ExternalBuffer(ioManager, memoryPool, serializer, maxDiskSize, compression);
} else {
return new InMemoryBuffer(memoryPool, serializer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class AppendOnlyFileStoreWrite extends MemoryFileStoreWrite<InternalRow>
private final int compactionMaxFileNum;
private final boolean commitForceCompact;
private final String fileCompression;
private final String spillCompression;
private final boolean useWriteBuffer;
private final boolean spillable;
private final MemorySize maxDiskSize;
Expand Down Expand Up @@ -101,6 +102,7 @@ public AppendOnlyFileStoreWrite(
this.commitForceCompact = options.commitForceCompact();
this.skipCompaction = options.writeOnly();
this.fileCompression = options.fileCompression();
this.spillCompression = options.spillCompression();
this.useWriteBuffer = options.useWriteBufferForAppend();
this.spillable = options.writeBufferSpillable(fileIO.isObjectStore(), isStreamingMode);
this.maxDiskSize = options.writeBufferSpillDiskSize();
Expand Down Expand Up @@ -149,6 +151,7 @@ protected RecordWriter<InternalRow> createWriter(
useWriteBuffer || forceBufferSpill,
spillable || forceBufferSpill,
fileCompression,
spillCompression,
statsCollectors,
maxDiskSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ private Pair<AppendOnlyWriter, List<DataFileMeta>> createWriter(
useWriteBuffer,
spillable,
CoreOptions.FILE_COMPRESSION.defaultValue(),
CoreOptions.SPILL_COMPRESSION.defaultValue(),
StatsCollectorFactories.createStatsFactories(
options, AppendOnlyWriterTest.SCHEMA.getFieldNames()),
MemorySize.MAX_VALUE);
Expand Down
Loading
Loading