From dd12fd4add98350249964c6959f1033a1d7d25d0 Mon Sep 17 00:00:00 2001 From: Wencong Liu <104502720+WencongLiu@users.noreply.github.com> Date: Fri, 31 May 2024 15:54:34 +0800 Subject: [PATCH] [flink] Support FileStoreLookupFunction to only cache required buckets (#3437) --- .../paimon/table/source/ReadBuilder.java | 20 +++++++++- .../paimon/table/source/ReadBuilderImpl.java | 21 +++++++++- .../paimon/table/FileStoreTableTestBase.java | 39 ++++++++++++++++--- .../flink/lookup/FileStoreLookupFunction.java | 23 ++++++++++- .../flink/lookup/FullCacheLookupTable.java | 13 ++++++- .../flink/lookup/LookupStreamingReader.java | 16 +++++++- .../lookup/PrimaryKeyPartialLookupTable.java | 26 +++++++++++-- .../paimon/flink/lookup/LookupTableTest.java | 39 ++++++++++++------- 8 files changed, 165 insertions(+), 32 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java index d0b03c966865..b7207927ca68 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java @@ -23,6 +23,7 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Filter; import java.io.Serializable; import java.util.Arrays; @@ -99,6 +100,16 @@ default ReadBuilder withFilter(List predicates) { /** Push partition filter. */ ReadBuilder withPartitionFilter(Map partitionSpec); + /** + * Push bucket filter. Note that this method cannot be used simultaneously with {@link + * #withShard(int, int)}. + * + *

Reason: Bucket filtering and sharding are different logical mechanisms for selecting + * subsets of table data. Applying both methods simultaneously introduces conflicting selection + * criteria. + */ + ReadBuilder withBucketFilter(Filter bucketFilter); + /** * Apply projection to the reader. * @@ -122,7 +133,14 @@ default ReadBuilder withProjection(int[] projection) { /** the row number pushed down. */ ReadBuilder withLimit(int limit); - /** Specify the shard to be read, and allocate sharded files to read records. */ + /** + * Specify the shard to be read, and allocate sharded files to read records. Note that this + * method cannot be used simultaneously with {@link #withBucketFilter(Filter)}. + * + *

Reason: Sharding and bucket filtering are different logical mechanisms for selecting + * subsets of table data. Applying both methods simultaneously introduces conflicting selection + * criteria. + */ ReadBuilder withShard(int indexOfThisSubtask, int numberOfParallelSubtasks); /** Create a {@link TableScan} to perform batch planning. */ diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java index 8813f9b3f413..e9b83340a881 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java @@ -22,6 +22,7 @@ import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.table.InnerTable; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.Projection; import org.apache.paimon.utils.TypeUtils; @@ -29,6 +30,8 @@ import java.util.Map; import java.util.Objects; +import static org.apache.paimon.utils.Preconditions.checkState; + /** Implementation for {@link ReadBuilder}. */ public class ReadBuilderImpl implements ReadBuilder { @@ -46,6 +49,8 @@ public class ReadBuilderImpl implements ReadBuilder { private Map partitionSpec; + private Filter bucketFilter; + public ReadBuilderImpl(InnerTable table) { this.table = table; } @@ -98,6 +103,12 @@ public ReadBuilder withShard(int indexOfThisSubtask, int numberOfParallelSubtask return this; } + @Override + public ReadBuilder withBucketFilter(Filter bucketFilter) { + this.bucketFilter = bucketFilter; + return this; + } + @Override public TableScan newScan() { InnerTableScan tableScan = configureScan(table.newScan()); @@ -114,16 +125,22 @@ public StreamTableScan newStreamScan() { private InnerTableScan configureScan(InnerTableScan scan) { scan.withFilter(filter).withPartitionFilter(partitionSpec); - + checkState( + bucketFilter == null || shardIndexOfThisSubtask == null, + "Bucket filter and shard configuration cannot be used together. " + + "Please choose one method to specify the data subset."); if (shardIndexOfThisSubtask != null) { if (scan instanceof DataTableScan) { - ((DataTableScan) scan) + return ((DataTableScan) scan) .withShard(shardIndexOfThisSubtask, shardNumberOfParallelSubtasks); } else { throw new UnsupportedOperationException( "Unsupported table scan type for shard configuring, the scan is: " + scan); } } + if (bucketFilter != null) { + scan.withBucketFilter(bucketFilter); + } return scan; } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index d10c9e8fad63..7592dd079787 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -114,6 +114,7 @@ import static org.apache.paimon.CoreOptions.WRITE_ONLY; import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.params.provider.Arguments.arguments; @@ -375,13 +376,12 @@ public void testBucketFilter() throws Exception { commit.commit(0, write.prepareCommit(true, 0)); write.close(); commit.close(); - List splits = - toSplits( - table.newSnapshotReader() - .withFilter(new PredicateBuilder(ROW_TYPE).equal(1, 5)) - .read() - .dataSplits()); + table.newReadBuilder() + .withBucketFilter(bucketId -> bucketId == 1) + .newScan() + .plan() + .splits(); assertThat(splits.size()).isEqualTo(1); assertThat(((DataSplit) splits.get(0)).bucket()).isEqualTo(1); } @@ -415,6 +415,33 @@ protected void innerTestWithShard(FileStoreTable table) throws Exception { "1|7|8|binary|varbinary|mapKey:mapVal|multiset")); } + @Test + public void testBucketFilterConflictWithShard() throws Exception { + String exceptionMessage = "Bucket filter and shard configuration cannot be used together"; + FileStoreTable table = + createFileStoreTable( + conf -> { + conf.set(BUCKET, 5); + conf.set(BUCKET_KEY, "a"); + }); + assertThatExceptionOfType(IllegalStateException.class) + .isThrownBy( + () -> + table.newReadBuilder() + .withBucketFilter(bucketId -> bucketId == 1) + .withShard(0, 1) + .newScan()) + .withMessageContaining(exceptionMessage); + assertThatExceptionOfType(IllegalStateException.class) + .isThrownBy( + () -> + table.newReadBuilder() + .withShard(0, 1) + .withBucketFilter(bucketId -> bucketId == 1) + .newStreamScan()) + .withMessageContaining(exceptionMessage); + } + @Test public void testAbort() throws Exception { FileStoreTable table = createFileStoreTable(conf -> conf.set(BUCKET, 1)); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index e2efbcbc5186..3fd73d2b96d7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -60,6 +60,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; @@ -165,7 +166,11 @@ private void open() throws Exception { try { this.lookupTable = PrimaryKeyPartialLookupTable.createLocalTable( - storeTable, projection, path, joinKeys); + storeTable, + projection, + path, + joinKeys, + getRequireCachedBucketIds()); } catch (UnsupportedOperationException ignore2) { } } @@ -179,7 +184,8 @@ private void open() throws Exception { predicate, createProjectedPredicate(projection), path, - joinKeys); + joinKeys, + getRequireCachedBucketIds()); this.lookupTable = FullCacheLookupTable.create(context, options.get(LOOKUP_CACHE_ROWS)); } @@ -337,4 +343,17 @@ private static StreamingRuntimeContext extractStreamingRuntimeContext(Object run field.setAccessible(true); return extractStreamingRuntimeContext(field.get(runtimeContext)); } + + /** + * Get the set of bucket IDs that need to be cached by the current lookup join subtask. + * + *

The Flink Planner will distribute data to lookup join nodes based on buckets. This allows + * paimon to cache only the necessary buckets for each subtask, improving efficiency. + * + * @return the set of bucket IDs to be cached + */ + private Set getRequireCachedBucketIds() { + // TODO: Implement the method when Flink support bucket shuffle for lookup join. + return null; + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java index 1c8db091626b..d7af8bdf71f3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java @@ -54,6 +54,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -146,7 +147,12 @@ protected void openStateFactory() throws Exception { protected void bootstrap() throws Exception { Predicate scanPredicate = PredicateBuilder.andNullable(context.tablePredicate, specificPartition); - this.reader = new LookupStreamingReader(context.table, context.projection, scanPredicate); + this.reader = + new LookupStreamingReader( + context.table, + context.projection, + scanPredicate, + context.requiredCachedBucketIds); BinaryExternalSortBuffer bulkLoadSorter = RocksDBState.createBulkLoadSorter( IOManager.create(context.tempPath.toString()), context.table.coreOptions()); @@ -328,6 +334,7 @@ public static class Context { @Nullable public final Predicate projectedPredicate; public final File tempPath; public final List joinKey; + public final Set requiredCachedBucketIds; public Context( FileStoreTable table, @@ -335,13 +342,15 @@ public Context( @Nullable Predicate tablePredicate, @Nullable Predicate projectedPredicate, File tempPath, - List joinKey) { + List joinKey, + @Nullable Set requiredCachedBucketIds) { this.table = table; this.projection = projection; this.tablePredicate = tablePredicate; this.projectedPredicate = projectedPredicate; this.tempPath = tempPath; this.joinKey = joinKey; + this.requiredCachedBucketIds = requiredCachedBucketIds; } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java index ce64e27e3ae6..ea9256830284 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupStreamingReader.java @@ -45,6 +45,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.IntUnaryOperator; import java.util.stream.IntStream; @@ -68,11 +69,22 @@ public class LookupStreamingReader { CoreOptions.SCAN_TAG_NAME, CoreOptions.SCAN_VERSION); - public LookupStreamingReader(Table table, int[] projection, @Nullable Predicate predicate) { + public LookupStreamingReader( + Table table, + int[] projection, + @Nullable Predicate predicate, + Set requireCachedBucketIds) { this.table = unsetTimeTravelOptions(table); this.projection = projection; this.readBuilder = - this.table.newReadBuilder().withProjection(projection).withFilter(predicate); + this.table + .newReadBuilder() + .withProjection(projection) + .withFilter(predicate) + .withBucketFilter( + requireCachedBucketIds == null + ? null + : requireCachedBucketIds::contains); scan = readBuilder.newStreamScan(); if (predicate != null) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java index 1327ed3b66fd..bdf0a1b4af77 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java @@ -43,6 +43,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Function; import static org.apache.paimon.CoreOptions.SCAN_BOUNDED_WATERMARK; @@ -149,9 +150,15 @@ public void close() throws IOException { } public static PrimaryKeyPartialLookupTable createLocalTable( - FileStoreTable table, int[] projection, File tempPath, List joinKey) { + FileStoreTable table, + int[] projection, + File tempPath, + List joinKey, + Set requireCachedBucketIds) { return new PrimaryKeyPartialLookupTable( - filter -> new LocalQueryExecutor(table, projection, tempPath, filter), + filter -> + new LocalQueryExecutor( + table, projection, tempPath, filter, requireCachedBucketIds), table, joinKey); } @@ -175,7 +182,11 @@ static class LocalQueryExecutor implements QueryExecutor { private final StreamTableScan scan; private LocalQueryExecutor( - FileStoreTable table, int[] projection, File tempPath, @Nullable Predicate filter) { + FileStoreTable table, + int[] projection, + File tempPath, + @Nullable Predicate filter, + Set requireCachedBucketIds) { this.tableQuery = table.newLocalTableQuery() .withValueProjection(Projection.of(projection).toNestedIndexes()) @@ -185,7 +196,14 @@ private LocalQueryExecutor( dynamicOptions.put(STREAM_SCAN_MODE.key(), FILE_MONITOR.getValue()); dynamicOptions.put(SCAN_BOUNDED_WATERMARK.key(), null); this.scan = - table.copy(dynamicOptions).newReadBuilder().withFilter(filter).newStreamScan(); + table.copy(dynamicOptions) + .newReadBuilder() + .withFilter(filter) + .withBucketFilter( + requireCachedBucketIds == null + ? null + : requireCachedBucketIds::contains) + .newStreamScan(); } @Override diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java index 2b45b38bbfcf..7cca8e25d0b5 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java @@ -121,7 +121,8 @@ public void testPkTable() throws Exception { null, null, tempDir.toFile(), - singletonList("f0")); + singletonList("f0"), + null); table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); table.open(); @@ -182,7 +183,8 @@ public void testPkTableWithSequenceField() throws Exception { null, null, tempDir.toFile(), - singletonList("f0")); + singletonList("f0"), + null); table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); table.open(); @@ -230,7 +232,8 @@ public void testPkTablePkFilter() throws Exception { null, new PredicateBuilder(RowType.of(INT())).lessThan(0, 3), tempDir.toFile(), - singletonList("f0")); + singletonList("f0"), + null); table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); table.open(); @@ -265,7 +268,8 @@ public void testPkTableNonPkFilter() throws Exception { null, new PredicateBuilder(RowType.of(INT(), INT())).lessThan(1, 22), tempDir.toFile(), - singletonList("f0")); + singletonList("f0"), + null); table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); table.open(); @@ -293,7 +297,8 @@ public void testSecKeyTable() throws Exception { null, null, tempDir.toFile(), - singletonList("f1")); + singletonList("f1"), + null); table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); table.open(); @@ -342,7 +347,8 @@ public void testSecKeyTableWithSequenceField() throws Exception { null, null, tempDir.toFile(), - singletonList("f1")); + singletonList("f1"), + null); table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); table.open(); @@ -394,7 +400,8 @@ public void testSecKeyTablePkFilter() throws Exception { null, new PredicateBuilder(RowType.of(INT())).lessThan(0, 3), tempDir.toFile(), - singletonList("f1")); + singletonList("f1"), + null); table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); table.open(); @@ -438,7 +445,8 @@ public void testNoPrimaryKeyTable() throws Exception { null, null, tempDir.toFile(), - singletonList("f1")); + singletonList("f1"), + null); table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); table.open(); @@ -485,7 +493,8 @@ public void testNoPrimaryKeyTableFilter() throws Exception { null, new PredicateBuilder(RowType.of(INT(), INT(), INT())).lessThan(2, 222), tempDir.toFile(), - singletonList("f1")); + singletonList("f1"), + null); table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); table.open(); @@ -517,7 +526,8 @@ public void testPartialLookupTable() throws Exception { dimTable, new int[] {0, 1, 2}, tempDir.toFile(), - ImmutableList.of("pk1", "pk2")); + ImmutableList.of("pk1", "pk2"), + null); table.open(); List result = table.get(row(1, -1)); @@ -549,7 +559,8 @@ public void testPartialLookupTableWithProjection() throws Exception { dimTable, new int[] {2, 1}, tempDir.toFile(), - ImmutableList.of("pk1", "pk2")); + ImmutableList.of("pk1", "pk2"), + null); table.open(); // test re-open @@ -580,7 +591,8 @@ public void testPartialLookupTableJoinKeyOrder() throws Exception { dimTable, new int[] {2, 1}, tempDir.toFile(), - ImmutableList.of("pk2", "pk1")); + ImmutableList.of("pk2", "pk1"), + null); table.open(); // test re-open @@ -616,7 +628,8 @@ public void testPKLookupTableRefreshAsync(boolean refreshAsync) throws Exception null, null, tempDir.toFile(), - singletonList("f0")); + singletonList("f0"), + null); table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10); table.open();