diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index e9b4cc5fd00e..a44c91ba54c5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -228,7 +228,8 @@ public void pushdown(Predicate keyFilter) { options.bucket(), forWrite, options.scanManifestParallelism(), - branchName); + branchName, + options.deletionVectorsEnabled()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java index 90d2c1b184e4..8e37d1e4fac1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java @@ -43,6 +43,7 @@ public class KeyValueFileStoreScan extends AbstractFileStoreScan { private Predicate keyFilter; private Predicate valueFilter; + private final boolean deletionVectorsEnabled; public KeyValueFileStoreScan( RowType partitionType, @@ -56,7 +57,8 @@ public KeyValueFileStoreScan( int numOfBuckets, boolean checkNumOfBuckets, Integer scanManifestParallelism, - String branchName) { + String branchName, + boolean deletionVectorsEnabled) { super( partitionType, bucketFilter, @@ -74,6 +76,7 @@ public KeyValueFileStoreScan( this.fieldValueStatsConverters = new FieldStatsConverters( sid -> keyValueFieldsExtractor.valueFields(scanTableSchema(sid)), schemaId); + this.deletionVectorsEnabled = deletionVectorsEnabled; } public KeyValueFileStoreScan withKeyFilter(Predicate predicate) { @@ -90,14 +93,26 @@ public KeyValueFileStoreScan withValueFilter(Predicate predicate) { /** Note: Keep this thread-safe. */ @Override protected boolean filterByStats(ManifestEntry entry) { - if (keyFilter == null) { + Predicate filter = null; + FieldStatsArraySerializer serializer = null; + BinaryTableStats stats = null; + if (deletionVectorsEnabled && entry.level() > 0 && valueFilter != null) { + filter = valueFilter; + serializer = fieldValueStatsConverters.getOrCreate(entry.file().schemaId()); + stats = entry.file().valueStats(); + } + + if (filter == null && keyFilter != null) { + filter = keyFilter; + serializer = fieldKeyStatsConverters.getOrCreate(entry.file().schemaId()); + stats = entry.file().keyStats(); + } + + if (filter == null) { return true; } - FieldStatsArraySerializer serializer = - fieldKeyStatsConverters.getOrCreate(entry.file().schemaId()); - BinaryTableStats stats = entry.file().keyStats(); - return keyFilter.test( + return filter.test( entry.file().rowCount(), serializer.evolution(stats.minValues()), serializer.evolution(stats.maxValues()), diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index 68adf63fb04d..c245a2ce7220 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -80,6 +80,7 @@ import static org.apache.paimon.CoreOptions.BUCKET; import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER; import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP; +import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED; import static org.apache.paimon.CoreOptions.TARGET_FILE_SIZE; import static org.apache.paimon.Snapshot.CommitKind.COMPACT; import static org.apache.paimon.data.DataFormatTestUtil.internalRowToString; @@ -1257,6 +1258,69 @@ public void testReadOptimizedTable() throws Exception { commit.close(); } + @Test + public void testReadDeletionVectorTable() throws Exception { + FileStoreTable table = + createFileStoreTable( + options -> { + // let level has many files + options.set(TARGET_FILE_SIZE, new MemorySize(1)); + options.set(DELETION_VECTORS_ENABLED, true); + }); + StreamTableWrite write = table.newWrite(commitUser); + IOManager ioManager = IOManager.create(tablePath.toString()); + write.withIOManager(ioManager); + StreamTableCommit commit = table.newCommit(commitUser); + + write.write(rowDataWithKind(RowKind.INSERT, 1, 10, 100L)); + write.write(rowDataWithKind(RowKind.INSERT, 2, 20, 100L)); + commit.commit(0, write.prepareCommit(true, 0)); + write.write(rowDataWithKind(RowKind.INSERT, 1, 20, 200L)); + commit.commit(1, write.prepareCommit(true, 1)); + write.write(rowDataWithKind(RowKind.INSERT, 1, 10, 110L)); + commit.commit(2, write.prepareCommit(true, 2)); + + // test result + Function rowDataToString = + row -> + internalRowToString( + row, + DataTypes.ROW( + DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT())); + List result = + getResult(table.newRead(), table.newScan().plan().splits(), rowDataToString); + assertThat(result) + .containsExactlyInAnyOrder("+I[1, 10, 110]", "+I[1, 20, 200]", "+I[2, 20, 100]"); + + // file layout + // pt 1 + // level 4 (1, 10, 110L) + // level 5 (1, 10, 100L), (1, 20, 200L) + // pt 2 + // level 5 (2, 20, 100L) + + // test filter on dv table + // with key filter pt = 1 + PredicateBuilder builder = new PredicateBuilder(ROW_TYPE); + Predicate filter = builder.equal(0, 1); + int files = + table.newScan().withFilter(filter).plan().splits().stream() + .mapToInt(split -> ((DataSplit) split).dataFiles().size()) + .sum(); + assertThat(files).isEqualTo(3); + + // with key filter pt = 1 and value filter idx2 = 110L + filter = and(filter, builder.equal(2, 110L)); + files = + table.newScan().withFilter(filter).plan().splits().stream() + .mapToInt(split -> ((DataSplit) split).dataFiles().size()) + .sum(); + assertThat(files).isEqualTo(1); + + write.close(); + commit.close(); + } + @Test public void testTableQueryForLookup() throws Exception { FileStoreTable table =