From 7766f69b8738d8df8e07f87e150653b13f95552f Mon Sep 17 00:00:00 2001 From: tanjialiang Date: Wed, 11 Dec 2024 10:34:58 +0800 Subject: [PATCH] [core] support file index filter push down --- ...leIndexFilterFallbackPredicateVisitor.java | 64 +++++++++ .../FileIndexFilterPushDownAnalyzer.java | 103 +++++++++++++++ .../FileIndexFilterPushDownVisitor.java | 72 ++++++++++ .../paimon/fileindex/FileIndexOptions.java | 45 +++++++ .../paimon/fileindex/FileIndexResult.java | 9 ++ .../apache/paimon/fileindex/FileIndexer.java | 2 + .../fileindex/bitmap/BitmapFileIndex.java | 41 ++++++ .../fileindex/bitmap/BitmapIndexResult.java | 17 ++- .../bloomfilter/BloomFilterFileIndex.java | 8 ++ .../bsi/BitSliceIndexBitmapFileIndex.java | 67 ++++++++++ .../paimon/predicate/PredicateBuilder.java | 14 ++ .../paimon/reader/FileRecordReader.java | 23 ++++ ...dexFilterFallbackPredicateVisitorTest.java | 112 ++++++++++++++++ .../FileIndexFilterPushDownVisitorTest.java | 123 ++++++++++++++++++ .../paimon/io/KeyValueFileReaderFactory.java | 2 +- .../paimon/operation/MergeFileSplitRead.java | 9 ++ .../paimon/operation/RawFileSplitRead.java | 33 ++++- .../apache/paimon/operation/SplitRead.java | 8 ++ .../table/AppendOnlyFileStoreTable.java | 17 +++ .../table/PrimaryKeyFileStoreTable.java | 11 ++ .../java/org/apache/paimon/table/Table.java | 7 + .../paimon/table/source/InnerTableRead.java | 7 + .../table/source/KeyValueTableRead.java | 11 ++ .../paimon/table/source/ReadBuilder.java | 5 + .../paimon/table/source/ReadBuilderImpl.java | 11 ++ .../splitread/IncrementalDiffSplitRead.java | 6 + .../paimon/utils/BulkFormatMapping.java | 10 +- .../table/AppendOnlyFileStoreTableTest.java | 90 +++++++++++++ .../table/PrimaryKeyFileStoreTableTest.java | 73 +++++++++++ .../paimon/flink/source/DataTableSource.java | 4 + .../flink/lookup/LookupCompactDiffRead.java | 8 ++ .../flink/source/BaseDataTableSource.java | 8 ++ .../paimon/flink/source/DataTableSource.java | 4 + .../flink/source/FlinkSourceBuilder.java | 9 ++ .../paimon/flink/source/FlinkTableSource.java | 15 ++- .../flink/source/FilterPushDownITCase.java | 61 ++++++++- .../FileStoreTableStatisticsTestBase.java | 3 + .../PrimaryKeyTableStatisticsTest.java | 1 + 38 files changed, 1102 insertions(+), 11 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFilterFallbackPredicateVisitor.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFilterPushDownAnalyzer.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFilterPushDownVisitor.java create mode 100644 paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFilterFallbackPredicateVisitorTest.java create mode 100644 paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFilterPushDownVisitorTest.java diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFilterFallbackPredicateVisitor.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFilterFallbackPredicateVisitor.java new file mode 100644 index 000000000000..89b284f5cf5c --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFilterFallbackPredicateVisitor.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fileindex; + +import org.apache.paimon.predicate.CompoundPredicate; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateVisitor; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +/** Visit the predicate and extract the file index fallback predicate. */ +public class FileIndexFilterFallbackPredicateVisitor + implements PredicateVisitor> { + + private final Set fields; + + public FileIndexFilterFallbackPredicateVisitor(Set fields) { + this.fields = fields; + } + + @Override + public Optional visit(LeafPredicate predicate) { + if (fields.contains(predicate.fieldRef())) { + return Optional.empty(); + } + return Optional.of(predicate); + } + + @Override + public Optional visit(CompoundPredicate predicate) { + List converted = new ArrayList<>(); + for (Predicate child : predicate.children()) { + child.visit(this).ifPresent(converted::add); + } + if (converted.isEmpty()) { + return Optional.empty(); + } + if (converted.size() == 1) { + return Optional.of(converted.get(0)); + } + return Optional.of(new CompoundPredicate(predicate.function(), converted)); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFilterPushDownAnalyzer.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFilterPushDownAnalyzer.java new file mode 100644 index 000000000000..c92cd9d34766 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFilterPushDownAnalyzer.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fileindex; + +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.predicate.FunctionVisitor; + +import java.util.List; + +/** Visit the predicate and check if file index can be filter push down. */ +public abstract class FileIndexFilterPushDownAnalyzer implements FunctionVisitor { + + @Override + public Boolean visitIsNotNull(FieldRef fieldRef) { + return false; + } + + @Override + public Boolean visitIsNull(FieldRef fieldRef) { + return false; + } + + @Override + public Boolean visitStartsWith(FieldRef fieldRef, Object literal) { + return false; + } + + @Override + public Boolean visitEndsWith(FieldRef fieldRef, Object literal) { + return false; + } + + @Override + public Boolean visitContains(FieldRef fieldRef, Object literal) { + return false; + } + + @Override + public Boolean visitLessThan(FieldRef fieldRef, Object literal) { + return false; + } + + @Override + public Boolean visitGreaterOrEqual(FieldRef fieldRef, Object literal) { + return false; + } + + @Override + public Boolean visitNotEqual(FieldRef fieldRef, Object literal) { + return false; + } + + @Override + public Boolean visitLessOrEqual(FieldRef fieldRef, Object literal) { + return false; + } + + @Override + public Boolean visitEqual(FieldRef fieldRef, Object literal) { + return false; + } + + @Override + public Boolean visitGreaterThan(FieldRef fieldRef, Object literal) { + return false; + } + + @Override + public Boolean visitIn(FieldRef fieldRef, List literals) { + return false; + } + + @Override + public Boolean visitNotIn(FieldRef fieldRef, List literals) { + return false; + } + + @Override + public Boolean visitAnd(List children) { + throw new UnsupportedOperationException("Should not invoke this"); + } + + @Override + public Boolean visitOr(List children) { + throw new UnsupportedOperationException("Should not invoke this"); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFilterPushDownVisitor.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFilterPushDownVisitor.java new file mode 100644 index 000000000000..d50979603c77 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexFilterPushDownVisitor.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fileindex; + +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.predicate.CompoundPredicate; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateVisitor; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** Visit the predicate and check if index can push down the predicate. */ +public class FileIndexFilterPushDownVisitor implements PredicateVisitor { + + private final Map> analyzers; + + public FileIndexFilterPushDownVisitor() { + this(Collections.emptyMap()); + } + + public FileIndexFilterPushDownVisitor( + Map> analyzers) { + this.analyzers = analyzers; + } + + @Override + public Boolean visit(LeafPredicate predicate) { + List analyzers = + this.analyzers.getOrDefault(predicate.fieldName(), Collections.emptyList()); + for (FileIndexFilterPushDownAnalyzer analyzer : analyzers) { + if (analyzer.visit(predicate)) { + return true; + } + } + return false; + } + + @Override + public Boolean visit(CompoundPredicate predicate) { + for (Predicate child : predicate.children()) { + Boolean matched = child.visit(this); + if (!matched) { + return false; + } + } + return true; + } + + @VisibleForTesting + public Map> getAnalyzers() { + return analyzers; + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java index 81002c291637..ae63fd953803 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexOptions.java @@ -20,10 +20,16 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.options.Options; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypeRoot; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowType; import org.apache.paimon.utils.StringUtils; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -167,6 +173,45 @@ public Options getMapTopLevelOptions(String column, String indexType) { + indexType)); } + public FileIndexFilterPushDownVisitor createFilterPushDownPredicateVisitor(RowType rowType) { + Map> analyzers = new HashMap<>(); + for (Map.Entry> entry : indexTypeOptions.entrySet()) { + Column column = entry.getKey(); + for (Map.Entry typeEntry : entry.getValue().entrySet()) { + String key; + FileIndexFilterPushDownAnalyzer analyzer; + DataField field = rowType.getField(column.columnName); + Options options = typeEntry.getValue(); + if (column.isNestedColumn) { + if (field.type().getTypeRoot() != DataTypeRoot.MAP) { + throw new IllegalArgumentException( + "Column " + + column.columnName + + " is nested column, but is not map type. Only should map type yet."); + } + MapType mapType = (MapType) field.type(); + Options mapTopLevelOptions = + getMapTopLevelOptions(column.columnName, typeEntry.getKey()); + key = FileIndexCommon.toMapKey(column.columnName, column.nestedColumnName); + analyzer = + FileIndexer.create( + typeEntry.getKey(), + mapType.getValueType(), + new Options( + mapTopLevelOptions.toMap(), options.toMap())) + .createFilterPushDownAnalyzer(); + } else { + key = column.columnName; + analyzer = + FileIndexer.create(typeEntry.getKey(), field.type(), options) + .createFilterPushDownAnalyzer(); + } + analyzers.computeIfAbsent(key, k -> new ArrayList<>()).add(analyzer); + } + } + return new FileIndexFilterPushDownVisitor(analyzers); + } + public boolean isEmpty() { return indexTypeOptions.isEmpty(); } diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexResult.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexResult.java index 3661a9b59a39..15af1ab1993c 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexResult.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexResult.java @@ -18,6 +18,11 @@ package org.apache.paimon.fileindex; +import org.apache.paimon.predicate.FieldRef; + +import java.util.Collections; +import java.util.Set; + /** File index result to decide whether filter a file. */ public interface FileIndexResult { @@ -59,6 +64,10 @@ public FileIndexResult or(FileIndexResult fileIndexResult) { boolean remain(); + default Set applyIndexes() { + return Collections.emptySet(); + } + default FileIndexResult and(FileIndexResult fileIndexResult) { if (fileIndexResult.remain()) { return this; diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexer.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexer.java index d6c51286a519..f31c49ebee99 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexer.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexer.java @@ -34,6 +34,8 @@ public interface FileIndexer { FileIndexReader createReader(SeekableInputStream inputStream, int start, int length); + FileIndexFilterPushDownAnalyzer createFilterPushDownAnalyzer(); + static FileIndexer create(String type, DataType dataType, Options options) { FileIndexerFactory fileIndexerFactory = FileIndexerFactoryUtils.load(type); return fileIndexerFactory.create(dataType, options); diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndex.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndex.java index 4020302c565c..641aadd02c4b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndex.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndex.java @@ -19,6 +19,7 @@ package org.apache.paimon.fileindex.bitmap; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.fileindex.FileIndexFilterPushDownAnalyzer; import org.apache.paimon.fileindex.FileIndexReader; import org.apache.paimon.fileindex.FileIndexResult; import org.apache.paimon.fileindex.FileIndexWriter; @@ -71,6 +72,11 @@ public FileIndexReader createReader( } } + @Override + public FileIndexFilterPushDownAnalyzer createFilterPushDownAnalyzer() { + return new FilterPushDownAnalyzer(); + } + private static class Writer extends FileIndexWriter { private final DataType dataType; @@ -189,6 +195,7 @@ public FileIndexResult visitNotEqual(FieldRef fieldRef, Object literal) { @Override public FileIndexResult visitIn(FieldRef fieldRef, List literals) { return new BitmapIndexResult( + Collections.singleton(fieldRef), () -> { readInternalMeta(fieldRef.type()); return getInListResultBitmap(literals); @@ -198,6 +205,7 @@ public FileIndexResult visitIn(FieldRef fieldRef, List literals) { @Override public FileIndexResult visitNotIn(FieldRef fieldRef, List literals) { return new BitmapIndexResult( + Collections.singleton(fieldRef), () -> { readInternalMeta(fieldRef.type()); RoaringBitmap32 bitmap = getInListResultBitmap(literals); @@ -270,6 +278,39 @@ private void readInternalMeta(DataType dataType) { } } + private static class FilterPushDownAnalyzer extends FileIndexFilterPushDownAnalyzer { + + @Override + public Boolean visitEqual(FieldRef fieldRef, Object literal) { + return true; + } + + @Override + public Boolean visitNotEqual(FieldRef fieldRef, Object literal) { + return true; + } + + @Override + public Boolean visitIn(FieldRef fieldRef, List literals) { + return true; + } + + @Override + public Boolean visitNotIn(FieldRef fieldRef, List literals) { + return true; + } + + @Override + public Boolean visitIsNull(FieldRef fieldRef) { + return true; + } + + @Override + public Boolean visitIsNotNull(FieldRef fieldRef) { + return true; + } + } + // Currently, it is mainly used to convert timestamps to long public static Function getValueMapper(DataType dataType) { return dataType.accept( diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java index 8d572ff254fc..6289962d2439 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapIndexResult.java @@ -19,16 +19,22 @@ package org.apache.paimon.fileindex.bitmap; import org.apache.paimon.fileindex.FileIndexResult; +import org.apache.paimon.predicate.FieldRef; import org.apache.paimon.utils.LazyField; import org.apache.paimon.utils.RoaringBitmap32; +import java.util.HashSet; +import java.util.Set; import java.util.function.Supplier; /** bitmap file index result. */ public class BitmapIndexResult extends LazyField implements FileIndexResult { - public BitmapIndexResult(Supplier supplier) { + private final Set fields; + + public BitmapIndexResult(Set fields, Supplier supplier) { super(supplier); + this.fields = new HashSet<>(fields); } @Override @@ -36,10 +42,17 @@ public boolean remain() { return !get().isEmpty(); } + @Override + public Set applyIndexes() { + return fields; + } + @Override public FileIndexResult and(FileIndexResult fileIndexResult) { if (fileIndexResult instanceof BitmapIndexResult) { + fields.addAll(fileIndexResult.applyIndexes()); return new BitmapIndexResult( + fields, () -> RoaringBitmap32.and(get(), ((BitmapIndexResult) fileIndexResult).get())); } return FileIndexResult.super.and(fileIndexResult); @@ -48,7 +61,9 @@ public FileIndexResult and(FileIndexResult fileIndexResult) { @Override public FileIndexResult or(FileIndexResult fileIndexResult) { if (fileIndexResult instanceof BitmapIndexResult) { + fields.addAll(fileIndexResult.applyIndexes()); return new BitmapIndexResult( + fields, () -> RoaringBitmap32.or(get(), ((BitmapIndexResult) fileIndexResult).get())); } return FileIndexResult.super.and(fileIndexResult); diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java index 3c9dcadba3ec..00d145c7d31c 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/bloomfilter/BloomFilterFileIndex.java @@ -18,6 +18,7 @@ package org.apache.paimon.fileindex.bloomfilter; +import org.apache.paimon.fileindex.FileIndexFilterPushDownAnalyzer; import org.apache.paimon.fileindex.FileIndexReader; import org.apache.paimon.fileindex.FileIndexResult; import org.apache.paimon.fileindex.FileIndexWriter; @@ -80,6 +81,11 @@ public FileIndexReader createReader(SeekableInputStream inputStream, int start, } } + @Override + public FileIndexFilterPushDownAnalyzer createFilterPushDownAnalyzer() { + return new FilterPushDownAnalyzer(); + } + private static class Writer extends FileIndexWriter { private final BloomFilter64 filter; @@ -133,4 +139,6 @@ public FileIndexResult visitEqual(FieldRef fieldRef, Object key) { return key == null || filter.testHash(hashFunction.hash(key)) ? REMAIN : SKIP; } } + + private static class FilterPushDownAnalyzer extends FileIndexFilterPushDownAnalyzer {} } diff --git a/paimon-common/src/main/java/org/apache/paimon/fileindex/bsi/BitSliceIndexBitmapFileIndex.java b/paimon-common/src/main/java/org/apache/paimon/fileindex/bsi/BitSliceIndexBitmapFileIndex.java index df6b8d897ca1..243ba22c0d7e 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fileindex/bsi/BitSliceIndexBitmapFileIndex.java +++ b/paimon-common/src/main/java/org/apache/paimon/fileindex/bsi/BitSliceIndexBitmapFileIndex.java @@ -20,6 +20,7 @@ import org.apache.paimon.data.Decimal; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.fileindex.FileIndexFilterPushDownAnalyzer; import org.apache.paimon.fileindex.FileIndexReader; import org.apache.paimon.fileindex.FileIndexResult; import org.apache.paimon.fileindex.FileIndexWriter; @@ -102,6 +103,11 @@ public FileIndexReader createReader(SeekableInputStream inputStream, int start, } } + @Override + public FileIndexFilterPushDownAnalyzer createFilterPushDownAnalyzer() { + return new FilterPushDownAnalyzer(); + } + private static class Writer extends FileIndexWriter { private final Function valueMapper; @@ -209,6 +215,7 @@ public Reader( @Override public FileIndexResult visitIsNull(FieldRef fieldRef) { return new BitmapIndexResult( + Collections.singleton(fieldRef), () -> { RoaringBitmap32 bitmap = RoaringBitmap32.or(positive.isNotNull(), negative.isNotNull()); @@ -220,6 +227,7 @@ public FileIndexResult visitIsNull(FieldRef fieldRef) { @Override public FileIndexResult visitIsNotNull(FieldRef fieldRef) { return new BitmapIndexResult( + Collections.singleton(fieldRef), () -> RoaringBitmap32.or(positive.isNotNull(), negative.isNotNull())); } @@ -236,6 +244,7 @@ public FileIndexResult visitNotEqual(FieldRef fieldRef, Object literal) { @Override public FileIndexResult visitIn(FieldRef fieldRef, List literals) { return new BitmapIndexResult( + Collections.singleton(fieldRef), () -> literals.stream() .map(valueMapper) @@ -255,6 +264,7 @@ public FileIndexResult visitIn(FieldRef fieldRef, List literals) { @Override public FileIndexResult visitNotIn(FieldRef fieldRef, List literals) { return new BitmapIndexResult( + Collections.singleton(fieldRef), () -> { RoaringBitmap32 ebm = RoaringBitmap32.or(positive.isNotNull(), negative.isNotNull()); @@ -279,6 +289,7 @@ public FileIndexResult visitNotIn(FieldRef fieldRef, List literals) { @Override public FileIndexResult visitLessThan(FieldRef fieldRef, Object literal) { return new BitmapIndexResult( + Collections.singleton(fieldRef), () -> { Long value = valueMapper.apply(literal); if (value < 0) { @@ -292,6 +303,7 @@ public FileIndexResult visitLessThan(FieldRef fieldRef, Object literal) { @Override public FileIndexResult visitLessOrEqual(FieldRef fieldRef, Object literal) { return new BitmapIndexResult( + Collections.singleton(fieldRef), () -> { Long value = valueMapper.apply(literal); if (value < 0) { @@ -305,6 +317,7 @@ public FileIndexResult visitLessOrEqual(FieldRef fieldRef, Object literal) { @Override public FileIndexResult visitGreaterThan(FieldRef fieldRef, Object literal) { return new BitmapIndexResult( + Collections.singleton(fieldRef), () -> { Long value = valueMapper.apply(literal); if (value < 0) { @@ -319,6 +332,7 @@ public FileIndexResult visitGreaterThan(FieldRef fieldRef, Object literal) { @Override public FileIndexResult visitGreaterOrEqual(FieldRef fieldRef, Object literal) { return new BitmapIndexResult( + Collections.singleton(fieldRef), () -> { Long value = valueMapper.apply(literal); if (value < 0) { @@ -331,6 +345,59 @@ public FileIndexResult visitGreaterOrEqual(FieldRef fieldRef, Object literal) { } } + private static class FilterPushDownAnalyzer extends FileIndexFilterPushDownAnalyzer { + + @Override + public Boolean visitIsNull(FieldRef fieldRef) { + return true; + } + + @Override + public Boolean visitIsNotNull(FieldRef fieldRef) { + return true; + } + + @Override + public Boolean visitEqual(FieldRef fieldRef, Object literal) { + return true; + } + + @Override + public Boolean visitNotEqual(FieldRef fieldRef, Object literal) { + return true; + } + + @Override + public Boolean visitIn(FieldRef fieldRef, List literals) { + return true; + } + + @Override + public Boolean visitNotIn(FieldRef fieldRef, List literals) { + return true; + } + + @Override + public Boolean visitLessThan(FieldRef fieldRef, Object literal) { + return true; + } + + @Override + public Boolean visitLessOrEqual(FieldRef fieldRef, Object literal) { + return true; + } + + @Override + public Boolean visitGreaterThan(FieldRef fieldRef, Object literal) { + return true; + } + + @Override + public Boolean visitGreaterOrEqual(FieldRef fieldRef, Object literal) { + return true; + } + } + public static Function getValueMapper(DataType dataType) { return dataType.accept( new DataTypeDefaultVisitor>() { diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java index f3d0b42bddea..c47f2313953f 100644 --- a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java @@ -390,6 +390,20 @@ public static List excludePredicateWithFields( .collect(Collectors.toList()); } + public static List trimPredicate( + @Nullable List predicates, @Nullable Predicate exclude) { + if (predicates == null || predicates.isEmpty() || exclude == null) { + return predicates; + } + List result = new ArrayList<>(); + for (Predicate predicate : predicates) { + if (!predicate.equals(exclude)) { + result.add(predicate); + } + } + return result; + } + @Nullable public static Predicate partition( Map map, RowType rowType, String defaultPartValue) { diff --git a/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordReader.java b/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordReader.java index 4d5356edf275..189930816f1f 100644 --- a/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordReader.java @@ -18,6 +18,8 @@ package org.apache.paimon.reader; +import org.apache.paimon.utils.Filter; + import javax.annotation.Nullable; import java.io.IOException; @@ -28,4 +30,25 @@ public interface FileRecordReader extends RecordReader { @Override @Nullable FileRecordIterator readBatch() throws IOException; + + @Override + default FileRecordReader filter(Filter filter) { + FileRecordReader thisReader = this; + return new FileRecordReader() { + @Nullable + @Override + public FileRecordIterator readBatch() throws IOException { + FileRecordIterator iterator = thisReader.readBatch(); + if (iterator == null) { + return null; + } + return iterator.filter(filter); + } + + @Override + public void close() throws IOException { + thisReader.close(); + } + }; + } } diff --git a/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFilterFallbackPredicateVisitorTest.java b/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFilterFallbackPredicateVisitorTest.java new file mode 100644 index 000000000000..9751fd624918 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFilterFallbackPredicateVisitorTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fileindex; + +import org.apache.paimon.predicate.CompoundPredicate; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link FileIndexFilterFallbackPredicateVisitor}. */ +public class FileIndexFilterFallbackPredicateVisitorTest { + + @Test + public void testVisitLeafPredicate() { + RowType rowType = + new RowType( + Arrays.asList( + new DataField(0, "a", DataTypes.STRING()), + new DataField(1, "b", DataTypes.STRING()))); + Set indexFields = Collections.singleton(new FieldRef(0, "a", DataTypes.STRING())); + FileIndexFilterFallbackPredicateVisitor visitor = + new FileIndexFilterFallbackPredicateVisitor(indexFields); + + Predicate p1 = new PredicateBuilder(rowType).equal(0, "a"); + Optional r1 = visitor.visit((LeafPredicate) p1); + assertThat(r1.isPresent()).isFalse(); + + Predicate p2 = new PredicateBuilder(rowType).equal(1, "b"); + Optional r2 = visitor.visit((LeafPredicate) p2); + assertThat(r2).isPresent(); + assertThat(r2.get()).isEqualTo(p2); + } + + @Test + public void testVisitCompoundPredicate() { + RowType rowType = + new RowType( + Arrays.asList( + new DataField(0, "a", DataTypes.INT()), + new DataField(1, "b", DataTypes.STRING()), + new DataField(2, "c", DataTypes.INT()))); + + Predicate predicate = + PredicateBuilder.and( + new PredicateBuilder(rowType).greaterThan(0, 1), + new PredicateBuilder(rowType).equal(1, "b"), + new PredicateBuilder(rowType).lessThan(2, 2)); + + Set indexFields = + new HashSet<>( + Arrays.asList( + new FieldRef(0, "a", DataTypes.INT()), + new FieldRef(2, "c", DataTypes.INT()))); + FileIndexFilterFallbackPredicateVisitor v1 = + new FileIndexFilterFallbackPredicateVisitor(indexFields); + Optional r1 = v1.visit((CompoundPredicate) predicate); + assertThat(r1).isPresent(); + assertThat(r1.get()).isEqualTo(new PredicateBuilder(rowType).equal(1, "b")); + + FileIndexFilterFallbackPredicateVisitor v2 = + new FileIndexFilterFallbackPredicateVisitor( + new HashSet<>( + Collections.singletonList(new FieldRef(0, "a", DataTypes.INT())))); + Optional r2 = v2.visit((CompoundPredicate) predicate); + assertThat(r2).isPresent(); + assertThat(r2.get()) + .isEqualTo( + PredicateBuilder.and( + new PredicateBuilder(rowType).equal(1, "b"), + new PredicateBuilder(rowType).lessThan(2, 2))); + + FileIndexFilterFallbackPredicateVisitor v3 = + new FileIndexFilterFallbackPredicateVisitor( + new HashSet<>( + Arrays.asList( + new FieldRef(0, "a", DataTypes.INT()), + new FieldRef(1, "b", DataTypes.STRING()), + new FieldRef(2, "c", DataTypes.INT())))); + Optional r3 = v3.visit((CompoundPredicate) predicate); + assertThat(r3.isPresent()).isFalse(); + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFilterPushDownVisitorTest.java b/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFilterPushDownVisitorTest.java new file mode 100644 index 000000000000..b6235641bc91 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/fileindex/FileIndexFilterPushDownVisitorTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.fileindex; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.fileindex.bloomfilter.BloomFilterFileIndexFactory; +import org.apache.paimon.fileindex.bsi.BitSliceIndexBitmapFileIndexFactory; +import org.apache.paimon.predicate.Equal; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link FileIndexOptions}. */ +public class FileIndexFilterPushDownVisitorTest { + + private RowType rowType; + private FileIndexFilterPushDownVisitor visitor; + + @BeforeEach + public void setup() { + this.rowType = + RowType.builder() + .field("a", DataTypes.INT()) + .field("b", DataTypes.INT()) + .field("c", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())) + .build(); + Map properties = new HashMap<>(); + properties.put( + FileIndexOptions.FILE_INDEX + + "." + + BloomFilterFileIndexFactory.BLOOM_FILTER + + "." + + CoreOptions.COLUMNS, + "a,c[hello]"); + properties.put( + FileIndexOptions.FILE_INDEX + + "." + + BloomFilterFileIndexFactory.BLOOM_FILTER + + "." + + "c[hello].fpp", + "200"); + properties.put( + FileIndexOptions.FILE_INDEX + + "." + + BitSliceIndexBitmapFileIndexFactory.BSI_INDEX + + "." + + CoreOptions.COLUMNS, + "a,b"); + FileIndexOptions fileIndexOptions = CoreOptions.fromMap(properties).indexColumnsOptions(); + this.visitor = fileIndexOptions.createFilterPushDownPredicateVisitor(rowType); + } + + @Test + public void testCreateFilterPushDownPredicateVisitor() { + Map> analyzers = visitor.getAnalyzers(); + assertThat(analyzers.size()).isEqualTo(3); + assertThat(analyzers.get("a").size()).isEqualTo(2); + assertThat(analyzers.get("b").size()).isEqualTo(1); + assertThat(analyzers.get("c[hello]").size()).isEqualTo(1); + } + + @Test + public void testVisitLeafPredicate() { + // test column a with bloom-filter and bsi index + assertThat(visitor.visit((LeafPredicate) new PredicateBuilder(rowType).equal(0, 1))) + .isTrue(); + assertThat(visitor.visit((LeafPredicate) new PredicateBuilder(rowType).lessThan(0, 1))) + .isTrue(); + assertThat(visitor.visit((LeafPredicate) new PredicateBuilder(rowType).greaterThan(0, 1))) + .isTrue(); + assertThat(visitor.visit((LeafPredicate) new PredicateBuilder(rowType).startsWith(0, 1))) + .isFalse(); + + // test column b with bsi index + assertThat(visitor.visit((LeafPredicate) new PredicateBuilder(rowType).equal(1, 1))) + .isTrue(); + assertThat(visitor.visit((LeafPredicate) new PredicateBuilder(rowType).lessThan(1, 1))) + .isTrue(); + assertThat(visitor.visit((LeafPredicate) new PredicateBuilder(rowType).greaterThan(1, 1))) + .isTrue(); + assertThat(visitor.visit((LeafPredicate) new PredicateBuilder(rowType).startsWith(0, 1))) + .isFalse(); + + // test map column c[hello] with bloom-filter index + assertThat( + visitor.visit( + new LeafPredicate( + Equal.INSTANCE, + DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()), + 2, + "c[hello]", + Collections.singletonList(BinaryString.fromString("a"))))) + .isFalse(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index 7d3acd729c55..4857588d1691 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -277,7 +277,7 @@ public KeyValueFileReaderFactory build( finalReadKeyType, readValueType, new BulkFormatMappingBuilder( - formatDiscover, readTableFields, fieldsExtractor, filters), + formatDiscover, readTableFields, fieldsExtractor, filters, null), pathFactory.createDataFilePathFactory(partition, bucket), options.fileReaderAsyncThreshold().getBytes(), partition, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java index 23a3a576e4a6..65c9df314eea 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java @@ -239,6 +239,15 @@ public MergeFileSplitRead withFilter(Predicate predicate) { return this; } + @Override + public MergeFileSplitRead withIndexFilter(Predicate predicate) { + if (predicate == null) { + return this; + } + throw new UnsupportedOperationException( + "index should not be pushed down in the LSM merging reader"); + } + @Override public RecordReader createReader(DataSplit split) throws IOException { if (!split.beforeFiles().isEmpty()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java index 46977457c4be..ccd3f0786835 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java @@ -23,6 +23,7 @@ import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader; import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.disk.IOManager; +import org.apache.paimon.fileindex.FileIndexFilterFallbackPredicateVisitor; import org.apache.paimon.fileindex.FileIndexResult; import org.apache.paimon.fileindex.bitmap.ApplyBitmapIndexRecordReader; import org.apache.paimon.fileindex.bitmap.BitmapIndexResult; @@ -36,6 +37,7 @@ import org.apache.paimon.io.FileIndexEvaluator; import org.apache.paimon.mergetree.compact.ConcatRecordReader; import org.apache.paimon.partition.PartitionUtils; +import org.apache.paimon.predicate.FieldRef; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.EmptyFileRecordReader; import org.apache.paimon.reader.FileRecordReader; @@ -58,9 +60,12 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Set; import java.util.function.Supplier; import static org.apache.paimon.predicate.PredicateBuilder.splitAnd; @@ -80,6 +85,7 @@ public class RawFileSplitRead implements SplitRead { private RowType readRowType; @Nullable private List filters; + @Nullable private Predicate indexFilter; public RawFileSplitRead( FileIO fileIO, @@ -123,6 +129,12 @@ public RawFileSplitRead withFilter(Predicate predicate) { return this; } + @Override + public SplitRead withIndexFilter(@Nullable Predicate indexFilter) { + this.indexFilter = indexFilter; + return this; + } + @Override public RecordReader createReader(DataSplit split) throws IOException { if (split.beforeFiles().size() > 0) { @@ -152,7 +164,7 @@ public RecordReader createReader( List readTableFields = readRowType.getFields(); BulkFormatMappingBuilder bulkFormatMappingBuilder = new BulkFormatMappingBuilder( - formatDiscover, readTableFields, TableSchema::fields, filters); + formatDiscover, readTableFields, TableSchema::fields, filters, indexFilter); for (int i = 0; i < files.size(); i++) { DataFileMeta file = files.get(i); @@ -229,10 +241,27 @@ private FileRecordReader createFileReader( fileRecordReader, (BitmapIndexResult) fileIndexResult); } + Set fields = + fileIndexResult == null ? Collections.emptySet() : fileIndexResult.applyIndexes(); + if (indexFilter != null) { + Optional fallbackPredicate = + indexFilter.visit(new FileIndexFilterFallbackPredicateVisitor(fields)); + if (fallbackPredicate.isPresent()) { + Predicate fallback = fallbackPredicate.get(); + LOG.warn( + "The file index of {} was not found at runtime, filter ({}) fallback to use paimon predicate." + + " You can use the `rewrite_file_index` procedure to ensure the integrity of the file index.", + file.fileName(), + fallback); + fileRecordReader = fileRecordReader.filter(fallback::test); + } + } + DeletionVector deletionVector = dvFactory == null ? null : dvFactory.get(); if (deletionVector != null && !deletionVector.isEmpty()) { - return new ApplyDeletionVectorReader(fileRecordReader, deletionVector); + fileRecordReader = new ApplyDeletionVectorReader(fileRecordReader, deletionVector); } + return fileRecordReader; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java index 1722aa53ed9f..5e1c5fee07aa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java @@ -44,6 +44,8 @@ public interface SplitRead { SplitRead withFilter(@Nullable Predicate predicate); + SplitRead withIndexFilter(@Nullable Predicate predicate); + /** Create a {@link RecordReader} from split. */ RecordReader createReader(DataSplit split) throws IOException; @@ -74,6 +76,12 @@ public SplitRead withFilter(@Nullable Predicate predicate) { return this; } + @Override + public SplitRead withIndexFilter(@Nullable Predicate predicate) { + read.withIndexFilter(predicate); + return this; + } + @Override public RecordReader createReader(DataSplit split) throws IOException { return convertedFactory.apply(split); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java index 103fa64050aa..8f020776f45d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java @@ -21,6 +21,7 @@ import org.apache.paimon.AppendOnlyFileStore; import org.apache.paimon.CoreOptions; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.fileindex.FileIndexFilterPushDownVisitor; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.iceberg.AppendOnlyIcebergCommitCallback; @@ -69,6 +70,16 @@ class AppendOnlyFileStoreTable extends AbstractFileStoreTable { super(fileIO, path, tableSchema, catalogEnvironment); } + @Override + public FileIndexFilterPushDownVisitor fileIndexFilterPushDownVisitor() { + CoreOptions options = coreOptions(); + if (options.fileIndexReadEnabled()) { + return options.indexColumnsOptions() + .createFilterPushDownPredicateVisitor(schema().logicalRowType()); + } + return super.fileIndexFilterPushDownVisitor(); + } + @Override public AppendOnlyFileStore store() { if (lazyStore == null) { @@ -120,6 +131,12 @@ protected InnerTableRead innerWithFilter(Predicate predicate) { return this; } + @Override + public InnerTableRead withIndexFilter(Predicate indexPredicate) { + read.withIndexFilter(indexPredicate); + return this; + } + @Override public void applyReadType(RowType readType) { read.withReadType(readType); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java index 516ae766cef8..7ed9d6617cef 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.KeyValue; import org.apache.paimon.KeyValueFileStore; +import org.apache.paimon.fileindex.FileIndexFilterPushDownVisitor; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.iceberg.IcebergOptions; @@ -191,4 +192,14 @@ protected List createCommitCallbacks(String commitUser) { return callbacks; } + + @Override + public FileIndexFilterPushDownVisitor fileIndexFilterPushDownVisitor() { + CoreOptions options = coreOptions(); + if (options.fileIndexReadEnabled() && options.deletionVectorsEnabled()) { + return options.indexColumnsOptions() + .createFilterPushDownPredicateVisitor(schema().logicalRowType()); + } + return super.fileIndexFilterPushDownVisitor(); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index 7ed7ba48a8eb..e61a84741f4b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -21,6 +21,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.annotation.Experimental; import org.apache.paimon.annotation.Public; +import org.apache.paimon.fileindex.FileIndexFilterPushDownVisitor; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFileMeta; @@ -109,6 +110,12 @@ default String uuid() { @Experimental SimpleFileReader indexManifestFileReader(); + /** Visitor to check if index can push the predicate down. */ + @Experimental + default FileIndexFilterPushDownVisitor fileIndexFilterPushDownVisitor() { + return new FileIndexFilterPushDownVisitor(); + } + /** Rollback table's state to a specific snapshot. */ @Experimental void rollbackTo(long snapshotId); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java index a8f189067466..ba6defa8856e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java @@ -36,6 +36,13 @@ default InnerTableRead withFilter(List predicates) { InnerTableRead withFilter(Predicate predicate); + default InnerTableRead withIndexFilter(Predicate predicate) { + if (predicate == null) { + return this; + } + throw new UnsupportedOperationException(); + } + /** Use {@link #withReadType(RowType)} instead. */ @Deprecated default InnerTableRead withProjection(int[] projection) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java index c62f2118df6c..e19d23223cf5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java @@ -52,6 +52,7 @@ public final class KeyValueTableRead extends AbstractDataTableRead { @Nullable private RowType readType = null; private boolean forceKeepDelete = false; private Predicate predicate = null; + private Predicate indexPredicate = null; private IOManager ioManager = null; public KeyValueTableRead( @@ -84,6 +85,9 @@ private void assignValues(SplitRead read) { if (readType != null) { read = read.withReadType(readType); } + if (indexPredicate != null) { + read = read.withIndexFilter(indexPredicate); + } read.withFilter(predicate).withIOManager(ioManager); } @@ -107,6 +111,13 @@ protected InnerTableRead innerWithFilter(Predicate predicate) { return this; } + @Override + public InnerTableRead withIndexFilter(Predicate indexPredicate) { + initialized().forEach(r -> r.withIndexFilter(indexPredicate)); + this.indexPredicate = indexPredicate; + return this; + } + @Override public TableRead withIOManager(IOManager ioManager) { initialized().forEach(r -> r.withIOManager(ioManager)); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java index 0c1386ce441d..67148a3791de 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java @@ -98,6 +98,11 @@ default ReadBuilder withFilter(List predicates) { */ ReadBuilder withFilter(Predicate predicate); + /** + * Push index filters, when index is empty, fallback to use it to do filter in reader instead. + */ + ReadBuilder withIndexFilter(Predicate predicate); + /** Push partition filter. */ ReadBuilder withPartitionFilter(Map partitionSpec); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java index 95bfe6f24bc7..18322067b54d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java @@ -40,6 +40,8 @@ public class ReadBuilderImpl implements ReadBuilder { private Predicate filter; + private Predicate indexFilter; + private Integer limit = null; private Integer shardIndexOfThisSubtask; @@ -81,6 +83,12 @@ public ReadBuilder withFilter(Predicate filter) { return this; } + @Override + public ReadBuilder withIndexFilter(Predicate indexFilter) { + this.indexFilter = indexFilter; + return this; + } + @Override public ReadBuilder withPartitionFilter(Map partitionSpec) { this.partitionSpec = partitionSpec; @@ -176,6 +184,9 @@ public TableRead newRead() { if (readType != null) { read.withReadType(readType); } + if (indexFilter != null) { + read.withIndexFilter(indexFilter); + } return read; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java index 2e236b1dffc0..acf4be5c4489 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/splitread/IncrementalDiffSplitRead.java @@ -83,6 +83,12 @@ public SplitRead withFilter(@Nullable Predicate predicate) { return this; } + @Override + public SplitRead withIndexFilter(@Nullable Predicate predicate) { + mergeRead.withIndexFilter(predicate); + return this; + } + @Override public RecordReader createReader(DataSplit split) throws IOException { RecordReader reader = diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java index 58ef924df178..c34f0ae7f2a8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java @@ -43,6 +43,7 @@ import java.util.function.Function; import static org.apache.paimon.predicate.PredicateBuilder.excludePredicateWithFields; +import static org.apache.paimon.predicate.PredicateBuilder.trimPredicate; import static org.apache.paimon.table.SpecialFields.KEY_FIELD_ID_START; /** Class with index mapping and bulk format. */ @@ -129,16 +130,19 @@ public static class BulkFormatMappingBuilder { private final FileFormatDiscover formatDiscover; private final List readTableFields; private final Function> fieldsExtractor; + @Nullable private final Predicate indexFilter; @Nullable private final List filters; public BulkFormatMappingBuilder( FileFormatDiscover formatDiscover, List readTableFields, Function> fieldsExtractor, - @Nullable List filters) { + @Nullable List filters, + @Nullable Predicate indexFilter) { this.formatDiscover = formatDiscover; this.readTableFields = readTableFields; this.fieldsExtractor = fieldsExtractor; + this.indexFilter = indexFilter; this.filters = filters; } @@ -186,6 +190,8 @@ public BulkFormatMapping build( // build read filters List readFilters = readFilters(filters, tableSchema, dataSchema); + // Skip pushing down index filters to format reader. + List formatFilters = trimPredicate(readFilters, indexFilter); return new BulkFormatMapping( indexCastMapping.getIndexMapping(), @@ -194,7 +200,7 @@ public BulkFormatMapping build( partitionMapping, formatDiscover .discover(formatIdentifier) - .createReaderFactory(readRowType, readFilters), + .createReaderFactory(readRowType, formatFilters), dataSchema, readFilters); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java index 01d4e89af95d..e71222edea5e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java @@ -81,6 +81,7 @@ import static org.apache.paimon.CoreOptions.DATA_FILE_PATH_DIRECTORY; import static org.apache.paimon.CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD; import static org.apache.paimon.CoreOptions.METADATA_STATS_MODE; +import static org.apache.paimon.CoreOptions.WRITE_ONLY; import static org.apache.paimon.io.DataFileTestUtils.row; import static org.apache.paimon.table.sink.KeyAndBucketExtractor.bucket; import static org.apache.paimon.table.sink.KeyAndBucketExtractor.bucketKeyHashCode; @@ -722,6 +723,95 @@ public void testBSIAndBitmapIndexInDisk() throws Exception { }); } + @Test + public void testFileIndexFilterPushDownFallback() throws Exception { + RowType rowType = + RowType.builder() + .field("id", DataTypes.INT()) + .field("event", DataTypes.STRING()) + .field("price", DataTypes.BIGINT()) + .build(); + // in unaware-bucket mode, we split files into splits all the time + FileStoreTable table = + createUnawareBucketFileStoreTable( + rowType, + options -> { + options.set(METADATA_STATS_MODE, "NONE"); + options.set(WRITE_ONLY, true); + options.set(FILE_INDEX_IN_MANIFEST_THRESHOLD.key(), "1 B"); + }); + + // write some records without file index + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + write.write(GenericRow.of(1, BinaryString.fromString("A"), 1L)); + write.write(GenericRow.of(1, BinaryString.fromString("B"), 2L)); + write.write(GenericRow.of(1, BinaryString.fromString("C"), 3L)); + commit.commit(0, write.prepareCommit(true, 0)); + write.close(); + + // add bitmap index and write some records + Map properties = new HashMap<>(); + properties.put( + FileIndexOptions.FILE_INDEX + + "." + + BitmapFileIndexFactory.BITMAP_INDEX + + "." + + CoreOptions.COLUMNS, + "event"); + table = table.copy(properties); + write = table.newWrite(commitUser); + write.write(GenericRow.of(1, BinaryString.fromString("A"), 1L)); + write.write(GenericRow.of(1, BinaryString.fromString("B"), 2L)); + write.write(GenericRow.of(1, BinaryString.fromString("C"), 3L)); + commit.commit(1, write.prepareCommit(true, 1)); + write.close(); + + // add bsi index and write some records + properties.put( + FileIndexOptions.FILE_INDEX + + "." + + BitSliceIndexBitmapFileIndexFactory.BSI_INDEX + + "." + + CoreOptions.COLUMNS, + "price"); + table = table.copy(properties); + write = table.newWrite(commitUser); + write.write(GenericRow.of(1, BinaryString.fromString("A"), 1L)); + write.write(GenericRow.of(1, BinaryString.fromString("B"), 2L)); + write.write(GenericRow.of(1, BinaryString.fromString("C"), 3L)); + commit.commit(2, write.prepareCommit(true, 2)); + write.close(); + commit.close(); + + Function TO_STRING = + row -> row.getInt(0) + "," + row.getString(1).toString() + "," + row.getLong(2); + Predicate predicate = + PredicateBuilder.and( + new PredicateBuilder(rowType).equal(1, BinaryString.fromString("A")), + new PredicateBuilder(rowType).equal(2, 1L)); + TableScan.Plan plan = table.newScan().withFilter(predicate).plan(); + + // test read without index filter, should not fallback + List a1 = new ArrayList<>(); + RecordReader r1 = + table.newRead().withFilter(predicate).createReader(plan.splits()); + r1.forEachRemaining(row -> a1.add(TO_STRING.apply(row))); + assertThat(a1.size()).isEqualTo(5); + assertThat(String.join("|", a1)).isEqualTo("1,A,1|1,B,2|1,C,3|1,A,1|1,A,1"); + + // test read with index filter, should fallback + List a2 = new ArrayList<>(); + RecordReader r2 = + table.newRead() + .withFilter(predicate) + .withIndexFilter(predicate) + .createReader(plan.splits()); + r2.forEachRemaining(row -> a2.add(TO_STRING.apply(row))); + assertThat(a2.size()).isEqualTo(3); + assertThat(String.join("|", a2)).isEqualTo("1,A,1|1,A,1|1,A,1"); + } + @Test public void testWithShardAppendTable() throws Exception { FileStoreTable table = createFileStoreTable(conf -> conf.set(BUCKET, -1)); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index fa635e2ab666..781b76aefca0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -27,6 +27,8 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; import org.apache.paimon.disk.IOManagerImpl; +import org.apache.paimon.fileindex.FileIndexOptions; +import org.apache.paimon.fileindex.bitmap.BitmapFileIndexFactory; import org.apache.paimon.fs.FileIOFinder; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.BundleRecords; @@ -36,6 +38,7 @@ import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.SchemaUtils; @@ -103,6 +106,7 @@ import static org.apache.paimon.CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD; import static org.apache.paimon.CoreOptions.LOOKUP_LOCAL_FILE_TYPE; import static org.apache.paimon.CoreOptions.MERGE_ENGINE; +import static org.apache.paimon.CoreOptions.METADATA_STATS_MODE; import static org.apache.paimon.CoreOptions.MergeEngine; import static org.apache.paimon.CoreOptions.MergeEngine.AGGREGATE; import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE; @@ -1049,6 +1053,75 @@ public void testDeletionVectorsWithBitmapFileIndexInMeta() throws Exception { "1|5|100|binary|varbinary|mapKey:mapVal|multiset")); } + @Test + public void testDeletionVectorsWithFileIndexFilterPushDownFallback() throws Exception { + FileStoreTable table = + createFileStoreTable( + conf -> { + conf.set(BUCKET, 1); + conf.set(METADATA_STATS_MODE, "NONE"); + conf.set(DELETION_VECTORS_ENABLED, true); + conf.set(TARGET_FILE_SIZE, MemorySize.ofBytes(1)); + conf.set(FILE_INDEX_IN_MANIFEST_THRESHOLD, MemorySize.ofBytes(1)); + }); + + StreamTableWrite write = + table.newWrite(commitUser).withIOManager(new IOManagerImpl(tempDir.toString())); + StreamTableCommit commit = table.newCommit(commitUser); + + // write some records without index + write.write(rowData(1, 1, 300L)); + write.write(rowData(1, 2, 400L)); + write.write(rowData(1, 3, 100L)); + write.write(rowData(1, 6, 300L)); + commit.commit(0, write.prepareCommit(true, 0)); + write.close(); + + // add bitmap index and write some records + Map properties = new HashMap<>(); + properties.put( + FileIndexOptions.FILE_INDEX + + "." + + BitmapFileIndexFactory.BITMAP_INDEX + + "." + + CoreOptions.COLUMNS, + "b"); + table = table.copy(properties); + write = table.newWrite(commitUser).withIOManager(new IOManagerImpl(tempDir.toString())); + write.write(rowData(1, 5, 300L)); + write.write(rowData(1, 7, 200L)); + commit.commit(1, write.prepareCommit(true, 1)); + + write.write(rowData(1, 5, 100L)); + commit.commit(2, write.prepareCommit(true, 2)); + write.close(); + commit.close(); + + Predicate predicate = new PredicateBuilder(ROW_TYPE).equal(2, 100L); + List splits = toSplits(table.newSnapshotReader().read().dataSplits()); + + Function TO_STRING = + row -> row.getInt(0) + "," + row.getInt(1) + "," + row.getLong(2); + + // test read without index filter, should not fallback + List a1 = new ArrayList<>(); + RecordReader r1 = table.newRead().withFilter(predicate).createReader(splits); + r1.forEachRemaining(row -> a1.add(TO_STRING.apply(row))); + assertThat(a1.size()).isEqualTo(5); + assertThat(String.join("|", a1)).isEqualTo("1,1,300|1,2,400|1,3,100|1,6,300|1,5,100"); + + // test read with index filter, should fallback + List a2 = new ArrayList<>(); + RecordReader r2 = + table.newRead() + .withFilter(predicate) + .withIndexFilter(predicate) + .createReader(splits); + r2.forEachRemaining(row -> a2.add(TO_STRING.apply(row))); + assertThat(a2.size()).isEqualTo(2); + assertThat(String.join("|", a2)).isEqualTo("1,3,100|1,5,100"); + } + @Test public void testWithShardFirstRow() throws Exception { FileStoreTable table = diff --git a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java index ee00d41832cd..09720de44512 100644 --- a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java +++ b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java @@ -51,6 +51,7 @@ public DataTableSource( null, null, null, + null, false); } @@ -61,6 +62,7 @@ public DataTableSource( DynamicTableFactory.Context context, @Nullable LogStoreTableFactory logStoreTableFactory, @Nullable Predicate predicate, + @Nullable Predicate indexPredicate, @Nullable int[][] projectFields, @Nullable Long limit, @Nullable WatermarkStrategy watermarkStrategy, @@ -72,6 +74,7 @@ public DataTableSource( context, logStoreTableFactory, predicate, + indexPredicate, projectFields, limit, watermarkStrategy, @@ -87,6 +90,7 @@ public DataTableSource copy() { context, logStoreTableFactory, predicate, + indexPredicate, projectFields, limit, watermarkStrategy, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java index 2a140adc3240..73a7e0ed6189 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java @@ -72,6 +72,14 @@ protected InnerTableRead innerWithFilter(Predicate predicate) { return this; } + @Override + public InnerTableRead withIndexFilter(Predicate indexPredicate) { + if (indexPredicate == null) { + return this; + } + throw new UnsupportedOperationException("index should not be pushed down in the lookup"); + } + @Override public InnerTableRead forceKeepDelete() { fullPhaseMergeRead.forceKeepDelete(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java index 5dbbdcedd82a..a75c2b0ec022 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java @@ -107,6 +107,7 @@ public BaseDataTableSource( DynamicTableFactory.Context context, @Nullable LogStoreTableFactory logStoreTableFactory, @Nullable Predicate predicate, + @Nullable Predicate indexPredicate, @Nullable int[][] projectFields, @Nullable Long limit, @Nullable WatermarkStrategy watermarkStrategy, @@ -117,6 +118,7 @@ public BaseDataTableSource( this.context = context; this.logStoreTableFactory = logStoreTableFactory; this.predicate = predicate; + this.indexPredicate = indexPredicate; this.projectFields = projectFields; this.limit = limit; this.watermarkStrategy = watermarkStrategy; @@ -198,6 +200,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .logSourceProvider(logSourceProvider) .projection(projectFields) .predicate(predicate) + .indexPredicate(indexPredicate) .limit(limit) .watermarkStrategy(watermarkStrategy) .dynamicPartitionFilteringFields(dynamicPartitionFilteringFields()); @@ -324,6 +327,11 @@ public boolean applyAggregates( return false; } + // todo: support apply aggregate by indexes + if (indexPredicate != null) { + return false; + } + if (!aggregateExpressions .get(0) .getFunctionDefinition() diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java index 53a1b5f63083..4721b91e5ad2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java @@ -70,6 +70,7 @@ public DataTableSource( null, null, null, + null, false); } @@ -80,6 +81,7 @@ public DataTableSource( DynamicTableFactory.Context context, @Nullable LogStoreTableFactory logStoreTableFactory, @Nullable Predicate predicate, + @Nullable Predicate indexPredicate, @Nullable int[][] projectFields, @Nullable Long limit, @Nullable WatermarkStrategy watermarkStrategy, @@ -92,6 +94,7 @@ public DataTableSource( context, logStoreTableFactory, predicate, + indexPredicate, projectFields, limit, watermarkStrategy, @@ -108,6 +111,7 @@ public DataTableSource copy() { context, logStoreTableFactory, predicate, + indexPredicate, projectFields, limit, watermarkStrategy, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java index e864ec050045..b92479763281 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java @@ -85,6 +85,7 @@ public class FlinkSourceBuilder { private StreamExecutionEnvironment env; @Nullable private int[][] projectedFields; @Nullable private Predicate predicate; + @Nullable private Predicate indexPredicate; @Nullable private LogSourceProvider logSourceProvider; @Nullable private Integer parallelism; @Nullable private Long limit; @@ -133,6 +134,11 @@ public FlinkSourceBuilder predicate(Predicate predicate) { return this; } + public FlinkSourceBuilder indexPredicate(Predicate indexPredicate) { + this.indexPredicate = indexPredicate; + return this; + } + public FlinkSourceBuilder limit(@Nullable Long limit) { this.limit = limit; return this; @@ -177,6 +183,9 @@ private ReadBuilder createReadBuilder() { if (limit != null) { readBuilder.withLimit(limit.intValue()); } + if (indexPredicate != null) { + readBuilder.withIndexFilter(indexPredicate); + } return readBuilder.dropStats(); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java index 12b579589d0f..45fa28002ae2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.source; import org.apache.paimon.CoreOptions; +import org.apache.paimon.fileindex.FileIndexFilterPushDownVisitor; import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.LogicalTypeConversion; import org.apache.paimon.flink.PredicateConverter; @@ -68,6 +69,7 @@ public abstract class FlinkTableSource protected final Table table; @Nullable protected Predicate predicate; + @Nullable protected Predicate indexPredicate; @Nullable protected int[][] projectFields; @Nullable protected Long limit; protected SplitStatistics splitStatistics; @@ -96,9 +98,12 @@ public Result applyFilters(List filters) { // of query will be wrong. List unConsumedFilters = new ArrayList<>(); List consumedFilters = new ArrayList<>(); + List indexConverted = new ArrayList<>(); List converted = new ArrayList<>(); PredicateVisitor onlyPartFieldsVisitor = new PartitionPredicateVisitor(partitionKeys); + FileIndexFilterPushDownVisitor indexPushDownVisitor = + table.fileIndexFilterPushDownVisitor(); for (ResolvedExpression filter : filters) { Optional predicateOptional = PredicateConverter.convert(rowType, filter); @@ -107,15 +112,21 @@ public Result applyFilters(List filters) { unConsumedFilters.add(filter); } else { Predicate p = predicateOptional.get(); - if (isStreaming() || !p.visit(onlyPartFieldsVisitor)) { + if (isStreaming()) { unConsumedFilters.add(filter); - } else { + } else if (p.visit(onlyPartFieldsVisitor)) { + consumedFilters.add(filter); + } else if (p.visit(indexPushDownVisitor)) { consumedFilters.add(filter); + indexConverted.add(p); + } else { + unConsumedFilters.add(filter); } converted.add(p); } } predicate = converted.isEmpty() ? null : PredicateBuilder.and(converted); + indexPredicate = indexConverted.isEmpty() ? null : PredicateBuilder.and(indexConverted); LOG.info("Consumed filters: {} of {}", consumedFilters, filters); return Result.of(filters, unConsumedFilters); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FilterPushDownITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FilterPushDownITCase.java index a7bb919f3876..f32376bf6a1c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FilterPushDownITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FilterPushDownITCase.java @@ -21,8 +21,6 @@ import org.apache.paimon.flink.CatalogITCaseBase; import org.apache.paimon.utils.BlockingIterator; -import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; - import org.apache.flink.table.api.ExplainFormat; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; @@ -43,7 +41,15 @@ public class FilterPushDownITCase extends CatalogITCaseBase { @Override public List ddl() { - return ImmutableList.of("CREATE TABLE T (" + "a INT, b INT, c STRING) PARTITIONED BY (a);"); + return Arrays.asList( + "CREATE TABLE T (" + "a INT, b INT, c STRING) PARTITIONED BY (a);", + "CREATE TABLE T2 (a INT, b INT, c STRING)" + + " PARTITIONED BY (a)" + + " WITH (" + + "'file-index.bitmap.columns'='b'," + + "'file-index.bloom-filter.columns'='c'," + + "'file-index.read.enabled'='true'" + + ");"); } @BeforeEach @@ -51,6 +57,7 @@ public List ddl() { public void before() throws IOException { super.before(); batchSql("INSERT INTO T VALUES (1, 1, '1'), (1, 2, '2'), (2, 3, '3'), (3, 3, '3')"); + batchSql("INSERT INTO T2 VALUES (1, 1, '1'), (1, 2, '2'), (2, 3, '3'), (3, 3, '3')"); } @Test @@ -63,6 +70,54 @@ public void testPartitionConditionConsuming_OnePartitionCondition() { Row.ofKind(RowKind.INSERT, 1, 1, "1")); } + @Test + public void testPartitionAndIndexConditionConsuming() { + // bitmap index push down + String s1 = "SELECT * FROM T2 where a = 1 AND b = 1 limit 1"; + assertPlanAndResult( + s1, + "+- Limit(offset=[0], fetch=[1], global=[false])\n" + + "+- TableSourceScan(table=[[PAIMON, default, T2, filter=[and(=(a, 1), =(b, 1))], project=[c], limit=[1]]], fields=[c])", + Row.ofKind(RowKind.INSERT, 1, 1, "1")); + + // bitmap index does not support range push down + String s2 = "SELECT * FROM T2 where a = 1 AND b > 0 limit 10"; + assertPlanAndResult( + s2, + "+- Limit(offset=[0], fetch=[10], global=[false])\n" + + "+- Calc(select=[CAST(1 AS INTEGER) AS a, b, c], where=[>(b, 0)])\n" + + "+- TableSourceScan(table=[[PAIMON, default, T2, filter=[and(=(a, 1), >(b, 0))], project=[b, c]]], fields=[b, c])", + Row.ofKind(RowKind.INSERT, 1, 1, "1"), + Row.ofKind(RowKind.INSERT, 1, 2, "2")); + + // bloom-filter index does not support filter push down + String s3 = "SELECT * FROM T2 where a = 1 AND c = '1' limit 1"; + assertPlanAndResult( + s3, + "+- Limit(offset=[0], fetch=[1], global=[false])\n" + + "+- Calc(select=[CAST(1 AS INTEGER) AS a, b, CAST('1' AS VARCHAR(2147483647)) AS c], where=[=(c, '1')])\n" + + "+- TableSourceScan(table=[[PAIMON, default, T2, filter=[and(=(a, 1), =(c, _UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))], project=[b, c]]], fields=[b, c])", + Row.ofKind(RowKind.INSERT, 1, 1, "1")); + + // test bitmap and bloom-filter combine + String s4 = "SELECT * FROM T2 where a = 1 AND b = 1 AND c = '1' limit 1"; + assertPlanAndResult( + s4, + "+- Limit(offset=[0], fetch=[1], global=[false])\n" + + "+- Calc(select=[CAST(1 AS INTEGER) AS a, CAST(1 AS INTEGER) AS b, CAST('1' AS VARCHAR(2147483647)) AS c], where=[(c = '1')])\n" + + "+- TableSourceScan(table=[[PAIMON, default, T2, filter=[and(and(=(a, 1), =(b, 1)), =(c, _UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))], project=[c]]], fields=[c])", + Row.ofKind(RowKind.INSERT, 1, 1, "1")); + + // test bitmap and bloom-filter combine without partition + String s5 = "SELECT * FROM T2 where b = 1 AND c = '1' limit 1"; + assertPlanAndResult( + s5, + "+- Limit(offset=[0], fetch=[1], global=[false])\n" + + "+- Calc(select=[a, CAST(1 AS INTEGER) AS b, CAST('1' AS VARCHAR(2147483647)) AS c], where=[(c = '1')])\n" + + "+- TableSourceScan(table=[[PAIMON, default, T2, filter=[and(=(b, 1), =(c, _UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))], project=[a, c]]], fields=[a, c])", + Row.ofKind(RowKind.INSERT, 1, 1, "1")); + } + @Test public void testPartitionConditionConsuming_PartitionConditionAndOther() { String sql = "SELECT * FROM T where (a = 1 or a = 2) and c = '1' limit 1"; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java index 826bf28d1248..30331228ca21 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java @@ -153,6 +153,7 @@ public void testTableFilterPartitionStatistics() throws Exception { null, null, null, + null, false); Assertions.assertThat(partitionFilterSource.reportStatistics().getRowCount()).isEqualTo(5L); Map> colStatsMap = new HashMap<>(); @@ -232,6 +233,7 @@ public void testTableFilterKeyStatistics() throws Exception { null, null, null, + null, false); Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(2L); Map> colStatsMap = new HashMap<>(); @@ -311,6 +313,7 @@ public void testTableFilterValueStatistics() throws Exception { null, null, null, + null, false); Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(4L); Map> colStatsMap = new HashMap<>(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java index f5d4121672b0..a6e3d3f26aa9 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java @@ -52,6 +52,7 @@ public void testTableFilterValueStatistics() throws Exception { null, null, null, + null, false); Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(9L); // TODO validate column statistics