From e6fd09eb9f2a8c9636f261454e3596cbe6b0cb1f Mon Sep 17 00:00:00 2001 From: Jingsong Date: Mon, 11 Mar 2024 17:10:09 +0800 Subject: [PATCH] fix --- .../operation/KeyValueFileStoreRead.java | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java index c959c293114d2..2fadaa1e0afa0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java @@ -78,9 +78,9 @@ public class KeyValueFileStoreRead implements FileStoreRead { @Nullable private int[][] keyProjectedFields; - @Nullable private List filtersForMerge; + @Nullable private List filtersForKeys; - @Nullable private List filtersForNonMerge; + @Nullable private List filtersForAll; @Nullable private int[][] pushdownProjection; @Nullable private int[][] outerProjection; @@ -164,8 +164,8 @@ public FileStoreRead withFilter(Predicate predicate) { // So for sections with overlapping runs, we only push down key filters. // For sections with only one run, as each key only appears once, it is OK to push down // value filters. - filtersForNonMerge = allFilters; - filtersForMerge = pkFilters; + filtersForAll = allFilters; + filtersForKeys = pkFilters; return this; } @@ -191,7 +191,8 @@ private RecordReader createReaderWithoutOuterProjection(DataSplit spli split.partition(), split.bucket(), split.beforeFiles(), - split.beforeDeletionFiles().orElse(null))); + split.beforeDeletionFiles().orElse(null), + split.isStreaming())); } else { beforeSupplier = () -> @@ -211,7 +212,8 @@ private RecordReader createReaderWithoutOuterProjection(DataSplit spli split.partition(), split.bucket(), split.dataFiles(), - split.deletionFiles().orElse(null)); + split.deletionFiles().orElse(null), + split.isStreaming()); } else { dataSupplier = () -> @@ -245,9 +247,9 @@ private RecordReader mergeRead( // Sections are read by SortMergeReader, which sorts and merges records by keys. // So we cannot project keys or else the sorting will be incorrect. KeyValueFileReaderFactory overlappedSectionFactory = - readerFactoryBuilder.build(partition, bucket, false, filtersForMerge); + readerFactoryBuilder.build(partition, bucket, false, filtersForKeys); KeyValueFileReaderFactory nonOverlappedSectionFactory = - readerFactoryBuilder.build(partition, bucket, false, filtersForNonMerge); + readerFactoryBuilder.build(partition, bucket, false, filtersForAll); List> sectionReaders = new ArrayList<>(); MergeFunctionWrapper mergeFuncWrapper = @@ -279,10 +281,12 @@ private RecordReader noMergeRead( BinaryRow partition, int bucket, List files, - @Nullable List deletionFiles) + @Nullable List deletionFiles, + boolean onlyFilterKey) throws IOException { KeyValueFileReaderFactory readerFactory = - readerFactoryBuilder.build(partition, bucket, true, filtersForNonMerge); + readerFactoryBuilder.build( + partition, bucket, true, onlyFilterKey ? filtersForKeys : filtersForAll); List> suppliers = new ArrayList<>(); for (int i = 0; i < files.size(); i++) { DataFileMeta file = files.get(i);