Skip to content

Commit

Permalink
[core] resolve withBuckets commit
Browse files Browse the repository at this point in the history
  • Loading branch information
ranxianglei authored and ranxianglei.rxl committed Nov 15, 2024
1 parent a1cc9f4 commit e401844
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -440,9 +440,8 @@ private Filter<InternalRow> createCacheRowFilter() {
}

/**
* Read the corresponding entries based on the current required partition and bucket.
*
* <p>Implemented to {@link InternalRow} is for performance (No deserialization).
* Read the corresponding entries based on the current required bucket, but push down into file
* format .
*/
private static List<Predicate> createPushDownFilter(Collection<Integer> buckets) {
if (buckets == null || buckets.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.TableScan;
Expand All @@ -44,6 +45,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -266,6 +268,13 @@ public Scan withBucketFilter(Filter<Integer> bucketFilter) {
return this;
}

@Override
public InnerTableScan withBuckets(Collection<Integer> buckets) {
mainScan.withBuckets(buckets);
fallbackScan.withBuckets(buckets);
return this;
}

@Override
public Scan withLevelFilter(Filter<Integer> levelFilter) {
mainScan.withLevelFilter(levelFilter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ default InnerTableScan withBucket(Integer bucket) {
}

default InnerTableScan withBuckets(Collection<Integer> buckets) {
throw new RuntimeException("not impl withBuckets for " + this.getClass().getName());
// return this is not safe for too many class not impl this method and withBucketFilter
return this;
}

default InnerTableScan withLevelFilter(Filter<Integer> levelFilter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,12 @@ public InnerTableScan withBucketFilter(Filter<Integer> bucketFilter) {
return this;
}

@Override
public InnerTableScan withBuckets(Collection<Integer> buckets) {
batchScan.withBuckets(buckets);
return this;
}

@Override
public InnerTableScan withLevelFilter(Filter<Integer> levelFilter) {
batchScan.withLevelFilter(levelFilter);
Expand Down Expand Up @@ -478,6 +484,18 @@ public StreamDataTableScan withFilter(Predicate predicate) {
return this;
}

@Override
public InnerTableScan withBucketFilter(Filter<Integer> bucketFilter) {
streamScan.withBucketFilter(bucketFilter);
return this;
}

@Override
public InnerTableScan withBuckets(Collection<Integer> buckets) {
streamScan.withBuckets(buckets);
return this;
}

@Override
public StartingContext startingContext() {
return streamScan.startingContext();
Expand Down

0 comments on commit e401844

Please sign in to comment.