Skip to content

Commit

Permalink
[core] Fix race condition for earliest snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Jan 16, 2025
1 parent 4c2ba07 commit b56221a
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 36 deletions.
11 changes: 11 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/Changelog.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import javax.annotation.Nullable;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Map;

Expand Down Expand Up @@ -105,9 +106,19 @@ public static Changelog fromJson(String json) {
}

public static Changelog fromPath(FileIO fileIO, Path path) {
try {
return tryFromPath(fileIO, path);
} catch (FileNotFoundException e) {
throw new RuntimeException("Fails to read changelog from path " + path, e);
}
}

public static Changelog tryFromPath(FileIO fileIO, Path path) throws FileNotFoundException {
try {
String json = fileIO.readFileUtf8(path);
return Changelog.fromJson(json);
} catch (FileNotFoundException e) {
throw e;
} catch (IOException e) {
throw new RuntimeException("Fails to read changelog from path " + path, e);
}
Expand Down
106 changes: 93 additions & 13 deletions paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ public Changelog changelog(long snapshotId) {
return Changelog.fromPath(fileIO, changelogPath);
}

private Changelog tryGetChangelog(long snapshotId) throws FileNotFoundException {
Path changelogPath = longLivedChangelogPath(snapshotId);
return Changelog.tryFromPath(fileIO, changelogPath);
}

public Changelog longLivedChangelog(long snapshotId) {
return Changelog.fromPath(fileIO, longLivedChangelogPath(snapshotId));
}
Expand Down Expand Up @@ -209,7 +214,23 @@ public boolean longLivedChangelogExists(long snapshotId) {

public @Nullable Snapshot earliestSnapshot() {
Long snapshotId = earliestSnapshotId();
return snapshotId == null ? null : snapshot(snapshotId);
if (snapshotId == null) {
return null;
}

Long latest = null;
do {
try {
return tryGetSnapshot(snapshotId);
} catch (FileNotFoundException e) {
snapshotId++;
if (latest == null) {
latest = latestSnapshotId();
}
}
} while (latest != null && snapshotId <= latest);

return null;
}

public @Nullable Long earliestSnapshotId() {
Expand Down Expand Up @@ -260,11 +281,11 @@ public boolean longLivedChangelogExists(long snapshotId) {
return latestId;
}

private Snapshot changelogOrSnapshot(long snapshotId) {
private Snapshot tryGetChangelogOrSnapshot(long snapshotId) throws FileNotFoundException {
if (longLivedChangelogExists(snapshotId)) {
return changelog(snapshotId);
return tryGetChangelog(snapshotId);
} else {
return snapshot(snapshotId);
return tryGetSnapshot(snapshotId);
}
}

Expand All @@ -273,26 +294,46 @@ private Snapshot changelogOrSnapshot(long snapshotId) {
* returned if all snapshots are equal to or later than the timestamp mills.
*/
public @Nullable Long earlierThanTimeMills(long timestampMills, boolean startFromChangelog) {
Long earliestSnapshot = earliestSnapshotId();
Long earliestSnapshotId = earliestSnapshotId();
Long earliest;
if (startFromChangelog) {
Long earliestChangelog = earliestLongLivedChangelogId();
earliest = earliestChangelog == null ? earliestSnapshot : earliestChangelog;
earliest = earliestChangelog == null ? earliestSnapshotId : earliestChangelog;
} else {
earliest = earliestSnapshot;
earliest = earliestSnapshotId;
}
Long latest = latestSnapshotId();
if (earliest == null || latest == null) {
return null;
}

if (changelogOrSnapshot(earliest).timeMillis() >= timestampMills) {
Snapshot earliestSnapshot = null;
while (earliest <= latest) {
try {
earliestSnapshot = tryGetSnapshot(earliest);
break;
} catch (FileNotFoundException e) {
earliest++;
}
}

if (earliestSnapshot == null || earliestSnapshot.timeMillis() >= timestampMills) {
return earliest - 1;
}

while (earliest < latest) {
long mid = (earliest + latest + 1) / 2;
if (changelogOrSnapshot(mid).timeMillis() < timestampMills) {

Snapshot midSnapshot;
try {
midSnapshot = tryGetChangelogOrSnapshot(mid);
} catch (FileNotFoundException e) {
// Snapshots earlier than or equal to mid have been expired.
earliest = mid + 1;
continue;
}

if (midSnapshot.timeMillis() < timestampMills) {
earliest = mid;
} else {
latest = mid - 1;
Expand All @@ -312,8 +353,17 @@ private Snapshot changelogOrSnapshot(long snapshotId) {
return null;
}

Snapshot earliestSnapShot = snapshot(earliest);
if (earliestSnapShot.timeMillis() > timestampMills) {
Snapshot earliestSnapShot = null;
while (earliest <= latest) {
try {
earliestSnapShot = tryGetSnapshot(earliest);
break;
} catch (FileNotFoundException e) {
earliest++;
}
}

if (earliestSnapShot == null || earliestSnapShot.timeMillis() > timestampMills) {
return earliestSnapShot;
}
Snapshot finalSnapshot = null;
Expand Down Expand Up @@ -375,9 +425,24 @@ private Snapshot changelogOrSnapshot(long snapshotId) {
if (earliest == null || latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) {
return null;
}

Snapshot earliestSnapShot = null;
while (earliest <= latest) {
try {
earliestSnapShot = tryGetSnapshot(earliest);
break;
} catch (FileNotFoundException e) {
earliest++;
}
}

if (earliestSnapShot == null) {
return null;
}

Long earliestWatermark = null;
// find the first snapshot with watermark
if ((earliestWatermark = snapshot(earliest).watermark()) == null) {
if ((earliestWatermark = earliestSnapShot.watermark()) == null) {
while (earliest < latest) {
earliest++;
earliestWatermark = snapshot(earliest).watermark();
Expand Down Expand Up @@ -434,9 +499,24 @@ private Snapshot changelogOrSnapshot(long snapshotId) {
if (earliest == null || latest == null || snapshot(latest).watermark() == Long.MIN_VALUE) {
return null;
}

Snapshot earliestSnapShot = null;
while (earliest <= latest) {
try {
earliestSnapShot = tryGetSnapshot(earliest);
break;
} catch (FileNotFoundException e) {
earliest++;
}
}

if (earliestSnapShot == null) {
return null;
}

Long earliestWatermark = null;
// find the first snapshot with watermark
if ((earliestWatermark = snapshot(earliest).watermark()) == null) {
if ((earliestWatermark = earliestSnapShot.watermark()) == null) {
while (earliest < latest) {
earliest++;
earliestWatermark = snapshot(earliest).watermark();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -56,8 +60,41 @@ public void testSnapshotPath() {
}
}

@Test
public void testEarlierThanTimeMillis() throws IOException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testEarliestSnapshot(boolean isRaceCondition) throws IOException {
long millis = 1684726826L;
FileIO localFileIO = LocalFileIO.create();
SnapshotManager snapshotManager =
new TestSnapshotManager(localFileIO, new Path(tempDir.toString()), isRaceCondition);
// create 10 snapshots
for (long i = 0; i < 10; i++) {
Snapshot snapshot = createSnapshotWithMillis(i, millis + i * 1000);
localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson());
}

assertThat(snapshotManager.earliestSnapshot().id()).isEqualTo(isRaceCondition ? 1 : 0);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testEarlierOrEqualWatermark(boolean isRaceCondition) throws IOException {
long millis = 1684726826L;
FileIO localFileIO = LocalFileIO.create();
SnapshotManager snapshotManager =
new TestSnapshotManager(localFileIO, new Path(tempDir.toString()), isRaceCondition);
// create 10 snapshots
for (long i = 0; i < 10; i++) {
Snapshot snapshot = createSnapshotWithMillis(i, millis + i * 1000, Long.MIN_VALUE);
localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson());
}

assertThat(snapshotManager.earlierOrEqualWatermark(millis + 999)).isNull();
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testEarlierThanTimeMillis(boolean isRaceCondition) throws IOException {
long base = System.currentTimeMillis();
ThreadLocalRandom random = ThreadLocalRandom.current();

Expand All @@ -70,7 +107,7 @@ public void testEarlierThanTimeMillis() throws IOException {

FileIO localFileIO = LocalFileIO.create();
SnapshotManager snapshotManager =
new SnapshotManager(localFileIO, new Path(tempDir.toString()));
new TestSnapshotManager(localFileIO, new Path(tempDir.toString()), isRaceCondition);
int firstSnapshotId = random.nextInt(1, 100);
for (int i = 0; i < numSnapshots; i++) {
Snapshot snapshot = createSnapshotWithMillis(firstSnapshotId + i, millis.get(i));
Expand All @@ -94,39 +131,58 @@ public void testEarlierThanTimeMillis() throws IOException {
} else {
for (int i = 0; i < numSnapshots; i++) {
if (millis.get(i) >= time) {
assertThat(actual).isEqualTo(firstSnapshotId + i - 1);
if (isRaceCondition && i == 0) {
// The first snapshot expired during invocation
assertThat(actual).isEqualTo(firstSnapshotId + i);
} else {
assertThat(actual).isEqualTo(firstSnapshotId + i - 1);
}
break;
}
}
}
}
}

@Test
public void testEarlierOrEqualTimeMills() throws IOException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testEarlierOrEqualTimeMills(boolean isRaceCondition) throws IOException {
long millis = 1684726826L;
FileIO localFileIO = LocalFileIO.create();
SnapshotManager snapshotManager =
new SnapshotManager(localFileIO, new Path(tempDir.toString()));
new TestSnapshotManager(localFileIO, new Path(tempDir.toString()), isRaceCondition);
// create 10 snapshots
for (long i = 0; i < 10; i++) {
Snapshot snapshot = createSnapshotWithMillis(i, millis + i * 1000);
localFileIO.tryToWriteAtomic(snapshotManager.snapshotPath(i), snapshot.toJson());
}

// there is no snapshot smaller than "millis - 1L" return the earliest snapshot
assertThat(snapshotManager.earlierOrEqualTimeMills(millis - 1L).timeMillis())
.isEqualTo(millis);

// smaller than the second snapshot return the first snapshot
assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 999).timeMillis())
.isEqualTo(millis);
// equal to the second snapshot return the second snapshot
assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 1000).timeMillis())
.isEqualTo(millis + 1000);
// larger than the second snapshot return the second snapshot
assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 1001).timeMillis())
.isEqualTo(millis + 1000);
if (isRaceCondition) {
// The earliest snapshot has expired, so always return the second snapshot
assertThat(snapshotManager.earlierOrEqualTimeMills(millis - 1L).timeMillis())
.isEqualTo(millis + 1000L);
assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 999).timeMillis())
.isEqualTo(millis + 1000L);
assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 1000).timeMillis())
.isEqualTo(millis + 1000L);
assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 1001).timeMillis())
.isEqualTo(millis + 1000L);
} else {
// there is no snapshot smaller than "millis - 1L" return the earliest snapshot
assertThat(snapshotManager.earlierOrEqualTimeMills(millis - 1L).timeMillis())
.isEqualTo(millis);

// smaller than the second snapshot return the first snapshot
assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 999).timeMillis())
.isEqualTo(millis);

// equal to the second snapshot return the second snapshot
assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 1000).timeMillis())
.isEqualTo(millis + 1000);
// larger than the second snapshot return the second snapshot
assertThat(snapshotManager.earlierOrEqualTimeMills(millis + 1001).timeMillis())
.isEqualTo(millis + 1000);
}
}

@Test
Expand Down Expand Up @@ -154,12 +210,13 @@ public void testLaterOrEqualTimeMills() throws IOException {
assertThat(snapshotManager.laterOrEqualTimeMills(millis + 10001)).isNull();
}

@Test
public void testlaterOrEqualWatermark() throws IOException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testlaterOrEqualWatermark(boolean isRaceCondition) throws IOException {
long millis = Long.MIN_VALUE;
FileIO localFileIO = LocalFileIO.create();
SnapshotManager snapshotManager =
new SnapshotManager(localFileIO, new Path(tempDir.toString()));
new TestSnapshotManager(localFileIO, new Path(tempDir.toString()), isRaceCondition);
// create 10 snapshots
for (long i = 0; i < 10; i++) {
Snapshot snapshot = createSnapshotWithMillis(i, millis, Long.MIN_VALUE);
Expand Down Expand Up @@ -410,4 +467,35 @@ public void testCommitChangelogWhenSameChangelogCommitTwice() throws IOException
snapshotManager.commitChangelog(changelog, id);
assertDoesNotThrow(() -> snapshotManager.commitChangelog(changelog, id));
}

/**
* Test {@link SnapshotManager} to mock situations when there is a race condition, that the
* earliest snapshot is deleted by another thread in the middle of the current thread's
* invocation.
*/
private static class TestSnapshotManager extends SnapshotManager {
private final boolean isRaceCondition;

private boolean deleteEarliestSnapshot = false;

public TestSnapshotManager(FileIO fileIO, Path tablePath, boolean isRaceCondition) {
super(fileIO, tablePath);
this.isRaceCondition = isRaceCondition;
}

@Override
public @Nullable Long earliestSnapshotId() {
Long snapshotId = super.earliestSnapshotId();
if (isRaceCondition && snapshotId != null && !deleteEarliestSnapshot) {
Path snapshotPath = snapshotPath(snapshotId);
try {
fileIO().delete(snapshotPath, true);
} catch (IOException e) {
throw new RuntimeException(e);
}
deleteEarliestSnapshot = true;
}
return snapshotId;
}
}
}

0 comments on commit b56221a

Please sign in to comment.