diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 24e97e1c9571..b33cf6922490 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -58,6 +58,7 @@ import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl; import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner; import org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner; +import org.apache.paimon.table.source.snapshot.TimeTravelUtil; import org.apache.paimon.tag.TagPreview; import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.Preconditions; @@ -65,7 +66,6 @@ import org.apache.paimon.utils.SimpleFileReader; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.SnapshotNotExistException; -import org.apache.paimon.utils.StringUtils; import org.apache.paimon.utils.TagManager; import javax.annotation.Nullable; @@ -169,24 +169,9 @@ public Identifier identifier() { @Override public Optional statistics() { - Snapshot latestSnapshot = null; - Long snapshotId = coreOptions().scanSnapshotId(); - String tagName = coreOptions().scanTagName(); - Long timestampMills = coreOptions().scanTimestampMills(); - - if (snapshotId != null && snapshotManager().snapshotExists(snapshotId)) { - latestSnapshot = snapshotManager().snapshot(snapshotId); - } else if (!StringUtils.isEmpty(tagName) && tagManager().tagExists(tagName)) { - return store().newStatsFileHandler() - .readStats(tagManager().tag(tagName).trimToSnapshot()); - } else if (timestampMills != null) { - latestSnapshot = snapshotManager().earlierOrEqualTimeMills(timestampMills); - } else if (snapshotId == null && StringUtils.isEmpty(tagName)) { - latestSnapshot = snapshotManager().latestSnapshot(); - } - - if (latestSnapshot != null) { - return store().newStatsFileHandler().readStats(latestSnapshot); + Snapshot snapshot = TimeTravelUtil.resolveSnapshot(this); + if (snapshot != null) { + return store().newStatsFileHandler().readStats(snapshot); } return Optional.empty(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java index 2ef0d5153f84..4c8b41aa4215 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java @@ -20,40 +20,42 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; +import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.SnapshotNotExistException; import org.apache.paimon.utils.TagManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.ArrayList; import java.util.List; +import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID; + /** The util class of resolve snapshot from scan params for time travel. */ public class TimeTravelUtil { - private static String[] scanKeys = { + private static final String[] SCAN_KEYS = { CoreOptions.SCAN_SNAPSHOT_ID.key(), CoreOptions.SCAN_TAG_NAME.key(), CoreOptions.SCAN_WATERMARK.key(), CoreOptions.SCAN_TIMESTAMP_MILLIS.key() }; - private static final Logger LOG = LoggerFactory.getLogger(TimeTravelUtil.class); + public static Snapshot resolveSnapshot(FileStoreTable table) { + return resolveSnapshotFromOptions(table.coreOptions(), table.snapshotManager()); + } public static Snapshot resolveSnapshotFromOptions( CoreOptions options, SnapshotManager snapshotManager) { List scanHandleKey = new ArrayList<>(1); - for (String key : scanKeys) { + for (String key : SCAN_KEYS) { if (options.toConfiguration().containsKey(key)) { scanHandleKey.add(key); } } if (scanHandleKey.size() == 0) { - LOG.warn("Not set any time travel parameter."); - return null; + return snapshotManager.latestSnapshot(); } Preconditions.checkArgument( @@ -76,13 +78,28 @@ public static Snapshot resolveSnapshotFromOptions( } else if (key.equals(CoreOptions.SCAN_TAG_NAME.key())) { snapshot = resolveSnapshotByTagName(snapshotManager, options); } + + if (snapshot == null) { + snapshot = snapshotManager.latestSnapshot(); + } return snapshot; } private static Snapshot resolveSnapshotBySnapshotId( SnapshotManager snapshotManager, CoreOptions options) { Long snapshotId = options.scanSnapshotId(); - if (snapshotId != null && snapshotManager.snapshotExists(snapshotId)) { + if (snapshotId != null) { + if (!snapshotManager.snapshotExists(snapshotId)) { + Long earliestSnapshotId = snapshotManager.earliestSnapshotId(); + Long latestSnapshotId = snapshotManager.latestSnapshotId(); + throw new SnapshotNotExistException( + String.format( + "Specified parameter %s = %s is not exist, you can set it in range from %s to %s.", + SCAN_SNAPSHOT_ID.key(), + snapshotId, + earliestSnapshotId, + latestSnapshotId)); + } return snapshotManager.snapshot(snapshotId); } return null; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java index 02e0e8618db9..d88636d02aab 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java @@ -37,6 +37,7 @@ import org.apache.paimon.table.source.SingletonSplit; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.table.source.snapshot.TimeTravelUtil; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; @@ -44,9 +45,6 @@ import org.apache.paimon.utils.IteratorRecordReader; import org.apache.paimon.utils.ProjectedRow; import org.apache.paimon.utils.SerializationUtils; -import org.apache.paimon.utils.SnapshotManager; -import org.apache.paimon.utils.SnapshotNotExistException; -import org.apache.paimon.utils.StringUtils; import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; @@ -59,8 +57,6 @@ import java.util.List; import java.util.Map; -import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID; -import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME; import static org.apache.paimon.catalog.Catalog.SYSTEM_TABLE_SPLITTER; /** A {@link Table} for showing committing snapshots of table. */ @@ -204,41 +200,8 @@ private InternalRow toRow(ManifestFileMeta manifestFileMeta) { } private static List allManifests(FileStoreTable dataTable) { - CoreOptions coreOptions = CoreOptions.fromMap(dataTable.options()); - SnapshotManager snapshotManager = dataTable.snapshotManager(); - Long snapshotId = coreOptions.scanSnapshotId(); - String tagName = coreOptions.scanTagName(); - Long timestampMills = coreOptions.scanTimestampMills(); - - Snapshot snapshot; - if (snapshotId != null) { - // reminder user with snapshot id range - if (!snapshotManager.snapshotExists(snapshotId)) { - Long earliestSnapshotId = snapshotManager.earliestSnapshotId(); - Long latestSnapshotId = snapshotManager.latestSnapshotId(); - throw new SnapshotNotExistException( - String.format( - "Specified parameter %s = %s is not exist, you can set it in range from %s to %s.", - SCAN_SNAPSHOT_ID.key(), - snapshotId, - earliestSnapshotId, - latestSnapshotId)); - } - snapshot = snapshotManager.snapshot(snapshotId); - } else if (!StringUtils.isEmpty(tagName)) { - if (!dataTable.tagManager().tagExists(tagName)) { - throw new RuntimeException( - String.format( - "Specified parameter %s = %s is not exist.", - SCAN_TAG_NAME.key(), tagName)); - } - snapshot = dataTable.tagManager().tag(tagName).trimToSnapshot(); - } else if (timestampMills != null) { - snapshot = snapshotManager.earlierOrEqualTimeMills(timestampMills); - } else { - snapshot = snapshotManager.latestSnapshot(); - } - + CoreOptions options = dataTable.coreOptions(); + Snapshot snapshot = TimeTravelUtil.resolveSnapshot(dataTable); if (snapshot == null) { LOG.warn("Check if your snapshot is empty."); return Collections.emptyList(); @@ -247,8 +210,8 @@ private static List allManifests(FileStoreTable dataTable) { ManifestList manifestList = new ManifestList.Factory( dataTable.fileIO(), - coreOptions.manifestFormat(), - coreOptions.manifestCompression(), + options.manifestFormat(), + options.manifestCompression(), fileStorePathFactory, null) .create(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java index faaa654ebe48..f0180444988c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/StatisticTable.java @@ -22,7 +22,6 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; -import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.EmptyRecordReader; @@ -47,7 +46,6 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; -import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -74,18 +72,9 @@ public class StatisticTable implements ReadonlyTable { new DataField(3, "mergedRecordSize", new BigIntType(true)), new DataField(4, "colstat", SerializationUtils.newStringType(true)))); - private final FileIO fileIO; - private final Path location; - private final FileStoreTable dataTable; public StatisticTable(FileStoreTable dataTable) { - this(dataTable.fileIO(), dataTable.location(), dataTable); - } - - public StatisticTable(FileIO fileIO, Path location, FileStoreTable dataTable) { - this.fileIO = fileIO; - this.location = location; this.dataTable = dataTable; } @@ -111,12 +100,12 @@ public InnerTableScan newScan() { @Override public InnerTableRead newRead() { - return new StatisticRead(fileIO, dataTable); + return new StatisticRead(dataTable); } @Override public Table copy(Map dynamicOptions) { - return new StatisticTable(fileIO, location, dataTable.copy(dynamicOptions)); + return new StatisticTable(dataTable.copy(dynamicOptions)); } private class StatisticScan extends ReadOnceTableScan { @@ -129,7 +118,9 @@ public InnerTableScan withFilter(Predicate predicate) { @Override public Plan innerPlan() { - return () -> Collections.singletonList(new StatisticTable.StatisticSplit(location)); + return () -> + Collections.singletonList( + new StatisticTable.StatisticSplit(dataTable.location())); } } @@ -163,13 +154,11 @@ public int hashCode() { private static class StatisticRead implements InnerTableRead { - private final FileIO fileIO; private RowType readType; private final FileStoreTable dataTable; - public StatisticRead(FileIO fileIO, FileStoreTable dataTable) { - this.fileIO = fileIO; + public StatisticRead(FileStoreTable dataTable) { this.dataTable = dataTable; } @@ -191,7 +180,7 @@ public TableRead withIOManager(IOManager ioManager) { } @Override - public RecordReader createReader(Split split) throws IOException { + public RecordReader createReader(Split split) { if (!(split instanceof StatisticTable.StatisticSplit)) { throw new IllegalArgumentException("Unsupported split: " + split.getClass()); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java index eacd40847c21..a39e6f6fa807 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java @@ -150,7 +150,7 @@ public void testReadManifestsFromSpecifiedTimestampMillis() throws Exception { } @Test - public void testReadManifestsFromNotExistSnapshot() throws Exception { + public void testReadManifestsFromNotExistSnapshot() { manifestsTable = (ManifestsTable) manifestsTable.copy( diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala index b397be24b728..238dd039969a 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala @@ -140,11 +140,6 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase { sql("SELECT snapshot_id, schema_id, mergedRecordCount, colstat FROM `T$statistics`"), Row(5, 0, 4, "{ }")) } - - withSQLConf("spark.paimon.scan.snapshot-id" -> "100") { - Assertions.assertEquals(0, sql("select * from `T$statistics`").count()) - } - } test("Paimon analyze: analyze table without snapshot") {