Skip to content

Commit

Permalink
query change
Browse files Browse the repository at this point in the history
  • Loading branch information
xuyu committed Sep 27, 2024
1 parent 9100a8f commit cab9d1c
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,45 +168,22 @@ public Identifier identifier() {
public Optional<Statistics> statistics() {
Snapshot latestSnapshot;
Long snapshotId = coreOptions().scanSnapshotId();
if (snapshotId != null) {
return readStatisticsBySnapshotId(snapshotId);
if (snapshotId == null) {
snapshotId = snapshotManager().latestSnapshotId();
}

if (snapshotManager().snapshotExists(snapshotId)) {
latestSnapshot = snapshotManager().snapshot(snapshotId);
} else {
latestSnapshot = snapshotManager().latestSnapshot();
}

if (latestSnapshot != null) {
return store().newStatsFileHandler().readStats(latestSnapshot);
}
return Optional.empty();
}

public Optional<Statistics> readStatisticsBySnapshotId(Long snapshotId) {
Long latestSnapshotId = snapshotManager().latestSnapshotId();
if (latestSnapshotId == null) {
return Optional.empty();
}

if (!snapshotManager().snapshotExists(snapshotId)) {
return store().newStatsFileHandler().readStats(snapshotManager().latestSnapshot());
}

Long startSnapshotId = snapshotId;

while (startSnapshotId <= latestSnapshotId) {
Snapshot startSnapshot = snapshotManager().snapshot(startSnapshotId);
if (startSnapshot.commitKind() == Snapshot.CommitKind.ANALYZE) {
Optional<Statistics> statistics =
store().newStatsFileHandler().readStats(startSnapshot);
if (statistics.isPresent() && statistics.get().snapshotId() == snapshotId) {
return statistics;
}
break;
}
startSnapshotId++;
}

return Optional.empty();
}

@Override
public Optional<WriteSelector> newWriteSelector() {
switch (bucketMode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.LeafPredicateExtractor;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.EmptyRecordReader;
import org.apache.paimon.reader.RecordReader;
Expand All @@ -49,19 +47,15 @@

import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;

/** A {@link Table} for showing statistic of table. */
Expand Down Expand Up @@ -129,26 +123,15 @@ public Table copy(Map<String, String> dynamicOptions) {

private class StatisticScan extends ReadOnceTableScan {

private @Nullable LeafPredicate snapshotIdPredicate;

@Override
public InnerTableScan withFilter(Predicate predicate) {
if (predicate == null) {
return this;
}

Map<String, LeafPredicate> leafPredicates =
predicate.visit(LeafPredicateExtractor.INSTANCE);
snapshotIdPredicate = leafPredicates.get("snapshot_id");

// TODO
return this;
}

@Override
public Plan innerPlan() {
return () ->
Collections.singletonList(
new StatisticTable.StatisticSplit(location, snapshotIdPredicate));
return () -> Collections.singletonList(new StatisticTable.StatisticSplit(location));
}
}

Expand All @@ -158,11 +141,8 @@ private static class StatisticSplit extends SingletonSplit {

private final Path location;

private final @Nullable LeafPredicate snapshotIdPredicate;

private StatisticSplit(Path location, @Nullable LeafPredicate snapshotIdPredicate) {
private StatisticSplit(Path location) {
this.location = location;
this.snapshotIdPredicate = snapshotIdPredicate;
}

@Override
Expand All @@ -174,8 +154,7 @@ public boolean equals(Object o) {
return false;
}
StatisticTable.StatisticSplit that = (StatisticTable.StatisticSplit) o;
return Objects.equals(location, that.location)
&& Objects.equals(snapshotIdPredicate, that.snapshotIdPredicate);
return Objects.equals(location, that.location);
}

@Override
Expand Down Expand Up @@ -218,24 +197,8 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException {
if (!(split instanceof StatisticTable.StatisticSplit)) {
throw new IllegalArgumentException("Unsupported split: " + split.getClass());
}
StatisticSplit statisticSplit = (StatisticSplit) split;
LeafPredicate snapshotIdPredicate = statisticSplit.snapshotIdPredicate;
Optional<Statistics> statisticsOptional;
if (snapshotIdPredicate != null) {
Long snapshotId =
(Long)
snapshotIdPredicate
.visit(LeafPredicateExtractor.INSTANCE)
.get(SNAPSHOT_ID)
.literals()
.get(0);
HashMap<String, String> snapshotIdMap = new HashMap<>();
snapshotIdMap.put(SCAN_SNAPSHOT_ID.key(), snapshotId.toString());
statisticsOptional = dataTable.copy(snapshotIdMap).statistics();
} else {
statisticsOptional = dataTable.statistics();
}

Optional<Statistics> statisticsOptional = dataTable.statistics();
if (statisticsOptional.isPresent()) {
Statistics statistics = statisticsOptional.get();
Iterator<Statistics> statisticsIterator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase {
Row(2, 0, 2, "{ }"))
}

test("Paimon analyze: test statistic system table with predicate") {
test("Paimon analyze: test statistic system table with snapshot") {
spark.sql(s"""
|CREATE TABLE T (id STRING, name STRING, i INT, l LONG)
|USING PAIMON
Expand All @@ -91,28 +91,30 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase {

spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS")

checkAnswer(
spark.sql(
"SELECT snapshot_id, schema_id, mergedRecordCount, colstat from `T$statistics` where snapshot_id=3"),
Nil)
withSQLConf("spark.paimon.scan.snapshot-id" -> "3") {
checkAnswer(
sql("SELECT snapshot_id, schema_id, mergedRecordCount, colstat FROM `T$statistics`"),
Row(2, 0, 2, "{ }"))
}

checkAnswer(
spark.sql(
"SELECT snapshot_id, schema_id, mergedRecordCount, colstat from `T$statistics` where snapshot_id=2"),
Row(2, 0, 2, "{ }"))
withSQLConf("spark.paimon.scan.snapshot-id" -> "4") {
checkAnswer(
sql("SELECT snapshot_id, schema_id, mergedRecordCount, colstat FROM `T$statistics`"),
Row(2, 0, 2, "{ }"))
}

checkAnswer(
spark.sql(
"SELECT snapshot_id, schema_id, mergedRecordCount, colstat from `T$statistics` where snapshot_id=5"),
Row(5, 0, 4, "{ }"))
withSQLConf("spark.paimon.scan.snapshot-id" -> "6") {
checkAnswer(
sql("SELECT snapshot_id, schema_id, mergedRecordCount, colstat FROM `T$statistics`"),
Row(5, 0, 4, "{ }"))
}

// this case indicator that statistic can get the latest analyzed snapshot by snapshot_id
spark.sql(s"INSERT INTO T VALUES ('5', 'bbb', 3, 2)")
withSQLConf("spark.paimon.scan.snapshot-id" -> "100") {
checkAnswer(
sql("SELECT snapshot_id, schema_id, mergedRecordCount, colstat FROM `T$statistics`"),
Row(5, 0, 4, "{ }"))
}

checkAnswer(
spark.sql(
"SELECT snapshot_id, schema_id, mergedRecordCount, colstat from `T$statistics` where snapshot_id=5"),
Row(5, 0, 4, "{ }"))
}

test("Paimon analyze: analyze table without snapshot") {
Expand Down

0 comments on commit cab9d1c

Please sign in to comment.