Skip to content

Commit

Permalink
[flink] Support FileStoreLookupFunction to only cache required buckets (
Browse files Browse the repository at this point in the history
  • Loading branch information
WencongLiu authored May 31, 2024
1 parent 885c2e0 commit dd12fd4
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;

import java.io.Serializable;
import java.util.Arrays;
Expand Down Expand Up @@ -99,6 +100,16 @@ default ReadBuilder withFilter(List<Predicate> predicates) {
/** Push partition filter. */
ReadBuilder withPartitionFilter(Map<String, String> partitionSpec);

/**
* Push bucket filter. Note that this method cannot be used simultaneously with {@link
* #withShard(int, int)}.
*
* <p>Reason: Bucket filtering and sharding are different logical mechanisms for selecting
* subsets of table data. Applying both methods simultaneously introduces conflicting selection
* criteria.
*/
ReadBuilder withBucketFilter(Filter<Integer> bucketFilter);

/**
* Apply projection to the reader.
*
Expand All @@ -122,7 +133,14 @@ default ReadBuilder withProjection(int[] projection) {
/** the row number pushed down. */
ReadBuilder withLimit(int limit);

/** Specify the shard to be read, and allocate sharded files to read records. */
/**
* Specify the shard to be read, and allocate sharded files to read records. Note that this
* method cannot be used simultaneously with {@link #withBucketFilter(Filter)}.
*
* <p>Reason: Sharding and bucket filtering are different logical mechanisms for selecting
* subsets of table data. Applying both methods simultaneously introduces conflicting selection
* criteria.
*/
ReadBuilder withShard(int indexOfThisSubtask, int numberOfParallelSubtasks);

/** Create a {@link TableScan} to perform batch planning. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.InnerTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Projection;
import org.apache.paimon.utils.TypeUtils;

import java.util.Arrays;
import java.util.Map;
import java.util.Objects;

import static org.apache.paimon.utils.Preconditions.checkState;

/** Implementation for {@link ReadBuilder}. */
public class ReadBuilderImpl implements ReadBuilder {

Expand All @@ -46,6 +49,8 @@ public class ReadBuilderImpl implements ReadBuilder {

private Map<String, String> partitionSpec;

private Filter<Integer> bucketFilter;

public ReadBuilderImpl(InnerTable table) {
this.table = table;
}
Expand Down Expand Up @@ -98,6 +103,12 @@ public ReadBuilder withShard(int indexOfThisSubtask, int numberOfParallelSubtask
return this;
}

@Override
public ReadBuilder withBucketFilter(Filter<Integer> bucketFilter) {
this.bucketFilter = bucketFilter;
return this;
}

@Override
public TableScan newScan() {
InnerTableScan tableScan = configureScan(table.newScan());
Expand All @@ -114,16 +125,22 @@ public StreamTableScan newStreamScan() {

private InnerTableScan configureScan(InnerTableScan scan) {
scan.withFilter(filter).withPartitionFilter(partitionSpec);

checkState(
bucketFilter == null || shardIndexOfThisSubtask == null,
"Bucket filter and shard configuration cannot be used together. "
+ "Please choose one method to specify the data subset.");
if (shardIndexOfThisSubtask != null) {
if (scan instanceof DataTableScan) {
((DataTableScan) scan)
return ((DataTableScan) scan)
.withShard(shardIndexOfThisSubtask, shardNumberOfParallelSubtasks);
} else {
throw new UnsupportedOperationException(
"Unsupported table scan type for shard configuring, the scan is: " + scan);
}
}
if (bucketFilter != null) {
scan.withBucketFilter(bucketFilter);
}
return scan;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import static org.apache.paimon.CoreOptions.WRITE_ONLY;
import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.params.provider.Arguments.arguments;

Expand Down Expand Up @@ -375,13 +376,12 @@ public void testBucketFilter() throws Exception {
commit.commit(0, write.prepareCommit(true, 0));
write.close();
commit.close();

List<Split> splits =
toSplits(
table.newSnapshotReader()
.withFilter(new PredicateBuilder(ROW_TYPE).equal(1, 5))
.read()
.dataSplits());
table.newReadBuilder()
.withBucketFilter(bucketId -> bucketId == 1)
.newScan()
.plan()
.splits();
assertThat(splits.size()).isEqualTo(1);
assertThat(((DataSplit) splits.get(0)).bucket()).isEqualTo(1);
}
Expand Down Expand Up @@ -415,6 +415,33 @@ protected void innerTestWithShard(FileStoreTable table) throws Exception {
"1|7|8|binary|varbinary|mapKey:mapVal|multiset"));
}

@Test
public void testBucketFilterConflictWithShard() throws Exception {
String exceptionMessage = "Bucket filter and shard configuration cannot be used together";
FileStoreTable table =
createFileStoreTable(
conf -> {
conf.set(BUCKET, 5);
conf.set(BUCKET_KEY, "a");
});
assertThatExceptionOfType(IllegalStateException.class)
.isThrownBy(
() ->
table.newReadBuilder()
.withBucketFilter(bucketId -> bucketId == 1)
.withShard(0, 1)
.newScan())
.withMessageContaining(exceptionMessage);
assertThatExceptionOfType(IllegalStateException.class)
.isThrownBy(
() ->
table.newReadBuilder()
.withShard(0, 1)
.withBucketFilter(bucketId -> bucketId == 1)
.newStreamScan())
.withMessageContaining(exceptionMessage);
}

@Test
public void testAbort() throws Exception {
FileStoreTable table = createFileStoreTable(conf -> conf.set(BUCKET, 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -165,7 +166,11 @@ private void open() throws Exception {
try {
this.lookupTable =
PrimaryKeyPartialLookupTable.createLocalTable(
storeTable, projection, path, joinKeys);
storeTable,
projection,
path,
joinKeys,
getRequireCachedBucketIds());
} catch (UnsupportedOperationException ignore2) {
}
}
Expand All @@ -179,7 +184,8 @@ private void open() throws Exception {
predicate,
createProjectedPredicate(projection),
path,
joinKeys);
joinKeys,
getRequireCachedBucketIds());
this.lookupTable = FullCacheLookupTable.create(context, options.get(LOOKUP_CACHE_ROWS));
}

Expand Down Expand Up @@ -337,4 +343,17 @@ private static StreamingRuntimeContext extractStreamingRuntimeContext(Object run
field.setAccessible(true);
return extractStreamingRuntimeContext(field.get(runtimeContext));
}

/**
* Get the set of bucket IDs that need to be cached by the current lookup join subtask.
*
* <p>The Flink Planner will distribute data to lookup join nodes based on buckets. This allows
* paimon to cache only the necessary buckets for each subtask, improving efficiency.
*
* @return the set of bucket IDs to be cached
*/
private Set<Integer> getRequireCachedBucketIds() {
// TODO: Implement the method when Flink support bucket shuffle for lookup join.
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -146,7 +147,12 @@ protected void openStateFactory() throws Exception {
protected void bootstrap() throws Exception {
Predicate scanPredicate =
PredicateBuilder.andNullable(context.tablePredicate, specificPartition);
this.reader = new LookupStreamingReader(context.table, context.projection, scanPredicate);
this.reader =
new LookupStreamingReader(
context.table,
context.projection,
scanPredicate,
context.requiredCachedBucketIds);
BinaryExternalSortBuffer bulkLoadSorter =
RocksDBState.createBulkLoadSorter(
IOManager.create(context.tempPath.toString()), context.table.coreOptions());
Expand Down Expand Up @@ -328,20 +334,23 @@ public static class Context {
@Nullable public final Predicate projectedPredicate;
public final File tempPath;
public final List<String> joinKey;
public final Set<Integer> requiredCachedBucketIds;

public Context(
FileStoreTable table,
int[] projection,
@Nullable Predicate tablePredicate,
@Nullable Predicate projectedPredicate,
File tempPath,
List<String> joinKey) {
List<String> joinKey,
@Nullable Set<Integer> requiredCachedBucketIds) {
this.table = table;
this.projection = projection;
this.tablePredicate = tablePredicate;
this.projectedPredicate = projectedPredicate;
this.tempPath = tempPath;
this.joinKey = joinKey;
this.requiredCachedBucketIds = requiredCachedBucketIds;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.IntUnaryOperator;
import java.util.stream.IntStream;

Expand All @@ -68,11 +69,22 @@ public class LookupStreamingReader {
CoreOptions.SCAN_TAG_NAME,
CoreOptions.SCAN_VERSION);

public LookupStreamingReader(Table table, int[] projection, @Nullable Predicate predicate) {
public LookupStreamingReader(
Table table,
int[] projection,
@Nullable Predicate predicate,
Set<Integer> requireCachedBucketIds) {
this.table = unsetTimeTravelOptions(table);
this.projection = projection;
this.readBuilder =
this.table.newReadBuilder().withProjection(projection).withFilter(predicate);
this.table
.newReadBuilder()
.withProjection(projection)
.withFilter(predicate)
.withBucketFilter(
requireCachedBucketIds == null
? null
: requireCachedBucketIds::contains);
scan = readBuilder.newStreamScan();

if (predicate != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;

import static org.apache.paimon.CoreOptions.SCAN_BOUNDED_WATERMARK;
Expand Down Expand Up @@ -149,9 +150,15 @@ public void close() throws IOException {
}

public static PrimaryKeyPartialLookupTable createLocalTable(
FileStoreTable table, int[] projection, File tempPath, List<String> joinKey) {
FileStoreTable table,
int[] projection,
File tempPath,
List<String> joinKey,
Set<Integer> requireCachedBucketIds) {
return new PrimaryKeyPartialLookupTable(
filter -> new LocalQueryExecutor(table, projection, tempPath, filter),
filter ->
new LocalQueryExecutor(
table, projection, tempPath, filter, requireCachedBucketIds),
table,
joinKey);
}
Expand All @@ -175,7 +182,11 @@ static class LocalQueryExecutor implements QueryExecutor {
private final StreamTableScan scan;

private LocalQueryExecutor(
FileStoreTable table, int[] projection, File tempPath, @Nullable Predicate filter) {
FileStoreTable table,
int[] projection,
File tempPath,
@Nullable Predicate filter,
Set<Integer> requireCachedBucketIds) {
this.tableQuery =
table.newLocalTableQuery()
.withValueProjection(Projection.of(projection).toNestedIndexes())
Expand All @@ -185,7 +196,14 @@ private LocalQueryExecutor(
dynamicOptions.put(STREAM_SCAN_MODE.key(), FILE_MONITOR.getValue());
dynamicOptions.put(SCAN_BOUNDED_WATERMARK.key(), null);
this.scan =
table.copy(dynamicOptions).newReadBuilder().withFilter(filter).newStreamScan();
table.copy(dynamicOptions)
.newReadBuilder()
.withFilter(filter)
.withBucketFilter(
requireCachedBucketIds == null
? null
: requireCachedBucketIds::contains)
.newStreamScan();
}

@Override
Expand Down
Loading

0 comments on commit dd12fd4

Please sign in to comment.