From 594e8940f5de6f09b1d1e328bff90e0f22b281fe Mon Sep 17 00:00:00 2001 From: sxnan Date: Thu, 10 Oct 2024 15:45:55 +0800 Subject: [PATCH] FullCacheLookupTable support cache row filter --- .../apache/paimon/table/TableTestBase.java | 14 ++ .../flink/lookup/FileStoreLookupFunction.java | 10 ++ .../flink/lookup/FullCacheLookupTable.java | 10 +- .../flink/lookup/LookupStreamingReader.java | 10 +- .../paimon/flink/lookup/LookupTable.java | 3 + .../paimon/flink/lookup/LookupTableTest.java | 134 ++++++++++++++++++ 6 files changed, 179 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java index eaaf8ca70bc8..7f850a7725b4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java @@ -57,6 +57,7 @@ import java.util.Map; import java.util.Random; import java.util.UUID; +import java.util.function.Function; import java.util.function.Predicate; import static org.assertj.core.api.Assertions.assertThat; @@ -110,6 +111,19 @@ protected final void write(Table table, Pair... rows) thro } } + protected void writeWithBucketAssigner( + Table table, Function bucketAssigner, InternalRow... rows) + throws Exception { + BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); + try (BatchTableWrite write = writeBuilder.newWrite(); + BatchTableCommit commit = writeBuilder.newCommit()) { + for (InternalRow row : rows) { + write.write(row, bucketAssigner.apply(row)); + } + commit.commit(write.prepareCommit()); + } + } + protected void write(Table table, InternalRow... rows) throws Exception { write(table, null, rows); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index 4090193de285..347370c6840e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -33,6 +33,7 @@ import org.apache.paimon.table.source.OutOfRangeException; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileIOUtils; +import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.RowDataToObjectArrayConverter; import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints; @@ -96,6 +97,8 @@ public class FileStoreLookupFunction implements Serializable, Closeable { protected FunctionContext functionContext; + @Nullable private Filter cacheRowFilter; + public FileStoreLookupFunction( Table table, int[] projection, int[] joinKeyIndex, @Nullable Predicate predicate) { if (!TableScanUtils.supportCompactDiffStreamingReading(table)) { @@ -195,6 +198,9 @@ private void open() throws Exception { } refreshDynamicPartition(false); + if (cacheRowFilter != null) { + lookupTable.specifyCacheRowFilter(cacheRowFilter); + } lookupTable.open(); } @@ -361,4 +367,8 @@ protected Set getRequireCachedBucketIds() { // TODO: Implement the method when Flink support bucket shuffle for lookup join. return null; } + + protected void setCacheRowFilter(@Nullable Filter cacheRowFilter) { + this.cacheRowFilter = cacheRowFilter; + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java index e9389f1f291a..89134585b9af 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java @@ -37,6 +37,7 @@ import org.apache.paimon.utils.ExecutorUtils; import org.apache.paimon.utils.FieldsComparator; import org.apache.paimon.utils.FileIOUtils; +import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.MutableObjectIterator; import org.apache.paimon.utils.PartialRow; import org.apache.paimon.utils.TypeUtils; @@ -85,6 +86,7 @@ public abstract class FullCacheLookupTable implements LookupTable { private Future refreshFuture; private LookupStreamingReader reader; private Predicate specificPartition; + @Nullable private Filter cacheRowFilter; public FullCacheLookupTable(Context context) { this.table = context.table; @@ -138,6 +140,11 @@ public void specificPartitionFilter(Predicate filter) { this.specificPartition = filter; } + @Override + public void specifyCacheRowFilter(Filter filter) { + this.cacheRowFilter = filter; + } + protected void openStateFactory() throws Exception { this.stateFactory = new RocksDBStateFactory( @@ -154,7 +161,8 @@ protected void bootstrap() throws Exception { context.table, context.projection, scanPredicate, - context.requiredCachedBucketIds); + context.requiredCachedBucketIds, + cacheRowFilter); BinaryExternalSortBuffer bulkLoadSorter = RocksDBState.createBulkLoadSorter( IOManager.create(context.tempPath.toString()), context.table.coreOptions()); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java index e6dfd41f8f0e..132b30138d0a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java @@ -30,6 +30,7 @@ import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.StreamTableScan; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.FunctionWithIOException; import org.apache.paimon.utils.TypeUtils; @@ -51,6 +52,7 @@ public class LookupStreamingReader { private final LookupFileStoreTable table; private final int[] projection; + @Nullable private final Filter cacheRowFilter; private final ReadBuilder readBuilder; @Nullable private final Predicate projectedPredicate; private final StreamTableScan scan; @@ -59,9 +61,11 @@ public LookupStreamingReader( LookupFileStoreTable table, int[] projection, @Nullable Predicate predicate, - Set requireCachedBucketIds) { + Set requireCachedBucketIds, + @Nullable Filter cacheRowFilter) { this.table = table; this.projection = projection; + this.cacheRowFilter = cacheRowFilter; this.readBuilder = this.table .newReadBuilder() @@ -125,6 +129,10 @@ public RecordReader nextBatch(boolean useParallelism) throws Except if (projectedPredicate != null) { reader = reader.filter(projectedPredicate::test); } + + if (cacheRowFilter != null) { + reader = reader.filter(cacheRowFilter); + } return reader; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java index c13947bbd819..8ea1931b96aa 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupTable.java @@ -20,6 +20,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.utils.Filter; import java.io.Closeable; import java.io.IOException; @@ -35,4 +36,6 @@ public interface LookupTable extends Closeable { List get(InternalRow key) throws IOException; void refresh() throws Exception; + + void specifyCacheRowFilter(Filter filter); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java index 619cb4c1d620..8141ff4f9448 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java @@ -559,6 +559,129 @@ public void testNoPrimaryKeyTableFilter() throws Exception { assertRow(result.get(1), 1, 11, 111); } + @Test + public void testPkTableWithCacheRowFilter() throws Exception { + FileStoreTable storeTable = createTable(singletonList("f0"), new Options()); + writeWithBucketAssigner( + storeTable, row -> 0, GenericRow.of(1, 11, 111), GenericRow.of(2, 22, 222)); + + FullCacheLookupTable.Context context = + new FullCacheLookupTable.Context( + storeTable, + new int[] {0, 1, 2}, + null, + null, + tempDir.toFile(), + singletonList("f0"), + null); + table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); + assertThat(table).isInstanceOf(PrimaryKeyLookupTable.class); + table.specifyCacheRowFilter(row -> row.getInt(0) < 2); + table.open(); + + List res = table.get(GenericRow.of(1)); + assertThat(res).hasSize(1); + assertRow(res.get(0), 1, 11, 111); + + res = table.get(GenericRow.of(2)); + assertThat(res).isEmpty(); + + writeWithBucketAssigner( + storeTable, row -> 0, GenericRow.of(0, 0, 0), GenericRow.of(3, 33, 333)); + res = table.get(GenericRow.of(0)); + assertThat(res).isEmpty(); + + table.refresh(); + res = table.get(GenericRow.of(0)); + assertThat(res).hasSize(1); + assertRow(res.get(0), 0, 0, 0); + + res = table.get(GenericRow.of(3)); + assertThat(res).isEmpty(); + } + + @Test + public void testNoPkTableWithCacheRowFilter() throws Exception { + FileStoreTable storeTable = createTable(emptyList(), new Options()); + writeWithBucketAssigner( + storeTable, row -> 0, GenericRow.of(1, 11, 111), GenericRow.of(2, 22, 222)); + + FullCacheLookupTable.Context context = + new FullCacheLookupTable.Context( + storeTable, + new int[] {0, 1, 2}, + null, + null, + tempDir.toFile(), + singletonList("f0"), + null); + table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); + assertThat(table).isInstanceOf(NoPrimaryKeyLookupTable.class); + table.specifyCacheRowFilter(row -> row.getInt(0) < 2); + table.open(); + + List res = table.get(GenericRow.of(1)); + assertThat(res).hasSize(1); + assertRow(res.get(0), 1, 11, 111); + + res = table.get(GenericRow.of(2)); + assertThat(res).isEmpty(); + + writeWithBucketAssigner( + storeTable, row -> 0, GenericRow.of(0, 0, 0), GenericRow.of(3, 33, 333)); + res = table.get(GenericRow.of(0)); + assertThat(res).isEmpty(); + + table.refresh(); + res = table.get(GenericRow.of(0)); + assertThat(res).hasSize(1); + assertRow(res.get(0), 0, 0, 0); + + res = table.get(GenericRow.of(3)); + assertThat(res).isEmpty(); + } + + @Test + public void testSecKeyTableWithCacheRowFilter() throws Exception { + FileStoreTable storeTable = createTable(singletonList("f0"), new Options()); + writeWithBucketAssigner( + storeTable, row -> 0, GenericRow.of(1, 11, 111), GenericRow.of(2, 22, 222)); + + FullCacheLookupTable.Context context = + new FullCacheLookupTable.Context( + storeTable, + new int[] {0, 1, 2}, + null, + null, + tempDir.toFile(), + singletonList("f1"), + null); + table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); + assertThat(table).isInstanceOf(SecondaryIndexLookupTable.class); + table.specifyCacheRowFilter(row -> row.getInt(1) < 22); + table.open(); + + List res = table.get(GenericRow.of(11)); + assertThat(res).hasSize(1); + assertRow(res.get(0), 1, 11, 111); + + res = table.get(GenericRow.of(22)); + assertThat(res).isEmpty(); + + writeWithBucketAssigner( + storeTable, row -> 0, GenericRow.of(0, 0, 0), GenericRow.of(3, 33, 333)); + res = table.get(GenericRow.of(0)); + assertThat(res).isEmpty(); + + table.refresh(); + res = table.get(GenericRow.of(0)); + assertThat(res).hasSize(1); + assertRow(res.get(0), 0, 0, 0); + + res = table.get(GenericRow.of(33)); + assertThat(res).isEmpty(); + } + @Test public void testPartialLookupTable() throws Exception { FileStoreTable dimTable = createDimTable(); @@ -866,4 +989,15 @@ private static void assertRow(InternalRow resultRow, int... expected) { assertThat(results).containsExactly(expected); assertThat(resultRow.getFieldCount()).isEqualTo(expected.length); } + + private void writeAndCommit(FileStoreTable table, InternalRow... rows) throws Exception { + BatchWriteBuilder builder = table.newBatchWriteBuilder(); + try (BatchTableWrite writer = builder.newWrite(); + BatchTableCommit commiter = builder.newCommit()) { + for (InternalRow row : rows) { + writer.write(row, 0); + } + commiter.commit(writer.prepareCommit()); + } + } }