From 154126a687f9d8f0800aaa9faab3837bb326f013 Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Fri, 16 Aug 2024 18:47:55 +0800 Subject: [PATCH] [core] Introduce Table.snapshot(long snapshotId) (#3982) --- .../main/java/org/apache/paimon/Snapshot.java | 107 +----------------- .../apache/paimon/manifest/ManifestEntry.java | 18 +++ .../apache/paimon/manifest/ManifestList.java | 48 ++++++++ .../operation/AbstractFileStoreScan.java | 8 +- .../paimon/operation/FileDeletionBase.java | 4 +- .../paimon/operation/FileStoreCommitImpl.java | 12 +- .../paimon/operation/FileStoreScan.java | 13 +++ .../paimon/table/AbstractFileStoreTable.java | 5 + .../paimon/table/DelegatedFileStoreTable.java | 6 + .../apache/paimon/table/ReadonlyTable.java | 9 ++ .../java/org/apache/paimon/table/Table.java | 5 + .../paimon/table/system/AuditLogTable.java | 5 + .../paimon/table/system/BucketsTable.java | 6 + .../paimon/table/system/FileMonitorTable.java | 6 + .../paimon/table/system/ManifestsTable.java | 2 +- .../table/system/ReadOptimizedTable.java | 6 + .../apache/paimon/utils/SnapshotManager.java | 6 +- .../org/apache/paimon/utils/TagManager.java | 6 +- .../java/org/apache/paimon/TestFileStore.java | 15 ++- .../paimon/operation/FileDeletionTest.java | 14 +-- .../operation/KeyValueFileStoreScanTest.java | 2 +- .../operation/OrphanFilesCleanTest.java | 4 +- .../table/system/ManifestsTableTest.java | 2 +- 23 files changed, 173 insertions(+), 136 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java index 6102c321db60..a16349ce54b8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java +++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java @@ -18,13 +18,9 @@ package org.apache.paimon; +import org.apache.paimon.annotation.Public; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; -import org.apache.paimon.manifest.FileKind; -import org.apache.paimon.manifest.ManifestEntry; -import org.apache.paimon.manifest.ManifestFileMeta; -import org.apache.paimon.manifest.ManifestList; -import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; @@ -35,11 +31,7 @@ import javax.annotation.Nullable; -import java.io.FileNotFoundException; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.Objects; @@ -63,7 +55,10 @@ * commitIdentifier (which is a long value). Json can automatically perform type conversion so * there is no compatibility issue. * + * + * @since 0.9.0 */ +@Public @JsonIgnoreProperties(ignoreUnknown = true) public class Snapshot { @@ -351,85 +346,6 @@ public String statistics() { return statistics; } - /** - * Return all {@link ManifestFileMeta} instances for either data or changelog manifests in this - * snapshot. - * - * @param manifestList a {@link ManifestList} instance used for reading files at snapshot. - * @return a list of ManifestFileMeta. - */ - public List allManifests(ManifestList manifestList) { - List result = new ArrayList<>(); - result.addAll(dataManifests(manifestList)); - result.addAll(changelogManifests(manifestList)); - return result; - } - - /** - * Return a {@link ManifestFileMeta} for each data manifest in this snapshot. - * - * @param manifestList a {@link ManifestList} instance used for reading files at snapshot. - * @return a list of ManifestFileMeta. - */ - public List dataManifests(ManifestList manifestList) { - List result = new ArrayList<>(); - result.addAll(manifestList.read(baseManifestList)); - result.addAll(deltaManifests(manifestList)); - return result; - } - - /** - * Return a {@link ManifestFileMeta} for each delta manifest in this snapshot. - * - * @param manifestList a {@link ManifestList} instance used for reading files at snapshot. - * @return a list of ManifestFileMeta. - */ - public List deltaManifests(ManifestList manifestList) { - return manifestList.read(deltaManifestList); - } - - /** - * Return a {@link ManifestFileMeta} for each changelog manifest in this snapshot. - * - * @param manifestList a {@link ManifestList} instance used for reading files at snapshot. - * @return a list of ManifestFileMeta. - */ - public List changelogManifests(ManifestList manifestList) { - return changelogManifestList == null - ? Collections.emptyList() - : manifestList.read(changelogManifestList); - } - - /** - * Return record count of all changes occurred in this snapshot given the scan. - * - * @param scan a {@link FileStoreScan} instance used for count of reading files at snapshot. - * @return total record count of Snapshot. - */ - public Long totalRecordCount(FileStoreScan scan) { - return totalRecordCount == null - ? recordCount(scan.withSnapshot(id).plan().files()) - : totalRecordCount; - } - - public static long recordCount(List manifestEntries) { - return manifestEntries.stream().mapToLong(manifest -> manifest.file().rowCount()).sum(); - } - - public static long recordCountAdd(List manifestEntries) { - return manifestEntries.stream() - .filter(manifestEntry -> FileKind.ADD.equals(manifestEntry.kind())) - .mapToLong(manifest -> manifest.file().rowCount()) - .sum(); - } - - public static long recordCountDelete(List manifestEntries) { - return manifestEntries.stream() - .filter(manifestEntry -> FileKind.DELETE.equals(manifestEntry.kind())) - .mapToLong(manifest -> manifest.file().rowCount()) - .sum(); - } - public String toJson() { return JsonSerdeUtil.toJson(this); } @@ -440,25 +356,12 @@ public static Snapshot fromJson(String json) { public static Snapshot fromPath(FileIO fileIO, Path path) { try { - return fromPathThrowsException(fileIO, path); + return Snapshot.fromJson(fileIO.readFileUtf8(path)); } catch (IOException e) { throw new RuntimeException("Fails to read snapshot from path " + path, e); } } - @Nullable - public static Snapshot safelyFromPath(FileIO fileIO, Path path) throws IOException { - try { - return fromPathThrowsException(fileIO, path); - } catch (FileNotFoundException e) { - return null; - } - } - - private static Snapshot fromPathThrowsException(FileIO fileIO, Path path) throws IOException { - return Snapshot.fromJson(fileIO.readFileUtf8(path)); - } - @Override public int hashCode() { return Objects.hash( diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java index 013f7ebd13cd..65cef5517434 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java @@ -197,4 +197,22 @@ public static Filter createEntryRowFilter( return true; }; } + + public static long recordCount(List manifestEntries) { + return manifestEntries.stream().mapToLong(manifest -> manifest.file().rowCount()).sum(); + } + + public static long recordCountAdd(List manifestEntries) { + return manifestEntries.stream() + .filter(manifestEntry -> FileKind.ADD.equals(manifestEntry.kind())) + .mapToLong(manifest -> manifest.file().rowCount()) + .sum(); + } + + public static long recordCountDelete(List manifestEntries) { + return manifestEntries.stream() + .filter(manifestEntry -> FileKind.DELETE.equals(manifestEntry.kind())) + .mapToLong(manifest -> manifest.file().rowCount()) + .sum(); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java index 304730f30cff..2980e998bf3f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java @@ -18,6 +18,7 @@ package org.apache.paimon.manifest; +import org.apache.paimon.Snapshot; import org.apache.paimon.format.FileFormat; import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.format.FormatWriterFactory; @@ -32,6 +33,8 @@ import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; /** @@ -60,6 +63,51 @@ private ManifestList( cache); } + /** + * Return all {@link ManifestFileMeta} instances for either data or changelog manifests in this + * snapshot. + * + * @return a list of ManifestFileMeta. + */ + public List readAllManifests(Snapshot snapshot) { + List result = new ArrayList<>(); + result.addAll(readDataManifests(snapshot)); + result.addAll(readChangelogManifests(snapshot)); + return result; + } + + /** + * Return a {@link ManifestFileMeta} for each data manifest in this snapshot. + * + * @return a list of ManifestFileMeta. + */ + public List readDataManifests(Snapshot snapshot) { + List result = new ArrayList<>(); + result.addAll(read(snapshot.baseManifestList())); + result.addAll(readDeltaManifests(snapshot)); + return result; + } + + /** + * Return a {@link ManifestFileMeta} for each delta manifest in this snapshot. + * + * @return a list of ManifestFileMeta. + */ + public List readDeltaManifests(Snapshot snapshot) { + return read(snapshot.deltaManifestList()); + } + + /** + * Return a {@link ManifestFileMeta} for each changelog manifest in this snapshot. + * + * @return a list of ManifestFileMeta. + */ + public List readChangelogManifests(Snapshot snapshot) { + return snapshot.changelogManifestList() == null + ? Collections.emptyList() + : read(snapshot.changelogManifestList()); + } + /** * Write several {@link ManifestFileMeta}s into a manifest list. * diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 358a2722fdc1..4bf83e520266 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -403,18 +403,18 @@ private Pair> readManifests() { private List readManifests(Snapshot snapshot) { switch (scanMode) { case ALL: - return snapshot.dataManifests(manifestList); + return manifestList.readDataManifests(snapshot); case DELTA: - return snapshot.deltaManifests(manifestList); + return manifestList.readDeltaManifests(snapshot); case CHANGELOG: if (snapshot.version() > Snapshot.TABLE_STORE_02_VERSION) { - return snapshot.changelogManifests(manifestList); + return manifestList.readChangelogManifests(snapshot); } // compatible with Paimon 0.2, we'll read extraFiles in DataFileMeta // see comments on DataFileMeta#extraFiles if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) { - return snapshot.deltaManifests(manifestList); + return manifestList.readDeltaManifests(snapshot); } throw new IllegalStateException( String.format( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java index 0ce94bb9b112..303a074b0cb8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java @@ -347,7 +347,7 @@ public Predicate createDataFileSkipperForTags( * It is possible that a job was killed during expiration and some manifest files have been * deleted, so if the clean methods need to get manifests of a snapshot to be cleaned, we should * try to read manifests and return empty list if failed instead of calling {@link - * Snapshot#dataManifests} directly. + * ManifestList#readDataManifests} directly. */ protected List tryReadManifestList(String manifestListName) { try { @@ -424,7 +424,7 @@ public Set manifestSkippingSet(List skippingSnapshots) { // data manifests skippingSet.add(skippingSnapshot.baseManifestList()); skippingSet.add(skippingSnapshot.deltaManifestList()); - skippingSnapshot.dataManifests(manifestList).stream() + manifestList.readDataManifests(skippingSnapshot).stream() .map(ManifestFileMeta::fileName) .forEach(skippingSet::add); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 3a8d7195cef6..21aeee38d4e6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -78,6 +78,9 @@ import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; import static org.apache.paimon.index.HashIndexFile.HASH_INDEX; +import static org.apache.paimon.manifest.ManifestEntry.recordCount; +import static org.apache.paimon.manifest.ManifestEntry.recordCountAdd; +import static org.apache.paimon.manifest.ManifestEntry.recordCountDelete; import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString; @@ -839,9 +842,9 @@ boolean tryCommitOnce( Long currentWatermark = watermark; String previousIndexManifest = null; if (latestSnapshot != null) { - previousTotalRecordCount = latestSnapshot.totalRecordCount(scan); + previousTotalRecordCount = scan.totalRecordCount(latestSnapshot); List previousManifests = - latestSnapshot.dataManifests(manifestList); + manifestList.readDataManifests(latestSnapshot); // read all previous manifest files oldMetas.addAll(previousManifests); // read the last snapshot to complete the bucket's offsets when logOffsets does not @@ -872,8 +875,7 @@ boolean tryCommitOnce( previousChangesListName = manifestList.write(newMetas); // the added records subtract the deleted records from - long deltaRecordCount = - Snapshot.recordCountAdd(tableFiles) - Snapshot.recordCountDelete(tableFiles); + long deltaRecordCount = recordCountAdd(tableFiles) - recordCountDelete(tableFiles); long totalRecordCount = previousTotalRecordCount + deltaRecordCount; // write new changes into manifest files @@ -928,7 +930,7 @@ boolean tryCommitOnce( logOffsets, totalRecordCount, deltaRecordCount, - Snapshot.recordCount(changelogFiles), + recordCount(changelogFiles), currentWatermark, statsFileName); } catch (Throwable e) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java index f5249efa9de8..744f2fe14510 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java @@ -41,6 +41,8 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.paimon.manifest.ManifestEntry.recordCount; + /** Scan operation which produces a plan. */ public interface FileStoreScan { @@ -77,6 +79,17 @@ public interface FileStoreScan { /** Produce a {@link Plan}. */ Plan plan(); + /** + * Return record count of all changes occurred in this snapshot given the scan. + * + * @return total record count of Snapshot. + */ + default Long totalRecordCount(Snapshot snapshot) { + return snapshot.totalRecordCount() == null + ? (Long) recordCount(withSnapshot(snapshot.id()).plan().files()) + : snapshot.totalRecordCount(); + } + /** * Read {@link SimpleFileEntry}s, SimpleFileEntry only retains some critical information, so it * cannot perform filtering based on statistical information. diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index a3b1735dd3c3..3b61d3edb796 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -121,6 +121,11 @@ public OptionalLong latestSnapshotId() { return snapshot == null ? OptionalLong.empty() : OptionalLong.of(snapshot); } + @Override + public Snapshot snapshot(long snapshotId) { + return store().snapshotManager().snapshot(snapshotId); + } + @Override public String name() { return identifier().getObjectName(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java index 1ff64f2c1bc2..a82d1824799f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.FileStore; +import org.apache.paimon.Snapshot; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.ManifestCacheFilter; @@ -128,6 +129,11 @@ public OptionalLong latestSnapshotId() { return wrapped.latestSnapshotId(); } + @Override + public Snapshot snapshot(long snapshotId) { + return wrapped.snapshot(snapshotId); + } + @Override public void rollbackTo(long snapshotId) { wrapped.rollbackTo(snapshotId); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index 4854f983d9af..6373e9944df4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -18,6 +18,7 @@ package org.apache.paimon.table; +import org.apache.paimon.Snapshot; import org.apache.paimon.stats.Statistics; import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.sink.InnerTableCommit; @@ -112,6 +113,14 @@ default OptionalLong latestSnapshotId() { this.getClass().getSimpleName())); } + @Override + default Snapshot snapshot(long snapshotId) { + throw new UnsupportedOperationException( + String.format( + "Readonly Table %s does not support snapshot.", + this.getClass().getSimpleName())); + } + @Override default void rollbackTo(long snapshotId) { throw new UnsupportedOperationException( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index 55cf25aea331..c588a5f0ab1d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -18,6 +18,7 @@ package org.apache.paimon.table; +import org.apache.paimon.Snapshot; import org.apache.paimon.annotation.Experimental; import org.apache.paimon.annotation.Public; import org.apache.paimon.stats.Statistics; @@ -78,6 +79,10 @@ default String fullName() { @Experimental OptionalLong latestSnapshotId(); + /** Get the {@link Snapshot} from snapshot id. */ + @Experimental + Snapshot snapshot(long snapshotId); + /** Rollback table's state to a specific snapshot. */ @Experimental void rollbackTo(long snapshotId); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index 5596493318da..526a71d10744 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -106,6 +106,11 @@ public OptionalLong latestSnapshotId() { return wrapped.latestSnapshotId(); } + @Override + public Snapshot snapshot(long snapshotId) { + return wrapped.snapshot(snapshotId); + } + @Override public String name() { return wrapped.name() + SYSTEM_TABLE_SPLITTER + AUDIT_LOG; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java index 7b67f00bf21a..282839e43eea 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java @@ -19,6 +19,7 @@ package org.apache.paimon.table.system; import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; @@ -109,6 +110,11 @@ public OptionalLong latestSnapshotId() { return wrapped.latestSnapshotId(); } + @Override + public Snapshot snapshot(long snapshotId) { + return wrapped.snapshot(snapshotId); + } + @Override public Path location() { return wrapped.location(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java index bd4efc80e42a..cc3d7b5c373a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java @@ -19,6 +19,7 @@ package org.apache.paimon.table.system; import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; import org.apache.paimon.annotation.Experimental; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.GenericRow; @@ -96,6 +97,11 @@ public OptionalLong latestSnapshotId() { return wrapped.latestSnapshotId(); } + @Override + public Snapshot snapshot(long snapshotId) { + return wrapped.snapshot(snapshotId); + } + @Override public Path location() { return wrapped.location(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java index 15210792b6a4..6184dbdad6ac 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/ManifestsTable.java @@ -213,6 +213,6 @@ private static List allManifests(FileStoreTable dataTable) { fileStorePathFactory, null) .create(); - return snapshot.allManifests(manifestList); + return manifestList.readAllManifests(snapshot); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java index e7b3bd17195f..9cb9abe1a244 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java @@ -19,6 +19,7 @@ package org.apache.paimon.table.system; import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.operation.DefaultValueAssigner; @@ -66,6 +67,11 @@ public OptionalLong latestSnapshotId() { return wrapped.latestSnapshotId(); } + @Override + public Snapshot snapshot(long snapshotId) { + return wrapped.snapshot(snapshotId); + } + @Override public String name() { return wrapped.name() + SYSTEM_TABLE_SPLITTER + READ_OPTIMIZED; diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index 9cce9233f09e..d64164d53e43 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -407,9 +407,9 @@ public List safelyGetAllSnapshots() throws IOException { List snapshots = new ArrayList<>(); for (Path path : paths) { - Snapshot snapshot = Snapshot.safelyFromPath(fileIO, path); - if (snapshot != null) { - snapshots.add(snapshot); + try { + snapshots.add(Snapshot.fromJson(fileIO.readFileUtf8(path))); + } catch (FileNotFoundException ignored) { } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 259a5bdbcda0..56ed8dacb9b7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -33,6 +33,7 @@ import javax.annotation.Nullable; +import java.io.FileNotFoundException; import java.io.IOException; import java.time.Duration; import java.time.LocalDateTime; @@ -303,9 +304,10 @@ public SortedMap> tags(Predicate filter) { } // If the tag file is not found, it might be deleted by // other processes, so just skip this tag - Snapshot snapshot = Snapshot.safelyFromPath(fileIO, path); - if (snapshot != null) { + try { + Snapshot snapshot = Snapshot.fromJson(fileIO.readFileUtf8(path)); tags.computeIfAbsent(snapshot, s -> new ArrayList<>()).add(tagName); + } catch (FileNotFoundException ignored) { } } } catch (IOException e) { diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index bd6950d77b3d..303879337780 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -620,7 +620,7 @@ private static Set getSnapshotFileInUse( } // manifests - List manifests = snapshot.allManifests(manifestList); + List manifests = manifestList.readAllManifests(snapshot); manifests.forEach(m -> result.add(pathFactory.toManifestFilePath(m.fileName()))); // data file @@ -639,7 +639,8 @@ private static Set getSnapshotFileInUse( // but it can only be cleaned after this snapshot expired, so we should add it to the file // use list. if (changelogDecoupled && !produceChangelog) { - entries = scan.withManifestList(snapshot.deltaManifests(manifestList)).plan().files(); + entries = + scan.withManifestList(manifestList.readDeltaManifests(snapshot)).plan().files(); for (ManifestEntry entry : entries) { // append delete file are delayed to delete if (entry.kind() == FileKind.DELETE @@ -686,9 +687,9 @@ private static Set getChangelogFileInUse( // manifests List manifests = - new ArrayList<>(changelog.changelogManifests(manifestList)); + new ArrayList<>(manifestList.readChangelogManifests(changelog)); if (!produceChangelog) { - manifests.addAll(changelog.dataManifests(manifestList)); + manifests.addAll(manifestList.readDataManifests(changelog)); } manifests.forEach(m -> result.add(pathFactory.toManifestFilePath(m.fileName()))); @@ -701,7 +702,9 @@ private static Set getChangelogFileInUse( // delta file if (!produceChangelog) { for (ManifestEntry entry : - scan.withManifestList(changelog.deltaManifests(manifestList)).plan().files()) { + scan.withManifestList(manifestList.readDeltaManifests(changelog)) + .plan() + .files()) { if (entry.file().fileSource().orElse(FileSource.APPEND) == FileSource.APPEND) { result.add( new Path( @@ -712,7 +715,7 @@ private static Set getChangelogFileInUse( } else { // changelog for (ManifestEntry entry : - scan.withManifestList(changelog.changelogManifests(manifestList)) + scan.withManifestList(manifestList.readChangelogManifests(changelog)) .plan() .files()) { result.add( diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java index 3fb1d36ac305..6ae74c669f94 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java @@ -315,7 +315,7 @@ public void testExpireWithExistingTags() throws Exception { for (String tagName : Arrays.asList("tag1", "tag2")) { Snapshot snapshot = tagManager.taggedSnapshot(tagName); List manifestFilePaths = - snapshot.dataManifests(manifestList).stream() + manifestList.readDataManifests(snapshot).stream() .map(ManifestFileMeta::fileName) .map(pathFactory::toManifestFilePath) .collect(Collectors.toList()); @@ -370,7 +370,7 @@ public void testExpireWithUpgradeAndTags() throws Exception { Snapshot tag1 = tagManager.taggedSnapshot("tag1"); ManifestList manifestList = store.manifestListFactory().create(); List manifestFilePaths = - tag1.dataManifests(manifestList).stream() + manifestList.readDataManifests(tag1).stream() .map(ManifestFileMeta::fileName) .map(pathFactory::toManifestFilePath) .collect(Collectors.toList()); @@ -409,9 +409,9 @@ public void testDeleteTagWithSnapshot() throws Exception { ManifestList manifestList = store.manifestListFactory().create(); Snapshot snapshot1 = snapshotManager.snapshot(1); - List snapshot1Data = snapshot1.dataManifests(manifestList); + List snapshot1Data = manifestList.readDataManifests(snapshot1); Snapshot snapshot3 = snapshotManager.snapshot(3); - List snapshot3Data = snapshot3.dataManifests(manifestList); + List snapshot3Data = manifestList.readDataManifests(snapshot3); List manifestLists = Arrays.asList(snapshot1.baseManifestList(), snapshot1.deltaManifestList()); @@ -486,7 +486,7 @@ public void testDeleteTagWithOtherTag() throws Exception { ManifestList manifestList = store.manifestListFactory().create(); Snapshot snapshot2 = snapshotManager.snapshot(2); - List snapshot2Data = snapshot2.dataManifests(manifestList); + List snapshot2Data = manifestList.readDataManifests(snapshot2); List manifestLists = Arrays.asList(snapshot2.baseManifestList(), snapshot2.deltaManifestList()); @@ -521,8 +521,8 @@ public void testDeleteTagWithOtherTag() throws Exception { // check manifests Snapshot tag1 = tagManager.taggedSnapshot("tag1"); Snapshot tag3 = tagManager.taggedSnapshot("tag3"); - List existing = tag1.dataManifests(manifestList); - existing.addAll(tag3.dataManifests(manifestList)); + List existing = manifestList.readDataManifests(tag1); + existing.addAll(manifestList.readDataManifests(tag3)); for (ManifestFileMeta manifestFileMeta : snapshot2Data) { Path path = pathFactory.toManifestFilePath(manifestFileMeta.fileName()); if (existing.contains(manifestFileMeta)) { diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java index c58136339807..e4b74c13c656 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java @@ -263,7 +263,7 @@ public void testWithManifestList() throws Exception { ManifestList manifestList = store.manifestListFactory().create(); long wantedSnapshotId = random.nextLong(snapshotManager.latestSnapshotId()) + 1; Snapshot wantedSnapshot = snapshotManager.snapshot(wantedSnapshotId); - List wantedManifests = wantedSnapshot.dataManifests(manifestList); + List wantedManifests = manifestList.readDataManifests(wantedSnapshot); FileStoreScan scan = store.newScan(); scan.withManifestList(wantedManifests); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java index 7e912a839940..efb9fb27ab7c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java @@ -395,8 +395,8 @@ public void testAbnormallyRemoving() throws Exception { List manifests = new ArrayList<>(); ManifestList manifestList = table.store().manifestListFactory().create(); FileStorePathFactory pathFactory = table.store().pathFactory(); - snapshot1 - .allManifests(manifestList) + manifestList + .readAllManifests(snapshot1) .forEach(m -> manifests.add(pathFactory.toManifestFilePath(m.fileName()))); Path manifest = manifests.get(RANDOM.nextInt(manifests.size())); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java index ffd4716e7e9e..edca0831d168 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/ManifestsTableTest.java @@ -128,7 +128,7 @@ private List getExpectedResult(long snapshotId) { } Snapshot snapshot = snapshotManager.snapshot(snapshotId); - List allManifestMeta = snapshot.allManifests(manifestList); + List allManifestMeta = manifestList.readAllManifests(snapshot); List expectedRow = new ArrayList<>(); for (ManifestFileMeta manifestFileMeta : allManifestMeta) {