diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ConcatRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ConcatRecordReader.java index 2b5bac269cfb..3b8e6908ffc1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ConcatRecordReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ConcatRecordReader.java @@ -24,6 +24,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Queue; @@ -50,6 +51,11 @@ public static RecordReader create(List> readers) throws return readers.size() == 1 ? readers.get(0).get() : new ConcatRecordReader<>(readers); } + public static RecordReader create(ReaderSupplier reader1, ReaderSupplier reader2) + throws IOException { + return create(Arrays.asList(reader1, reader2)); + } + @Nullable @Override public RecordIterator readBatch() throws IOException { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java index a166a8526aa4..179522610ebc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java @@ -53,7 +53,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Optional; @@ -180,77 +179,78 @@ public RecordReader createReader(DataSplit split) throws IOException { private RecordReader createReaderWithoutOuterProjection(DataSplit split) throws IOException { - ReaderSupplier beforeSupplier = null; - if (split.beforeFiles().size() > 0) { - if (split.isStreaming() || split.beforeDeletionFiles().isPresent()) { - beforeSupplier = - () -> - new ReverseReader( - noMergeRead( - split.partition(), - split.bucket(), - split.beforeFiles(), - split.beforeDeletionFiles().orElse(null), - split.isStreaming())); + if (split.beforeFiles().isEmpty()) { + if (split.isStreaming() || split.deletionFiles().isPresent()) { + return noMergeRead( + split.partition(), + split.bucket(), + split.dataFiles(), + split.deletionFiles().orElse(null), + split.isStreaming()); } else { - beforeSupplier = - () -> - mergeRead( - split.partition(), - split.bucket(), - split.beforeFiles(), - false); + return projectKey( + mergeRead( + split.partition(), + split.bucket(), + split.dataFiles(), + null, + forceKeepDelete)); } - } - - ReaderSupplier dataSupplier; - if (split.isStreaming() || split.deletionFiles().isPresent()) { - dataSupplier = + } else if (split.isStreaming()) { + // streaming concat read + return ConcatRecordReader.create( + () -> + new ReverseReader( + noMergeRead( + split.partition(), + split.bucket(), + split.beforeFiles(), + split.beforeDeletionFiles().orElse(null), + true)), () -> noMergeRead( split.partition(), split.bucket(), split.dataFiles(), split.deletionFiles().orElse(null), - split.isStreaming()); + true)); } else { - dataSupplier = - () -> + // batch diff read + return projectKey( + DiffReader.readDiff( + mergeRead( + split.partition(), + split.bucket(), + split.beforeFiles(), + split.beforeDeletionFiles().orElse(null), + false), mergeRead( split.partition(), split.bucket(), split.dataFiles(), - split.beforeFiles().isEmpty() && forceKeepDelete); - } - - if (split.isStreaming()) { - return beforeSupplier == null - ? dataSupplier.get() - : ConcatRecordReader.create(Arrays.asList(beforeSupplier, dataSupplier)); - } else { - return beforeSupplier == null - ? dataSupplier.get() - : DiffReader.readDiff( - beforeSupplier.get(), - dataSupplier.get(), + split.deletionFiles().orElse(null), + false), keyComparator, userDefinedSeqComparator, mergeSorter, - forceKeepDelete); + forceKeepDelete)); } } private RecordReader mergeRead( - BinaryRow partition, int bucket, List files, boolean keepDelete) + BinaryRow partition, + int bucket, + List files, + @Nullable List deletionFiles, + boolean keepDelete) throws IOException { // Sections are read by SortMergeReader, which sorts and merges records by keys. // So we cannot project keys or else the sorting will be incorrect. + DeletionVector.Factory dvFactory = DeletionVector.factory(fileIO, files, deletionFiles); KeyValueFileReaderFactory overlappedSectionFactory = - readerFactoryBuilder.build( - partition, bucket, DeletionVector.emptyFactory(), false, filtersForKeys); + readerFactoryBuilder.build(partition, bucket, dvFactory, false, filtersForKeys); KeyValueFileReaderFactory nonOverlappedSectionFactory = - readerFactoryBuilder.build( - partition, bucket, DeletionVector.emptyFactory(), false, filtersForAll); + readerFactoryBuilder.build(partition, bucket, dvFactory, false, filtersForAll); List> sectionReaders = new ArrayList<>(); MergeFunctionWrapper mergeFuncWrapper = @@ -274,8 +274,7 @@ private RecordReader mergeRead( reader = new DropDeleteReader(reader); } - // Project results from SortMergeReader using ProjectKeyRecordReader. - return keyProjectedFields == null ? reader : projectKey(reader, keyProjectedFields); + return reader; } private RecordReader noMergeRead( @@ -285,12 +284,11 @@ private RecordReader noMergeRead( @Nullable List deletionFiles, boolean onlyFilterKey) throws IOException { - DeletionVector.Factory dvFactory = DeletionVector.factory(fileIO, files, deletionFiles); KeyValueFileReaderFactory readerFactory = readerFactoryBuilder.build( partition, bucket, - dvFactory, + DeletionVector.factory(fileIO, files, deletionFiles), true, onlyFilterKey ? filtersForKeys : filtersForAll); List> suppliers = new ArrayList<>(); @@ -316,8 +314,11 @@ private Optional changelogFile(DataFileMeta fileMeta) { return Optional.empty(); } - private RecordReader projectKey( - RecordReader reader, int[][] keyProjectedFields) { + private RecordReader projectKey(RecordReader reader) { + if (keyProjectedFields == null) { + return reader; + } + ProjectedRow projectedRow = ProjectedRow.from(keyProjectedFields); return reader.transform(kv -> kv.replaceKey(projectedRow.replaceRow(kv.key()))); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java index 17735b8130db..bb0354eee52e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java @@ -39,7 +39,9 @@ public abstract class KeyValueTableRead extends AbstractDataTableRead protected KeyValueTableRead(KeyValueFileStoreRead read, TableSchema schema) { super(read, schema); - this.read = read; + // We don't need any key fields, the columns that need to be read are already included in + // the value + this.read = read.withKeyProjection(new int[0][]); } @Override @@ -53,10 +55,6 @@ public final RecordReader reader(Split split) throws IOException { return new RowDataRecordReader(read.createReader((DataSplit) split)); } - public final RecordReader kvReader(Split split) throws IOException { - return read.createReader((DataSplit) split); - } - protected abstract RecordReader.RecordIterator rowDataRecordIteratorFromKv( RecordReader.RecordIterator kvRecordIterator); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java index f12d78e80a2a..6e32bb7d3fa5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java @@ -19,9 +19,7 @@ package org.apache.paimon.flink.lookup; import org.apache.paimon.CoreOptions; -import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.data.JoinedRow; import org.apache.paimon.io.SplitsParallelReadUtil; import org.apache.paimon.mergetree.compact.ConcatRecordReader; import org.apache.paimon.options.ConfigOption; @@ -31,11 +29,9 @@ import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; -import org.apache.paimon.table.source.KeyValueTableRead; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.StreamTableScan; -import org.apache.paimon.table.source.TableRead; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FunctionWithIOException; import org.apache.paimon.utils.TypeUtils; @@ -148,25 +144,4 @@ public RecordReader nextBatch(boolean useParallelism) throws Except } return reader; } - - private FunctionWithIOException> - createReaderWithSequenceSupplier() { - return split -> { - TableRead read = readBuilder.newRead(); - if (!(read instanceof KeyValueTableRead)) { - throw new IllegalArgumentException( - "Only KeyValueTableRead supports sequence read, but it is: " + read); - } - - KeyValueTableRead kvRead = (KeyValueTableRead) read; - JoinedRow reused = new JoinedRow(); - return kvRead.kvReader(split) - .transform( - kv -> { - reused.replace(kv.value(), GenericRow.of(kv.sequenceNumber())); - reused.setRowKind(kv.valueKind()); - return reused; - }); - }; - } }