diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index 1b64d8bc8641..c5fdb042e37f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -24,6 +24,9 @@ import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -56,6 +59,8 @@ public class SnapshotManager implements Serializable { private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(SnapshotManager.class); + private static final String SNAPSHOT_PREFIX = "snapshot-"; private static final String CHANGELOG_PREFIX = "changelog-"; public static final String EARLIEST = "EARLIEST"; @@ -523,7 +528,31 @@ public Optional latestSnapshotOfUser(String user) { "Latest snapshot id is not null, but earliest snapshot id is null. " + "This is unexpected."); for (long id = latestId; id >= earliestId; id--) { - Snapshot snapshot = snapshot(id); + Snapshot snapshot; + try { + snapshot = snapshot(id); + } catch (Exception e) { + long newEarliestId = + Preconditions.checkNotNull( + earliestSnapshotId(), + "Latest snapshot id is not null, but earliest snapshot id is null. " + + "This is unexpected."); + + // this is a valid snapshot, should throw exception + if (id >= newEarliestId) { + throw e; + } + + // this is an expired snapshot + LOG.warn( + "Snapshot #" + + id + + " is expired. The latest snapshot of current user(" + + user + + ") is not found."); + break; + } + if (user.equals(snapshot.commitUser())) { return Optional.of(snapshot); } @@ -594,7 +623,7 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) { return null; } - // this is a valid snapshot, should not throw exception + // this is a valid snapshot, should throw exception if (id >= newEarliestId) { throw e; } diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java index b1e0384809d8..4e0b18d47402 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java @@ -199,6 +199,55 @@ private Changelog createChangelogWithMillis(long id, long millis) { null)); } + @Test + public void testLatestSnapshotOfUser() throws IOException, InterruptedException { + FileIO localFileIO = LocalFileIO.create(); + SnapshotManager snapshotManager = + new SnapshotManager(localFileIO, new Path(tempDir.toString())); + // create 100 snapshots using user "lastCommitUser" + for (long i = 0; i < 100; i++) { + Snapshot snapshot = + new Snapshot( + i, + 0L, + null, + null, + null, + null, + "lastCommitUser", + 0L, + Snapshot.CommitKind.APPEND, + i * 1000, + null, + null, + null, + null, + null, + null); + localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson()); + } + + // read the latest snapshot of user "currentCommitUser" + AtomicReference exception = new AtomicReference<>(); + Thread thread = + new Thread( + () -> { + try { + snapshotManager.latestSnapshotOfUser("currentCommitUser"); + } catch (Exception e) { + exception.set(e); + } + }); + thread.start(); + Thread.sleep(100); + + // expire snapshot + localFileIO.deleteQuietly(snapshotManager.snapshotPath(0)); + thread.join(); + + assertThat(exception.get()).isNull(); + } + @Test public void testTraversalSnapshotsFromLatestSafely() throws IOException, InterruptedException { FileIO localFileIO = LocalFileIO.create();