From cc057880dd52e8c920b6be6f424f5460ed5bcb56 Mon Sep 17 00:00:00 2001 From: sxnan Date: Thu, 10 Oct 2024 15:46:59 +0800 Subject: [PATCH] PrimaryKeyPartialLookupTable support cache row filter --- .../paimon/table/query/LocalTableQuery.java | 29 +++++++++++---- .../lookup/PrimaryKeyPartialLookupTable.java | 36 +++++++++++++------ .../paimon/flink/lookup/LookupTableTest.java | 32 +++++++++++------ 3 files changed, 70 insertions(+), 27 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java index 6b19f9c051f3..d5b392d9e099 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java @@ -37,8 +37,10 @@ import org.apache.paimon.mergetree.LookupFile; import org.apache.paimon.mergetree.LookupLevels; import org.apache.paimon.options.Options; +import org.apache.paimon.reader.RecordReader; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.KeyComparatorSupplier; import org.apache.paimon.utils.Preconditions; @@ -79,6 +81,8 @@ public class LocalTableQuery implements TableQuery { private final RowType rowType; private final RowType partitionType; + @Nullable private Filter cacheRowFilter; + public LocalTableQuery(FileStoreTable table) { this.options = table.coreOptions(); this.tableView = new HashMap<>(); @@ -154,12 +158,20 @@ private void newLookupLevels(BinaryRow partition, int bucket, List keyComparatorSupplier.get(), readerFactoryBuilder.keyType(), new LookupLevels.KeyValueProcessor(readerFactoryBuilder.readValueType()), - file -> - factory.createRecordReader( - file.schemaId(), - file.fileName(), - file.fileSize(), - file.level()), + file -> { + RecordReader reader = + factory.createRecordReader( + file.schemaId(), + file.fileName(), + file.fileSize(), + file.level()); + if (cacheRowFilter != null) { + reader = + reader.filter( + keyValue -> cacheRowFilter.test(keyValue.value())); + } + return reader; + }, file -> Preconditions.checkNotNull(ioManager, "IOManager is required.") .createChannel( @@ -206,6 +218,11 @@ public LocalTableQuery withIOManager(IOManager ioManager) { return this; } + public LocalTableQuery withCacheRowFilter(Filter cacheRowFilter) { + this.cacheRowFilter = cacheRowFilter; + return this; + } + @Override public InternalRowSerializer createValueSerializer() { return InternalSerializers.create(readerFactoryBuilder.readValueType()); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java index 6c6979eeebc2..ef5543ac9b7c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java @@ -31,6 +31,7 @@ import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.StreamTableScan; +import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.ProjectedRow; import javax.annotation.Nullable; @@ -41,23 +42,21 @@ import java.util.Collections; import java.util.List; import java.util.Set; -import java.util.function.Function; /** Lookup table for primary key which supports to read the LSM tree directly. */ public class PrimaryKeyPartialLookupTable implements LookupTable { - private final Function executorFactory; + private final QueryExecutorFactory executorFactory; private final FixedBucketFromPkExtractor extractor; @Nullable private final ProjectedRow keyRearrange; @Nullable private final ProjectedRow trimmedKeyRearrange; private Predicate specificPartition; + @Nullable private Filter cacheRowFilter; private QueryExecutor queryExecutor; private PrimaryKeyPartialLookupTable( - Function executorFactory, - FileStoreTable table, - List joinKey) { + QueryExecutorFactory executorFactory, FileStoreTable table, List joinKey) { this.executorFactory = executorFactory; if (table.bucketMode() != BucketMode.HASH_FIXED) { @@ -103,7 +102,7 @@ public void specificPartitionFilter(Predicate filter) { @Override public void open() throws Exception { - this.queryExecutor = executorFactory.apply(specificPartition); + this.queryExecutor = executorFactory.create(specificPartition, cacheRowFilter); refresh(); } @@ -135,6 +134,11 @@ public void refresh() { queryExecutor.refresh(); } + @Override + public void specifyCacheRowFilter(Filter filter) { + this.cacheRowFilter = filter; + } + @Override public void close() throws IOException { if (queryExecutor != null) { @@ -149,13 +153,14 @@ public static PrimaryKeyPartialLookupTable createLocalTable( List joinKey, Set requireCachedBucketIds) { return new PrimaryKeyPartialLookupTable( - filter -> + (filter, cacheRowFilter) -> new LocalQueryExecutor( new LookupFileStoreTable(table, joinKey), projection, tempPath, filter, - requireCachedBucketIds), + requireCachedBucketIds, + cacheRowFilter), table, joinKey); } @@ -163,7 +168,13 @@ public static PrimaryKeyPartialLookupTable createLocalTable( public static PrimaryKeyPartialLookupTable createRemoteTable( FileStoreTable table, int[] projection, List joinKey) { return new PrimaryKeyPartialLookupTable( - filter -> new RemoteQueryExecutor(table, projection), table, joinKey); + (filter, cacheRowFilter) -> new RemoteQueryExecutor(table, projection), + table, + joinKey); + } + + interface QueryExecutorFactory { + QueryExecutor create(Predicate filter, @Nullable Filter cacheRowFilter); } interface QueryExecutor extends Closeable { @@ -183,12 +194,17 @@ private LocalQueryExecutor( int[] projection, File tempPath, @Nullable Predicate filter, - Set requireCachedBucketIds) { + Set requireCachedBucketIds, + @Nullable Filter cacheRowFilter) { this.tableQuery = table.newLocalTableQuery() .withValueProjection(projection) .withIOManager(new IOManagerImpl(tempPath.toString())); + if (cacheRowFilter != null) { + this.tableQuery.withCacheRowFilter(cacheRowFilter); + } + this.scan = table.newReadBuilder() .withFilter(filter) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java index 8141ff4f9448..46c61a15bd8a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java @@ -715,6 +715,27 @@ public void testPartialLookupTable() throws Exception { assertThat(result).hasSize(0); } + @Test + public void testPartialLookupTableWithRowFilter() throws Exception { + Options options = new Options(); + options.set(CoreOptions.BUCKET.key(), "2"); + options.set(CoreOptions.BUCKET_KEY.key(), "f0"); + FileStoreTable dimTable = createTable(singletonList("f0"), options); + write(dimTable, GenericRow.of(1, 11, 111), GenericRow.of(2, 22, 222)); + + PrimaryKeyPartialLookupTable table = + PrimaryKeyPartialLookupTable.createLocalTable( + dimTable, new int[] {0, 2}, tempDir.toFile(), ImmutableList.of("f0"), null); + table.specifyCacheRowFilter(row -> row.getInt(0) < 2); + table.open(); + + List result = table.get(row(1, 11)); + assertThat(result).hasSize(1); + + result = table.get(row(2, 22)); + assertThat(result).isEmpty(); + } + @Test public void testPartialLookupTableWithProjection() throws Exception { FileStoreTable dimTable = createDimTable(); @@ -989,15 +1010,4 @@ private static void assertRow(InternalRow resultRow, int... expected) { assertThat(results).containsExactly(expected); assertThat(resultRow.getFieldCount()).isEqualTo(expected.length); } - - private void writeAndCommit(FileStoreTable table, InternalRow... rows) throws Exception { - BatchWriteBuilder builder = table.newBatchWriteBuilder(); - try (BatchTableWrite writer = builder.newWrite(); - BatchTableCommit commiter = builder.newCommit()) { - for (InternalRow row : rows) { - writer.write(row, 0); - } - commiter.commit(writer.prepareCommit()); - } - } }