diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java index 46d6c66043b9..8f5485dbe66d 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java @@ -74,25 +74,13 @@ public FileIndexResult evaluate(@Nullable Predicate predicate) { Set requiredFieldNames = getRequiredNames(predicate); Map> indexReaders = new HashMap<>(); requiredFieldNames.forEach(name -> indexReaders.put(name, reader.readColumnIndex(name))); - return new FileIndexPredicateTest(indexReaders).test(predicate); - } - - public boolean testPredicate(@Nullable Predicate filePredicate) { - if (filePredicate == null) { - return true; - } - - Set requiredFieldNames = getRequiredNames(filePredicate); - - Map> indexReaders = new HashMap<>(); - requiredFieldNames.forEach(name -> indexReaders.put(name, reader.readColumnIndex(name))); - if (!new FileIndexPredicateTest(indexReaders).test(filePredicate).remain()) { + FileIndexResult result = new FileIndexPredicateTest(indexReaders).test(predicate); + if (!result.remain()) { LOG.debug( "One file has been filtered: " + (path == null ? "in scan stage" : path.toString())); - return false; } - return true; + return result; } private Set getRequiredNames(Predicate filePredicate) { diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java index db76a1553b9e..0d3dd7c79ff3 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java @@ -23,21 +23,25 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.reader.RecordReader; +import javax.annotation.Nullable; + /** the context for creating RecordReader {@link RecordReader}. */ public class FormatReaderContext implements FormatReaderFactory.Context { private final FileIO fileIO; private final Path file; private final long fileSize; - private FileIndexResult fileIndexResult; + @Nullable private final FileIndexResult fileIndexResult; public FormatReaderContext(FileIO fileIO, Path file, long fileSize) { + this(fileIO, file, fileSize, null); + } + + public FormatReaderContext( + FileIO fileIO, Path file, long fileSize, @Nullable FileIndexResult fileIndexResult) { this.fileIO = fileIO; this.file = file; this.fileSize = fileSize; - } - - public void setFileIndexResult(FileIndexResult fileIndexResult) { this.fileIndexResult = fileIndexResult; } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexSkipper.java b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexSkipper.java deleted file mode 100644 index 0c4ac82a05c3..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexSkipper.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.io; - -import org.apache.paimon.fileindex.FileIndexPredicate; -import org.apache.paimon.fs.FileIO; -import org.apache.paimon.predicate.Predicate; -import org.apache.paimon.predicate.PredicateBuilder; -import org.apache.paimon.schema.TableSchema; - -import java.io.IOException; -import java.util.List; -import java.util.stream.Collectors; - -/** File index reader, do the filter in the constructor. */ -public class FileIndexSkipper { - - public static boolean skip( - FileIO fileIO, - TableSchema dataSchema, - List dataFilter, - DataFilePathFactory dataFilePathFactory, - DataFileMeta file) - throws IOException { - if (dataFilter != null && !dataFilter.isEmpty()) { - List indexFiles = - file.extraFiles().stream() - .filter(name -> name.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX)) - .collect(Collectors.toList()); - if (!indexFiles.isEmpty()) { - if (indexFiles.size() > 1) { - throw new RuntimeException( - "Found more than one index file for one data file: " - + String.join(" and ", indexFiles)); - } - // go to file index check - try (FileIndexPredicate predicate = - new FileIndexPredicate( - dataFilePathFactory.toPath(indexFiles.get(0)), - fileIO, - dataSchema.logicalRowType())) { - if (!predicate.testPredicate( - PredicateBuilder.and(dataFilter.toArray(new Predicate[0])))) { - return true; - } - } - } - } - - return false; - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index 50a8c74dcf4a..60b4e7933cb1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -120,7 +120,7 @@ private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestEntry try (FileIndexPredicate predicate = new FileIndexPredicate(embeddedIndexBytes, dataRowType)) { - return predicate.testPredicate(dataPredicate); + return predicate.evaluate(dataPredicate).remain(); } catch (IOException e) { throw new RuntimeException("Exception happens while checking predicate.", e); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java index 58b03694d74b..c368d9e510b0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java @@ -144,7 +144,7 @@ private boolean filterByFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestE id -> fieldValueStatsConverters.convertFilter( entry.file().schemaId(), valueFilter)); - return predicate.testPredicate(dataPredicate); + return predicate.evaluate(dataPredicate).remain(); } catch (IOException e) { throw new RuntimeException("Exception happens while checking fileIndex predicate.", e); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java index cc06eb31ee5a..9c612a9f8cf0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java @@ -208,8 +208,10 @@ private RecordReader createFileReader( FormatReaderContext formatReaderContext = new FormatReaderContext( - fileIO, dataFilePathFactory.toPath(file.fileName()), file.fileSize()); - formatReaderContext.setFileIndexResult(fileIndexResult); + fileIO, + dataFilePathFactory.toPath(file.fileName()), + file.fileSize(), + fileIndexResult); FileRecordReader fileRecordReader = new FileRecordReader( bulkFormatMapping.getReaderFactory(), diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java index 038c91445b1a..82d19e448878 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java @@ -18,6 +18,7 @@ package org.apache.paimon.format.parquet; +import org.apache.paimon.fileindex.FileIndexResult; import org.apache.paimon.format.SimpleStatsExtractor; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; @@ -48,7 +49,7 @@ public class ParquetUtil { */ public static Pair>, SimpleStatsExtractor.FileInfo> extractColumnStats(FileIO fileIO, Path path) throws IOException { - try (ParquetFileReader reader = getParquetReader(fileIO, path)) { + try (ParquetFileReader reader = getParquetReader(fileIO, path, null)) { ParquetMetadata parquetMetadata = reader.getFooter(); List blockMetaDataList = parquetMetadata.getBlocks(); Map> resultStats = new HashMap<>(); @@ -77,11 +78,12 @@ public class ParquetUtil { * @param path the path of parquet files to be read * @return parquet reader, used for reading footer, status, etc. */ - public static ParquetFileReader getParquetReader(FileIO fileIO, Path path) throws IOException { + public static ParquetFileReader getParquetReader( + FileIO fileIO, Path path, FileIndexResult fileIndexResult) throws IOException { return new ParquetFileReader( ParquetInputFile.fromPath(fileIO, path), ParquetReadOptions.builder().build(), - null); + fileIndexResult); } static void assertStatsClass(