diff --git a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentPool.java b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentPool.java index 9ea5fa3f62cd..18538b2fae0d 100644 --- a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentPool.java +++ b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentPool.java @@ -36,7 +36,7 @@ public interface MemorySegmentPool extends MemorySegmentSource { /** * Get the page size of each page this pool holds. * - * @return the page size + * @return the page size, the bytes size in one page. */ int pageSize(); diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 9566dd37215e..4b00eae1f1b5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -71,6 +71,7 @@ public class AppendOnlyWriter implements RecordWriter, MemoryOwner private final List compactAfter; private final LongCounter seqNumCounter; private final String fileCompression; + private final String spillCompression; private SinkWriter sinkWriter; private final FieldStatsCollector.Factory[] statsCollectors; private final IOManager ioManager; @@ -93,6 +94,7 @@ public AppendOnlyWriter( boolean useWriteBuffer, boolean spillable, String fileCompression, + String spillCompression, FieldStatsCollector.Factory[] statsCollectors, MemorySize maxDiskSize) { this.fileIO = fileIO; @@ -109,13 +111,14 @@ public AppendOnlyWriter( this.compactAfter = new ArrayList<>(); this.seqNumCounter = new LongCounter(maxSequenceNumber + 1); this.fileCompression = fileCompression; + this.spillCompression = spillCompression; this.ioManager = ioManager; this.statsCollectors = statsCollectors; this.maxDiskSize = maxDiskSize; this.sinkWriter = useWriteBuffer - ? new BufferedSinkWriter(spillable, maxDiskSize) + ? new BufferedSinkWriter(spillable, maxDiskSize, spillCompression) : new DirectSinkWriter(); if (increment != null) { @@ -211,7 +214,7 @@ public void toBufferedWriter() throws Exception { trySyncLatestCompaction(true); sinkWriter.close(); - sinkWriter = new BufferedSinkWriter(true, maxDiskSize); + sinkWriter = new BufferedSinkWriter(true, maxDiskSize, spillCompression); sinkWriter.setMemoryPool(memorySegmentPool); } } @@ -378,11 +381,14 @@ private class BufferedSinkWriter implements SinkWriter { private final MemorySize maxDiskSize; + private final String compression; + private RowBuffer writeBuffer; - private BufferedSinkWriter(boolean spillable, MemorySize maxDiskSize) { + private BufferedSinkWriter(boolean spillable, MemorySize maxDiskSize, String compression) { this.spillable = spillable; this.maxDiskSize = maxDiskSize; + this.compression = compression; } @Override @@ -439,7 +445,8 @@ public void setMemoryPool(MemorySegmentPool memoryPool) { memoryPool, new InternalRowSerializer(writeSchema), spillable, - maxDiskSize); + maxDiskSize, + compression); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java index c53d055d5743..819937984083 100644 --- a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java +++ b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java @@ -171,7 +171,8 @@ public void open( coreOptions.writeBufferSize() / 2, coreOptions.pageSize()), new InternalRowSerializer(table.rowType()), true, - coreOptions.writeBufferSpillDiskSize()); + coreOptions.writeBufferSpillDiskSize(), + coreOptions.spillCompression()); } public void bootstrapKey(InternalRow value) throws IOException { diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/BufferFileReaderInputView.java b/paimon-core/src/main/java/org/apache/paimon/disk/BufferFileReaderInputView.java deleted file mode 100644 index ab91b8944d97..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/disk/BufferFileReaderInputView.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.disk; - -import org.apache.paimon.data.AbstractPagedInputView; -import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.data.serializer.BinaryRowSerializer; -import org.apache.paimon.memory.Buffer; -import org.apache.paimon.memory.MemorySegment; -import org.apache.paimon.utils.MutableObjectIterator; - -import java.io.EOFException; -import java.io.IOException; - -/** An {@link AbstractPagedInputView} which reads blocks from channel without compression. */ -public class BufferFileReaderInputView extends AbstractPagedInputView { - - private final BufferFileReader reader; - private final MemorySegment segment; - - private int currentSegmentLimit; - - public BufferFileReaderInputView(FileIOChannel.ID id, IOManager ioManager, int segmentSize) - throws IOException { - this.reader = ioManager.createBufferFileReader(id); - this.segment = MemorySegment.wrap(new byte[segmentSize]); - } - - @Override - protected MemorySegment nextSegment(MemorySegment current) throws IOException { - if (reader.hasReachedEndOfFile()) { - throw new EOFException(); - } - - Buffer buffer = Buffer.create(segment); - reader.readInto(buffer); - this.currentSegmentLimit = buffer.getSize(); - return segment; - } - - @Override - protected int getLimitForSegment(MemorySegment segment) { - return currentSegmentLimit; - } - - public void close() throws IOException { - reader.close(); - } - - public FileIOChannel getChannel() { - return reader; - } - - public MutableObjectIterator createBinaryRowIterator( - BinaryRowSerializer serializer) { - return new BinaryRowChannelInputViewIterator(serializer); - } - - private class BinaryRowChannelInputViewIterator implements MutableObjectIterator { - - protected final BinaryRowSerializer serializer; - - public BinaryRowChannelInputViewIterator(BinaryRowSerializer serializer) { - this.serializer = serializer; - } - - @Override - public BinaryRow next(BinaryRow reuse) throws IOException { - try { - return this.serializer.deserializeFromPages(reuse, BufferFileReaderInputView.this); - } catch (EOFException e) { - close(); - return null; - } - } - - @Override - public BinaryRow next() throws IOException { - throw new UnsupportedOperationException( - "This method is disabled due to performance issue!"); - } - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelReaderInputView.java b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelReaderInputView.java index e0ecaa68cf15..aeb6e08c2afb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelReaderInputView.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelReaderInputView.java @@ -21,9 +21,12 @@ import org.apache.paimon.compression.BlockCompressionFactory; import org.apache.paimon.compression.BlockDecompressor; import org.apache.paimon.data.AbstractPagedInputView; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.serializer.BinaryRowSerializer; import org.apache.paimon.io.DataInputView; import org.apache.paimon.memory.Buffer; import org.apache.paimon.memory.MemorySegment; +import org.apache.paimon.utils.MutableObjectIterator; import java.io.EOFException; import java.io.IOException; @@ -100,4 +103,34 @@ public List close() throws IOException { public FileIOChannel getChannel() { return reader; } + + public MutableObjectIterator createBinaryRowIterator( + BinaryRowSerializer serializer) { + return new BinaryRowChannelInputViewIterator(serializer); + } + + private class BinaryRowChannelInputViewIterator implements MutableObjectIterator { + + protected final BinaryRowSerializer serializer; + + public BinaryRowChannelInputViewIterator(BinaryRowSerializer serializer) { + this.serializer = serializer; + } + + @Override + public BinaryRow next(BinaryRow reuse) throws IOException { + try { + return this.serializer.deserializeFromPages(reuse, ChannelReaderInputView.this); + } catch (EOFException e) { + close(); + return null; + } + } + + @Override + public BinaryRow next() throws IOException { + throw new UnsupportedOperationException( + "This method is disabled due to performance issue!"); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java index 98a532cc5770..6a1838e9bf8d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java @@ -71,6 +71,14 @@ public int close() throws IOException { return -1; } + public void closeAndDelete() throws IOException { + try { + close(); + } finally { + writer.deleteChannel(); + } + } + @Override protected MemorySegment nextSegment(MemorySegment current, int positionInCurrent) throws IOException { diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java index 9261a4a23ac1..529d63d0ceb5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java @@ -19,11 +19,11 @@ package org.apache.paimon.disk; import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.compression.BlockCompressionFactory; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.AbstractRowDataSerializer; import org.apache.paimon.data.serializer.BinaryRowSerializer; -import org.apache.paimon.memory.Buffer; import org.apache.paimon.memory.MemorySegment; import org.apache.paimon.memory.MemorySegmentPool; import org.apache.paimon.options.MemorySize; @@ -49,6 +49,7 @@ public class ExternalBuffer implements RowBuffer { private final BinaryRowSerializer binaryRowSerializer; private final InMemoryBuffer inMemoryBuffer; private final MemorySize maxDiskSize; + private final BlockCompressionFactory compactionFactory; // The size of each segment private final int segmentSize; @@ -62,11 +63,14 @@ public class ExternalBuffer implements RowBuffer { IOManager ioManager, MemorySegmentPool pool, AbstractRowDataSerializer serializer, - MemorySize maxDiskSize) { + MemorySize maxDiskSize, + String compression) { this.ioManager = ioManager; this.pool = pool; this.maxDiskSize = maxDiskSize; + this.compactionFactory = BlockCompressionFactory.create(compression); + this.binaryRowSerializer = serializer instanceof BinaryRowSerializer ? (BinaryRowSerializer) serializer.duplicate() @@ -155,10 +159,11 @@ private void throwTooBigException(InternalRow row) throws IOException { private void spill() throws IOException { FileIOChannel.ID channel = ioManager.createChannel(); - BufferFileWriter writer = ioManager.createBufferFileWriter(channel); + ChannelWriterOutputView channelWriterOutputView = + new ChannelWriterOutputView( + ioManager.createBufferFileWriter(channel), compactionFactory, segmentSize); int numRecordBuffers = inMemoryBuffer.getNumRecordBuffers(); ArrayList segments = inMemoryBuffer.getRecordBufferSegments(); - long writeBytes; try { // spill in memory buffer in zero-copy. for (int i = 0; i < numRecordBuffers; i++) { @@ -167,16 +172,15 @@ private void spill() throws IOException { i == numRecordBuffers - 1 ? inMemoryBuffer.getNumBytesInLastBuffer() : segment.size(); - writer.writeBlock(Buffer.create(segment, bufferSize)); + channelWriterOutputView.write(segment, 0, bufferSize); } - writeBytes = writer.getSize(); LOG.info( "here spill the reset buffer data with {} records {} bytes", inMemoryBuffer.size(), - writer.getSize()); - writer.close(); + channelWriterOutputView.getNumBytes()); + channelWriterOutputView.close(); } catch (IOException e) { - writer.closeAndDelete(); + channelWriterOutputView.closeAndDelete(); throw e; } @@ -185,7 +189,7 @@ private void spill() throws IOException { channel, inMemoryBuffer.getNumRecordBuffers(), inMemoryBuffer.getNumBytesInLastBuffer(), - writeBytes)); + channelWriterOutputView.getNumBytes())); inMemoryBuffer.reset(); } @@ -224,7 +228,7 @@ public class BufferIterator implements RowBufferIterator { private int currentChannelID = -1; private BinaryRow row; private boolean closed; - private BufferFileReaderInputView channelReader; + private ChannelReaderInputView channelReader; private BufferIterator() { this.closed = false; @@ -303,7 +307,13 @@ private void nextSpilledIterator() throws IOException { // new reader. this.channelReader = - new BufferFileReaderInputView(channel.getChannel(), ioManager, segmentSize); + new ChannelReaderInputView( + channel.getChannel(), + ioManager, + compactionFactory, + segmentSize, + channel.getBlockCount()); + this.currentIterator = channelReader.createBinaryRowIterator(binaryRowSerializer); } diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java index b4d3aa89fa2a..3c0a31cd6305 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java @@ -59,9 +59,10 @@ static RowBuffer getBuffer( MemorySegmentPool memoryPool, AbstractRowDataSerializer serializer, boolean spillable, - MemorySize maxDiskSize) { + MemorySize maxDiskSize, + String compression) { if (spillable) { - return new ExternalBuffer(ioManager, memoryPool, serializer, maxDiskSize); + return new ExternalBuffer(ioManager, memoryPool, serializer, maxDiskSize, compression); } else { return new InMemoryBuffer(memoryPool, serializer); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index dbf73caef2f6..f794b160cddd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -68,6 +68,7 @@ public class AppendOnlyFileStoreWrite extends MemoryFileStoreWrite private final int compactionMaxFileNum; private final boolean commitForceCompact; private final String fileCompression; + private final String spillCompression; private final boolean useWriteBuffer; private final boolean spillable; private final MemorySize maxDiskSize; @@ -101,6 +102,7 @@ public AppendOnlyFileStoreWrite( this.commitForceCompact = options.commitForceCompact(); this.skipCompaction = options.writeOnly(); this.fileCompression = options.fileCompression(); + this.spillCompression = options.spillCompression(); this.useWriteBuffer = options.useWriteBufferForAppend(); this.spillable = options.writeBufferSpillable(fileIO.isObjectStore(), isStreamingMode); this.maxDiskSize = options.writeBufferSpillDiskSize(); @@ -149,6 +151,7 @@ protected RecordWriter createWriter( useWriteBuffer || forceBufferSpill, spillable || forceBufferSpill, fileCompression, + spillCompression, statsCollectors, maxDiskSize); } diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index 2c398d141b0d..87bb14745c61 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -600,6 +600,7 @@ private Pair> createWriter( useWriteBuffer, spillable, CoreOptions.FILE_COMPRESSION.defaultValue(), + CoreOptions.SPILL_COMPRESSION.defaultValue(), StatsCollectorFactories.createStatsFactories( options, AppendOnlyWriterTest.SCHEMA.getFieldNames()), MemorySize.MAX_VALUE); diff --git a/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java b/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java index 8189519435db..3ea27cfa7211 100644 --- a/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java @@ -18,6 +18,7 @@ package org.apache.paimon.disk; +import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.BinaryRowWriter; import org.apache.paimon.data.BinaryString; @@ -65,7 +66,8 @@ private ExternalBuffer newBuffer(MemorySize maxDiskSize) { ioManager, new HeapMemorySegmentPool(2 * DEFAULT_PAGE_SIZE, DEFAULT_PAGE_SIZE), this.serializer, - maxDiskSize); + maxDiskSize, + CoreOptions.SPILL_COMPRESSION.defaultValue()); } @Test @@ -179,7 +181,8 @@ public void testHugeRecord() { ioManager, new HeapMemorySegmentPool(3 * DEFAULT_PAGE_SIZE, DEFAULT_PAGE_SIZE), new BinaryRowSerializer(1), - MemorySize.MAX_VALUE); + MemorySize.MAX_VALUE, + CoreOptions.SPILL_COMPRESSION.defaultValue()); assertThatThrownBy(() -> writeHuge(buffer)).isInstanceOf(IOException.class); buffer.reset(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java index 608345696ad4..22826d5bcd2b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java @@ -85,6 +85,7 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception false, false, CoreOptions.FILE_COMPRESSION.defaultValue(), + CoreOptions.SPILL_COMPRESSION.defaultValue(), StatsCollectorFactories.createStatsFactories( options, SCHEMA.getFieldNames()), MemorySize.MAX_VALUE);