Skip to content

Commit

Permalink
[core] Dv table supports value filter pushdown (apache#3024)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored Mar 18, 2024
1 parent f2c605e commit 17268f0
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ public void pushdown(Predicate keyFilter) {
options.bucket(),
forWrite,
options.scanManifestParallelism(),
branchName);
branchName,
options.deletionVectorsEnabled());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class KeyValueFileStoreScan extends AbstractFileStoreScan {

private Predicate keyFilter;
private Predicate valueFilter;
private final boolean deletionVectorsEnabled;

public KeyValueFileStoreScan(
RowType partitionType,
Expand All @@ -56,7 +57,8 @@ public KeyValueFileStoreScan(
int numOfBuckets,
boolean checkNumOfBuckets,
Integer scanManifestParallelism,
String branchName) {
String branchName,
boolean deletionVectorsEnabled) {
super(
partitionType,
bucketFilter,
Expand All @@ -74,6 +76,7 @@ public KeyValueFileStoreScan(
this.fieldValueStatsConverters =
new FieldStatsConverters(
sid -> keyValueFieldsExtractor.valueFields(scanTableSchema(sid)), schemaId);
this.deletionVectorsEnabled = deletionVectorsEnabled;
}

public KeyValueFileStoreScan withKeyFilter(Predicate predicate) {
Expand All @@ -90,14 +93,26 @@ public KeyValueFileStoreScan withValueFilter(Predicate predicate) {
/** Note: Keep this thread-safe. */
@Override
protected boolean filterByStats(ManifestEntry entry) {
if (keyFilter == null) {
Predicate filter = null;
FieldStatsArraySerializer serializer = null;
BinaryTableStats stats = null;
if (deletionVectorsEnabled && entry.level() > 0 && valueFilter != null) {
filter = valueFilter;
serializer = fieldValueStatsConverters.getOrCreate(entry.file().schemaId());
stats = entry.file().valueStats();
}

if (filter == null && keyFilter != null) {
filter = keyFilter;
serializer = fieldKeyStatsConverters.getOrCreate(entry.file().schemaId());
stats = entry.file().keyStats();
}

if (filter == null) {
return true;
}

FieldStatsArraySerializer serializer =
fieldKeyStatsConverters.getOrCreate(entry.file().schemaId());
BinaryTableStats stats = entry.file().keyStats();
return keyFilter.test(
return filter.test(
entry.file().rowCount(),
serializer.evolution(stats.minValues()),
serializer.evolution(stats.maxValues()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP;
import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
import static org.apache.paimon.CoreOptions.TARGET_FILE_SIZE;
import static org.apache.paimon.Snapshot.CommitKind.COMPACT;
import static org.apache.paimon.data.DataFormatTestUtil.internalRowToString;
Expand Down Expand Up @@ -1257,6 +1258,69 @@ public void testReadOptimizedTable() throws Exception {
commit.close();
}

@Test
public void testReadDeletionVectorTable() throws Exception {
FileStoreTable table =
createFileStoreTable(
options -> {
// let level has many files
options.set(TARGET_FILE_SIZE, new MemorySize(1));
options.set(DELETION_VECTORS_ENABLED, true);
});
StreamTableWrite write = table.newWrite(commitUser);
IOManager ioManager = IOManager.create(tablePath.toString());
write.withIOManager(ioManager);
StreamTableCommit commit = table.newCommit(commitUser);

write.write(rowDataWithKind(RowKind.INSERT, 1, 10, 100L));
write.write(rowDataWithKind(RowKind.INSERT, 2, 20, 100L));
commit.commit(0, write.prepareCommit(true, 0));
write.write(rowDataWithKind(RowKind.INSERT, 1, 20, 200L));
commit.commit(1, write.prepareCommit(true, 1));
write.write(rowDataWithKind(RowKind.INSERT, 1, 10, 110L));
commit.commit(2, write.prepareCommit(true, 2));

// test result
Function<InternalRow, String> rowDataToString =
row ->
internalRowToString(
row,
DataTypes.ROW(
DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT()));
List<String> result =
getResult(table.newRead(), table.newScan().plan().splits(), rowDataToString);
assertThat(result)
.containsExactlyInAnyOrder("+I[1, 10, 110]", "+I[1, 20, 200]", "+I[2, 20, 100]");

// file layout
// pt 1
// level 4 (1, 10, 110L)
// level 5 (1, 10, 100L), (1, 20, 200L)
// pt 2
// level 5 (2, 20, 100L)

// test filter on dv table
// with key filter pt = 1
PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
Predicate filter = builder.equal(0, 1);
int files =
table.newScan().withFilter(filter).plan().splits().stream()
.mapToInt(split -> ((DataSplit) split).dataFiles().size())
.sum();
assertThat(files).isEqualTo(3);

// with key filter pt = 1 and value filter idx2 = 110L
filter = and(filter, builder.equal(2, 110L));
files =
table.newScan().withFilter(filter).plan().splits().stream()
.mapToInt(split -> ((DataSplit) split).dataFiles().size())
.sum();
assertThat(files).isEqualTo(1);

write.close();
commit.close();
}

@Test
public void testTableQueryForLookup() throws Exception {
FileStoreTable table =
Expand Down

0 comments on commit 17268f0

Please sign in to comment.