-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Support statistic with time travel #4251
Changes from 9 commits
b6b0cc9
67bc406
7804136
cf676e2
48fe472
d9d2e10
d452c74
548858b
e62988e
bbdbb2f
516f559
77fddbe
fdb26ba
9100a8f
cab9d1c
c6e74f7
82fba79
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -166,14 +166,43 @@ public Identifier identifier() { | |
|
||
@Override | ||
public Optional<Statistics> statistics() { | ||
// todo: support time travel | ||
Snapshot latestSnapshot = snapshotManager().latestSnapshot(); | ||
if (latestSnapshot != null) { | ||
return store().newStatsFileHandler().readStats(latestSnapshot); | ||
} | ||
return Optional.empty(); | ||
} | ||
|
||
@Override | ||
public Optional<Statistics> statistics(Long snapshotId) { | ||
if (!snapshotManager().snapshotExists(snapshotId)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it should return store().newStatsFileHandler().readStats(latestSnapshot) better. Thanks~ @JingsongLi |
||
throw new SnapshotNotExistException( | ||
String.format("snapshot id: %s is not existed", snapshotId)); | ||
} | ||
|
||
Long latestSnapshotId = snapshotManager().latestSnapshotId(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. latestSnapshotId may be null, you should add a check. |
||
if (latestSnapshotId == null) { | ||
return Optional.empty(); | ||
} | ||
|
||
while (latestSnapshotId > 0) { | ||
Snapshot latestSnapshot = snapshotManager().snapshot(latestSnapshotId); | ||
// reduce unnessary loop | ||
if (latestSnapshot.id() < snapshotId) { | ||
break; | ||
} | ||
if (latestSnapshot.commitKind() == Snapshot.CommitKind.ANALYZE) { | ||
Optional<Statistics> statistics = | ||
store().newStatsFileHandler().readStats(latestSnapshot); | ||
if (statistics.isPresent() && statistics.get().snapshotId() == snapshotId) { | ||
return statistics; | ||
} | ||
} | ||
latestSnapshotId--; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't need to find the snapshot with ANALYZE commit. The snapshot will inherit its parent snapshot. So I said, just return the stats of this snapshot. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. According the logic maybe we need traversal. Because StatsFileHandler##readStats from a snapshotid,can only get the latest analyzed snapshot,but from the logic statistic file snapshot_id may less than the anaylzed snapshot about it. I added a case to the ut at end which may refer it. |
||
} | ||
return Optional.empty(); | ||
} | ||
|
||
@Override | ||
public Optional<WriteSelector> newWriteSelector() { | ||
switch (bucketMode()) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -74,6 +74,9 @@ default String fullName() { | |
@Experimental | ||
Optional<Statistics> statistics(); | ||
|
||
@Experimental | ||
Optional<Statistics> statistics(Long snapshotId); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should be a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
||
// ================= Table Operations ==================== | ||
|
||
/** Copy this table with adding dynamic options. */ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,8 @@ | |
import org.apache.paimon.disk.IOManager; | ||
import org.apache.paimon.fs.FileIO; | ||
import org.apache.paimon.fs.Path; | ||
import org.apache.paimon.predicate.LeafPredicate; | ||
import org.apache.paimon.predicate.LeafPredicateExtractor; | ||
import org.apache.paimon.predicate.Predicate; | ||
import org.apache.paimon.reader.EmptyRecordReader; | ||
import org.apache.paimon.reader.RecordReader; | ||
|
@@ -47,6 +49,8 @@ | |
|
||
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; | ||
|
||
import javax.annotation.Nullable; | ||
|
||
import java.io.IOException; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
|
@@ -65,7 +69,9 @@ public class StatisticTable implements ReadonlyTable { | |
|
||
public static final String STATISTICS = "statistics"; | ||
|
||
public static final RowType TABLE_TYPE = | ||
private static final String SNAPSHOT_ID = "snapshot_id"; | ||
|
||
private static final RowType TABLE_TYPE = | ||
new RowType( | ||
Arrays.asList( | ||
new DataField(0, "snapshot_id", new BigIntType(false)), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SNAPSHOT_ID |
||
|
@@ -101,7 +107,7 @@ public RowType rowType() { | |
|
||
@Override | ||
public List<String> primaryKeys() { | ||
return Collections.singletonList("snapshot_id"); | ||
return Collections.singletonList(SNAPSHOT_ID); | ||
} | ||
|
||
@Override | ||
|
@@ -121,15 +127,26 @@ public Table copy(Map<String, String> dynamicOptions) { | |
|
||
private class StatisticScan extends ReadOnceTableScan { | ||
|
||
private @Nullable LeafPredicate snapshotIdPredicate; | ||
|
||
@Override | ||
public InnerTableScan withFilter(Predicate predicate) { | ||
// TODO | ||
if (predicate == null) { | ||
return this; | ||
} | ||
|
||
Map<String, LeafPredicate> leafPredicates = | ||
predicate.visit(LeafPredicateExtractor.INSTANCE); | ||
snapshotIdPredicate = leafPredicates.get("snapshot_id"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SNAPSHOT_ID There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also in line 110 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TABLE_TYPE maybe not need set with constant value,keep the style with other table seems better,other position would change to SNAPSHOT_ID. |
||
|
||
return this; | ||
} | ||
|
||
@Override | ||
public Plan innerPlan() { | ||
return () -> Collections.singletonList(new StatisticTable.StatisticSplit(location)); | ||
return () -> | ||
Collections.singletonList( | ||
new StatisticTable.StatisticSplit(location, snapshotIdPredicate)); | ||
} | ||
} | ||
|
||
|
@@ -139,8 +156,11 @@ private static class StatisticSplit extends SingletonSplit { | |
|
||
private final Path location; | ||
|
||
private StatisticSplit(Path location) { | ||
private final @Nullable LeafPredicate snapshotIdPredicate; | ||
|
||
private StatisticSplit(Path location, @Nullable LeafPredicate snapshotIdPredicate) { | ||
this.location = location; | ||
this.snapshotIdPredicate = snapshotIdPredicate; | ||
} | ||
|
||
@Override | ||
|
@@ -152,7 +172,8 @@ public boolean equals(Object o) { | |
return false; | ||
} | ||
StatisticTable.StatisticSplit that = (StatisticTable.StatisticSplit) o; | ||
return Objects.equals(location, that.location); | ||
return Objects.equals(location, that.location) | ||
&& Objects.equals(snapshotIdPredicate, that.snapshotIdPredicate); | ||
} | ||
|
||
@Override | ||
|
@@ -195,8 +216,22 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException { | |
if (!(split instanceof StatisticTable.StatisticSplit)) { | ||
throw new IllegalArgumentException("Unsupported split: " + split.getClass()); | ||
} | ||
StatisticSplit statisticSplit = (StatisticSplit) split; | ||
LeafPredicate snapshotIdPredicate = statisticSplit.snapshotIdPredicate; | ||
Optional<Statistics> statisticsOptional; | ||
if (snapshotIdPredicate != null) { | ||
Long snapshotId = | ||
(Long) | ||
snapshotIdPredicate | ||
.visit(LeafPredicateExtractor.INSTANCE) | ||
.get(SNAPSHOT_ID) | ||
.literals() | ||
.get(0); | ||
statisticsOptional = dataTable.statistics(snapshotId); | ||
} else { | ||
statisticsOptional = dataTable.statistics(); | ||
} | ||
|
||
Optional<Statistics> statisticsOptional = dataTable.statistics(); | ||
if (statisticsOptional.isPresent()) { | ||
Statistics statistics = statisticsOptional.get(); | ||
Iterator<Statistics> statisticsIterator = | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just here to respect time travel options.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done