diff --git a/paimon-common/src/main/java/org/apache/paimon/reader/ReaderSupplier.java b/paimon-common/src/main/java/org/apache/paimon/reader/ReaderSupplier.java new file mode 100644 index 000000000000..4664f5aea7e8 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/reader/ReaderSupplier.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.reader; + +import java.io.IOException; + +/** Supplier to get {@link RecordReader}. */ +@FunctionalInterface +public interface ReaderSupplier { + RecordReader get() throws IOException; +} diff --git a/paimon-common/src/main/java/org/apache/paimon/reader/SizedReaderSupplier.java b/paimon-common/src/main/java/org/apache/paimon/reader/SizedReaderSupplier.java new file mode 100644 index 000000000000..e2dfd2864a1a --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/reader/SizedReaderSupplier.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.reader; + +/** Supplier to get {@link RecordReader} with size. */ +public interface SizedReaderSupplier extends ReaderSupplier { + + long estimateSize(); +} diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValue.java b/paimon-core/src/main/java/org/apache/paimon/KeyValue.java index bbab367e413e..31d3749bf832 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValue.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValue.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.apache.paimon.types.TinyIntType; @@ -33,6 +34,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.apache.paimon.schema.SystemColumns.LEVEL; import static org.apache.paimon.schema.SystemColumns.SEQUENCE_NUMBER; import static org.apache.paimon.schema.SystemColumns.VALUE_KIND; import static org.apache.paimon.utils.Preconditions.checkState; @@ -120,6 +122,15 @@ public static RowType schema(RowType keyType, RowType valueType) { return new RowType(fields); } + public static RowType schemaWithLevel(RowType keyType, RowType valueType) { + RowType.Builder builder = RowType.builder(); + schema(keyType, valueType) + .getFields() + .forEach(f -> builder.field(f.name(), f.type(), f.description())); + builder.field(LEVEL, DataTypes.INT().notNull()); + return builder.build(); + } + /** * Create key-value fields, we need to add a const value to the id of value field to ensure that * they are consistent when compared by field id. For example, there are two table with key diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWithMeta.java b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWithMeta.java index a3cb7b0e3900..5c61cc44c071 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWithMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWithMeta.java @@ -23,17 +23,11 @@ public class ChannelWithMeta { private final FileIOChannel.ID channel; private final int blockCount; - private final int numBytesInLastBlock; private final long numBytes; - public ChannelWithMeta( - FileIOChannel.ID channel, - int blockCount, - int numBytesInLastBlock, - long numEstimatedBytes) { + public ChannelWithMeta(FileIOChannel.ID channel, int blockCount, long numEstimatedBytes) { this.channel = channel; this.blockCount = blockCount; - this.numBytesInLastBlock = numBytesInLastBlock; this.numBytes = numEstimatedBytes; } @@ -45,10 +39,6 @@ public int getBlockCount() { return blockCount; } - public int getNumBytesInLastBlock() { - return numBytesInLastBlock; - } - public long getNumBytes() { return numBytes; } diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java index 6a1838e9bf8d..c2d3cc205178 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/ChannelWriterOutputView.java @@ -25,6 +25,7 @@ import org.apache.paimon.memory.Buffer; import org.apache.paimon.memory.MemorySegment; +import java.io.Closeable; import java.io.IOException; /** @@ -32,7 +33,7 @@ * output stream. The view will compress its data before writing it in blocks to the underlying * channel. */ -public final class ChannelWriterOutputView extends AbstractPagedOutputView { +public final class ChannelWriterOutputView extends AbstractPagedOutputView implements Closeable { private final MemorySegment compressedBuffer; private final BlockCompressor compressor; @@ -60,7 +61,8 @@ public FileIOChannel getChannel() { return writer; } - public int close() throws IOException { + @Override + public void close() throws IOException { if (!writer.isClosed()) { int currentPositionInSegment = getCurrentPositionInSegment(); writeCompressed(currentSegment, currentPositionInSegment); @@ -68,7 +70,6 @@ public int close() throws IOException { this.writeBytes = writer.getSize(); this.writer.close(); } - return -1; } public void closeAndDelete() throws IOException { diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java index 529d63d0ceb5..34c082371205 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java @@ -188,7 +188,6 @@ private void spill() throws IOException { new ChannelWithMeta( channel, inMemoryBuffer.getNumRecordBuffers(), - inMemoryBuffer.getNumBytesInLastBuffer(), channelWriterOutputView.getNumBytes())); inMemoryBuffer.reset(); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java index 420613899ad8..f02cdaa3e6f8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeSorter.java @@ -21,31 +21,30 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.CoreOptions.SortEngine; import org.apache.paimon.KeyValue; +import org.apache.paimon.compression.BlockCompressionFactory; import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.data.JoinedRow; +import org.apache.paimon.data.serializer.BinaryRowSerializer; +import org.apache.paimon.disk.ChannelReaderInputView; +import org.apache.paimon.disk.ChannelReaderInputViewIterator; +import org.apache.paimon.disk.ChannelWithMeta; +import org.apache.paimon.disk.ChannelWriterOutputView; +import org.apache.paimon.disk.FileChannelUtil; +import org.apache.paimon.disk.FileIOChannel; import org.apache.paimon.disk.IOManager; import org.apache.paimon.memory.CachelessSegmentPool; import org.apache.paimon.memory.MemorySegmentPool; -import org.apache.paimon.mergetree.compact.ConcatRecordReader; -import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier; import org.apache.paimon.mergetree.compact.MergeFunctionWrapper; import org.apache.paimon.mergetree.compact.SortMergeReader; import org.apache.paimon.options.MemorySize; +import org.apache.paimon.reader.ReaderSupplier; import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.sort.BinaryExternalSortBuffer; -import org.apache.paimon.sort.SortBuffer; -import org.apache.paimon.types.BigIntType; -import org.apache.paimon.types.DataField; -import org.apache.paimon.types.IntType; -import org.apache.paimon.types.RowKind; +import org.apache.paimon.reader.RecordReader.RecordIterator; +import org.apache.paimon.reader.SizedReaderSupplier; import org.apache.paimon.types.RowType; -import org.apache.paimon.types.TinyIntType; import org.apache.paimon.utils.FieldsComparator; import org.apache.paimon.utils.IOUtils; -import org.apache.paimon.utils.MutableObjectIterator; -import org.apache.paimon.utils.OffsetRow; +import org.apache.paimon.utils.KeyValueWithLevelNoReusingSerializer; import javax.annotation.Nullable; @@ -53,10 +52,8 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; -import java.util.stream.IntStream; -import static org.apache.paimon.schema.SystemColumns.SEQUENCE_NUMBER; -import static org.apache.paimon.schema.SystemColumns.VALUE_KIND; +import static org.apache.paimon.utils.Preconditions.checkArgument; /** The merge sorter to sort and merge readers with key overlap. */ public class MergeSorter { @@ -66,9 +63,7 @@ public class MergeSorter { private final SortEngine sortEngine; private final int spillThreshold; - private final int spillSortMaxNumFiles; private final String compression; - private final MemorySize maxDiskSize; private final MemorySegmentPool memoryPool; @@ -81,14 +76,12 @@ public MergeSorter( @Nullable IOManager ioManager) { this.sortEngine = options.sortEngine(); this.spillThreshold = options.sortSpillThreshold(); - this.spillSortMaxNumFiles = options.localSortMaxNumFileHandles(); this.compression = options.spillCompression(); this.keyType = keyType; this.valueType = valueType; this.memoryPool = new CachelessSegmentPool(options.sortSpillBufferSize(), options.pageSize()); this.ioManager = ioManager; - this.maxDiskSize = options.writeBufferSpillDiskSize(); } public MemorySegmentPool memoryPool() { @@ -108,7 +101,7 @@ public void setProjectedValueType(RowType projectedType) { } public RecordReader mergeSort( - List> lazyReaders, + List> lazyReaders, Comparator keyComparator, @Nullable FieldsComparator userDefinedSeqComparator, MergeFunctionWrapper mergeFunction) @@ -118,6 +111,16 @@ public RecordReader mergeSort( lazyReaders, keyComparator, userDefinedSeqComparator, mergeFunction); } + return mergeSortNoSpill( + lazyReaders, keyComparator, userDefinedSeqComparator, mergeFunction); + } + + public RecordReader mergeSortNoSpill( + List> lazyReaders, + Comparator keyComparator, + @Nullable FieldsComparator userDefinedSeqComparator, + MergeFunctionWrapper mergeFunction) + throws IOException { List> readers = new ArrayList<>(lazyReaders.size()); for (ReaderSupplier supplier : lazyReaders) { try { @@ -134,192 +137,129 @@ public RecordReader mergeSort( } private RecordReader spillMergeSort( - List> readers, + List> inputReaders, Comparator keyComparator, @Nullable FieldsComparator userDefinedSeqComparator, MergeFunctionWrapper mergeFunction) throws IOException { - ExternalSorterWithLevel sorter = new ExternalSorterWithLevel(userDefinedSeqComparator); - ConcatRecordReader.create(readers).forIOEachRemaining(sorter::put); - sorter.flushMemory(); - - NoReusingMergeIterator iterator = sorter.newIterator(keyComparator, mergeFunction); - return new RecordReader() { - - private boolean read = false; - - @Nullable - @Override - public RecordIterator readBatch() { - if (read) { - return null; - } - - read = true; - return new RecordIterator() { - @Override - public T next() throws IOException { - return iterator.next(); - } - - @Override - public void releaseBatch() {} - }; - } + List> sortedReaders = new ArrayList<>(inputReaders); + sortedReaders.sort(Comparator.comparingLong(SizedReaderSupplier::estimateSize)); + int spillSize = inputReaders.size() - spillThreshold; + + List> readers = + new ArrayList<>(sortedReaders.subList(spillSize, sortedReaders.size())); + for (ReaderSupplier supplier : sortedReaders.subList(0, spillSize)) { + readers.add(spill(supplier)); + } - @Override - public void close() { - sorter.clear(); - } - }; + return mergeSortNoSpill(readers, keyComparator, userDefinedSeqComparator, mergeFunction); } - /** - * Here can not use {@link SortBufferWriteBuffer} for two reasons: - * - *

1.Changelog-producer: full-compaction and lookup need to know the level of the KeyValue. - * - *

2.Changelog-producer: full-compaction and lookup need to store the reference of - * update_before. - */ - private class ExternalSorterWithLevel { - - private final SortBuffer buffer; - - public ExternalSorterWithLevel(@Nullable FieldsComparator userDefinedSeqComparator) { - if (memoryPool.freePages() < 3) { - throw new IllegalArgumentException( - "Write buffer requires a minimum of 3 page memory, please increase write buffer memory size."); - } - - // key fields - IntStream sortFields = IntStream.range(0, keyType.getFieldCount()); - - // user define sequence fields - if (userDefinedSeqComparator != null) { - IntStream udsFields = - IntStream.of(userDefinedSeqComparator.compareFields()) - .map(operand -> operand + keyType.getFieldCount() + 3); - sortFields = IntStream.concat(sortFields, udsFields); + private ReaderSupplier spill(ReaderSupplier readerSupplier) + throws IOException { + checkArgument(ioManager != null); + + FileIOChannel.ID channel = ioManager.createChannel(); + KeyValueWithLevelNoReusingSerializer serializer = + new KeyValueWithLevelNoReusingSerializer(keyType, valueType); + BlockCompressionFactory compressFactory = BlockCompressionFactory.create(compression); + int compressBlock = (int) MemorySize.parse("64 kb").getBytes(); + + ChannelWithMeta channelWithMeta; + ChannelWriterOutputView out = + FileChannelUtil.createOutputView( + ioManager, channel, compressFactory, compressBlock); + try (RecordReader reader = readerSupplier.get(); ) { + RecordIterator batch; + KeyValue record; + while ((batch = reader.readBatch()) != null) { + while ((record = batch.next()) != null) { + serializer.serialize(record, out); + } + batch.releaseBatch(); } - - // sequence field - sortFields = IntStream.concat(sortFields, IntStream.of(keyType.getFieldCount())); - - // row type - List fields = new ArrayList<>(keyType.getFields()); - fields.add(new DataField(0, SEQUENCE_NUMBER, new BigIntType(false))); - fields.add(new DataField(1, VALUE_KIND, new TinyIntType(false))); - fields.add(new DataField(2, "_LEVEL", new IntType(false))); - fields.addAll(valueType.getFields()); - - this.buffer = - BinaryExternalSortBuffer.create( - ioManager, - new RowType(fields), - sortFields.toArray(), - memoryPool, - spillSortMaxNumFiles, - compression, - maxDiskSize); + } finally { + out.close(); + channelWithMeta = + new ChannelWithMeta(channel, out.getBlockCount(), out.getWriteBytes()); } - public boolean put(KeyValue keyValue) throws IOException { - GenericRow meta = new GenericRow(3); - meta.setField(0, keyValue.sequenceNumber()); - meta.setField(1, keyValue.valueKind().toByteValue()); - meta.setField(2, keyValue.level()); - JoinedRow row = - new JoinedRow() - .replace( - new JoinedRow().replace(keyValue.key(), meta), - keyValue.value()); - return buffer.write(row); - } - - public boolean flushMemory() throws IOException { - return buffer.flushMemory(); - } + return new SpilledReaderSupplier( + channelWithMeta, compressFactory, compressBlock, serializer); + } - public void clear() { - buffer.clear(); + private class SpilledReaderSupplier implements ReaderSupplier { + + private final ChannelWithMeta channel; + private final BlockCompressionFactory compressFactory; + private final int compressBlock; + private final KeyValueWithLevelNoReusingSerializer serializer; + + public SpilledReaderSupplier( + ChannelWithMeta channel, + BlockCompressionFactory compressFactory, + int compressBlock, + KeyValueWithLevelNoReusingSerializer serializer) { + this.channel = channel; + this.compressFactory = compressFactory; + this.compressBlock = compressBlock; + this.serializer = serializer; } - public NoReusingMergeIterator newIterator( - Comparator keyComparator, MergeFunctionWrapper mergeFunction) - throws IOException { - return new NoReusingMergeIterator<>( - buffer.sortedIterator(), keyComparator, mergeFunction); + @Override + public RecordReader get() throws IOException { + ChannelReaderInputView view = + FileChannelUtil.createInputView( + ioManager, channel, new ArrayList<>(), compressFactory, compressBlock); + BinaryRowSerializer rowSerializer = new BinaryRowSerializer(serializer.numFields()); + ChannelReaderInputViewIterator iterator = + new ChannelReaderInputViewIterator(view, null, rowSerializer); + return new ChannelReaderReader(view, iterator, serializer); } } - private class NoReusingMergeIterator { - - private final MutableObjectIterator kvIter; - private final Comparator keyComparator; - private final MergeFunctionWrapper mergeFunc; + private static class ChannelReaderReader implements RecordReader { - private KeyValue left; + private final ChannelReaderInputView view; + private final ChannelReaderInputViewIterator iterator; + private final KeyValueWithLevelNoReusingSerializer serializer; - private boolean isEnd; - - private NoReusingMergeIterator( - MutableObjectIterator kvIter, - Comparator keyComparator, - MergeFunctionWrapper mergeFunction) { - this.kvIter = kvIter; - this.keyComparator = keyComparator; - this.mergeFunc = mergeFunction; - this.isEnd = false; + private ChannelReaderReader( + ChannelReaderInputView view, + ChannelReaderInputViewIterator iterator, + KeyValueWithLevelNoReusingSerializer serializer) { + this.view = view; + this.iterator = iterator; + this.serializer = serializer; } - public T next() throws IOException { - if (isEnd) { + private boolean read = false; + + @Override + public RecordIterator readBatch() { + if (read) { return null; } - T result; - do { - mergeFunc.reset(); - InternalRow key = null; - KeyValue keyValue; - while ((keyValue = readOnce()) != null) { - if (key != null && keyComparator.compare(keyValue.key(), key) != 0) { - break; + read = true; + return new RecordIterator() { + @Override + public KeyValue next() throws IOException { + BinaryRow noReuseRow = iterator.next(); + if (noReuseRow == null) { + return null; } - key = keyValue.key(); - mergeFunc.add(keyValue); + return serializer.fromRow(noReuseRow); } - left = keyValue; - if (key == null) { - return null; - } - result = mergeFunc.getResult(); - } while (result == null); - return result; - } - private KeyValue readOnce() throws IOException { - if (left != null) { - KeyValue ret = left; - left = null; - return ret; - } - BinaryRow row = kvIter.next(); - if (row == null) { - isEnd = true; - return null; - } + @Override + public void releaseBatch() {} + }; + } - int keyArity = keyType.getFieldCount(); - int valueArity = valueType.getFieldCount(); - return new KeyValue() - .replace( - new OffsetRow(keyArity, 0).replace(row), - row.getLong(keyArity), - RowKind.fromByteValue(row.getByte(keyArity + 1)), - new OffsetRow(valueArity, keyArity + 3).replace(row)) - .setLevel(row.getInt(keyArity + 2)); + @Override + public void close() throws IOException { + view.getChannel().closeAndDelete(); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java index 766dccca2753..88a9c2593f1e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java @@ -23,9 +23,10 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.FileReaderFactory; import org.apache.paimon.mergetree.compact.ConcatRecordReader; -import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier; import org.apache.paimon.mergetree.compact.MergeFunctionWrapper; +import org.apache.paimon.reader.ReaderSupplier; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.reader.SizedReaderSupplier; import org.apache.paimon.utils.FieldsComparator; import javax.annotation.Nullable; @@ -71,9 +72,20 @@ public static RecordReader readerForSection( MergeFunctionWrapper mergeFunctionWrapper, MergeSorter mergeSorter) throws IOException { - List> readers = new ArrayList<>(); + List> readers = new ArrayList<>(); for (SortedRun run : section) { - readers.add(() -> readerForRun(run, readerFactory)); + readers.add( + new SizedReaderSupplier() { + @Override + public long estimateSize() { + return run.totalSize(); + } + + @Override + public RecordReader get() throws IOException { + return readerForRun(run, readerFactory); + } + }); } return mergeSorter.mergeSort( readers, userKeyComparator, userDefinedSeqComparator, mergeFunctionWrapper); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ConcatRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ConcatRecordReader.java index 3b8e6908ffc1..c9f5bebfca8e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ConcatRecordReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ConcatRecordReader.java @@ -18,6 +18,7 @@ package org.apache.paimon.mergetree.compact; +import org.apache.paimon.reader.ReaderSupplier; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.utils.Preconditions; @@ -40,14 +41,15 @@ public class ConcatRecordReader implements RecordReader { private RecordReader current; - protected ConcatRecordReader(List> readerFactories) { + protected ConcatRecordReader(List> readerFactories) { readerFactories.forEach( supplier -> Preconditions.checkNotNull(supplier, "Reader factory must not be null.")); this.queue = new LinkedList<>(readerFactories); } - public static RecordReader create(List> readers) throws IOException { + public static RecordReader create(List> readers) + throws IOException { return readers.size() == 1 ? readers.get(0).get() : new ConcatRecordReader<>(readers); } @@ -81,10 +83,4 @@ public void close() throws IOException { current.close(); } } - - /** Supplier to get {@link RecordReader}. */ - @FunctionalInterface - public interface ReaderSupplier { - RecordReader get() throws IOException; - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java index 8002b62f0f01..20bee30c116a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java @@ -33,13 +33,13 @@ import org.apache.paimon.mergetree.MergeTreeReaders; import org.apache.paimon.mergetree.SortedRun; import org.apache.paimon.mergetree.compact.ConcatRecordReader; -import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier; import org.apache.paimon.mergetree.compact.IntervalPartition; import org.apache.paimon.mergetree.compact.MergeFunctionFactory; import org.apache.paimon.mergetree.compact.MergeFunctionFactory.AdjustedProjection; import org.apache.paimon.mergetree.compact.MergeFunctionWrapper; import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.reader.ReaderSupplier; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.source.DataSplit; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java index 7e6e91731d72..777a4588a3a0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java @@ -37,6 +37,7 @@ import org.apache.paimon.partition.PartitionUtils; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.EmptyRecordReader; +import org.apache.paimon.reader.ReaderSupplier; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.IndexCastMapping; import org.apache.paimon.schema.SchemaEvolutionUtil; @@ -131,7 +132,7 @@ public RawFileSplitRead withFilter(Predicate predicate) { public RecordReader createReader(DataSplit split) throws IOException { DataFilePathFactory dataFilePathFactory = pathFactory.createDataFilePathFactory(split.partition(), split.bucket()); - List> suppliers = new ArrayList<>(); + List> suppliers = new ArrayList<>(); if (split.beforeFiles().size() > 0) { LOG.info("Ignore split before files: " + split.beforeFiles()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java b/paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java index f45960706333..f6350f44ace8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SystemColumns.java @@ -30,6 +30,7 @@ public class SystemColumns { public static final String VALUE_COUNT = "_VALUE_COUNT"; public static final String SEQUENCE_NUMBER = "_SEQUENCE_NUMBER"; public static final String VALUE_KIND = "_VALUE_KIND"; + public static final String LEVEL = "_LEVEL"; public static final List SYSTEM_FIELD_NAMES = Arrays.asList(VALUE_COUNT, SEQUENCE_NUMBER, VALUE_KIND); } diff --git a/paimon-core/src/main/java/org/apache/paimon/sort/AbstractBinaryExternalMerger.java b/paimon-core/src/main/java/org/apache/paimon/sort/AbstractBinaryExternalMerger.java index 56b526313f9a..e1db28c16e23 100644 --- a/paimon-core/src/main/java/org/apache/paimon/sort/AbstractBinaryExternalMerger.java +++ b/paimon-core/src/main/java/org/apache/paimon/sort/AbstractBinaryExternalMerger.java @@ -183,7 +183,7 @@ private ChannelWithMeta mergeChannels(List channelIDs) throws I compressionCodecFactory, compressionBlockSize); writeMergingOutput(mergeIterator, output); - numBytesInLastBlock = output.close(); + output.close(); numBlocksWritten = output.getBlockCount(); } catch (IOException e) { if (output != null) { @@ -202,8 +202,7 @@ private ChannelWithMeta mergeChannels(List channelIDs) throws I } } - return new ChannelWithMeta( - mergedChannelID, numBlocksWritten, numBytesInLastBlock, output.getWriteBytes()); + return new ChannelWithMeta(mergedChannelID, numBlocksWritten, output.getWriteBytes()); } // ------------------------------------------------------------------------------------------- diff --git a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java index c709dcc1a98e..72003408a33f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java +++ b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java @@ -251,7 +251,6 @@ private void spill() throws IOException { channelManager.addChannel(channel); ChannelWriterOutputView output = null; - int bytesInLastBuffer; int blockCount; try { @@ -260,7 +259,7 @@ private void spill() throws IOException { ioManager, channel, compressionCodecFactory, compressionBlockSize); new QuickSort().sort(inMemorySortBuffer); inMemorySortBuffer.writeToOutput(output); - bytesInLastBuffer = output.close(); + output.close(); blockCount = output.getBlockCount(); } catch (IOException e) { if (output != null) { @@ -270,9 +269,7 @@ private void spill() throws IOException { throw e; } - spillChannelIDs.add( - new ChannelWithMeta( - channel, blockCount, bytesInLastBuffer, output.getWriteBytes())); + spillChannelIDs.add(new ChannelWithMeta(channel, blockCount, output.getWriteBytes())); inMemorySortBuffer.clear(); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java index 1b2c6299ba0f..7b78d5aede81 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/TableRead.java @@ -23,6 +23,7 @@ import org.apache.paimon.disk.IOManager; import org.apache.paimon.mergetree.compact.ConcatRecordReader; import org.apache.paimon.operation.SplitRead; +import org.apache.paimon.reader.ReaderSupplier; import org.apache.paimon.reader.RecordReader; import java.io.IOException; @@ -44,7 +45,7 @@ public interface TableRead { RecordReader createReader(Split split) throws IOException; default RecordReader createReader(List splits) throws IOException { - List> readers = new ArrayList<>(); + List> readers = new ArrayList<>(); for (Split split : splits) { readers.add(() -> createReader(split)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java index 0519da9fd19e..b610ff60b4a1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java @@ -117,7 +117,7 @@ private static RecordReader readDiff( MergeSorter sorter, boolean keepDelete) throws IOException { - return sorter.mergeSort( + return sorter.mergeSortNoSpill( Arrays.asList( () -> wrapLevelToReader(beforeReader, BEFORE_LEVEL), () -> wrapLevelToReader(afterReader, AFTER_LEVEL)), diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/KeyValueWithLevelNoReusingSerializer.java b/paimon-core/src/main/java/org/apache/paimon/utils/KeyValueWithLevelNoReusingSerializer.java new file mode 100644 index 000000000000..456dc43a0a86 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/KeyValueWithLevelNoReusingSerializer.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.KeyValue; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.types.RowKind; +import org.apache.paimon.types.RowType; + +import static org.apache.paimon.data.JoinedRow.join; + +/** Serializer for {@link KeyValue} with Level. */ +public class KeyValueWithLevelNoReusingSerializer extends ObjectSerializer { + + private static final long serialVersionUID = 1L; + + private final int keyArity; + private final int valueArity; + + public KeyValueWithLevelNoReusingSerializer(RowType keyType, RowType valueType) { + super(KeyValue.schemaWithLevel(keyType, valueType)); + + this.keyArity = keyType.getFieldCount(); + this.valueArity = valueType.getFieldCount(); + } + + @Override + public InternalRow toRow(KeyValue kv) { + GenericRow meta = GenericRow.of(kv.sequenceNumber(), kv.valueKind().toByteValue()); + return join(join(join(kv.key(), meta), kv.value()), GenericRow.of(kv.level())); + } + + @Override + public KeyValue fromRow(InternalRow row) { + return new KeyValue() + .replace( + new OffsetRow(keyArity, 0).replace(row), + row.getLong(keyArity), + RowKind.fromByteValue(row.getByte(keyArity + 1)), + new OffsetRow(valueArity, keyArity + 2).replace(row)) + .setLevel(row.getInt(keyArity + 2 + valueArity)); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeSorterTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeSorterTest.java index 557bd2959f9e..b368e60147f8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeSorterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeSorterTest.java @@ -24,10 +24,11 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; -import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier; import org.apache.paimon.mergetree.compact.MergeFunctionWrapper; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.reader.SizedReaderSupplier; import org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension; import org.apache.paimon.testutils.junit.parameterized.Parameters; import org.apache.paimon.types.DataTypes; @@ -139,11 +140,11 @@ private void innerTest(FieldsComparator userDefinedSeqComparator) throws Excepti } comparator = comparator.thenComparingLong(KeyValue::sequenceNumber); - List> readers = new ArrayList<>(); + List> readers = new ArrayList<>(); Random rnd = new Random(); List expectedKvs = new ArrayList<>(); Set distinctSeq = new HashSet<>(); - for (int i = 0; i < rnd.nextInt(10) + 3; i++) { + for (int i = 0; i < rnd.nextInt(20) + 3; i++) { List kvs = new ArrayList<>(); Set distinctKeys = new HashSet<>(); for (int j = 0; j < 100; j++) { @@ -172,7 +173,18 @@ private void innerTest(FieldsComparator userDefinedSeqComparator) throws Excepti } expectedKvs.addAll(kvs); kvs.sort(comparator); - readers.add(() -> new IteratorRecordReader<>(kvs.iterator())); + readers.add( + new SizedReaderSupplier() { + @Override + public long estimateSize() { + return kvs.size(); + } + + @Override + public RecordReader get() { + return new IteratorRecordReader<>(kvs.iterator()); + } + }); } expectedKvs.sort(comparator); diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ConcatRecordReaderTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ConcatRecordReaderTest.java index 6b4a85c0c563..ca04d8a7bf34 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ConcatRecordReaderTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/ConcatRecordReaderTest.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.CoreOptions.SortEngine; import org.apache.paimon.KeyValue; +import org.apache.paimon.reader.ReaderSupplier; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.utils.ReusingTestData; import org.apache.paimon.utils.TestReusingRecordReader; @@ -48,9 +49,7 @@ protected List getExpected(List input) { protected RecordReader createRecordReader( List readers, SortEngine sortEngine) { return new ConcatRecordReader( - readers.stream() - .map(r -> (ConcatRecordReader.ReaderSupplier) () -> r) - .collect(Collectors.toList())); + readers.stream().map(r -> (ReaderSupplier) () -> r).collect(Collectors.toList())); } @Test diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java index ec9197b8b9af..ff67dbc27d04 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java @@ -32,6 +32,7 @@ import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.mergetree.compact.ConcatRecordReader; import org.apache.paimon.options.Options; +import org.apache.paimon.reader.ReaderSupplier; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.schema.Schema; @@ -284,7 +285,7 @@ private void validateChangelog( while (id <= max) { List splits = scan.plan().splits(); if (!splits.isEmpty()) { - List> readers = new ArrayList<>(); + List> readers = new ArrayList<>(); for (Split split : splits) { readers.add(() -> scanTable.newRead().createReader(split)); } @@ -318,7 +319,7 @@ private void validateChangelog( private void validateSnapshot(Snapshot snapshot, List data) throws Exception { List splits = table.newSnapshotReader().withSnapshot(snapshot).read().splits(); - List> readers = new ArrayList<>(); + List> readers = new ArrayList<>(); for (Split split : splits) { readers.add(() -> table.newRead().createReader(split)); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index daf5db716b1f..937005d21d65 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -37,11 +37,11 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.mergetree.compact.ConcatRecordReader; -import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier; import org.apache.paimon.operation.FileStoreTestUtils; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.reader.ReaderSupplier; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.schema.SchemaChange; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java index f5874bed7eac..27789241037b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTableTestBase.java @@ -30,6 +30,7 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.mergetree.compact.ConcatRecordReader; import org.apache.paimon.options.Options; +import org.apache.paimon.reader.ReaderSupplier; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.schema.Schema; @@ -446,7 +447,7 @@ protected static void checkFilterRowCount( protected List getResult( TableRead read, List splits, Function rowDataToString) { try { - List> readers = new ArrayList<>(); + List> readers = new ArrayList<>(); for (Split split : splits) { readers.add(() -> read.createReader(split)); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java index defbe24ecfa3..2ed0d5c9b34b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java @@ -28,6 +28,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.mergetree.compact.ConcatRecordReader; import org.apache.paimon.options.Options; +import org.apache.paimon.reader.ReaderSupplier; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.schema.Schema; @@ -109,7 +110,7 @@ protected BinaryRow binaryRow(int a) { } protected List getResult(TableRead read, List splits) throws Exception { - List> readers = new ArrayList<>(); + List> readers = new ArrayList<>(); for (Split split : splits) { readers.add(() -> read.createReader(split)); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java index ea9256830284..ceb40c1a864f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java @@ -25,6 +25,7 @@ import org.apache.paimon.options.ConfigOption; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.reader.ReaderSupplier; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; @@ -144,7 +145,7 @@ public RecordReader nextBatch(boolean useParallelism) throws Except options.pageSize(), new Options(table.options()).get(LOOKUP_BOOTSTRAP_PARALLELISM)); } else { - List> readers = new ArrayList<>(); + List> readers = new ArrayList<>(); for (Split split : splits) { readers.add(() -> readerSupplier.apply(split)); }