Skip to content

Commit

Permalink
add ut
Browse files Browse the repository at this point in the history
  • Loading branch information
herefree committed Jan 6, 2025
1 parent 63bc4bc commit 6fea091
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.Changelog;
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
Expand Down Expand Up @@ -409,7 +410,8 @@ private Long findFirstSnapshotWithWatermark(Long earliest, Long latest) {
return null;
}

private @Nullable Long binarySearch(
@VisibleForTesting
public @Nullable Long binarySearch(
Long start,
Long end,
java.util.function.Predicate<Long> condition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -155,18 +156,164 @@ public void testLaterOrEqualTimeMills() throws IOException {
}

@Test
public void testlaterOrEqualWatermark() throws IOException {
public void testLaterOrEqualWatermark() throws IOException {
long millis = Long.MIN_VALUE;
FileIO localFileIO = LocalFileIO.create();
SnapshotManager snapshotManager =
new SnapshotManager(localFileIO, new Path(tempDir.toString()));
// create 10 snapshots
for (long i = 0; i < 10; i++) {
// create 10 snapshots, watermarks are Long.MIN_VALUE.
long i = 0;
for (; i < 10; i++) {
Snapshot snapshot = createSnapshotWithMillis(i, millis, Long.MIN_VALUE);
localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson());
}
// smaller than the second snapshot
assertThat(snapshotManager.laterOrEqualWatermark(millis + 999)).isNull();

// create 5 snapshots, watermarks are not Long.MIN_VALUE.
for (; i < 15; i++) {
Snapshot snapshot = createSnapshotWithMillis(i, millis, i * 10 + 1000L);
localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson());
}

assertThat(snapshotManager.laterOrEqualWatermark(1100L))
.isEqualTo(createSnapshotWithMillis(10, millis, 1100L));
assertThat(snapshotManager.laterOrEqualWatermark(1101L))
.isEqualTo(createSnapshotWithMillis(11, millis, 1110L));
assertThat(snapshotManager.laterOrEqualWatermark(99L))
.isEqualTo(createSnapshotWithMillis(10, millis, 1100L));
assertThat(snapshotManager.laterOrEqualWatermark(1110L))
.isEqualTo(createSnapshotWithMillis(11, millis, 1110L));
assertThat(snapshotManager.laterOrEqualWatermark(1140L))
.isEqualTo(createSnapshotWithMillis(14, millis, 1140L));
assertThat(snapshotManager.laterOrEqualWatermark(1141L)).isNull();

// delete snapshots
for (i = 0; i < 15; i++) {
localFileIO.deleteQuietly(snapshotManager.snapshotPath(i));
}

// create 15 snapshots, the first 10 watermark are null.
i = 0;
for (; i < 10; i++) {
Snapshot snapshot = createSnapshotWithMillis(i, millis, null);
localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson());
}
for (; i < 15; i++) {
Snapshot snapshot = createSnapshotWithMillis(i, millis, i * 10 + 1000);
localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson());
}
assertThat(snapshotManager.laterOrEqualWatermark(1100L))
.isEqualTo(createSnapshotWithMillis(10, millis, 1100L));
assertThat(snapshotManager.laterOrEqualWatermark(1101L))
.isEqualTo(createSnapshotWithMillis(11, millis, 1110L));
assertThat(snapshotManager.laterOrEqualWatermark(99L))
.isEqualTo(createSnapshotWithMillis(10, millis, 1100L));
assertThat(snapshotManager.laterOrEqualWatermark(1110L))
.isEqualTo(createSnapshotWithMillis(11, millis, 1110L));
assertThat(snapshotManager.laterOrEqualWatermark(1140L))
.isEqualTo(createSnapshotWithMillis(14, millis, 1140L));
assertThat(snapshotManager.laterOrEqualWatermark(1141L)).isNull();

// delete snapshots
for (i = 0; i < 15; i++) {
localFileIO.deleteQuietly(snapshotManager.snapshotPath(i));
}

// create 15 snapshots, all watermark are not null.
i = 0;
for (; i < 15; i++) {
Snapshot snapshot = createSnapshotWithMillis(i, millis, i * 10 + 1000);
localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson());
}
assertThat(snapshotManager.laterOrEqualWatermark(999L))
.isEqualTo(createSnapshotWithMillis(0, millis, 1000L));
assertThat(snapshotManager.laterOrEqualWatermark(1000L))
.isEqualTo(createSnapshotWithMillis(0, millis, 1000L));
assertThat(snapshotManager.laterOrEqualWatermark(1001L))
.isEqualTo(createSnapshotWithMillis(1, millis, 1010L));
assertThat(snapshotManager.laterOrEqualWatermark(1140L))
.isEqualTo(createSnapshotWithMillis(14, millis, 1140L));
assertThat(snapshotManager.laterOrEqualWatermark(1141L)).isNull();
}

@Test
public void testEarlierOrEqualWatermark() throws IOException {
long millis = Long.MIN_VALUE;
FileIO localFileIO = LocalFileIO.create();
SnapshotManager snapshotManager =
new SnapshotManager(localFileIO, new Path(tempDir.toString()));
// create 10 snapshots, watermarks are Long.MIN_VALUE.
long i = 0;
for (; i < 10; i++) {
Snapshot snapshot = createSnapshotWithMillis(i, millis, Long.MIN_VALUE);
localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson());
}
// smaller than the second snapshot
assertThat(snapshotManager.earlierOrEqualWatermark(millis + 999)).isNull();

// create 5 snapshots, watermarks are not Long.MIN_VALUE.
for (; i < 15; i++) {
Snapshot snapshot = createSnapshotWithMillis(i, millis, i * 10 + 1000L);
localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson());
}

assertThat(snapshotManager.earlierOrEqualWatermark(1140L))
.isEqualTo(createSnapshotWithMillis(14, millis, 1140L));
assertThat(snapshotManager.earlierOrEqualWatermark(1141L))
.isEqualTo(createSnapshotWithMillis(14, millis, 1140L));
assertThat(snapshotManager.earlierOrEqualWatermark(1139L))
.isEqualTo(createSnapshotWithMillis(13, millis, 1130L));
assertThat(snapshotManager.earlierOrEqualWatermark(1100L))
.isEqualTo(createSnapshotWithMillis(10, millis, 1100L));
assertThat(snapshotManager.earlierOrEqualWatermark(1099L))
.isEqualTo(createSnapshotWithMillis(9, millis, Long.MIN_VALUE));

// delete snapshots
for (i = 0; i < 15; i++) {
localFileIO.deleteQuietly(snapshotManager.snapshotPath(i));
}

// create 15 snapshots, the first 10 watermark are null.
i = 0;
for (; i < 10; i++) {
Snapshot snapshot = createSnapshotWithMillis(i, millis, null);
localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson());
}
for (; i < 15; i++) {
Snapshot snapshot = createSnapshotWithMillis(i, millis, i * 10 + 1000L);
localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson());
}
assertThat(snapshotManager.earlierOrEqualWatermark(1140L))
.isEqualTo(createSnapshotWithMillis(14, millis, 1140L));
assertThat(snapshotManager.earlierOrEqualWatermark(1141L))
.isEqualTo(createSnapshotWithMillis(14, millis, 1140L));
assertThat(snapshotManager.earlierOrEqualWatermark(1139L))
.isEqualTo(createSnapshotWithMillis(13, millis, 1130L));
assertThat(snapshotManager.earlierOrEqualWatermark(1100L))
.isEqualTo(createSnapshotWithMillis(10, millis, 1100L));
assertThat(snapshotManager.earlierOrEqualWatermark(1099L)).isNull();

// delete snapshots
for (i = 0; i < 15; i++) {
localFileIO.deleteQuietly(snapshotManager.snapshotPath(i));
}

// create 15 snapshots, all watermarks not null.
i = 0;
for (; i < 15; i++) {
Snapshot snapshot = createSnapshotWithMillis(i, millis, i * 10 + 1000L);
localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson());
}
assertThat(snapshotManager.earlierOrEqualWatermark(1140L))
.isEqualTo(createSnapshotWithMillis(14, millis, 1140L));
assertThat(snapshotManager.earlierOrEqualWatermark(1141L))
.isEqualTo(createSnapshotWithMillis(14, millis, 1140L));
assertThat(snapshotManager.earlierOrEqualWatermark(1139L))
.isEqualTo(createSnapshotWithMillis(13, millis, 1130L));
assertThat(snapshotManager.earlierOrEqualWatermark(1000L))
.isEqualTo(createSnapshotWithMillis(0, millis, 1000L));
assertThat(snapshotManager.earlierOrEqualWatermark(999L)).isNull();
}

private Snapshot createSnapshotWithMillis(long id, long millis) {
Expand All @@ -189,7 +336,7 @@ private Snapshot createSnapshotWithMillis(long id, long millis) {
null);
}

private Snapshot createSnapshotWithMillis(long id, long millis, long watermark) {
private Snapshot createSnapshotWithMillis(long id, long millis, Long watermark) {
return new Snapshot(
id,
0L,
Expand Down Expand Up @@ -410,4 +557,61 @@ public void testCommitChangelogWhenSameChangelogCommitTwice() throws IOException
snapshotManager.commitChangelog(changelog, id);
assertDoesNotThrow(() -> snapshotManager.commitChangelog(changelog, id));
}

@Test
public void testBinarySearch() {
FileIO localFileIO = LocalFileIO.create();
SnapshotManager snapshotManager =
new SnapshotManager(localFileIO, new Path(tempDir.toString()));
// findEarliest = true
Predicate<Long> condition = id -> id >= 5L;
Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, true)).isEqualTo(5L);
condition = id -> id > 5L;
Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, true)).isEqualTo(6L);
condition = id -> id >= 10L;
Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, true))
.isEqualTo(10L);
condition = id -> id > 10L;
Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, true)).isNull();
condition = id -> id >= 11L;
Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, true)).isNull();
condition = id -> id > 11L;
Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, true)).isNull();
condition = id -> id >= 1L;
Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, true)).isEqualTo(2L);
condition = id -> id > 1L;
Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, true)).isEqualTo(2L);
condition = id -> id >= 2L;
Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, true)).isEqualTo(2L);
condition = id -> id > 2L;
Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, true)).isEqualTo(3L);
// findEarliest = false
condition = id -> id <= 5;
Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, false))
.isEqualTo(5L);
condition = id -> id < 5;
Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, false))
.isEqualTo(4L);
condition = id -> id <= 10;
Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, false))
.isEqualTo(10L);
condition = id -> id < 10;
Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, false))
.isEqualTo(9L);
condition = id -> id <= 11;
Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, false))
.isEqualTo(10L);
condition = id -> id < 11;
Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, false))
.isEqualTo(10L);
condition = id -> id <= 1;
Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, false)).isNull();
condition = id -> id < 1;
Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, false)).isNull();
condition = id -> id <= 2;
Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, false))
.isEqualTo(2L);
condition = id -> id < 2;
Assertions.assertThat(snapshotManager.binarySearch(2L, 10L, condition, false)).isNull();
}
}

0 comments on commit 6fea091

Please sign in to comment.