diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index d1efbcfe1e6c..7b987b049228 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -128,6 +128,10 @@ public List listAll() { return listAllIds().stream().map(this::schema).collect(Collectors.toList()); } + public List schemasWithId(List schemaIds) { + return schemaIds.stream().map(this::schema).collect(Collectors.toList()); + } + public List listWithRange( Optional optionalMaxSchemaId, Optional optionalMinSchemaId) { Long lowerBoundSchemaId = 0L; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java index aab6c1d876af..b6150ef7524c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java @@ -35,6 +35,7 @@ import org.apache.paimon.predicate.LeafPredicateExtractor; import org.apache.paimon.predicate.LessOrEqual; import org.apache.paimon.predicate.LessThan; +import org.apache.paimon.predicate.Or; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.SchemaManager; @@ -64,6 +65,7 @@ import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -203,6 +205,7 @@ private class SchemasRead implements InnerTableRead { private Optional optionalFilterSchemaIdMax = Optional.empty(); private Optional optionalFilterSchemaIdMin = Optional.empty(); + private final List schemaIds = new ArrayList<>(); public SchemasRead(FileIO fileIO) { this.fileIO = fileIO; @@ -223,6 +226,22 @@ public InnerTableRead withFilter(Predicate predicate) { handleLeafPredicate(leaf, leafName); } } + + // optimize for IN filter + if ((compoundPredicate.function()) instanceof Or) { + List children = compoundPredicate.children(); + for (Predicate leaf : children) { + if (leaf instanceof LeafPredicate + && (((LeafPredicate) leaf).function() instanceof Equal) + && leaf.visit(LeafPredicateExtractor.INSTANCE).get(leafName) + != null) { + schemaIds.add((Long) ((LeafPredicate) leaf).literals().get(0)); + } else { + schemaIds.clear(); + break; + } + } + } } else { handleLeafPredicate(predicate, leafName); } @@ -279,8 +298,14 @@ public RecordReader createReader(Split split) { Path location = schemasSplit.location; SchemaManager manager = new SchemaManager(fileIO, location, branch); - Collection tableSchemas = - manager.listWithRange(optionalFilterSchemaIdMax, optionalFilterSchemaIdMin); + Collection tableSchemas; + if (!schemaIds.isEmpty()) { + tableSchemas = manager.schemasWithId(schemaIds); + } else { + tableSchemas = + manager.listWithRange(optionalFilterSchemaIdMax, optionalFilterSchemaIdMin); + } + Iterator rows = Iterators.transform(tableSchemas.iterator(), this::toRow); if (readType != null) { rows = diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java index 5bec2b109324..8bf4766d580e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SnapshotsTable.java @@ -36,6 +36,7 @@ import org.apache.paimon.predicate.LeafPredicateExtractor; import org.apache.paimon.predicate.LessOrEqual; import org.apache.paimon.predicate.LessThan; +import org.apache.paimon.predicate.Or; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.table.FileStoreTable; @@ -62,6 +63,7 @@ import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -206,6 +208,7 @@ private class SnapshotsRead implements InnerTableRead { private RowType readType; private Optional optionalFilterSnapshotIdMax = Optional.empty(); private Optional optionalFilterSnapshotIdMin = Optional.empty(); + private final List snapshotIds = new ArrayList<>(); public SnapshotsRead(FileIO fileIO) { this.fileIO = fileIO; @@ -220,12 +223,27 @@ public InnerTableRead withFilter(Predicate predicate) { String leafName = "snapshot_id"; if (predicate instanceof CompoundPredicate) { CompoundPredicate compoundPredicate = (CompoundPredicate) predicate; + List children = compoundPredicate.children(); if ((compoundPredicate.function()) instanceof And) { - List children = compoundPredicate.children(); for (Predicate leaf : children) { handleLeafPredicate(leaf, leafName); } } + + // optimize for IN filter + if ((compoundPredicate.function()) instanceof Or) { + for (Predicate leaf : children) { + if (leaf instanceof LeafPredicate + && (((LeafPredicate) leaf).function() instanceof Equal) + && leaf.visit(LeafPredicateExtractor.INSTANCE).get(leafName) + != null) { + snapshotIds.add((Long) ((LeafPredicate) leaf).literals().get(0)); + } else { + snapshotIds.clear(); + break; + } + } + } } else { handleLeafPredicate(predicate, leafName); } @@ -284,9 +302,15 @@ public RecordReader createReader(Split split) throws IOException { } SnapshotManager snapshotManager = new SnapshotManager(fileIO, ((SnapshotsSplit) split).location, branch); - Iterator snapshots = - snapshotManager.snapshotsWithinRange( - optionalFilterSnapshotIdMax, optionalFilterSnapshotIdMin); + + Iterator snapshots; + if (!snapshotIds.isEmpty()) { + snapshots = snapshotManager.snapshotsWithId(snapshotIds); + } else { + snapshots = + snapshotManager.snapshotsWithinRange( + optionalFilterSnapshotIdMax, optionalFilterSnapshotIdMin); + } Iterator rows = Iterators.transform(snapshots, this::toRow); if (readType != null) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java index dd7335e38564..f3342e9f2cb3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java @@ -26,9 +26,11 @@ import org.apache.paimon.disk.IOManager; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.predicate.CompoundPredicate; import org.apache.paimon.predicate.Equal; import org.apache.paimon.predicate.LeafPredicate; import org.apache.paimon.predicate.LeafPredicateExtractor; +import org.apache.paimon.predicate.Or; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.table.FileStoreTable; @@ -74,10 +76,12 @@ public class TagsTable implements ReadonlyTable { public static final String TAGS = "tags"; + private static final String TAG_NAME = "tag_name"; + public static final RowType TABLE_TYPE = new RowType( Arrays.asList( - new DataField(0, "tag_name", SerializationUtils.newStringType(false)), + new DataField(0, TAG_NAME, SerializationUtils.newStringType(false)), new DataField(1, "snapshot_id", new BigIntType(false)), new DataField(2, "schema_id", new BigIntType(false)), new DataField(3, "commit_time", new TimestampType(false, 3)), @@ -115,7 +119,7 @@ public RowType rowType() { @Override public List primaryKeys() { - return Collections.singletonList("tag_name"); + return Collections.singletonList(TAG_NAME); } @Override @@ -134,23 +138,20 @@ public Table copy(Map dynamicOptions) { } private class TagsScan extends ReadOnceTableScan { - private @Nullable LeafPredicate tagName; + private @Nullable Predicate tagPredicate; @Override public InnerTableScan withFilter(Predicate predicate) { if (predicate == null) { return this; } - // TODO - Map leafPredicates = - predicate.visit(LeafPredicateExtractor.INSTANCE); - tagName = leafPredicates.get("tag_name"); + tagPredicate = predicate; return this; } @Override public Plan innerPlan() { - return () -> Collections.singletonList(new TagsSplit(location, tagName)); + return () -> Collections.singletonList(new TagsSplit(location, tagPredicate)); } } @@ -160,11 +161,11 @@ private static class TagsSplit extends SingletonSplit { private final Path location; - private final @Nullable LeafPredicate tagName; + private final @Nullable Predicate tagPredicate; - private TagsSplit(Path location, @Nullable LeafPredicate tagName) { + private TagsSplit(Path location, @Nullable Predicate tagPredicate) { this.location = location; - this.tagName = tagName; + this.tagPredicate = tagPredicate; } @Override @@ -176,7 +177,8 @@ public boolean equals(Object o) { return false; } TagsSplit that = (TagsSplit) o; - return Objects.equals(location, that.location) && Objects.equals(tagName, that.tagName); + return Objects.equals(location, that.location) + && Objects.equals(tagPredicate, that.tagPredicate); } @Override @@ -217,18 +219,52 @@ public RecordReader createReader(Split split) { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } Path location = ((TagsSplit) split).location; - LeafPredicate predicate = ((TagsSplit) split).tagName; + Predicate predicate = ((TagsSplit) split).tagPredicate; TagManager tagManager = new TagManager(fileIO, location, branch); Map nameToSnapshot = new TreeMap<>(); + Map predicateMap = new TreeMap<>(); + if (predicate != null) { + if (predicate instanceof LeafPredicate + && ((LeafPredicate) predicate).function() instanceof Equal + && ((LeafPredicate) predicate).literals().get(0) instanceof BinaryString + && predicate.visit(LeafPredicateExtractor.INSTANCE).get(TAG_NAME) != null) { + String equalValue = ((LeafPredicate) predicate).literals().get(0).toString(); + if (tagManager.tagExists(equalValue)) { + predicateMap.put(equalValue, tagManager.tag(equalValue)); + } + } - if (predicate != null - && predicate.function() instanceof Equal - && predicate.literals().get(0) instanceof BinaryString) { - String equalValue = predicate.literals().get(0).toString(); - if (tagManager.tagExists(equalValue)) { - nameToSnapshot.put(equalValue, tagManager.tag(equalValue)); + if (predicate instanceof CompoundPredicate) { + CompoundPredicate compoundPredicate = (CompoundPredicate) predicate; + // optimize for IN filter + if ((compoundPredicate.function()) instanceof Or) { + List children = compoundPredicate.children(); + for (Predicate leaf : children) { + if (leaf instanceof LeafPredicate + && (((LeafPredicate) leaf).function() instanceof Equal + && ((LeafPredicate) leaf).literals().get(0) + instanceof BinaryString) + && predicate + .visit(LeafPredicateExtractor.INSTANCE) + .get(TAG_NAME) + != null) { + String equalValue = + ((LeafPredicate) leaf).literals().get(0).toString(); + if (tagManager.tagExists(equalValue)) { + predicateMap.put(equalValue, tagManager.tag(equalValue)); + } + } else { + predicateMap.clear(); + break; + } + } + } } + } + + if (!predicateMap.isEmpty()) { + nameToSnapshot.putAll(predicateMap); } else { for (Pair tag : tagManager.tagObjects()) { nameToSnapshot.put(tag.getValue(), tag.getKey()); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index 7e2fce0ada5e..5902d4c84cf5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -417,6 +417,13 @@ public List snapshotPaths(Predicate predicate) throws IOException { .collect(Collectors.toList()); } + public Iterator snapshotsWithId(List snapshotIds) { + return snapshotIds.stream() + .map(this::snapshot) + .sorted(Comparator.comparingLong(Snapshot::id)) + .iterator(); + } + public Iterator snapshotsWithinRange( Optional optionalMaxSnapshotId, Optional optionalMinSnapshotId) { Long lowerBoundSnapshotId = earliestSnapshotId(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java index b614b5953843..9c1a2f4e3918 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java @@ -70,14 +70,23 @@ public void testSnapshotsTable() throws Exception { sql("CREATE TABLE T (a INT, b INT)"); sql("INSERT INTO T VALUES (1, 2)"); sql("INSERT INTO T VALUES (3, 4)"); + sql("INSERT INTO T VALUES (5, 6)"); List result = sql("SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots"); - assertThat(result).containsExactly(Row.of(1L, 0L, "APPEND"), Row.of(2L, 0L, "APPEND")); + assertThat(result) + .containsExactly( + Row.of(1L, 0L, "APPEND"), + Row.of(2L, 0L, "APPEND"), + Row.of(3L, 0L, "APPEND")); result = sql( "SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots WHERE schema_id = 0"); - assertThat(result).containsExactly(Row.of(1L, 0L, "APPEND"), Row.of(2L, 0L, "APPEND")); + assertThat(result) + .containsExactly( + Row.of(1L, 0L, "APPEND"), + Row.of(2L, 0L, "APPEND"), + Row.of(3L, 0L, "APPEND")); result = sql( @@ -87,7 +96,7 @@ public void testSnapshotsTable() throws Exception { result = sql( "SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots WHERE snapshot_id > 1"); - assertThat(result).containsExactly(Row.of(2L, 0L, "APPEND")); + assertThat(result).containsExactly(Row.of(2L, 0L, "APPEND"), Row.of(3L, 0L, "APPEND")); result = sql( @@ -97,12 +106,30 @@ public void testSnapshotsTable() throws Exception { result = sql( "SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots WHERE snapshot_id >= 1"); - assertThat(result).contains(Row.of(1L, 0L, "APPEND"), Row.of(2L, 0L, "APPEND")); + assertThat(result) + .contains( + Row.of(1L, 0L, "APPEND"), + Row.of(2L, 0L, "APPEND"), + Row.of(3L, 0L, "APPEND")); result = sql( "SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots WHERE snapshot_id <= 2"); assertThat(result).contains(Row.of(1L, 0L, "APPEND"), Row.of(2L, 0L, "APPEND")); + + result = + sql( + "SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots WHERE snapshot_id in (1, 2)"); + assertThat(result).contains(Row.of(1L, 0L, "APPEND"), Row.of(2L, 0L, "APPEND")); + + result = + sql( + "SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots WHERE snapshot_id in (1, 2) or schema_id=0"); + assertThat(result) + .contains( + Row.of(1L, 0L, "APPEND"), + Row.of(2L, 0L, "APPEND"), + Row.of(3L, 0L, "APPEND")); } @Test @@ -281,6 +308,42 @@ public void testSchemasTable() { + "{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], {\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\"," + "\"b.bb.bbb\":\"val2\",\"snapshot.num-retained.max\":\"20\"}, ]]"); + // test for IN filter + result = + sql( + "SELECT schema_id, fields, partition_keys, " + + "primary_keys, options, `comment` FROM T$schemas where schema_id in (1, 3)"); + assertThat(result.toString()) + .isEqualTo( + "[+I[1, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"}," + + "{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], " + + "{\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\",\"b.bb.bbb\":\"val2\"}, ], " + + "+I[3, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"}," + + "{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], " + + "{\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\",\"b.bb.bbb\":\"val2\",\"snapshot.num-retained.max\":\"20\",\"snapshot.num-retained.min\":\"18\"}, ]]"); + + result = + sql( + "SELECT schema_id, fields, partition_keys, " + + "primary_keys, options, `comment` FROM T$schemas where schema_id in (1, 3) or fields='[{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}]' order by schema_id"); + assertThat(result.toString()) + .isEqualTo( + "[+I[0, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"}," + + "{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], " + + "{\"a.aa.aaa\":\"val1\",\"b.bb.bbb\":\"val2\"}, ], " + + "+I[1, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"}," + + "{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], " + + "{\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\",\"b.bb.bbb\":\"val2\"}, ], " + + "+I[2, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"}," + + "{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], " + + "{\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\",\"b.bb.bbb\":\"val2\",\"snapshot.num-retained.max\":\"20\"}, ], " + + "+I[3, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"}," + + "{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], " + + "{\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\",\"b.bb.bbb\":\"val2\",\"snapshot.num-retained.max\":\"20\",\"snapshot.num-retained.min\":\"18\"}, ], " + + "+I[4, [{\"id\":0,\"name\":\"a\",\"type\":\"INT NOT NULL\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"},{\"id\":2,\"name\":\"c\",\"type\":\"STRING\"}], [], [\"a\"], " + + "{\"a.aa.aaa\":\"val1\",\"snapshot.time-retained\":\"5 h\",\"b.bb.bbb\":\"val2\",\"snapshot.num-retained.max\":\"20\",\"manifest.format\":\"avro\"," + + "\"snapshot.num-retained.min\":\"18\"}, ]]"); + // check with not exist schema id assertThatThrownBy( () -> @@ -844,20 +907,35 @@ public void testTagsTable() throws Exception { sql("CREATE TABLE T (a INT, b INT)"); sql("INSERT INTO T VALUES (1, 2)"); sql("INSERT INTO T VALUES (3, 4)"); + sql("INSERT INTO T VALUES (5, 6)"); paimonTable("T").createTag("tag1", 1); paimonTable("T").createTag("tag2", 2); + paimonTable("T").createTag("tag3", 3); List result = sql( "SELECT tag_name, snapshot_id, schema_id, record_count FROM T$tags ORDER BY tag_name"); - - assertThat(result).containsExactly(Row.of("tag1", 1L, 0L, 1L), Row.of("tag2", 2L, 0L, 2L)); + assertThat(result) + .containsExactly( + Row.of("tag1", 1L, 0L, 1L), + Row.of("tag2", 2L, 0L, 2L), + Row.of("tag3", 3L, 0L, 3L)); result = sql( "SELECT tag_name, snapshot_id, schema_id, record_count FROM T$tags where tag_name = 'tag1' "); assertThat(result).containsExactly(Row.of("tag1", 1L, 0L, 1L)); + + result = + sql( + "SELECT tag_name, snapshot_id, schema_id, record_count FROM T$tags where tag_name in ('tag1', 'tag3')"); + assertThat(result).containsExactly(Row.of("tag1", 1L, 0L, 1L), Row.of("tag3", 3L, 0L, 3L)); + + result = + sql( + "SELECT tag_name, snapshot_id, schema_id, record_count FROM T$tags where tag_name in ('tag1') or snapshot_id=2"); + assertThat(result).containsExactly(Row.of("tag1", 1L, 0L, 1L), Row.of("tag2", 2L, 0L, 2L)); } @Test