Skip to content

Commit

Permalink
[core] Add compression for ExternalBuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
yejunhao committed Apr 2, 2024
1 parent f70f9d0 commit cba9057
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public interface MemorySegmentPool extends MemorySegmentSource {
/**
* Get the page size of each page this pool holds.
*
* @return the page size
* @return the page size, the bytes size in one page.
*/
int pageSize();

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import org.apache.paimon.compression.BlockCompressionFactory;
import org.apache.paimon.compression.BlockDecompressor;
import org.apache.paimon.data.AbstractPagedInputView;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.serializer.BinaryRowSerializer;
import org.apache.paimon.io.DataInputView;
import org.apache.paimon.memory.Buffer;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.utils.MutableObjectIterator;

import java.io.EOFException;
import java.io.IOException;
Expand Down Expand Up @@ -100,4 +103,34 @@ public List<MemorySegment> close() throws IOException {
public FileIOChannel getChannel() {
return reader;
}

public MutableObjectIterator<BinaryRow> createBinaryRowIterator(
BinaryRowSerializer serializer) {
return new BinaryRowChannelInputViewIterator(serializer);
}

private class BinaryRowChannelInputViewIterator implements MutableObjectIterator<BinaryRow> {

protected final BinaryRowSerializer serializer;

public BinaryRowChannelInputViewIterator(BinaryRowSerializer serializer) {
this.serializer = serializer;
}

@Override
public BinaryRow next(BinaryRow reuse) throws IOException {
try {
return this.serializer.deserializeFromPages(reuse, ChannelReaderInputView.this);
} catch (EOFException e) {
close();
return null;
}
}

@Override
public BinaryRow next() throws IOException {
throw new UnsupportedOperationException(
"This method is disabled due to performance issue!");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ public int close() throws IOException {
return -1;
}

public void closeAndDelete() throws IOException {
try {
close();
} finally {
writer.deleteChannel();
}
}

@Override
protected MemorySegment nextSegment(MemorySegment current, int positionInCurrent)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
package org.apache.paimon.disk;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.compression.Lz4BlockCompressionFactory;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.AbstractRowDataSerializer;
import org.apache.paimon.data.serializer.BinaryRowSerializer;
import org.apache.paimon.memory.Buffer;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.utils.MutableObjectIterator;
Expand All @@ -43,6 +43,9 @@ public class ExternalBuffer implements RowBuffer {

private static final Logger LOG = LoggerFactory.getLogger(ExternalBuffer.class);

private static final Lz4BlockCompressionFactory compactionFactory =
new Lz4BlockCompressionFactory();

private final IOManager ioManager;
private final MemorySegmentPool pool;
private final BinaryRowSerializer binaryRowSerializer;
Expand Down Expand Up @@ -135,7 +138,9 @@ private void throwTooBigException(InternalRow row) throws IOException {
private void spill() throws IOException {
FileIOChannel.ID channel = ioManager.createChannel();

BufferFileWriter writer = ioManager.createBufferFileWriter(channel);
ChannelWriterOutputView channelWriterOutputView =
new ChannelWriterOutputView(
ioManager.createBufferFileWriter(channel), compactionFactory, segmentSize);
int numRecordBuffers = inMemoryBuffer.getNumRecordBuffers();
ArrayList<MemorySegment> segments = inMemoryBuffer.getRecordBufferSegments();
try {
Expand All @@ -146,15 +151,15 @@ private void spill() throws IOException {
i == numRecordBuffers - 1
? inMemoryBuffer.getNumBytesInLastBuffer()
: segment.size();
writer.writeBlock(Buffer.create(segment, bufferSize));
channelWriterOutputView.write(segment, 0, bufferSize);
}
LOG.info(
"here spill the reset buffer data with {} records {} bytes",
inMemoryBuffer.size(),
writer.getSize());
writer.close();
channelWriterOutputView.getNumBytes());
channelWriterOutputView.close();
} catch (IOException e) {
writer.closeAndDelete();
channelWriterOutputView.closeAndDelete();
throw e;
}

Expand Down Expand Up @@ -201,7 +206,7 @@ public class BufferIterator implements RowBufferIterator {
private int currentChannelID = -1;
private BinaryRow row;
private boolean closed;
private BufferFileReaderInputView channelReader;
private ChannelReaderInputView channelReader;

private BufferIterator() {
this.closed = false;
Expand Down Expand Up @@ -280,7 +285,13 @@ private void nextSpilledIterator() throws IOException {

// new reader.
this.channelReader =
new BufferFileReaderInputView(channel.getChannel(), ioManager, segmentSize);
new ChannelReaderInputView(
channel.getChannel(),
ioManager,
compactionFactory,
segmentSize,
channel.getBlockCount());

this.currentIterator = channelReader.createBinaryRowIterator(binaryRowSerializer);
}

Expand Down

0 comments on commit cba9057

Please sign in to comment.