Skip to content

Commit

Permalink
[core] Optimize snapshots table when specific snapshot id (apache#3339)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored May 16, 2024
1 parent 3b006fe commit 382582e
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.predicate;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/** Extract leaf predicate for field names. */
public class LeafPredicateExtractor implements PredicateVisitor<Map<String, LeafPredicate>> {

public static final LeafPredicateExtractor INSTANCE = new LeafPredicateExtractor();

@Override
public Map<String, LeafPredicate> visit(LeafPredicate predicate) {
return Collections.singletonMap(predicate.fieldName(), predicate);
}

@Override
public Map<String, LeafPredicate> visit(CompoundPredicate predicate) {
if (predicate.function() instanceof And) {
Map<String, LeafPredicate> leafPredicates = new HashMap<>();
predicate.children().stream().map(p -> p.visit(this)).forEach(leafPredicates::putAll);
return leafPredicates;
}
return Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.predicate.Equal;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.LeafPredicateExtractor;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
Expand Down Expand Up @@ -162,25 +162,15 @@ private FilesScan(FileStoreTable storeTable) {

@Override
public InnerTableScan withFilter(Predicate pushdown) {
List<Predicate> predicates = PredicateBuilder.splitAnd(pushdown);
for (Predicate predicate : predicates) {
if (predicate instanceof LeafPredicate) {
LeafPredicate leaf = (LeafPredicate) predicate;
switch (leaf.fieldName()) {
case "partition":
this.partitionPredicate = leaf;
break;
case "bucket":
this.bucketPredicate = leaf;
break;
case "level":
this.levelPredicate = leaf;
break;
default:
break;
}
}
if (pushdown == null) {
return this;
}

Map<String, LeafPredicate> leafPredicates =
pushdown.visit(LeafPredicateExtractor.INSTANCE);
this.partitionPredicate = leafPredicates.get("partition");
this.bucketPredicate = leafPredicates.get("bucket");
this.levelPredicate = leafPredicates.get("level");
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.predicate.Equal;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.LeafPredicateExtractor;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
Expand All @@ -47,6 +50,8 @@

import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;

import javax.annotation.Nullable;

import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -140,37 +145,30 @@ private class SnapshotsScan extends ReadOnceTableScan {

@Override
public InnerTableScan withFilter(Predicate predicate) {
// TODO
// do filter in read
return this;
}

@Override
public Plan innerPlan() {
long rowCount;
try {
rowCount = new SnapshotManager(fileIO, location).snapshotCount();
} catch (IOException e) {
throw new RuntimeException(e);
}
return () -> Collections.singletonList(new SnapshotsSplit(rowCount, location));
return () -> Collections.singletonList(new SnapshotsSplit(location));
}
}

private static class SnapshotsSplit implements Split {

private static final long serialVersionUID = 1L;

private final long rowCount;
private final Path location;

private SnapshotsSplit(long rowCount, Path location) {
private SnapshotsSplit(Path location) {
this.location = location;
this.rowCount = rowCount;
}

@Override
public long rowCount() {
return rowCount;
// dummy 1, just 1 parallelism
return 1;
}

@Override
Expand All @@ -195,14 +193,23 @@ private static class SnapshotsRead implements InnerTableRead {

private final FileIO fileIO;
private int[][] projection;
@Nullable private Long specificSnapshot;

public SnapshotsRead(FileIO fileIO) {
this.fileIO = fileIO;
}

@Override
public InnerTableRead withFilter(Predicate predicate) {
// TODO
if (predicate == null) {
return this;
}

LeafPredicate snapshotPred =
predicate.visit(LeafPredicateExtractor.INSTANCE).get("snapshot_id");
if (snapshotPred != null && snapshotPred.function() instanceof Equal) {
specificSnapshot = (Long) snapshotPred.literals().get(0);
}
return this;
}

Expand All @@ -222,8 +229,13 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException {
if (!(split instanceof SnapshotsSplit)) {
throw new IllegalArgumentException("Unsupported split: " + split.getClass());
}
Path location = ((SnapshotsSplit) split).location;
Iterator<Snapshot> snapshots = new SnapshotManager(fileIO, location).snapshots();
SnapshotManager snapshotManager =
new SnapshotManager(fileIO, ((SnapshotsSplit) split).location);
Iterator<Snapshot> snapshots =
specificSnapshot != null
? Collections.singletonList(snapshotManager.snapshot(specificSnapshot))
.iterator()
: snapshotManager.snapshots();
Iterator<InternalRow> rows = Iterators.transform(snapshots, this::toRow);
if (projection != null) {
rows =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,17 @@ public void testSnapshotsTable() throws Exception {
sql("INSERT INTO T VALUES (3, 4)");

List<Row> result = sql("SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots");
assertThat(result).containsExactly(Row.of(1L, 0L, "APPEND"), Row.of(2L, 0L, "APPEND"));

// check correctness and sequence snapshots.
result =
sql(
"SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots WHERE schema_id = 0");
assertThat(result).containsExactly(Row.of(1L, 0L, "APPEND"), Row.of(2L, 0L, "APPEND"));

result =
sql(
"SELECT snapshot_id, schema_id, commit_kind FROM T$snapshots WHERE snapshot_id = 2");
assertThat(result).containsExactly(Row.of(2L, 0L, "APPEND"));
}

@Test
Expand Down

0 comments on commit 382582e

Please sign in to comment.