diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index 2f0a1d859091..facced160898 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -181,23 +181,30 @@ public boolean snapshotExists(long snapshotId) { } /** - * Returns a snapshot earlier than the timestamp mills. A non-existent snapshot may be returned - * if all snapshots are later than the timestamp mills. + * Returns the latest snapshot earlier than the timestamp mills. A non-existent snapshot may be + * returned if all snapshots are equal to or later than the timestamp mills. */ public @Nullable Long earlierThanTimeMills(long timestampMills) { Long earliest = earliestSnapshotId(); Long latest = latestSnapshotId(); + if (earliest == null || latest == null) { return null; } - for (long i = latest; i >= earliest; i--) { - long commitTime = snapshot(i).timeMillis(); - if (commitTime < timestampMills) { - return i; + if (snapshot(earliest).timeMillis() >= timestampMills) { + return earliest - 1; + } + + while (earliest < latest) { + long mid = (earliest + latest + 1) / 2; + if (snapshot(mid).timeMillis() < timestampMills) { + earliest = mid; + } else { + latest = mid - 1; } } - return earliest - 1; + return earliest; } /** @@ -214,7 +221,7 @@ public boolean snapshotExists(long snapshotId) { if (snapshot(earliest).timeMillis() > timestampMills) { return null; } - Snapshot finnalSnapshot = null; + Snapshot finalSnapshot = null; while (earliest <= latest) { long mid = earliest + (latest - earliest) / 2; // Avoid overflow Snapshot snapshot = snapshot(mid); @@ -223,13 +230,13 @@ public boolean snapshotExists(long snapshotId) { latest = mid - 1; // Search in the left half } else if (commitTime < timestampMills) { earliest = mid + 1; // Search in the right half - finnalSnapshot = snapshot; + finalSnapshot = snapshot; } else { - finnalSnapshot = snapshot; // Found the exact match + finalSnapshot = snapshot; // Found the exact match break; } } - return finnalSnapshot; + return finalSnapshot; } public long snapshotCount() throws IOException { diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java index 9778ba3be242..7294e3810f3a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java @@ -28,8 +28,12 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; @@ -49,6 +53,52 @@ public void testSnapshotPath() { } } + @Test + public void testEarlierThanTimeMillis() throws IOException { + long base = System.currentTimeMillis(); + ThreadLocalRandom random = ThreadLocalRandom.current(); + + int numSnapshots = random.nextInt(1, 20); + Set set = new HashSet<>(); + while (set.size() < numSnapshots) { + set.add(base + random.nextLong(0, 1_000_000)); + } + List millis = set.stream().sorted().collect(Collectors.toList()); + + FileIO localFileIO = LocalFileIO.create(); + SnapshotManager snapshotManager = + new SnapshotManager(localFileIO, new Path(tempDir.toString())); + int firstSnapshotId = random.nextInt(1, 100); + for (int i = 0; i < numSnapshots; i++) { + Snapshot snapshot = createSnapshotWithMillis(firstSnapshotId + i, millis.get(i)); + localFileIO.writeFileUtf8( + snapshotManager.snapshotPath(firstSnapshotId + i), snapshot.toJson()); + } + + for (int tries = 0; tries < 10; tries++) { + long time; + if (random.nextBoolean()) { + // pick a random time + time = base + random.nextLong(0, 1_000_000); + } else { + // pick a random time equal to one of the snapshots + time = millis.get(random.nextInt(numSnapshots)); + } + Long actual = snapshotManager.earlierThanTimeMills(time); + + if (millis.get(numSnapshots - 1) < time) { + assertThat(actual).isEqualTo(firstSnapshotId + numSnapshots - 1); + } else { + for (int i = 0; i < numSnapshots; i++) { + if (millis.get(i) >= time) { + assertThat(actual).isEqualTo(firstSnapshotId + i - 1); + break; + } + } + } + } + } + @Test public void testEarlierOrEqualTimeMills() throws IOException { long millis = 1684726826L; @@ -57,24 +107,7 @@ public void testEarlierOrEqualTimeMills() throws IOException { new SnapshotManager(localFileIO, new Path(tempDir.toString())); // create 10 snapshots for (long i = 0; i < 10; i++) { - Snapshot snapshot = - new Snapshot( - i, - 0L, - null, - null, - null, - null, - null, - 0L, - Snapshot.CommitKind.APPEND, - millis + i * 1000, - null, - null, - null, - null, - null, - null); + Snapshot snapshot = createSnapshotWithMillis(i, millis + i * 1000); localFileIO.writeFileUtf8(snapshotManager.snapshotPath(i), snapshot.toJson()); } // smaller than the second snapshot return the first snapshot @@ -88,6 +121,26 @@ public void testEarlierOrEqualTimeMills() throws IOException { .isEqualTo(millis + 1000); } + private Snapshot createSnapshotWithMillis(long id, long millis) { + return new Snapshot( + id, + 0L, + null, + null, + null, + null, + null, + 0L, + Snapshot.CommitKind.APPEND, + millis, + null, + null, + null, + null, + null, + null); + } + @Test public void testTraversalSnapshotsFromLatestSafely() throws IOException, InterruptedException { FileIO localFileIO = LocalFileIO.create();