Skip to content

Commit

Permalink
[core] added greater and less than query optimization to snapshots ta…
Browse files Browse the repository at this point in the history
…ble queries (#3396)
  • Loading branch information
ctrlaltdilj authored Jun 4, 2024
1 parent 0e226c6 commit fed70b0
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.predicate.Equal;
import org.apache.paimon.predicate.GreaterOrEqual;
import org.apache.paimon.predicate.GreaterThan;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.LeafPredicateExtractor;
import org.apache.paimon.predicate.LessOrEqual;
import org.apache.paimon.predicate.LessThan;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
Expand All @@ -51,8 +55,6 @@

import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;

import javax.annotation.Nullable;

import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
Expand All @@ -63,6 +65,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;

Expand Down Expand Up @@ -188,7 +191,8 @@ private static class SnapshotsRead implements InnerTableRead {

private final FileIO fileIO;
private int[][] projection;
@Nullable private Long specificSnapshot;
private Optional<Long> optionalFilterSnapshotIdMax = Optional.empty();
private Optional<Long> optionalFilterSnapshotIdMin = Optional.empty();

public SnapshotsRead(FileIO fileIO) {
this.fileIO = fileIO;
Expand All @@ -202,9 +206,35 @@ public InnerTableRead withFilter(Predicate predicate) {

LeafPredicate snapshotPred =
predicate.visit(LeafPredicateExtractor.INSTANCE).get("snapshot_id");
if (snapshotPred != null && snapshotPred.function() instanceof Equal) {
specificSnapshot = (Long) snapshotPred.literals().get(0);
if (snapshotPred != null) {
if (snapshotPred.function() instanceof Equal) {
optionalFilterSnapshotIdMin =
Optional.of((Long) snapshotPred.literals().get(0));
optionalFilterSnapshotIdMax =
Optional.of((Long) snapshotPred.literals().get(0));
}

if (snapshotPred.function() instanceof GreaterThan) {
optionalFilterSnapshotIdMin =
Optional.of((Long) snapshotPred.literals().get(0) + 1);
}

if (snapshotPred.function() instanceof GreaterOrEqual) {
optionalFilterSnapshotIdMin =
Optional.of((Long) snapshotPred.literals().get(0));
}

if (snapshotPred.function() instanceof LessThan) {
optionalFilterSnapshotIdMax =
Optional.of((Long) snapshotPred.literals().get(0) - 1);
}

if (snapshotPred.function() instanceof LessOrEqual) {
optionalFilterSnapshotIdMax =
Optional.of((Long) snapshotPred.literals().get(0));
}
}

return this;
}

Expand All @@ -227,10 +257,9 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException {
SnapshotManager snapshotManager =
new SnapshotManager(fileIO, ((SnapshotsSplit) split).location);
Iterator<Snapshot> snapshots =
specificSnapshot != null
? Collections.singletonList(snapshotManager.snapshot(specificSnapshot))
.iterator()
: snapshotManager.snapshots();
snapshotManager.snapshotsWithinRange(
optionalFilterSnapshotIdMax, optionalFilterSnapshotIdMin);

Iterator<InternalRow> rows = Iterators.transform(snapshots, this::toRow);
if (projection != null) {
rows =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.BranchManager.getBranchPath;
Expand Down Expand Up @@ -355,6 +356,32 @@ public Iterator<Snapshot> snapshots() throws IOException {
.iterator();
}

public Iterator<Snapshot> snapshotsWithinRange(
Optional<Long> optionalMaxSnapshotId, Optional<Long> optionalMinSnapshotId)
throws IOException {
Long lowerBoundSnapshotId = earliestSnapshotId();
Long upperBoundSnapshotId = latestSnapshotId();

// null check on lowerBoundSnapshotId & upperBoundSnapshotId
if (lowerBoundSnapshotId == null || upperBoundSnapshotId == null) {
return Collections.emptyIterator();
}

if (optionalMaxSnapshotId.isPresent()) {
upperBoundSnapshotId = optionalMaxSnapshotId.get();
}

if (optionalMinSnapshotId.isPresent()) {
lowerBoundSnapshotId = optionalMinSnapshotId.get();
}

// +1 here to include the upperBoundSnapshotId
return LongStream.range(lowerBoundSnapshotId, upperBoundSnapshotId + 1)
.mapToObj(this::snapshot)
.sorted(Comparator.comparingLong(Snapshot::id))
.iterator();
}

public Iterator<Changelog> changelogs() throws IOException {
return listVersionedFiles(fileIO, changelogDirectory(), CHANGELOG_PREFIX)
.map(snapshotId -> changelog(snapshotId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,26 @@ public void testSnapshotsTable() throws Exception {
sql(
"SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots WHERE snapshot_id = 2");
assertThat(result).containsExactly(Row.of(2L, 0L, "APPEND"));

result =
sql(
"SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots WHERE snapshot_id > 1");
assertThat(result).containsExactly(Row.of(2L, 0L, "APPEND"));

result =
sql(
"SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots WHERE snapshot_id < 2");
assertThat(result).containsExactly(Row.of(1L, 0L, "APPEND"));

result =
sql(
"SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots WHERE snapshot_id >= 1");
assertThat(result).contains(Row.of(1L, 0L, "APPEND"), Row.of(2L, 0L, "APPEND"));

result =
sql(
"SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots WHERE snapshot_id <= 2");
assertThat(result).contains(Row.of(1L, 0L, "APPEND"), Row.of(2L, 0L, "APPEND"));
}

@Test
Expand Down

0 comments on commit fed70b0

Please sign in to comment.