From e401844047ac742e8cc2ebd64326735ae3454989 Mon Sep 17 00:00:00 2001 From: ranxianglei Date: Fri, 15 Nov 2024 07:20:44 +0000 Subject: [PATCH] [core] resolve withBuckets commit --- .../operation/AbstractFileStoreScan.java | 5 ++--- .../table/FallbackReadFileStoreTable.java | 9 +++++++++ .../paimon/table/source/InnerTableScan.java | 3 ++- .../paimon/table/system/AuditLogTable.java | 18 ++++++++++++++++++ 4 files changed, 31 insertions(+), 4 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index deb6fadd7c19..306fd6f5a7c8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -440,9 +440,8 @@ private Filter createCacheRowFilter() { } /** - * Read the corresponding entries based on the current required partition and bucket. - * - *

Implemented to {@link InternalRow} is for performance (No deserialization). + * Read the corresponding entries based on the current required bucket, but push down into file + * format . */ private static List createPushDownFilter(Collection buckets) { if (buckets == null || buckets.isEmpty()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java index f1a60b9713f9..663933f337b5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java @@ -34,6 +34,7 @@ import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.DataTableScan; import org.apache.paimon.table.source.InnerTableRead; +import org.apache.paimon.table.source.InnerTableScan; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; import org.apache.paimon.table.source.TableScan; @@ -44,6 +45,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -266,6 +268,13 @@ public Scan withBucketFilter(Filter bucketFilter) { return this; } + @Override + public InnerTableScan withBuckets(Collection buckets) { + mainScan.withBuckets(buckets); + fallbackScan.withBuckets(buckets); + return this; + } + @Override public Scan withLevelFilter(Filter levelFilter) { mainScan.withLevelFilter(levelFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java index 6c3b28f0f4ce..1e17e001694e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java @@ -54,7 +54,8 @@ default InnerTableScan withBucket(Integer bucket) { } default InnerTableScan withBuckets(Collection buckets) { - throw new RuntimeException("not impl withBuckets for " + this.getClass().getName()); + // return this is not safe for too many class not impl this method and withBucketFilter + return this; } default InnerTableScan withLevelFilter(Filter levelFilter) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index 2e1d913648e7..5105f6fc8fd2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -441,6 +441,12 @@ public InnerTableScan withBucketFilter(Filter bucketFilter) { return this; } + @Override + public InnerTableScan withBuckets(Collection buckets) { + batchScan.withBuckets(buckets); + return this; + } + @Override public InnerTableScan withLevelFilter(Filter levelFilter) { batchScan.withLevelFilter(levelFilter); @@ -478,6 +484,18 @@ public StreamDataTableScan withFilter(Predicate predicate) { return this; } + @Override + public InnerTableScan withBucketFilter(Filter bucketFilter) { + streamScan.withBucketFilter(bucketFilter); + return this; + } + + @Override + public InnerTableScan withBuckets(Collection buckets) { + streamScan.withBuckets(buckets); + return this; + } + @Override public StartingContext startingContext() { return streamScan.startingContext();