Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Optizime IN filter pushdown to snapshot/tag/schema system tables #4436

Merged
merged 10 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ public List<TableSchema> listAll() {
return listAllIds().stream().map(this::schema).collect(Collectors.toList());
}

public List<TableSchema> schemasWithId(List<Long> schemaIds) {
return schemaIds.stream().map(this::schema).collect(Collectors.toList());
}

public List<TableSchema> listWithRange(
Optional<Long> optionalMaxSchemaId, Optional<Long> optionalMinSchemaId) {
Long lowerBoundSchemaId = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.paimon.predicate.LeafPredicateExtractor;
import org.apache.paimon.predicate.LessOrEqual;
import org.apache.paimon.predicate.LessThan;
import org.apache.paimon.predicate.Or;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
Expand Down Expand Up @@ -64,6 +65,7 @@
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -203,6 +205,7 @@ private class SchemasRead implements InnerTableRead {

private Optional<Long> optionalFilterSchemaIdMax = Optional.empty();
private Optional<Long> optionalFilterSchemaIdMin = Optional.empty();
private final List<Long> schemaIds = new ArrayList<>();

public SchemasRead(FileIO fileIO) {
this.fileIO = fileIO;
Expand All @@ -223,6 +226,22 @@ public InnerTableRead withFilter(Predicate predicate) {
handleLeafPredicate(leaf, leafName);
}
}

// optimize for IN filter
if ((compoundPredicate.function()) instanceof Or) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should extract a class to reduce duplicate codes, this can be done in a new PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK,would refactor it as suggest,Thanks @JingsongLi

List<Predicate> children = compoundPredicate.children();
for (Predicate leaf : children) {
if (leaf instanceof LeafPredicate
&& (((LeafPredicate) leaf).function() instanceof Equal)
&& leaf.visit(LeafPredicateExtractor.INSTANCE).get(leafName)
!= null) {
schemaIds.add((Long) ((LeafPredicate) leaf).literals().get(0));
} else {
schemaIds.clear();
break;
}
}
}
} else {
handleLeafPredicate(predicate, leafName);
}
Expand Down Expand Up @@ -279,8 +298,14 @@ public RecordReader<InternalRow> createReader(Split split) {
Path location = schemasSplit.location;
SchemaManager manager = new SchemaManager(fileIO, location, branch);

Collection<TableSchema> tableSchemas =
manager.listWithRange(optionalFilterSchemaIdMax, optionalFilterSchemaIdMin);
Collection<TableSchema> tableSchemas;
if (!schemaIds.isEmpty()) {
tableSchemas = manager.schemasWithId(schemaIds);
} else {
tableSchemas =
manager.listWithRange(optionalFilterSchemaIdMax, optionalFilterSchemaIdMin);
}

Iterator<InternalRow> rows = Iterators.transform(tableSchemas.iterator(), this::toRow);
if (readType != null) {
rows =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.paimon.predicate.LeafPredicateExtractor;
import org.apache.paimon.predicate.LessOrEqual;
import org.apache.paimon.predicate.LessThan;
import org.apache.paimon.predicate.Or;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
Expand All @@ -62,6 +63,7 @@
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -206,6 +208,7 @@ private class SnapshotsRead implements InnerTableRead {
private RowType readType;
private Optional<Long> optionalFilterSnapshotIdMax = Optional.empty();
private Optional<Long> optionalFilterSnapshotIdMin = Optional.empty();
private final List<Long> snapshotIds = new ArrayList<>();

public SnapshotsRead(FileIO fileIO) {
this.fileIO = fileIO;
Expand All @@ -220,12 +223,27 @@ public InnerTableRead withFilter(Predicate predicate) {
String leafName = "snapshot_id";
if (predicate instanceof CompoundPredicate) {
CompoundPredicate compoundPredicate = (CompoundPredicate) predicate;
List<Predicate> children = compoundPredicate.children();
if ((compoundPredicate.function()) instanceof And) {
List<Predicate> children = compoundPredicate.children();
for (Predicate leaf : children) {
handleLeafPredicate(leaf, leafName);
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

List children = compoundPredicate.children();
if ((compoundPredicate.function()) instanceof And) {
xxx;
} else if (if ((compoundPredicate.function()) instanceof And) {) {
xxx;
} else {
throw xxx;
}

// optimize for IN filter
if ((compoundPredicate.function()) instanceof Or) {
for (Predicate leaf : children) {
if (leaf instanceof LeafPredicate
&& (((LeafPredicate) leaf).function() instanceof Equal)
&& leaf.visit(LeafPredicateExtractor.INSTANCE).get(leafName)
!= null) {
snapshotIds.add((Long) ((LeafPredicate) leaf).literals().get(0));
} else {
snapshotIds.clear();
break;
}
}
}
} else {
handleLeafPredicate(predicate, leafName);
}
Expand Down Expand Up @@ -284,9 +302,15 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException {
}
SnapshotManager snapshotManager =
new SnapshotManager(fileIO, ((SnapshotsSplit) split).location, branch);
Iterator<Snapshot> snapshots =
snapshotManager.snapshotsWithinRange(
optionalFilterSnapshotIdMax, optionalFilterSnapshotIdMin);

Iterator<Snapshot> snapshots;
if (!snapshotIds.isEmpty()) {
snapshots = snapshotManager.snapshotsWithId(snapshotIds);
} else {
snapshots =
snapshotManager.snapshotsWithinRange(
optionalFilterSnapshotIdMax, optionalFilterSnapshotIdMin);
}

Iterator<InternalRow> rows = Iterators.transform(snapshots, this::toRow);
if (readType != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.predicate.CompoundPredicate;
import org.apache.paimon.predicate.Equal;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.LeafPredicateExtractor;
import org.apache.paimon.predicate.Or;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
Expand Down Expand Up @@ -74,10 +76,12 @@ public class TagsTable implements ReadonlyTable {

public static final String TAGS = "tags";

private static final String TAG_NAME = "tag_name";

public static final RowType TABLE_TYPE =
new RowType(
Arrays.asList(
new DataField(0, "tag_name", SerializationUtils.newStringType(false)),
new DataField(0, TAG_NAME, SerializationUtils.newStringType(false)),
new DataField(1, "snapshot_id", new BigIntType(false)),
new DataField(2, "schema_id", new BigIntType(false)),
new DataField(3, "commit_time", new TimestampType(false, 3)),
Expand Down Expand Up @@ -115,7 +119,7 @@ public RowType rowType() {

@Override
public List<String> primaryKeys() {
return Collections.singletonList("tag_name");
return Collections.singletonList(TAG_NAME);
}

@Override
Expand All @@ -134,23 +138,20 @@ public Table copy(Map<String, String> dynamicOptions) {
}

private class TagsScan extends ReadOnceTableScan {
private @Nullable LeafPredicate tagName;
private @Nullable Predicate tagPredicate;

@Override
public InnerTableScan withFilter(Predicate predicate) {
if (predicate == null) {
return this;
}
// TODO
Map<String, LeafPredicate> leafPredicates =
predicate.visit(LeafPredicateExtractor.INSTANCE);
tagName = leafPredicates.get("tag_name");
tagPredicate = predicate;
return this;
}

@Override
public Plan innerPlan() {
return () -> Collections.singletonList(new TagsSplit(location, tagName));
return () -> Collections.singletonList(new TagsSplit(location, tagPredicate));
}
}

Expand All @@ -160,11 +161,11 @@ private static class TagsSplit extends SingletonSplit {

private final Path location;

private final @Nullable LeafPredicate tagName;
private final @Nullable Predicate tagPredicate;

private TagsSplit(Path location, @Nullable LeafPredicate tagName) {
private TagsSplit(Path location, @Nullable Predicate tagPredicate) {
this.location = location;
this.tagName = tagName;
this.tagPredicate = tagPredicate;
}

@Override
Expand All @@ -176,7 +177,8 @@ public boolean equals(Object o) {
return false;
}
TagsSplit that = (TagsSplit) o;
return Objects.equals(location, that.location) && Objects.equals(tagName, that.tagName);
return Objects.equals(location, that.location)
&& Objects.equals(tagPredicate, that.tagPredicate);
}

@Override
Expand Down Expand Up @@ -217,18 +219,52 @@ public RecordReader<InternalRow> createReader(Split split) {
throw new IllegalArgumentException("Unsupported split: " + split.getClass());
}
Path location = ((TagsSplit) split).location;
LeafPredicate predicate = ((TagsSplit) split).tagName;
Predicate predicate = ((TagsSplit) split).tagPredicate;
TagManager tagManager = new TagManager(fileIO, location, branch);

Map<String, Tag> nameToSnapshot = new TreeMap<>();
Map<String, Tag> predicateMap = new TreeMap<>();
if (predicate != null) {
if (predicate instanceof LeafPredicate
&& ((LeafPredicate) predicate).function() instanceof Equal
&& ((LeafPredicate) predicate).literals().get(0) instanceof BinaryString
&& predicate.visit(LeafPredicateExtractor.INSTANCE).get(TAG_NAME) != null) {
String equalValue = ((LeafPredicate) predicate).literals().get(0).toString();
if (tagManager.tagExists(equalValue)) {
predicateMap.put(equalValue, tagManager.tag(equalValue));
}
}

if (predicate != null
&& predicate.function() instanceof Equal
&& predicate.literals().get(0) instanceof BinaryString) {
String equalValue = predicate.literals().get(0).toString();
if (tagManager.tagExists(equalValue)) {
nameToSnapshot.put(equalValue, tagManager.tag(equalValue));
if (predicate instanceof CompoundPredicate) {
CompoundPredicate compoundPredicate = (CompoundPredicate) predicate;
// optimize for IN filter
if ((compoundPredicate.function()) instanceof Or) {
List<Predicate> children = compoundPredicate.children();
for (Predicate leaf : children) {
if (leaf instanceof LeafPredicate
&& (((LeafPredicate) leaf).function() instanceof Equal
&& ((LeafPredicate) leaf).literals().get(0)
instanceof BinaryString)
&& predicate
.visit(LeafPredicateExtractor.INSTANCE)
.get(TAG_NAME)
!= null) {
String equalValue =
((LeafPredicate) leaf).literals().get(0).toString();
if (tagManager.tagExists(equalValue)) {
predicateMap.put(equalValue, tagManager.tag(equalValue));
}
} else {
predicateMap.clear();
break;
}
}
}
}
}

if (!predicateMap.isEmpty()) {
nameToSnapshot.putAll(predicateMap);
} else {
for (Pair<Tag, String> tag : tagManager.tagObjects()) {
nameToSnapshot.put(tag.getValue(), tag.getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,13 @@ public List<Path> snapshotPaths(Predicate<Long> predicate) throws IOException {
.collect(Collectors.toList());
}

public Iterator<Snapshot> snapshotsWithId(List<Long> snapshotIds) {
return snapshotIds.stream()
.map(this::snapshot)
.sorted(Comparator.comparingLong(Snapshot::id))
.iterator();
}

public Iterator<Snapshot> snapshotsWithinRange(
Optional<Long> optionalMaxSnapshotId, Optional<Long> optionalMinSnapshotId) {
Long lowerBoundSnapshotId = earliestSnapshotId();
Expand Down
Loading
Loading