-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Optizime IN filter pushdown to snapshot/tag/schema system tables #4436
Changes from 8 commits
da9010a
f8105e1
5cb9b69
116c04f
e105876
de1c84d
d1bfef2
42ec226
2bc72cb
1dcf28f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,7 @@ | |
import org.apache.paimon.predicate.LeafPredicateExtractor; | ||
import org.apache.paimon.predicate.LessOrEqual; | ||
import org.apache.paimon.predicate.LessThan; | ||
import org.apache.paimon.predicate.Or; | ||
import org.apache.paimon.predicate.Predicate; | ||
import org.apache.paimon.reader.RecordReader; | ||
import org.apache.paimon.schema.SchemaManager; | ||
|
@@ -64,6 +65,7 @@ | |
import java.time.Instant; | ||
import java.time.LocalDateTime; | ||
import java.time.ZoneId; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
|
@@ -203,6 +205,7 @@ private class SchemasRead implements InnerTableRead { | |
|
||
private Optional<Long> optionalFilterSchemaIdMax = Optional.empty(); | ||
private Optional<Long> optionalFilterSchemaIdMin = Optional.empty(); | ||
private List<Long> schemaIds = new ArrayList<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. final |
||
|
||
public SchemasRead(FileIO fileIO) { | ||
this.fileIO = fileIO; | ||
|
@@ -223,6 +226,22 @@ public InnerTableRead withFilter(Predicate predicate) { | |
handleLeafPredicate(leaf, leafName); | ||
} | ||
} | ||
|
||
// optimize for IN filter | ||
if ((compoundPredicate.function()) instanceof Or) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should extract a class to reduce duplicate codes, this can be done in a new PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK,would refactor it as suggest,Thanks @JingsongLi |
||
List<Predicate> children = compoundPredicate.children(); | ||
for (Predicate leaf : children) { | ||
if (leaf instanceof LeafPredicate | ||
&& (((LeafPredicate) leaf).function() instanceof Equal) | ||
&& leaf.visit(LeafPredicateExtractor.INSTANCE).get(leafName) | ||
!= null) { | ||
schemaIds.add((Long) ((LeafPredicate) leaf).literals().get(0)); | ||
} else { | ||
schemaIds.clear(); | ||
break; | ||
} | ||
} | ||
} | ||
} else { | ||
handleLeafPredicate(predicate, leafName); | ||
} | ||
|
@@ -279,8 +298,14 @@ public RecordReader<InternalRow> createReader(Split split) { | |
Path location = schemasSplit.location; | ||
SchemaManager manager = new SchemaManager(fileIO, location, branch); | ||
|
||
Collection<TableSchema> tableSchemas = | ||
manager.listWithRange(optionalFilterSchemaIdMax, optionalFilterSchemaIdMin); | ||
Collection<TableSchema> tableSchemas; | ||
if (!schemaIds.isEmpty()) { | ||
tableSchemas = manager.schemasWithIds(schemaIds); | ||
} else { | ||
tableSchemas = | ||
manager.listWithRange(optionalFilterSchemaIdMax, optionalFilterSchemaIdMin); | ||
} | ||
|
||
Iterator<InternalRow> rows = Iterators.transform(tableSchemas.iterator(), this::toRow); | ||
if (readType != null) { | ||
rows = | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,6 +36,7 @@ | |
import org.apache.paimon.predicate.LeafPredicateExtractor; | ||
import org.apache.paimon.predicate.LessOrEqual; | ||
import org.apache.paimon.predicate.LessThan; | ||
import org.apache.paimon.predicate.Or; | ||
import org.apache.paimon.predicate.Predicate; | ||
import org.apache.paimon.reader.RecordReader; | ||
import org.apache.paimon.table.FileStoreTable; | ||
|
@@ -62,6 +63,7 @@ | |
import java.time.Instant; | ||
import java.time.LocalDateTime; | ||
import java.time.ZoneId; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.Iterator; | ||
|
@@ -206,6 +208,7 @@ private class SnapshotsRead implements InnerTableRead { | |
private RowType readType; | ||
private Optional<Long> optionalFilterSnapshotIdMax = Optional.empty(); | ||
private Optional<Long> optionalFilterSnapshotIdMin = Optional.empty(); | ||
private List<Long> snapshotIds = new ArrayList<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. final |
||
|
||
public SnapshotsRead(FileIO fileIO) { | ||
this.fileIO = fileIO; | ||
|
@@ -226,6 +229,22 @@ public InnerTableRead withFilter(Predicate predicate) { | |
handleLeafPredicate(leaf, leafName); | ||
} | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. List children = compoundPredicate.children(); |
||
// optimize for IN filter | ||
if ((compoundPredicate.function()) instanceof Or) { | ||
List<Predicate> children = compoundPredicate.children(); | ||
for (Predicate leaf : children) { | ||
if (leaf instanceof LeafPredicate | ||
&& (((LeafPredicate) leaf).function() instanceof Equal) | ||
&& leaf.visit(LeafPredicateExtractor.INSTANCE).get(leafName) | ||
!= null) { | ||
snapshotIds.add((Long) ((LeafPredicate) leaf).literals().get(0)); | ||
} else { | ||
snapshotIds.clear(); | ||
break; | ||
} | ||
} | ||
} | ||
} else { | ||
handleLeafPredicate(predicate, leafName); | ||
} | ||
|
@@ -284,9 +303,15 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException { | |
} | ||
SnapshotManager snapshotManager = | ||
new SnapshotManager(fileIO, ((SnapshotsSplit) split).location, branch); | ||
Iterator<Snapshot> snapshots = | ||
snapshotManager.snapshotsWithinRange( | ||
optionalFilterSnapshotIdMax, optionalFilterSnapshotIdMin); | ||
|
||
Iterator<Snapshot> snapshots; | ||
if (!snapshotIds.isEmpty()) { | ||
snapshots = snapshotManager.snapshotsWithIds(snapshotIds); | ||
} else { | ||
snapshots = | ||
snapshotManager.snapshotsWithinRange( | ||
optionalFilterSnapshotIdMax, optionalFilterSnapshotIdMin); | ||
} | ||
|
||
Iterator<InternalRow> rows = Iterators.transform(snapshots, this::toRow); | ||
if (readType != null) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,9 +26,11 @@ | |
import org.apache.paimon.disk.IOManager; | ||
import org.apache.paimon.fs.FileIO; | ||
import org.apache.paimon.fs.Path; | ||
import org.apache.paimon.predicate.CompoundPredicate; | ||
import org.apache.paimon.predicate.Equal; | ||
import org.apache.paimon.predicate.LeafPredicate; | ||
import org.apache.paimon.predicate.LeafPredicateExtractor; | ||
import org.apache.paimon.predicate.Or; | ||
import org.apache.paimon.predicate.Predicate; | ||
import org.apache.paimon.reader.RecordReader; | ||
import org.apache.paimon.table.FileStoreTable; | ||
|
@@ -134,23 +136,20 @@ public Table copy(Map<String, String> dynamicOptions) { | |
} | ||
|
||
private class TagsScan extends ReadOnceTableScan { | ||
private @Nullable LeafPredicate tagName; | ||
private @Nullable Predicate tagPredicate; | ||
|
||
@Override | ||
public InnerTableScan withFilter(Predicate predicate) { | ||
if (predicate == null) { | ||
return this; | ||
} | ||
// TODO | ||
Map<String, LeafPredicate> leafPredicates = | ||
predicate.visit(LeafPredicateExtractor.INSTANCE); | ||
tagName = leafPredicates.get("tag_name"); | ||
tagPredicate = predicate; | ||
return this; | ||
} | ||
|
||
@Override | ||
public Plan innerPlan() { | ||
return () -> Collections.singletonList(new TagsSplit(location, tagName)); | ||
return () -> Collections.singletonList(new TagsSplit(location, tagPredicate)); | ||
} | ||
} | ||
|
||
|
@@ -160,11 +159,11 @@ private static class TagsSplit extends SingletonSplit { | |
|
||
private final Path location; | ||
|
||
private final @Nullable LeafPredicate tagName; | ||
private final @Nullable Predicate tagPredicate; | ||
|
||
private TagsSplit(Path location, @Nullable LeafPredicate tagName) { | ||
private TagsSplit(Path location, @Nullable Predicate tagPredicate) { | ||
this.location = location; | ||
this.tagName = tagName; | ||
this.tagPredicate = tagPredicate; | ||
} | ||
|
||
@Override | ||
|
@@ -176,7 +175,8 @@ public boolean equals(Object o) { | |
return false; | ||
} | ||
TagsSplit that = (TagsSplit) o; | ||
return Objects.equals(location, that.location) && Objects.equals(tagName, that.tagName); | ||
return Objects.equals(location, that.location) | ||
&& Objects.equals(tagPredicate, that.tagPredicate); | ||
} | ||
|
||
@Override | ||
|
@@ -217,18 +217,49 @@ public RecordReader<InternalRow> createReader(Split split) { | |
throw new IllegalArgumentException("Unsupported split: " + split.getClass()); | ||
} | ||
Path location = ((TagsSplit) split).location; | ||
LeafPredicate predicate = ((TagsSplit) split).tagName; | ||
Predicate predicate = ((TagsSplit) split).tagPredicate; | ||
TagManager tagManager = new TagManager(fileIO, location, branch); | ||
String leafName = "tag_name"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. first : private static final TAG_NAME = "tag_name"; There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. first : private static final TAG_NAME = "tag_name"; |
||
|
||
Map<String, Tag> nameToSnapshot = new TreeMap<>(); | ||
Map<String, Tag> predicateMap = new TreeMap<>(); | ||
if (predicate != null) { | ||
if (predicate instanceof LeafPredicate | ||
&& ((LeafPredicate) predicate).function() instanceof Equal | ||
&& ((LeafPredicate) predicate).literals().get(0) instanceof BinaryString | ||
&& predicate.visit(LeafPredicateExtractor.INSTANCE).get(leafName) != null) { | ||
String equalValue = ((LeafPredicate) predicate).literals().get(0).toString(); | ||
if (tagManager.tagExists(equalValue)) { | ||
predicateMap.put(equalValue, tagManager.tag(equalValue)); | ||
} | ||
} | ||
|
||
if (predicate != null | ||
&& predicate.function() instanceof Equal | ||
&& predicate.literals().get(0) instanceof BinaryString) { | ||
String equalValue = predicate.literals().get(0).toString(); | ||
if (tagManager.tagExists(equalValue)) { | ||
nameToSnapshot.put(equalValue, tagManager.tag(equalValue)); | ||
if (predicate instanceof CompoundPredicate) { | ||
CompoundPredicate compoundPredicate = (CompoundPredicate) predicate; | ||
// optimize for IN filter | ||
if ((compoundPredicate.function()) instanceof Or) { | ||
List<Predicate> children = compoundPredicate.children(); | ||
for (Predicate leaf : children) { | ||
if (leaf instanceof LeafPredicate | ||
&& (((LeafPredicate) leaf).function() instanceof Equal) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why here not judge: |
||
&& predicate | ||
.visit(LeafPredicateExtractor.INSTANCE) | ||
.get(leafName) | ||
!= null) { | ||
String equalValue = | ||
((LeafPredicate) leaf).literals().get(0).toString(); | ||
predicateMap.put(equalValue, tagManager.tag(equalValue)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if (tagManager.tagExists(equalValue)) { There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if (tagManager.tagExists(equalValue)) { |
||
} else { | ||
predicateMap.clear(); | ||
break; | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
if (!predicateMap.isEmpty()) { | ||
nameToSnapshot.putAll(predicateMap); | ||
} else { | ||
for (Pair<Tag, String> tag : tagManager.tagObjects()) { | ||
nameToSnapshot.put(tag.getValue(), tag.getKey()); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -417,6 +417,13 @@ public List<Path> snapshotPaths(Predicate<Long> predicate) throws IOException { | |
.collect(Collectors.toList()); | ||
} | ||
|
||
public Iterator<Snapshot> snapshotsWithIds(List<Long> snapshotIds) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. snapshotsWithId |
||
return snapshotIds.stream() | ||
.map(this::snapshot) | ||
.sorted(Comparator.comparingLong(Snapshot::id)) | ||
.iterator(); | ||
} | ||
|
||
public Iterator<Snapshot> snapshotsWithinRange( | ||
Optional<Long> optionalMaxSnapshotId, Optional<Long> optionalMinSnapshotId) { | ||
Long lowerBoundSnapshotId = earliestSnapshotId(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
schemasWithId