diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicateExtractor.java b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicateExtractor.java new file mode 100644 index 000000000000..a5cf772c0c2e --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicateExtractor.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.predicate; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** Extract leaf predicate for field names. */ +public class LeafPredicateExtractor implements PredicateVisitor> { + + public static final LeafPredicateExtractor INSTANCE = new LeafPredicateExtractor(); + + @Override + public Map visit(LeafPredicate predicate) { + return Collections.singletonMap(predicate.fieldName(), predicate); + } + + @Override + public Map visit(CompoundPredicate predicate) { + if (predicate.function() instanceof And) { + Map leafPredicates = new HashMap<>(); + predicate.children().stream().map(p -> p.visit(this)).forEach(leafPredicates::putAll); + return leafPredicates; + } + return Collections.emptyMap(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java index 5880c1162e05..5f47e574b019 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java @@ -28,8 +28,8 @@ import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.predicate.Equal; import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.LeafPredicateExtractor; import org.apache.paimon.predicate.Predicate; -import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; @@ -162,25 +162,15 @@ private FilesScan(FileStoreTable storeTable) { @Override public InnerTableScan withFilter(Predicate pushdown) { - List predicates = PredicateBuilder.splitAnd(pushdown); - for (Predicate predicate : predicates) { - if (predicate instanceof LeafPredicate) { - LeafPredicate leaf = (LeafPredicate) predicate; - switch (leaf.fieldName()) { - case "partition": - this.partitionPredicate = leaf; - break; - case "bucket": - this.bucketPredicate = leaf; - break; - case "level": - this.levelPredicate = leaf; - break; - default: - break; - } - } + if (pushdown == null) { + return this; } + + Map leafPredicates = + pushdown.visit(LeafPredicateExtractor.INSTANCE); + this.partitionPredicate = leafPredicates.get("partition"); + this.bucketPredicate = leafPredicates.get("bucket"); + this.levelPredicate = leafPredicates.get("level"); return this; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java index 8d6b545d908e..b8ff9cd8bda8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java @@ -26,6 +26,9 @@ import org.apache.paimon.disk.IOManager; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.predicate.Equal; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.LeafPredicateExtractor; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.table.FileStoreTable; @@ -47,6 +50,8 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; +import javax.annotation.Nullable; + import java.io.IOException; import java.time.Instant; import java.time.LocalDateTime; @@ -140,19 +145,13 @@ private class SnapshotsScan extends ReadOnceTableScan { @Override public InnerTableScan withFilter(Predicate predicate) { - // TODO + // do filter in read return this; } @Override public Plan innerPlan() { - long rowCount; - try { - rowCount = new SnapshotManager(fileIO, location).snapshotCount(); - } catch (IOException e) { - throw new RuntimeException(e); - } - return () -> Collections.singletonList(new SnapshotsSplit(rowCount, location)); + return () -> Collections.singletonList(new SnapshotsSplit(location)); } } @@ -160,17 +159,16 @@ private static class SnapshotsSplit implements Split { private static final long serialVersionUID = 1L; - private final long rowCount; private final Path location; - private SnapshotsSplit(long rowCount, Path location) { + private SnapshotsSplit(Path location) { this.location = location; - this.rowCount = rowCount; } @Override public long rowCount() { - return rowCount; + // dummy 1, just 1 parallelism + return 1; } @Override @@ -195,6 +193,7 @@ private static class SnapshotsRead implements InnerTableRead { private final FileIO fileIO; private int[][] projection; + @Nullable private Long specificSnapshot; public SnapshotsRead(FileIO fileIO) { this.fileIO = fileIO; @@ -202,7 +201,15 @@ public SnapshotsRead(FileIO fileIO) { @Override public InnerTableRead withFilter(Predicate predicate) { - // TODO + if (predicate == null) { + return this; + } + + LeafPredicate snapshotPred = + predicate.visit(LeafPredicateExtractor.INSTANCE).get("snapshot_id"); + if (snapshotPred != null && snapshotPred.function() instanceof Equal) { + specificSnapshot = (Long) snapshotPred.literals().get(0); + } return this; } @@ -222,8 +229,13 @@ public RecordReader createReader(Split split) throws IOException { if (!(split instanceof SnapshotsSplit)) { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } - Path location = ((SnapshotsSplit) split).location; - Iterator snapshots = new SnapshotManager(fileIO, location).snapshots(); + SnapshotManager snapshotManager = + new SnapshotManager(fileIO, ((SnapshotsSplit) split).location); + Iterator snapshots = + specificSnapshot != null + ? Collections.singletonList(snapshotManager.snapshot(specificSnapshot)) + .iterator() + : snapshotManager.snapshots(); Iterator rows = Iterators.transform(snapshots, this::toRow); if (projection != null) { rows = diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java index 6b167c4e74c7..a94d91af6559 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java @@ -64,9 +64,17 @@ public void testSnapshotsTable() throws Exception { sql("INSERT INTO T VALUES (3, 4)"); List result = sql("SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots"); + assertThat(result).containsExactly(Row.of(1L, 0L, "APPEND"), Row.of(2L, 0L, "APPEND")); - // check correctness and sequence snapshots. + result = + sql( + "SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots WHERE schema_id = 0"); assertThat(result).containsExactly(Row.of(1L, 0L, "APPEND"), Row.of(2L, 0L, "APPEND")); + + result = + sql( + "SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots WHERE snapshot_id = 2"); + assertThat(result).containsExactly(Row.of(2L, 0L, "APPEND")); } @Test