Skip to content

Commit

Permalink
[core] Check existence before expire: more check. (apache#4272)
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 authored Sep 27, 2024
1 parent ce56759 commit 66cc214
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
Expand Down Expand Up @@ -213,8 +214,25 @@ public int expireUntil(long earliestId, long endExclusiveId) {
List<Snapshot> skippingSnapshots =
SnapshotManager.findOverlappedSnapshots(
taggedSnapshots, beginInclusiveId, endExclusiveId);
skippingSnapshots.add(snapshotManager.snapshot(endExclusiveId));
Set<String> skippingSet = snapshotDeletion.manifestSkippingSet(skippingSnapshots);

try {
skippingSnapshots.add(snapshotManager.tryGetSnapshot(endExclusiveId));
} catch (FileNotFoundException e) {
// the end exclusive snapshot is gone
// there is no need to proceed
return 0;
}

Set<String> skippingSet = new HashSet<>();
try {
skippingSet.addAll(snapshotDeletion.manifestSkippingSet(skippingSnapshots));
} catch (Exception e) {
// maybe snapshot been deleted by other jobs.
if (e.getCause() == null || !(e.getCause() instanceof FileNotFoundException)) {
throw e;
}
}

for (long id = beginInclusiveId; id < endExclusiveId; id++) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ready to delete manifests in snapshot #" + id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand All @@ -56,6 +57,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -292,6 +294,50 @@ public void testNumRetainedMin() throws Exception {
store.assertCleaned();
}

@Test
public void testExpireEmptySnapshot() throws Exception {
Random random = new Random();

List<KeyValue> allData = new ArrayList<>();
List<Integer> snapshotPositions = new ArrayList<>();
commit(100, allData, snapshotPositions);
int latestSnapshotId = requireNonNull(snapshotManager.latestSnapshotId()).intValue();

List<Thread> s = new ArrayList<>();
s.add(
new Thread(
() -> {
final ExpireSnapshotsImpl expire =
(ExpireSnapshotsImpl) store.newExpire(1, Integer.MAX_VALUE, 1);
expire.expireUntil(89, latestSnapshotId);
}));
for (int i = 0; i < 10; i++) {
final ExpireSnapshotsImpl expire =
(ExpireSnapshotsImpl) store.newExpire(1, Integer.MAX_VALUE, 1);
s.add(
new Thread(
() -> {
int start = random.nextInt(latestSnapshotId - 10);
int end = start + random.nextInt(10);
expire.expireUntil(start, end);
}));
}

Assertions.assertThatCode(
() -> {
s.forEach(Thread::start);
s.forEach(
tt -> {
try {
tt.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
})
.doesNotThrowAnyException();
}

@Test
public void testExpireWithNumber() throws Exception {
ExpireSnapshots expire = store.newExpire(1, 3, Long.MAX_VALUE);
Expand Down

0 comments on commit 66cc214

Please sign in to comment.