Skip to content

Commit

Permalink
[core] Optizime IN filter pushdown to snapshot/tag/schema system tabl…
Browse files Browse the repository at this point in the history
…es (apache#4436)
  • Loading branch information
xuzifu666 authored Nov 6, 2024
1 parent b4601f3 commit 321c8ef
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ public List<TableSchema> listAll() {
return listAllIds().stream().map(this::schema).collect(Collectors.toList());
}

public List<TableSchema> schemasWithId(List<Long> schemaIds) {
return schemaIds.stream().map(this::schema).collect(Collectors.toList());
}

public List<TableSchema> listWithRange(
Optional<Long> optionalMaxSchemaId, Optional<Long> optionalMinSchemaId) {
Long lowerBoundSchemaId = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.paimon.predicate.LeafPredicateExtractor;
import org.apache.paimon.predicate.LessOrEqual;
import org.apache.paimon.predicate.LessThan;
import org.apache.paimon.predicate.Or;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
Expand Down Expand Up @@ -64,6 +65,7 @@
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -203,6 +205,7 @@ private class SchemasRead implements InnerTableRead {

private Optional<Long> optionalFilterSchemaIdMax = Optional.empty();
private Optional<Long> optionalFilterSchemaIdMin = Optional.empty();
private final List<Long> schemaIds = new ArrayList<>();

public SchemasRead(FileIO fileIO) {
this.fileIO = fileIO;
Expand All @@ -223,6 +226,22 @@ public InnerTableRead withFilter(Predicate predicate) {
handleLeafPredicate(leaf, leafName);
}
}

// optimize for IN filter
if ((compoundPredicate.function()) instanceof Or) {
List<Predicate> children = compoundPredicate.children();
for (Predicate leaf : children) {
if (leaf instanceof LeafPredicate
&& (((LeafPredicate) leaf).function() instanceof Equal)
&& leaf.visit(LeafPredicateExtractor.INSTANCE).get(leafName)
!= null) {
schemaIds.add((Long) ((LeafPredicate) leaf).literals().get(0));
} else {
schemaIds.clear();
break;
}
}
}
} else {
handleLeafPredicate(predicate, leafName);
}
Expand Down Expand Up @@ -279,8 +298,14 @@ public RecordReader<InternalRow> createReader(Split split) {
Path location = schemasSplit.location;
SchemaManager manager = new SchemaManager(fileIO, location, branch);

Collection<TableSchema> tableSchemas =
manager.listWithRange(optionalFilterSchemaIdMax, optionalFilterSchemaIdMin);
Collection<TableSchema> tableSchemas;
if (!schemaIds.isEmpty()) {
tableSchemas = manager.schemasWithId(schemaIds);
} else {
tableSchemas =
manager.listWithRange(optionalFilterSchemaIdMax, optionalFilterSchemaIdMin);
}

Iterator<InternalRow> rows = Iterators.transform(tableSchemas.iterator(), this::toRow);
if (readType != null) {
rows =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.paimon.predicate.LeafPredicateExtractor;
import org.apache.paimon.predicate.LessOrEqual;
import org.apache.paimon.predicate.LessThan;
import org.apache.paimon.predicate.Or;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
Expand All @@ -62,6 +63,7 @@
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -206,6 +208,7 @@ private class SnapshotsRead implements InnerTableRead {
private RowType readType;
private Optional<Long> optionalFilterSnapshotIdMax = Optional.empty();
private Optional<Long> optionalFilterSnapshotIdMin = Optional.empty();
private final List<Long> snapshotIds = new ArrayList<>();

public SnapshotsRead(FileIO fileIO) {
this.fileIO = fileIO;
Expand All @@ -220,12 +223,27 @@ public InnerTableRead withFilter(Predicate predicate) {
String leafName = "snapshot_id";
if (predicate instanceof CompoundPredicate) {
CompoundPredicate compoundPredicate = (CompoundPredicate) predicate;
List<Predicate> children = compoundPredicate.children();
if ((compoundPredicate.function()) instanceof And) {
List<Predicate> children = compoundPredicate.children();
for (Predicate leaf : children) {
handleLeafPredicate(leaf, leafName);
}
}

// optimize for IN filter
if ((compoundPredicate.function()) instanceof Or) {
for (Predicate leaf : children) {
if (leaf instanceof LeafPredicate
&& (((LeafPredicate) leaf).function() instanceof Equal)
&& leaf.visit(LeafPredicateExtractor.INSTANCE).get(leafName)
!= null) {
snapshotIds.add((Long) ((LeafPredicate) leaf).literals().get(0));
} else {
snapshotIds.clear();
break;
}
}
}
} else {
handleLeafPredicate(predicate, leafName);
}
Expand Down Expand Up @@ -284,9 +302,15 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException {
}
SnapshotManager snapshotManager =
new SnapshotManager(fileIO, ((SnapshotsSplit) split).location, branch);
Iterator<Snapshot> snapshots =
snapshotManager.snapshotsWithinRange(
optionalFilterSnapshotIdMax, optionalFilterSnapshotIdMin);

Iterator<Snapshot> snapshots;
if (!snapshotIds.isEmpty()) {
snapshots = snapshotManager.snapshotsWithId(snapshotIds);
} else {
snapshots =
snapshotManager.snapshotsWithinRange(
optionalFilterSnapshotIdMax, optionalFilterSnapshotIdMin);
}

Iterator<InternalRow> rows = Iterators.transform(snapshots, this::toRow);
if (readType != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.predicate.CompoundPredicate;
import org.apache.paimon.predicate.Equal;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.LeafPredicateExtractor;
import org.apache.paimon.predicate.Or;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
Expand Down Expand Up @@ -74,10 +76,12 @@ public class TagsTable implements ReadonlyTable {

public static final String TAGS = "tags";

private static final String TAG_NAME = "tag_name";

public static final RowType TABLE_TYPE =
new RowType(
Arrays.asList(
new DataField(0, "tag_name", SerializationUtils.newStringType(false)),
new DataField(0, TAG_NAME, SerializationUtils.newStringType(false)),
new DataField(1, "snapshot_id", new BigIntType(false)),
new DataField(2, "schema_id", new BigIntType(false)),
new DataField(3, "commit_time", new TimestampType(false, 3)),
Expand Down Expand Up @@ -115,7 +119,7 @@ public RowType rowType() {

@Override
public List<String> primaryKeys() {
return Collections.singletonList("tag_name");
return Collections.singletonList(TAG_NAME);
}

@Override
Expand All @@ -134,23 +138,20 @@ public Table copy(Map<String, String> dynamicOptions) {
}

private class TagsScan extends ReadOnceTableScan {
private @Nullable LeafPredicate tagName;
private @Nullable Predicate tagPredicate;

@Override
public InnerTableScan withFilter(Predicate predicate) {
if (predicate == null) {
return this;
}
// TODO
Map<String, LeafPredicate> leafPredicates =
predicate.visit(LeafPredicateExtractor.INSTANCE);
tagName = leafPredicates.get("tag_name");
tagPredicate = predicate;
return this;
}

@Override
public Plan innerPlan() {
return () -> Collections.singletonList(new TagsSplit(location, tagName));
return () -> Collections.singletonList(new TagsSplit(location, tagPredicate));
}
}

Expand All @@ -160,11 +161,11 @@ private static class TagsSplit extends SingletonSplit {

private final Path location;

private final @Nullable LeafPredicate tagName;
private final @Nullable Predicate tagPredicate;

private TagsSplit(Path location, @Nullable LeafPredicate tagName) {
private TagsSplit(Path location, @Nullable Predicate tagPredicate) {
this.location = location;
this.tagName = tagName;
this.tagPredicate = tagPredicate;
}

@Override
Expand All @@ -176,7 +177,8 @@ public boolean equals(Object o) {
return false;
}
TagsSplit that = (TagsSplit) o;
return Objects.equals(location, that.location) && Objects.equals(tagName, that.tagName);
return Objects.equals(location, that.location)
&& Objects.equals(tagPredicate, that.tagPredicate);
}

@Override
Expand Down Expand Up @@ -217,18 +219,52 @@ public RecordReader<InternalRow> createReader(Split split) {
throw new IllegalArgumentException("Unsupported split: " + split.getClass());
}
Path location = ((TagsSplit) split).location;
LeafPredicate predicate = ((TagsSplit) split).tagName;
Predicate predicate = ((TagsSplit) split).tagPredicate;
TagManager tagManager = new TagManager(fileIO, location, branch);

Map<String, Tag> nameToSnapshot = new TreeMap<>();
Map<String, Tag> predicateMap = new TreeMap<>();
if (predicate != null) {
if (predicate instanceof LeafPredicate
&& ((LeafPredicate) predicate).function() instanceof Equal
&& ((LeafPredicate) predicate).literals().get(0) instanceof BinaryString
&& predicate.visit(LeafPredicateExtractor.INSTANCE).get(TAG_NAME) != null) {
String equalValue = ((LeafPredicate) predicate).literals().get(0).toString();
if (tagManager.tagExists(equalValue)) {
predicateMap.put(equalValue, tagManager.tag(equalValue));
}
}

if (predicate != null
&& predicate.function() instanceof Equal
&& predicate.literals().get(0) instanceof BinaryString) {
String equalValue = predicate.literals().get(0).toString();
if (tagManager.tagExists(equalValue)) {
nameToSnapshot.put(equalValue, tagManager.tag(equalValue));
if (predicate instanceof CompoundPredicate) {
CompoundPredicate compoundPredicate = (CompoundPredicate) predicate;
// optimize for IN filter
if ((compoundPredicate.function()) instanceof Or) {
List<Predicate> children = compoundPredicate.children();
for (Predicate leaf : children) {
if (leaf instanceof LeafPredicate
&& (((LeafPredicate) leaf).function() instanceof Equal
&& ((LeafPredicate) leaf).literals().get(0)
instanceof BinaryString)
&& predicate
.visit(LeafPredicateExtractor.INSTANCE)
.get(TAG_NAME)
!= null) {
String equalValue =
((LeafPredicate) leaf).literals().get(0).toString();
if (tagManager.tagExists(equalValue)) {
predicateMap.put(equalValue, tagManager.tag(equalValue));
}
} else {
predicateMap.clear();
break;
}
}
}
}
}

if (!predicateMap.isEmpty()) {
nameToSnapshot.putAll(predicateMap);
} else {
for (Pair<Tag, String> tag : tagManager.tagObjects()) {
nameToSnapshot.put(tag.getValue(), tag.getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,13 @@ public List<Path> snapshotPaths(Predicate<Long> predicate) throws IOException {
.collect(Collectors.toList());
}

public Iterator<Snapshot> snapshotsWithId(List<Long> snapshotIds) {
return snapshotIds.stream()
.map(this::snapshot)
.sorted(Comparator.comparingLong(Snapshot::id))
.iterator();
}

public Iterator<Snapshot> snapshotsWithinRange(
Optional<Long> optionalMaxSnapshotId, Optional<Long> optionalMinSnapshotId) {
Long lowerBoundSnapshotId = earliestSnapshotId();
Expand Down
Loading

0 comments on commit 321c8ef

Please sign in to comment.