Skip to content

Commit

Permalink
fix comment
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 committed Apr 2, 2024
1 parent 6712c40 commit 67cd21c
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow>, MemoryOwner
private final List<DataFileMeta> compactAfter;
private final LongCounter seqNumCounter;
private final String fileCompression;
private final String spillCompression;
private SinkWriter sinkWriter;
private final FieldStatsCollector.Factory[] statsCollectors;
private final IOManager ioManager;
Expand All @@ -93,6 +94,7 @@ public AppendOnlyWriter(
boolean useWriteBuffer,
boolean spillable,
String fileCompression,
String spillCompression,
FieldStatsCollector.Factory[] statsCollectors,
MemorySize maxDiskSize) {
this.fileIO = fileIO;
Expand All @@ -109,13 +111,14 @@ public AppendOnlyWriter(
this.compactAfter = new ArrayList<>();
this.seqNumCounter = new LongCounter(maxSequenceNumber + 1);
this.fileCompression = fileCompression;
this.spillCompression = spillCompression;
this.ioManager = ioManager;
this.statsCollectors = statsCollectors;
this.maxDiskSize = maxDiskSize;

this.sinkWriter =
useWriteBuffer
? new BufferedSinkWriter(spillable, maxDiskSize)
? new BufferedSinkWriter(spillable, maxDiskSize, spillCompression)
: new DirectSinkWriter();

if (increment != null) {
Expand Down Expand Up @@ -211,7 +214,7 @@ public void toBufferedWriter() throws Exception {
trySyncLatestCompaction(true);

sinkWriter.close();
sinkWriter = new BufferedSinkWriter(true, maxDiskSize);
sinkWriter = new BufferedSinkWriter(true, maxDiskSize, spillCompression);
sinkWriter.setMemoryPool(memorySegmentPool);
}
}
Expand Down Expand Up @@ -378,11 +381,14 @@ private class BufferedSinkWriter implements SinkWriter {

private final MemorySize maxDiskSize;

private final String compression;

private RowBuffer writeBuffer;

private BufferedSinkWriter(boolean spillable, MemorySize maxDiskSize) {
private BufferedSinkWriter(boolean spillable, MemorySize maxDiskSize, String compression) {
this.spillable = spillable;
this.maxDiskSize = maxDiskSize;
this.compression = compression;
}

@Override
Expand Down Expand Up @@ -439,7 +445,8 @@ public void setMemoryPool(MemorySegmentPool memoryPool) {
memoryPool,
new InternalRowSerializer(writeSchema),
spillable,
maxDiskSize);
maxDiskSize,
compression);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ public void open(
coreOptions.writeBufferSize() / 2, coreOptions.pageSize()),
new InternalRowSerializer(table.rowType()),
true,
coreOptions.writeBufferSpillDiskSize());
coreOptions.writeBufferSpillDiskSize(),
coreOptions.spillCompression());
}

public void bootstrapKey(InternalRow value) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,12 @@ public class ExternalBuffer implements RowBuffer {

private static final Logger LOG = LoggerFactory.getLogger(ExternalBuffer.class);

private static final BlockCompressionFactory compactionFactory =
BlockCompressionFactory.create("LZ4");

private final IOManager ioManager;
private final MemorySegmentPool pool;
private final BinaryRowSerializer binaryRowSerializer;
private final InMemoryBuffer inMemoryBuffer;
private final MemorySize maxDiskSize;
private final BlockCompressionFactory compactionFactory;

// The size of each segment
private final int segmentSize;
Expand All @@ -65,11 +63,14 @@ public class ExternalBuffer implements RowBuffer {
IOManager ioManager,
MemorySegmentPool pool,
AbstractRowDataSerializer<?> serializer,
MemorySize maxDiskSize) {
MemorySize maxDiskSize,
String compression) {
this.ioManager = ioManager;
this.pool = pool;
this.maxDiskSize = maxDiskSize;

this.compactionFactory = BlockCompressionFactory.create(compression);

this.binaryRowSerializer =
serializer instanceof BinaryRowSerializer
? (BinaryRowSerializer) serializer.duplicate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ static RowBuffer getBuffer(
MemorySegmentPool memoryPool,
AbstractRowDataSerializer<InternalRow> serializer,
boolean spillable,
MemorySize maxDiskSize) {
MemorySize maxDiskSize,
String compression) {
if (spillable) {
return new ExternalBuffer(ioManager, memoryPool, serializer, maxDiskSize);
return new ExternalBuffer(ioManager, memoryPool, serializer, maxDiskSize, compression);
} else {
return new InMemoryBuffer(memoryPool, serializer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class AppendOnlyFileStoreWrite extends MemoryFileStoreWrite<InternalRow>
private final int compactionMaxFileNum;
private final boolean commitForceCompact;
private final String fileCompression;
private final String spillCompression;
private final boolean useWriteBuffer;
private final boolean spillable;
private final MemorySize maxDiskSize;
Expand Down Expand Up @@ -101,6 +102,7 @@ public AppendOnlyFileStoreWrite(
this.commitForceCompact = options.commitForceCompact();
this.skipCompaction = options.writeOnly();
this.fileCompression = options.fileCompression();
this.spillCompression = options.spillCompression();
this.useWriteBuffer = options.useWriteBufferForAppend();
this.spillable = options.writeBufferSpillable(fileIO.isObjectStore(), isStreamingMode);
this.maxDiskSize = options.writeBufferSpillDiskSize();
Expand Down Expand Up @@ -149,6 +151,7 @@ protected RecordWriter<InternalRow> createWriter(
useWriteBuffer || forceBufferSpill,
spillable || forceBufferSpill,
fileCompression,
spillCompression,
statsCollectors,
maxDiskSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ private Pair<AppendOnlyWriter, List<DataFileMeta>> createWriter(
useWriteBuffer,
spillable,
CoreOptions.FILE_COMPRESSION.defaultValue(),
CoreOptions.SPILL_COMPRESSION.defaultValue(),
StatsCollectorFactories.createStatsFactories(
options, AppendOnlyWriterTest.SCHEMA.getFieldNames()),
MemorySize.MAX_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.disk;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
Expand Down Expand Up @@ -65,7 +66,8 @@ private ExternalBuffer newBuffer(MemorySize maxDiskSize) {
ioManager,
new HeapMemorySegmentPool(2 * DEFAULT_PAGE_SIZE, DEFAULT_PAGE_SIZE),
this.serializer,
maxDiskSize);
maxDiskSize,
CoreOptions.SPILL_COMPRESSION.defaultValue());
}

@Test
Expand Down Expand Up @@ -179,7 +181,8 @@ public void testHugeRecord() {
ioManager,
new HeapMemorySegmentPool(3 * DEFAULT_PAGE_SIZE, DEFAULT_PAGE_SIZE),
new BinaryRowSerializer(1),
MemorySize.MAX_VALUE);
MemorySize.MAX_VALUE,
CoreOptions.SPILL_COMPRESSION.defaultValue());
assertThatThrownBy(() -> writeHuge(buffer)).isInstanceOf(IOException.class);
buffer.reset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception
false,
false,
CoreOptions.FILE_COMPRESSION.defaultValue(),
CoreOptions.SPILL_COMPRESSION.defaultValue(),
StatsCollectorFactories.createStatsFactories(
options, SCHEMA.getFieldNames()),
MemorySize.MAX_VALUE);
Expand Down

0 comments on commit 67cd21c

Please sign in to comment.