Skip to content

Commit

Permalink
[core] Use TimeTravelUtil to stats read and manifests table
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Oct 15, 2024
1 parent 11636ac commit 55f2102
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@
import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner;
import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
import org.apache.paimon.tag.TagPreview;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SimpleFileReader;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.SnapshotNotExistException;
import org.apache.paimon.utils.StringUtils;
import org.apache.paimon.utils.TagManager;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -169,24 +169,9 @@ public Identifier identifier() {

@Override
public Optional<Statistics> statistics() {
Snapshot latestSnapshot = null;
Long snapshotId = coreOptions().scanSnapshotId();
String tagName = coreOptions().scanTagName();
Long timestampMills = coreOptions().scanTimestampMills();

if (snapshotId != null && snapshotManager().snapshotExists(snapshotId)) {
latestSnapshot = snapshotManager().snapshot(snapshotId);
} else if (!StringUtils.isEmpty(tagName) && tagManager().tagExists(tagName)) {
return store().newStatsFileHandler()
.readStats(tagManager().tag(tagName).trimToSnapshot());
} else if (timestampMills != null) {
latestSnapshot = snapshotManager().earlierOrEqualTimeMills(timestampMills);
} else if (snapshotId == null && StringUtils.isEmpty(tagName)) {
latestSnapshot = snapshotManager().latestSnapshot();
}

if (latestSnapshot != null) {
return store().newStatsFileHandler().readStats(latestSnapshot);
Snapshot snapshot = TimeTravelUtil.resolveSnapshot(this);
if (snapshot != null) {
return store().newStatsFileHandler().readStats(snapshot);
}
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,42 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.SnapshotNotExistException;
import org.apache.paimon.utils.TagManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;

/** The util class of resolve snapshot from scan params for time travel. */
public class TimeTravelUtil {

private static String[] scanKeys = {
private static final String[] SCAN_KEYS = {
CoreOptions.SCAN_SNAPSHOT_ID.key(),
CoreOptions.SCAN_TAG_NAME.key(),
CoreOptions.SCAN_WATERMARK.key(),
CoreOptions.SCAN_TIMESTAMP_MILLIS.key()
};

private static final Logger LOG = LoggerFactory.getLogger(TimeTravelUtil.class);
public static Snapshot resolveSnapshot(FileStoreTable table) {
return resolveSnapshotFromOptions(table.coreOptions(), table.snapshotManager());
}

public static Snapshot resolveSnapshotFromOptions(
CoreOptions options, SnapshotManager snapshotManager) {
List<String> scanHandleKey = new ArrayList<>(1);
for (String key : scanKeys) {
for (String key : SCAN_KEYS) {
if (options.toConfiguration().containsKey(key)) {
scanHandleKey.add(key);
}
}

if (scanHandleKey.size() == 0) {
LOG.warn("Not set any time travel parameter.");
return null;
return snapshotManager.latestSnapshot();
}

Preconditions.checkArgument(
Expand All @@ -76,13 +78,28 @@ public static Snapshot resolveSnapshotFromOptions(
} else if (key.equals(CoreOptions.SCAN_TAG_NAME.key())) {
snapshot = resolveSnapshotByTagName(snapshotManager, options);
}

if (snapshot == null) {
snapshot = snapshotManager.latestSnapshot();
}
return snapshot;
}

private static Snapshot resolveSnapshotBySnapshotId(
SnapshotManager snapshotManager, CoreOptions options) {
Long snapshotId = options.scanSnapshotId();
if (snapshotId != null && snapshotManager.snapshotExists(snapshotId)) {
if (snapshotId != null) {
if (!snapshotManager.snapshotExists(snapshotId)) {
Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
Long latestSnapshotId = snapshotManager.latestSnapshotId();
throw new SnapshotNotExistException(
String.format(
"Specified parameter %s = %s is not exist, you can set it in range from %s to %s.",
SCAN_SNAPSHOT_ID.key(),
snapshotId,
earliestSnapshotId,
latestSnapshotId));
}
return snapshotManager.snapshot(snapshotId);
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,14 @@
import org.apache.paimon.table.source.SingletonSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.IteratorRecordReader;
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.SerializationUtils;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.SnapshotNotExistException;
import org.apache.paimon.utils.StringUtils;

import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;

Expand All @@ -59,8 +57,6 @@
import java.util.List;
import java.util.Map;

import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME;
import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER;

/** A {@link Table} for showing committing snapshots of table. */
Expand Down Expand Up @@ -204,41 +200,8 @@ private InternalRow toRow(ManifestFileMeta manifestFileMeta) {
}

private static List<ManifestFileMeta> allManifests(FileStoreTable dataTable) {
CoreOptions coreOptions = CoreOptions.fromMap(dataTable.options());
SnapshotManager snapshotManager = dataTable.snapshotManager();
Long snapshotId = coreOptions.scanSnapshotId();
String tagName = coreOptions.scanTagName();
Long timestampMills = coreOptions.scanTimestampMills();

Snapshot snapshot;
if (snapshotId != null) {
// reminder user with snapshot id range
if (!snapshotManager.snapshotExists(snapshotId)) {
Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
Long latestSnapshotId = snapshotManager.latestSnapshotId();
throw new SnapshotNotExistException(
String.format(
"Specified parameter %s = %s is not exist, you can set it in range from %s to %s.",
SCAN_SNAPSHOT_ID.key(),
snapshotId,
earliestSnapshotId,
latestSnapshotId));
}
snapshot = snapshotManager.snapshot(snapshotId);
} else if (!StringUtils.isEmpty(tagName)) {
if (!dataTable.tagManager().tagExists(tagName)) {
throw new RuntimeException(
String.format(
"Specified parameter %s = %s is not exist.",
SCAN_TAG_NAME.key(), tagName));
}
snapshot = dataTable.tagManager().tag(tagName).trimToSnapshot();
} else if (timestampMills != null) {
snapshot = snapshotManager.earlierOrEqualTimeMills(timestampMills);
} else {
snapshot = snapshotManager.latestSnapshot();
}

CoreOptions options = dataTable.coreOptions();
Snapshot snapshot = TimeTravelUtil.resolveSnapshot(dataTable);
if (snapshot == null) {
LOG.warn("Check if your snapshot is empty.");
return Collections.emptyList();
Expand All @@ -247,8 +210,8 @@ private static List<ManifestFileMeta> allManifests(FileStoreTable dataTable) {
ManifestList manifestList =
new ManifestList.Factory(
dataTable.fileIO(),
coreOptions.manifestFormat(),
coreOptions.manifestCompression(),
options.manifestFormat(),
options.manifestCompression(),
fileStorePathFactory,
null)
.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.EmptyRecordReader;
Expand All @@ -47,7 +46,6 @@

import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
Expand All @@ -74,18 +72,9 @@ public class StatisticTable implements ReadonlyTable {
new DataField(3, "mergedRecordSize", new BigIntType(true)),
new DataField(4, "colstat", SerializationUtils.newStringType(true))));

private final FileIO fileIO;
private final Path location;

private final FileStoreTable dataTable;

public StatisticTable(FileStoreTable dataTable) {
this(dataTable.fileIO(), dataTable.location(), dataTable);
}

public StatisticTable(FileIO fileIO, Path location, FileStoreTable dataTable) {
this.fileIO = fileIO;
this.location = location;
this.dataTable = dataTable;
}

Expand All @@ -111,12 +100,12 @@ public InnerTableScan newScan() {

@Override
public InnerTableRead newRead() {
return new StatisticRead(fileIO, dataTable);
return new StatisticRead(dataTable);
}

@Override
public Table copy(Map<String, String> dynamicOptions) {
return new StatisticTable(fileIO, location, dataTable.copy(dynamicOptions));
return new StatisticTable(dataTable.copy(dynamicOptions));
}

private class StatisticScan extends ReadOnceTableScan {
Expand All @@ -129,7 +118,9 @@ public InnerTableScan withFilter(Predicate predicate) {

@Override
public Plan innerPlan() {
return () -> Collections.singletonList(new StatisticTable.StatisticSplit(location));
return () ->
Collections.singletonList(
new StatisticTable.StatisticSplit(dataTable.location()));
}
}

Expand Down Expand Up @@ -163,13 +154,11 @@ public int hashCode() {

private static class StatisticRead implements InnerTableRead {

private final FileIO fileIO;
private RowType readType;

private final FileStoreTable dataTable;

public StatisticRead(FileIO fileIO, FileStoreTable dataTable) {
this.fileIO = fileIO;
public StatisticRead(FileStoreTable dataTable) {
this.dataTable = dataTable;
}

Expand All @@ -191,7 +180,7 @@ public TableRead withIOManager(IOManager ioManager) {
}

@Override
public RecordReader<InternalRow> createReader(Split split) throws IOException {
public RecordReader<InternalRow> createReader(Split split) {
if (!(split instanceof StatisticTable.StatisticSplit)) {
throw new IllegalArgumentException("Unsupported split: " + split.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public void testReadManifestsFromSpecifiedTimestampMillis() throws Exception {
}

@Test
public void testReadManifestsFromNotExistSnapshot() throws Exception {
public void testReadManifestsFromNotExistSnapshot() {
manifestsTable =
(ManifestsTable)
manifestsTable.copy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,6 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase {
sql("SELECT snapshot_id, schema_id, mergedRecordCount, colstat FROM `T$statistics`"),
Row(5, 0, 4, "{ }"))
}

withSQLConf("spark.paimon.scan.snapshot-id" -> "100") {
Assertions.assertEquals(0, sql("select * from `T$statistics`").count())
}

}

test("Paimon analyze: analyze table without snapshot") {
Expand Down

0 comments on commit 55f2102

Please sign in to comment.