diff --git a/paimon-core/src/main/java/org/apache/paimon/Changelog.java b/paimon-core/src/main/java/org/apache/paimon/Changelog.java index 8c6295b44c68..79c65ba570c4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/Changelog.java +++ b/paimon-core/src/main/java/org/apache/paimon/Changelog.java @@ -28,6 +28,7 @@ import javax.annotation.Nullable; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.Map; @@ -105,9 +106,19 @@ public static Changelog fromJson(String json) { } public static Changelog fromPath(FileIO fileIO, Path path) { + try { + return tryFromPath(fileIO, path); + } catch (FileNotFoundException e) { + throw new RuntimeException("Fails to read changelog from path " + path, e); + } + } + + public static Changelog tryFromPath(FileIO fileIO, Path path) throws FileNotFoundException { try { String json = fileIO.readFileUtf8(path); return Changelog.fromJson(json); + } catch (FileNotFoundException e) { + throw e; } catch (IOException e) { throw new RuntimeException("Fails to read changelog from path " + path, e); } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index ae70d7aec5d1..d3ea488536f9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -168,6 +168,11 @@ public Changelog changelog(long snapshotId) { return Changelog.fromPath(fileIO, changelogPath); } + private Changelog tryGetChangelog(long snapshotId) throws FileNotFoundException { + Path changelogPath = longLivedChangelogPath(snapshotId); + return Changelog.tryFromPath(fileIO, changelogPath); + } + public Changelog longLivedChangelog(long snapshotId) { return Changelog.fromPath(fileIO, longLivedChangelogPath(snapshotId)); } @@ -209,7 +214,23 @@ public boolean longLivedChangelogExists(long snapshotId) { public @Nullable Snapshot earliestSnapshot() { Long snapshotId = earliestSnapshotId(); - return snapshotId == null ? null : snapshot(snapshotId); + if (snapshotId == null) { + return null; + } + + Long latest = null; + do { + try { + return tryGetSnapshot(snapshotId); + } catch (FileNotFoundException e) { + snapshotId++; + if (latest == null) { + latest = latestSnapshotId(); + } + } + } while (latest != null && snapshotId <= latest); + + return null; } public @Nullable Long earliestSnapshotId() { @@ -260,11 +281,11 @@ public boolean longLivedChangelogExists(long snapshotId) { return latestId; } - private Snapshot changelogOrSnapshot(long snapshotId) { + private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundException { if (longLivedChangelogExists(snapshotId)) { - return changelog(snapshotId); + return tryGetChangelog(snapshotId); } else { - return snapshot(snapshotId); + return tryGetSnapshot(snapshotId); } } @@ -273,26 +294,46 @@ private Snapshot changelogOrSnapshot(long snapshotId) { * returned if all snapshots are equal to or later than the timestamp mills. */ public @Nullable Long earlierThanTimeMills(long timestampMills, boolean startFromChangelog) { - Long earliestSnapshot = earliestSnapshotId(); + Long earliestSnapshotId = earliestSnapshotId(); Long earliest; if (startFromChangelog) { Long earliestChangelog = earliestLongLivedChangelogId(); - earliest = earliestChangelog == null ? earliestSnapshot : earliestChangelog; + earliest = earliestChangelog == null ? earliestSnapshotId : earliestChangelog; } else { - earliest = earliestSnapshot; + earliest = earliestSnapshotId; } Long latest = latestSnapshotId(); if (earliest == null || latest == null) { return null; } - if (changelogOrSnapshot(earliest).timeMillis() >= timestampMills) { + Snapshot earliestSnapshot = null; + while (earliest <= latest) { + try { + earliestSnapshot = tryGetSnapshot(earliest); + break; + } catch (FileNotFoundException e) { + earliest++; + } + } + + if (earliestSnapshot == null || earliestSnapshot.timeMillis() >= timestampMills) { return earliest - 1; } while (earliest < latest) { long mid = (earliest + latest + 1) / 2; - if (changelogOrSnapshot(mid).timeMillis() < timestampMills) { + + Snapshot midSnapshot; + try { + midSnapshot = tryGetChangelogOrSnapshot(mid); + } catch (FileNotFoundException e) { + // Snapshots earlier than or equal to mid have been expired. + earliest = mid + 1; + continue; + } + + if (midSnapshot.timeMillis() < timestampMills) { earliest = mid; } else { latest = mid - 1; @@ -312,8 +353,17 @@ private Snapshot changelogOrSnapshot(long snapshotId) { return null; } - Snapshot earliestSnapShot = snapshot(earliest); - if (earliestSnapShot.timeMillis() > timestampMills) { + Snapshot earliestSnapShot = null; + while (earliest <= latest) { + try { + earliestSnapShot = tryGetSnapshot(earliest); + break; + } catch (FileNotFoundException e) { + earliest++; + } + } + + if (earliestSnapShot == null || earliestSnapShot.timeMillis() > timestampMills) { return earliestSnapShot; } Snapshot finalSnapshot = null; @@ -375,9 +425,24 @@ private Snapshot changelogOrSnapshot(long snapshotId) { if (earliest == null || latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) { return null; } + + Snapshot earliestSnapShot = null; + while (earliest <= latest) { + try { + earliestSnapShot = tryGetSnapshot(earliest); + break; + } catch (FileNotFoundException e) { + earliest++; + } + } + + if (earliestSnapShot == null) { + return null; + } + Long earliestWatermark = null; // find the first snapshot with watermark - if ((earliestWatermark = snapshot(earliest).watermark()) == null) { + if ((earliestWatermark = earliestSnapShot.watermark()) == null) { while (earliest < latest) { earliest++; earliestWatermark = snapshot(earliest).watermark(); @@ -434,9 +499,24 @@ private Snapshot changelogOrSnapshot(long snapshotId) { if (earliest == null || latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) { return null; } + + Snapshot earliestSnapShot = null; + while (earliest <= latest) { + try { + earliestSnapShot = tryGetSnapshot(earliest); + break; + } catch (FileNotFoundException e) { + earliest++; + } + } + + if (earliestSnapShot == null) { + return null; + } + Long earliestWatermark = null; // find the first snapshot with watermark - if ((earliestWatermark = snapshot(earliest).watermark()) == null) { + if ((earliestWatermark = earliestSnapShot.watermark()) == null) { while (earliest < latest) { earliest++; earliestWatermark = snapshot(earliest).watermark(); diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java index e828a0c90a9d..7c0885f86b3b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java @@ -27,6 +27,10 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; @@ -56,8 +60,41 @@ public void testSnapshotPath() { } } - @Test - public void testEarlierThanTimeMillis() throws IOException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testEarliestSnapshot(boolean isRaceCondition) throws IOException { + long millis = 1684726826L; + FileIO localFileIO = LocalFileIO.create(); + SnapshotManager snapshotManager = + new TestSnapshotManager(localFileIO, new Path(tempDir.toString()), isRaceCondition); + // create 10 snapshots + for (long i = 0; i < 10; i++) { + Snapshot snapshot = createSnapshotWithMillis(i, millis + i * 1000); + localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson()); + } + + assertThat(snapshotManager.earliestSnapshot().id()).isEqualTo(isRaceCondition ? 1 : 0); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testEarlierOrEqualWatermark(boolean isRaceCondition) throws IOException { + long millis = 1684726826L; + FileIO localFileIO = LocalFileIO.create(); + SnapshotManager snapshotManager = + new TestSnapshotManager(localFileIO, new Path(tempDir.toString()), isRaceCondition); + // create 10 snapshots + for (long i = 0; i < 10; i++) { + Snapshot snapshot = createSnapshotWithMillis(i, millis + i * 1000, Long.MIN_VALUE); + localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson()); + } + + assertThat(snapshotManager.earlierOrEqualWatermark(millis + 999)).isNull(); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testEarlierThanTimeMillis(boolean isRaceCondition) throws IOException { long base = System.currentTimeMillis(); ThreadLocalRandom random = ThreadLocalRandom.current(); @@ -70,7 +107,7 @@ public void testEarlierThanTimeMillis() throws IOException { FileIO localFileIO = LocalFileIO.create(); SnapshotManager snapshotManager = - new SnapshotManager(localFileIO, new Path(tempDir.toString())); + new TestSnapshotManager(localFileIO, new Path(tempDir.toString()), isRaceCondition); int firstSnapshotId = random.nextInt(1, 100); for (int i = 0; i < numSnapshots; i++) { Snapshot snapshot = createSnapshotWithMillis(firstSnapshotId + i, millis.get(i)); @@ -94,7 +131,12 @@ public void testEarlierThanTimeMillis() throws IOException { } else { for (int i = 0; i < numSnapshots; i++) { if (millis.get(i) >= time) { - assertThat(actual).isEqualTo(firstSnapshotId + i - 1); + if (isRaceCondition && i == 0) { + // The first snapshot expired during invocation + assertThat(actual).isEqualTo(firstSnapshotId + i); + } else { + assertThat(actual).isEqualTo(firstSnapshotId + i - 1); + } break; } } @@ -102,31 +144,45 @@ public void testEarlierThanTimeMillis() throws IOException { } } - @Test - public void testEarlierOrEqualTimeMills() throws IOException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testEarlierOrEqualTimeMills(boolean isRaceCondition) throws IOException { long millis = 1684726826L; FileIO localFileIO = LocalFileIO.create(); SnapshotManager snapshotManager = - new SnapshotManager(localFileIO, new Path(tempDir.toString())); + new TestSnapshotManager(localFileIO, new Path(tempDir.toString()), isRaceCondition); // create 10 snapshots for (long i = 0; i < 10; i++) { Snapshot snapshot = createSnapshotWithMillis(i, millis + i * 1000); localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson()); } - // there is no snapshot smaller than "millis - 1L" return the earliest snapshot - assertThat(snapshotManager.earlierOrEqualTimeMills(millis - 1L).timeMillis()) - .isEqualTo(millis); - - // smaller than the second snapshot return the first snapshot - assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 999).timeMillis()) - .isEqualTo(millis); - // equal to the second snapshot return the second snapshot - assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 1000).timeMillis()) - .isEqualTo(millis + 1000); - // larger than the second snapshot return the second snapshot - assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 1001).timeMillis()) - .isEqualTo(millis + 1000); + if (isRaceCondition) { + // The earliest snapshot has expired, so always return the second snapshot + assertThat(snapshotManager.earlierOrEqualTimeMills(millis - 1L).timeMillis()) + .isEqualTo(millis + 1000L); + assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 999).timeMillis()) + .isEqualTo(millis + 1000L); + assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 1000).timeMillis()) + .isEqualTo(millis + 1000L); + assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 1001).timeMillis()) + .isEqualTo(millis + 1000L); + } else { + // there is no snapshot smaller than "millis - 1L" return the earliest snapshot + assertThat(snapshotManager.earlierOrEqualTimeMills(millis - 1L).timeMillis()) + .isEqualTo(millis); + + // smaller than the second snapshot return the first snapshot + assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 999).timeMillis()) + .isEqualTo(millis); + + // equal to the second snapshot return the second snapshot + assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 1000).timeMillis()) + .isEqualTo(millis + 1000); + // larger than the second snapshot return the second snapshot + assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 1001).timeMillis()) + .isEqualTo(millis + 1000); + } } @Test @@ -154,12 +210,13 @@ public void testLaterOrEqualTimeMills() throws IOException { assertThat(snapshotManager.laterOrEqualTimeMills(millis + 10001)).isNull(); } - @Test - public void testlaterOrEqualWatermark() throws IOException { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testlaterOrEqualWatermark(boolean isRaceCondition) throws IOException { long millis = Long.MIN_VALUE; FileIO localFileIO = LocalFileIO.create(); SnapshotManager snapshotManager = - new SnapshotManager(localFileIO, new Path(tempDir.toString())); + new TestSnapshotManager(localFileIO, new Path(tempDir.toString()), isRaceCondition); // create 10 snapshots for (long i = 0; i < 10; i++) { Snapshot snapshot = createSnapshotWithMillis(i, millis, Long.MIN_VALUE); @@ -410,4 +467,35 @@ public void testCommitChangelogWhenSameChangelogCommitTwice() throws IOException snapshotManager.commitChangelog(changelog, id); assertDoesNotThrow(() -> snapshotManager.commitChangelog(changelog, id)); } + + /** + * Test {@link SnapshotManager} to mock situations when there is a race condition, that the + * earliest snapshot is deleted by another thread in the middle of the current thread's + * invocation. + */ + private static class TestSnapshotManager extends SnapshotManager { + private final boolean isRaceCondition; + + private boolean deleteEarliestSnapshot = false; + + public TestSnapshotManager(FileIO fileIO, Path tablePath, boolean isRaceCondition) { + super(fileIO, tablePath); + this.isRaceCondition = isRaceCondition; + } + + @Override + public @Nullable Long earliestSnapshotId() { + Long snapshotId = super.earliestSnapshotId(); + if (isRaceCondition && snapshotId != null && !deleteEarliestSnapshot) { + Path snapshotPath = snapshotPath(snapshotId); + try { + fileIO().delete(snapshotPath, true); + } catch (IOException e) { + throw new RuntimeException(e); + } + deleteEarliestSnapshot = true; + } + return snapshotId; + } + } }