diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index cc45798984014..86186a2b2498d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -238,7 +238,8 @@ private KeyValueFileStoreScan newScan(boolean forWrite) { options.scanManifestParallelism(), options.deletionVectorsEnabled(), options.mergeEngine(), - options.changelogProducer()); + options.changelogProducer(), + options.fileIndexReadEnabled() && options.deletionVectorsEnabled()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index bfda80db984c3..b6cac5ae51304 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -152,6 +152,43 @@ public static DataFileMeta forAppend( valueStatsCols); } + public DataFileMeta( + String fileName, + long fileSize, + long rowCount, + BinaryRow minKey, + BinaryRow maxKey, + SimpleStats keyStats, + SimpleStats valueStats, + long minSequenceNumber, + long maxSequenceNumber, + long schemaId, + int level, + List extraFiles, + @Nullable Long deleteRowCount, + @Nullable byte[] embeddedIndex, + @Nullable FileSource fileSource, + @Nullable List valueStatsCols) { + this( + fileName, + fileSize, + rowCount, + minKey, + maxKey, + keyStats, + valueStats, + minSequenceNumber, + maxSequenceNumber, + schemaId, + level, + extraFiles, + Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(), + deleteRowCount, + embeddedIndex, + fileSource, + valueStatsCols); + } + public DataFileMeta( String fileName, long fileSize, diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java index ba42f87209e7a..ce0b3b02840ba 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.fileindex.FileIndexOptions; import org.apache.paimon.format.FormatWriterFactory; import org.apache.paimon.format.SimpleColStats; import org.apache.paimon.format.SimpleStatsExtractor; @@ -42,9 +43,12 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.function.Function; +import static org.apache.paimon.io.DataFilePathFactory.dataFileToFileIndexPath; + /** * A {@link StatsCollectingSingleFileWriter} to write data files containing {@link KeyValue}s. Also * produces {@link DataFileMeta} after writing a file. @@ -66,6 +70,7 @@ public class KeyValueDataFileWriter private final SimpleStatsConverter valueStatsConverter; private final InternalRowSerializer keySerializer; private final FileSource fileSource; + @Nullable private final DataFileIndexWriter dataFileIndexWriter; private BinaryRow minKey = null; private InternalRow maxKey = null; @@ -85,7 +90,8 @@ public KeyValueDataFileWriter( int level, String compression, CoreOptions options, - FileSource fileSource) { + FileSource fileSource, + FileIndexOptions fileIndexOptions) { super( fileIO, factory, @@ -107,12 +113,19 @@ public KeyValueDataFileWriter( this.valueStatsConverter = new SimpleStatsConverter(valueType, options.statsDenseStore()); this.keySerializer = new InternalRowSerializer(keyType); this.fileSource = fileSource; + this.dataFileIndexWriter = + DataFileIndexWriter.create( + fileIO, dataFileToFileIndexPath(path), valueType, fileIndexOptions); } @Override public void write(KeyValue kv) throws IOException { super.write(kv); + if (dataFileIndexWriter != null) { + dataFileIndexWriter.write(kv.value()); + } + updateMinKey(kv); updateMaxKey(kv); @@ -165,6 +178,11 @@ public DataFileMeta result() throws IOException { Pair, SimpleStats> valueStatsPair = valueStatsConverter.toBinary(valFieldStats); + DataFileIndexWriter.FileIndexResult indexResult = + dataFileIndexWriter == null + ? DataFileIndexWriter.EMPTY_RESULT + : dataFileIndexWriter.result(); + return new DataFileMeta( path.getName(), fileIO.getFileSize(path), @@ -177,10 +195,20 @@ public DataFileMeta result() throws IOException { maxSeqNumber, schemaId, level, + indexResult.independentIndexFile() == null + ? Collections.emptyList() + : Collections.singletonList(indexResult.independentIndexFile()), deleteRecordCount, - // TODO: enable file filter for primary key table (e.g. deletion table). - null, + indexResult.embeddedIndexBytes(), fileSource, valueStatsPair.getKey()); } + + @Override + public void close() throws IOException { + if (dataFileIndexWriter != null) { + dataFileIndexWriter.close(); + } + super.close(); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java index 922b06ee82299..a6fddb43283a1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java @@ -23,6 +23,7 @@ import org.apache.paimon.KeyValueSerializer; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.fileindex.FileIndexOptions; import org.apache.paimon.format.FileFormat; import org.apache.paimon.format.FormatWriterFactory; import org.apache.paimon.format.SimpleStatsExtractor; @@ -52,6 +53,7 @@ public class KeyValueFileWriterFactory { private final WriteFormatContext formatContext; private final long suggestedFileSize; private final CoreOptions options; + private final FileIndexOptions fileIndexOptions; private KeyValueFileWriterFactory( FileIO fileIO, @@ -68,6 +70,7 @@ private KeyValueFileWriterFactory( this.formatContext = formatContext; this.suggestedFileSize = suggestedFileSize; this.options = options; + this.fileIndexOptions = options.indexColumnsOptions(); } public RowType keyType() { @@ -117,7 +120,8 @@ private KeyValueDataFileWriter createDataFileWriter( level, formatContext.compression(level), options, - fileSource); + fileSource, + fileIndexOptions); } public void deleteFile(String filename, int level) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java index 8300bdcfaff98..b5683fbe0090b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions.ChangelogProducer; import org.apache.paimon.CoreOptions.MergeEngine; import org.apache.paimon.KeyValueFileStore; +import org.apache.paimon.fileindex.FileIndexPredicate; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFile; @@ -31,11 +32,17 @@ import org.apache.paimon.stats.SimpleStatsEvolution; import org.apache.paimon.stats.SimpleStatsEvolutions; import org.apache.paimon.table.source.ScanMode; +import org.apache.paimon.types.RowType; import org.apache.paimon.utils.SnapshotManager; +import javax.annotation.Nullable; + +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.apache.paimon.CoreOptions.MergeEngine.AGGREGATE; import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW; @@ -54,6 +61,10 @@ public class KeyValueFileStoreScan extends AbstractFileStoreScan { private final MergeEngine mergeEngine; private final ChangelogProducer changelogProducer; + private final boolean fileIndexReadEnabled; + // just cache. + private final Map dataFilterMapping = new HashMap<>(); + public KeyValueFileStoreScan( ManifestsReader manifestsReader, BucketSelectConverter bucketSelectConverter, @@ -65,7 +76,8 @@ public KeyValueFileStoreScan( Integer scanManifestParallelism, boolean deletionVectorsEnabled, MergeEngine mergeEngine, - ChangelogProducer changelogProducer) { + ChangelogProducer changelogProducer, + boolean fileIndexReadEnabled) { super( manifestsReader, snapshotManager, @@ -85,6 +97,7 @@ public KeyValueFileStoreScan( this.deletionVectorsEnabled = deletionVectorsEnabled; this.mergeEngine = mergeEngine; this.changelogProducer = changelogProducer; + this.fileIndexReadEnabled = fileIndexReadEnabled; } public KeyValueFileStoreScan withKeyFilter(Predicate predicate) { @@ -118,6 +131,28 @@ protected boolean filterByStats(ManifestEntry entry) { return true; } + private boolean filterByFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestEntry entry) { + if (embeddedIndexBytes == null) { + return true; + } + + RowType dataRowType = scanTableSchema(entry.file().schemaId()).logicalRowType(); + + Predicate dataPredicate = + dataFilterMapping.computeIfAbsent( + entry.file().schemaId(), + id -> + fieldValueStatsConverters.convertFilter( + entry.file().schemaId(), valueFilter)); + + try (FileIndexPredicate predicate = + new FileIndexPredicate(embeddedIndexBytes, dataRowType)) { + return predicate.testPredicate(dataPredicate); + } catch (IOException e) { + throw new RuntimeException("Exception happens while checking predicate.", e); + } + } + private boolean isValueFilterEnabled(ManifestEntry entry) { if (valueFilter == null) { return false; @@ -181,7 +216,12 @@ private boolean filterByValueFilter(ManifestEntry entry) { .getOrCreate(file.schemaId()) .evolution(file.valueStats(), file.rowCount(), file.valueStatsCols()); return valueFilter.test( - file.rowCount(), result.minValues(), result.maxValues(), result.nullCounts()); + file.rowCount(), + result.minValues(), + result.maxValues(), + result.nullCounts()) + && (!fileIndexReadEnabled + || filterByFileIndex(entry.file().embeddedIndex(), entry)); } private static boolean noOverlapping(List entries) { diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index 1ecfd6f910bc8..dca86aa61ec28 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -672,10 +672,6 @@ private void writeBranchData(FileStoreTable table) throws Exception { @Test public void testReadFilter() throws Exception { FileStoreTable table = createFileStoreTable(); - if (table.coreOptions().fileFormat().getFormatIdentifier().equals("parquet")) { - // TODO support parquet reader filter push down - return; - } StreamTableWrite write = table.newWrite(commitUser); StreamTableCommit commit = table.newCommit(commitUser); @@ -791,6 +787,81 @@ public void testWithShardDeletionVectors() throws Exception { innerTestWithShard(table); } + @Test + public void testDeletionVectorsWithFileIndexInFile() throws Exception { + FileStoreTable table = + createFileStoreTable( + conf -> { + conf.set(BUCKET, 1); + conf.set(DELETION_VECTORS_ENABLED, true); + conf.set(TARGET_FILE_SIZE, MemorySize.ofBytes(1)); + conf.set("file-index.bloom-filter.columns", "b"); + }); + + StreamTableWrite write = + table.newWrite(commitUser).withIOManager(new IOManagerImpl(tempDir.toString())); + StreamTableCommit commit = table.newCommit(commitUser); + + write.write(rowData(1, 1, 300L)); + write.write(rowData(1, 2, 400L)); + write.write(rowData(1, 3, 200L)); + write.write(rowData(1, 4, 500L)); + commit.commit(0, write.prepareCommit(true, 0)); + + write.write(rowData(1, 5, 100L)); + write.write(rowData(1, 6, 600L)); + write.write(rowData(1, 7, 400L)); + commit.commit(1, write.prepareCommit(true, 1)); + + PredicateBuilder builder = new PredicateBuilder(ROW_TYPE); + List splits = toSplits(table.newSnapshotReader().read().dataSplits()); + assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(2); + TableRead read = table.newRead().withFilter(builder.equal(2, 300L)); + assertThat(getResult(read, splits, BATCH_ROW_TO_STRING)) + .hasSameElementsAs( + Arrays.asList( + "1|1|300|binary|varbinary|mapKey:mapVal|multiset", + "1|2|400|binary|varbinary|mapKey:mapVal|multiset", + "1|3|200|binary|varbinary|mapKey:mapVal|multiset", + "1|4|500|binary|varbinary|mapKey:mapVal|multiset")); + } + + @Test + public void testDeletionVectorsWithFileIndexInMeta() throws Exception { + FileStoreTable table = + createFileStoreTable( + conf -> { + conf.set(BUCKET, 1); + conf.set(DELETION_VECTORS_ENABLED, true); + conf.set(TARGET_FILE_SIZE, MemorySize.ofBytes(1)); + conf.set("file-index.bloom-filter.columns", "b"); + conf.set("file-index.bloom-filter.b.items", "20"); + }); + + StreamTableWrite write = + table.newWrite(commitUser).withIOManager(new IOManagerImpl(tempDir.toString())); + StreamTableCommit commit = table.newCommit(commitUser); + + write.write(rowData(1, 1, 300L)); + write.write(rowData(1, 2, 400L)); + write.write(rowData(1, 3, 200L)); + write.write(rowData(1, 4, 500L)); + commit.commit(0, write.prepareCommit(true, 0)); + + write.write(rowData(1, 5, 100L)); + write.write(rowData(1, 6, 600L)); + write.write(rowData(1, 7, 400L)); + commit.commit(1, write.prepareCommit(true, 1)); + + PredicateBuilder builder = new PredicateBuilder(ROW_TYPE); + Predicate predicate = builder.equal(2, 300L); + + List splits = + toSplits(table.newSnapshotReader().withFilter(predicate).read().dataSplits()); + + assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(1); + } + @Test public void testWithShardFirstRow() throws Exception { FileStoreTable table =