Skip to content

Commit

Permalink
[core] Avoid FileNotFoundException when looking for the latest snapsh…
Browse files Browse the repository at this point in the history
…ot of user (#3906)
  • Loading branch information
rfyu authored Aug 11, 2024
1 parent 312ce5f commit 0a3cb72
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -56,6 +59,8 @@ public class SnapshotManager implements Serializable {

private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(SnapshotManager.class);

private static final String SNAPSHOT_PREFIX = "snapshot-";
private static final String CHANGELOG_PREFIX = "changelog-";
public static final String EARLIEST = "EARLIEST";
Expand Down Expand Up @@ -523,7 +528,31 @@ public Optional<Snapshot> latestSnapshotOfUser(String user) {
"Latest snapshot id is not null, but earliest snapshot id is null. "
+ "This is unexpected.");
for (long id = latestId; id >= earliestId; id--) {
Snapshot snapshot = snapshot(id);
Snapshot snapshot;
try {
snapshot = snapshot(id);
} catch (Exception e) {
long newEarliestId =
Preconditions.checkNotNull(
earliestSnapshotId(),
"Latest snapshot id is not null, but earliest snapshot id is null. "
+ "This is unexpected.");

// this is a valid snapshot, should throw exception
if (id >= newEarliestId) {
throw e;
}

// this is an expired snapshot
LOG.warn(
"Snapshot #"
+ id
+ " is expired. The latest snapshot of current user("
+ user
+ ") is not found.");
break;
}

if (user.equals(snapshot.commitUser())) {
return Optional.of(snapshot);
}
Expand Down Expand Up @@ -594,7 +623,7 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter<Snapshot> checker) {
return null;
}

// this is a valid snapshot, should not throw exception
// this is a valid snapshot, should throw exception
if (id >= newEarliestId) {
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,55 @@ private Changelog createChangelogWithMillis(long id, long millis) {
null));
}

@Test
public void testLatestSnapshotOfUser() throws IOException, InterruptedException {
FileIO localFileIO = LocalFileIO.create();
SnapshotManager snapshotManager =
new SnapshotManager(localFileIO, new Path(tempDir.toString()));
// create 100 snapshots using user "lastCommitUser"
for (long i = 0; i < 100; i++) {
Snapshot snapshot =
new Snapshot(
i,
0L,
null,
null,
null,
null,
"lastCommitUser",
0L,
Snapshot.CommitKind.APPEND,
i * 1000,
null,
null,
null,
null,
null,
null);
localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson());
}

// read the latest snapshot of user "currentCommitUser"
AtomicReference<Exception> exception = new AtomicReference<>();
Thread thread =
new Thread(
() -> {
try {
snapshotManager.latestSnapshotOfUser("currentCommitUser");
} catch (Exception e) {
exception.set(e);
}
});
thread.start();
Thread.sleep(100);

// expire snapshot
localFileIO.deleteQuietly(snapshotManager.snapshotPath(0));
thread.join();

assertThat(exception.get()).isNull();
}

@Test
public void testTraversalSnapshotsFromLatestSafely() throws IOException, InterruptedException {
FileIO localFileIO = LocalFileIO.create();
Expand Down

0 comments on commit 0a3cb72

Please sign in to comment.