From d22e830e8229b873332da57f7031ff299ff551ee Mon Sep 17 00:00:00 2001 From: sxnan Date: Thu, 10 Oct 2024 15:46:59 +0800 Subject: [PATCH] PrimaryKeyPartialLookupTable support cache row filter --- .../paimon/io/KeyValueFileReaderFactory.java | 30 +++++++- .../paimon/table/query/LocalTableQuery.java | 11 ++- .../paimon/io/DataFileTestDataGenerator.java | 2 +- .../paimon/io/KeyValueFileReadWriteTest.java | 77 ++++++++++++++++++- .../lookup/PrimaryKeyPartialLookupTable.java | 36 ++++++--- .../paimon/flink/lookup/LookupTableTest.java | 33 +++++--- 6 files changed, 161 insertions(+), 28 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index fdbb727e56747..8e2e97a93162b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -42,6 +42,7 @@ import org.apache.paimon.utils.BulkFormatMapping; import org.apache.paimon.utils.BulkFormatMapping.BulkFormatMappingBuilder; import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.Filter; import javax.annotation.Nullable; @@ -102,11 +103,23 @@ public RecordReader createRecordReader(DataFileMeta file) throws IOExc public RecordReader createRecordReader( long schemaId, String fileName, long fileSize, int level) throws IOException { + return createRecordReader(schemaId, fileName, fileSize, level, null); + } + + public RecordReader createRecordReader( + long schemaId, + String fileName, + long fileSize, + int level, + @Nullable Filter valueFilter) + throws IOException { if (fileSize >= asyncThreshold && fileName.endsWith(".orc")) { return new AsyncRecordReader<>( - () -> createRecordReader(schemaId, fileName, level, false, 2, fileSize)); + () -> + createRecordReader( + schemaId, fileName, level, false, 2, fileSize, valueFilter)); } - return createRecordReader(schemaId, fileName, level, true, null, fileSize); + return createRecordReader(schemaId, fileName, level, true, null, fileSize, valueFilter); } private RecordReader createRecordReader( @@ -115,7 +128,8 @@ private RecordReader createRecordReader( int level, boolean reuseFormat, @Nullable Integer orcPoolSize, - long fileSize) + long fileSize, + @Nullable Filter valueFilter) throws IOException { String formatIdentifier = DataFilePathFactory.formatIdentifier(fileName); @@ -151,7 +165,15 @@ private RecordReader createRecordReader( new ApplyDeletionVectorReader(fileRecordReader, deletionVector.get()); } - return new KeyValueDataFileRecordReader(fileRecordReader, keyType, valueType, level); + RecordReader keyValueDataFileRecordReader = + new KeyValueDataFileRecordReader(fileRecordReader, keyType, valueType, level); + if (valueFilter != null) { + keyValueDataFileRecordReader = + keyValueDataFileRecordReader.filter( + keyValue -> valueFilter.test(keyValue.value())); + } + + return keyValueDataFileRecordReader; } public static Builder builder( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java index 6b19f9c051f3f..1c55e7972e66c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java @@ -39,6 +39,7 @@ import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.KeyComparatorSupplier; import org.apache.paimon.utils.Preconditions; @@ -79,6 +80,8 @@ public class LocalTableQuery implements TableQuery { private final RowType rowType; private final RowType partitionType; + @Nullable private Filter cacheRowFilter; + public LocalTableQuery(FileStoreTable table) { this.options = table.coreOptions(); this.tableView = new HashMap<>(); @@ -159,7 +162,8 @@ private void newLookupLevels(BinaryRow partition, int bucket, List file.schemaId(), file.fileName(), file.fileSize(), - file.level()), + file.level(), + cacheRowFilter), file -> Preconditions.checkNotNull(ioManager, "IOManager is required.") .createChannel( @@ -206,6 +210,11 @@ public LocalTableQuery withIOManager(IOManager ioManager) { return this; } + public LocalTableQuery withCacheRowFilter(Filter cacheRowFilter) { + this.cacheRowFilter = cacheRowFilter; + return this; + } + @Override public InternalRowSerializer createValueSerializer() { return InternalSerializers.create(readerFactoryBuilder.readValueType()); diff --git a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java index 810cef8607847..dd7e691b97cf4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java @@ -175,7 +175,7 @@ public static class Data { public final DataFileMeta meta; public final List content; - private Data(BinaryRow partition, int bucket, DataFileMeta meta, List content) { + Data(BinaryRow partition, int bucket, DataFileMeta meta, List content) { this.partition = partition; this.bucket = bucket; this.meta = meta; diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index 3c89552310f62..0f8fbdfb44112 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -24,6 +24,7 @@ import org.apache.paimon.TestKeyValueGenerator; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.format.FlushingFileFormat; @@ -41,6 +42,7 @@ import org.apache.paimon.utils.CloseableIterator; import org.apache.paimon.utils.FailingFileIO; import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.Filter; import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; @@ -56,6 +58,7 @@ import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Function; +import java.util.stream.Collectors; import static org.apache.paimon.TestKeyValueGenerator.DEFAULT_ROW_TYPE; import static org.apache.paimon.TestKeyValueGenerator.KEY_TYPE; @@ -219,6 +222,56 @@ public void testReadValueType() throws Exception { kv.value().getInt(1)))); } + @Test + public void testReadValueTypeWithRowFilter() throws Exception { + DataFileTestDataGenerator.Data data = gen.next(); + KeyValueFileWriterFactory writerFactory = createWriterFactory(tempDir.toString(), "avro"); + DataFileMetaSerializer serializer = new DataFileMetaSerializer(); + + RollingFileWriter writer = + writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND); + writer.write(CloseableIterator.fromList(data.content, kv -> {})); + writer.close(); + List actualMetas = writer.result(); + + // projection: + // (dt, hr, shopId, orderId, itemId, priceAmount, comment) -> + // (shopId, itemId, dt, hr) + RowType readValueType = DEFAULT_ROW_TYPE.project("shopId", "itemId", "dt", "hr"); + KeyValueFileReaderFactory readerFactory = + createReaderFactory(tempDir.toString(), "avro", null, readValueType); + InternalRowSerializer projectedValueSerializer = new InternalRowSerializer(readValueType); + + List expectedValues = + data.content.stream() + .filter(kv -> kv.key().getInt(0) % 2 == 0) + .collect(Collectors.toList()); + DataFileTestDataGenerator.Data expectedData = + new DataFileTestDataGenerator.Data( + data.partition, data.bucket, data.meta, expectedValues); + assertData( + expectedData, + actualMetas, + TestKeyValueGenerator.KEY_SERIALIZER, + projectedValueSerializer, + serializer, + readerFactory, + kv -> + new KeyValue() + .replace( + kv.key(), + kv.sequenceNumber(), + kv.valueKind(), + GenericRow.of( + kv.value().getInt(2), + kv.value().isNullAt(4) + ? null + : kv.value().getLong(4), + kv.value().getString(0), + kv.value().getInt(1))), + internalRow -> internalRow.getInt(0) % 2 == 0); + } + protected KeyValueFileWriterFactory createWriterFactory(String pathStr, String format) { Path path = new Path(pathStr); FileStorePathFactory pathFactory = @@ -294,6 +347,27 @@ private void assertData( KeyValueFileReaderFactory readerFactory, Function toExpectedKv) throws Exception { + assertData( + data, + actualMetas, + keySerializer, + projectedValueSerializer, + dataFileMetaSerializer, + readerFactory, + toExpectedKv, + null); + } + + private void assertData( + DataFileTestDataGenerator.Data data, + List actualMetas, + InternalRowSerializer keySerializer, + InternalRowSerializer projectedValueSerializer, + DataFileMetaSerializer dataFileMetaSerializer, + KeyValueFileReaderFactory readerFactory, + Function toExpectedKv, + Filter rowFilter) + throws Exception { Iterator expectedIterator = data.content.iterator(); for (DataFileMeta meta : actualMetas) { // check the contents of data file @@ -303,7 +377,8 @@ private void assertData( meta.schemaId(), meta.fileName(), meta.fileSize(), - meta.level())); + meta.level(), + rowFilter)); while (actualKvsIterator.hasNext()) { assertThat(expectedIterator.hasNext()).isTrue(); KeyValue actualKv = actualKvsIterator.next(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java index 6c6979eeebc26..ef5543ac9b7cf 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java @@ -31,6 +31,7 @@ import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.StreamTableScan; +import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.ProjectedRow; import javax.annotation.Nullable; @@ -41,23 +42,21 @@ import java.util.Collections; import java.util.List; import java.util.Set; -import java.util.function.Function; /** Lookup table for primary key which supports to read the LSM tree directly. */ public class PrimaryKeyPartialLookupTable implements LookupTable { - private final Function executorFactory; + private final QueryExecutorFactory executorFactory; private final FixedBucketFromPkExtractor extractor; @Nullable private final ProjectedRow keyRearrange; @Nullable private final ProjectedRow trimmedKeyRearrange; private Predicate specificPartition; + @Nullable private Filter cacheRowFilter; private QueryExecutor queryExecutor; private PrimaryKeyPartialLookupTable( - Function executorFactory, - FileStoreTable table, - List joinKey) { + QueryExecutorFactory executorFactory, FileStoreTable table, List joinKey) { this.executorFactory = executorFactory; if (table.bucketMode() != BucketMode.HASH_FIXED) { @@ -103,7 +102,7 @@ public void specificPartitionFilter(Predicate filter) { @Override public void open() throws Exception { - this.queryExecutor = executorFactory.apply(specificPartition); + this.queryExecutor = executorFactory.create(specificPartition, cacheRowFilter); refresh(); } @@ -135,6 +134,11 @@ public void refresh() { queryExecutor.refresh(); } + @Override + public void specifyCacheRowFilter(Filter filter) { + this.cacheRowFilter = filter; + } + @Override public void close() throws IOException { if (queryExecutor != null) { @@ -149,13 +153,14 @@ public static PrimaryKeyPartialLookupTable createLocalTable( List joinKey, Set requireCachedBucketIds) { return new PrimaryKeyPartialLookupTable( - filter -> + (filter, cacheRowFilter) -> new LocalQueryExecutor( new LookupFileStoreTable(table, joinKey), projection, tempPath, filter, - requireCachedBucketIds), + requireCachedBucketIds, + cacheRowFilter), table, joinKey); } @@ -163,7 +168,13 @@ public static PrimaryKeyPartialLookupTable createLocalTable( public static PrimaryKeyPartialLookupTable createRemoteTable( FileStoreTable table, int[] projection, List joinKey) { return new PrimaryKeyPartialLookupTable( - filter -> new RemoteQueryExecutor(table, projection), table, joinKey); + (filter, cacheRowFilter) -> new RemoteQueryExecutor(table, projection), + table, + joinKey); + } + + interface QueryExecutorFactory { + QueryExecutor create(Predicate filter, @Nullable Filter cacheRowFilter); } interface QueryExecutor extends Closeable { @@ -183,12 +194,17 @@ private LocalQueryExecutor( int[] projection, File tempPath, @Nullable Predicate filter, - Set requireCachedBucketIds) { + Set requireCachedBucketIds, + @Nullable Filter cacheRowFilter) { this.tableQuery = table.newLocalTableQuery() .withValueProjection(projection) .withIOManager(new IOManagerImpl(tempPath.toString())); + if (cacheRowFilter != null) { + this.tableQuery.withCacheRowFilter(cacheRowFilter); + } + this.scan = table.newReadBuilder() .withFilter(filter) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java index 8141ff4f9448a..e17c82210f7a3 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java @@ -715,6 +715,28 @@ public void testPartialLookupTable() throws Exception { assertThat(result).hasSize(0); } + @Test + public void testPartialLookupTableWithRowFilter() throws Exception { + + Options options = new Options(); + options.set(CoreOptions.BUCKET.key(), "2"); + options.set(CoreOptions.BUCKET_KEY.key(), "f0"); + FileStoreTable dimTable = createTable(singletonList("f0"), options); + write(dimTable, GenericRow.of(1, 11, 111), GenericRow.of(2, 22, 222)); + + PrimaryKeyPartialLookupTable table = + PrimaryKeyPartialLookupTable.createLocalTable( + dimTable, new int[] {0, 2}, tempDir.toFile(), ImmutableList.of("f0"), null); + table.specifyCacheRowFilter(row -> row.getInt(0) < 2); + table.open(); + + List result = table.get(row(1, 11)); + assertThat(result).hasSize(1); + + result = table.get(row(2, 22)); + assertThat(result).isEmpty(); + } + @Test public void testPartialLookupTableWithProjection() throws Exception { FileStoreTable dimTable = createDimTable(); @@ -989,15 +1011,4 @@ private static void assertRow(InternalRow resultRow, int... expected) { assertThat(results).containsExactly(expected); assertThat(resultRow.getFieldCount()).isEqualTo(expected.length); } - - private void writeAndCommit(FileStoreTable table, InternalRow... rows) throws Exception { - BatchWriteBuilder builder = table.newBatchWriteBuilder(); - try (BatchTableWrite writer = builder.newWrite(); - BatchTableCommit commiter = builder.newCommit()) { - for (InternalRow row : rows) { - writer.write(row, 0); - } - commiter.commit(writer.prepareCommit()); - } - } }