From 66cc214b4261b43caa50d795779d7b9096119f9f Mon Sep 17 00:00:00 2001 From: YeJunHao <41894543+leaves12138@users.noreply.github.com> Date: Fri, 27 Sep 2024 17:49:51 +0800 Subject: [PATCH] [core] Check existence before expire: more check. (#4272) --- .../paimon/table/ExpireSnapshotsImpl.java | 22 ++++++++- .../paimon/operation/ExpireSnapshotsTest.java | 46 +++++++++++++++++++ 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java index bc04cc4caf5a..eb163f8fa8c7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java @@ -35,6 +35,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.function.Predicate; @@ -213,8 +214,25 @@ public int expireUntil(long earliestId, long endExclusiveId) { List skippingSnapshots = SnapshotManager.findOverlappedSnapshots( taggedSnapshots, beginInclusiveId, endExclusiveId); - skippingSnapshots.add(snapshotManager.snapshot(endExclusiveId)); - Set skippingSet = snapshotDeletion.manifestSkippingSet(skippingSnapshots); + + try { + skippingSnapshots.add(snapshotManager.tryGetSnapshot(endExclusiveId)); + } catch (FileNotFoundException e) { + // the end exclusive snapshot is gone + // there is no need to proceed + return 0; + } + + Set skippingSet = new HashSet<>(); + try { + skippingSet.addAll(snapshotDeletion.manifestSkippingSet(skippingSnapshots)); + } catch (Exception e) { + // maybe snapshot been deleted by other jobs. + if (e.getCause() == null || !(e.getCause() instanceof FileNotFoundException)) { + throw e; + } + } + for (long id = beginInclusiveId; id < endExclusiveId; id++) { if (LOG.isDebugEnabled()) { LOG.debug("Ready to delete manifests in snapshot #" + id); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index bd32e48e5d1b..16a2ab56ba03 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -42,6 +42,7 @@ import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -56,6 +57,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; @@ -292,6 +294,50 @@ public void testNumRetainedMin() throws Exception { store.assertCleaned(); } + @Test + public void testExpireEmptySnapshot() throws Exception { + Random random = new Random(); + + List allData = new ArrayList<>(); + List snapshotPositions = new ArrayList<>(); + commit(100, allData, snapshotPositions); + int latestSnapshotId = requireNonNull(snapshotManager.latestSnapshotId()).intValue(); + + List s = new ArrayList<>(); + s.add( + new Thread( + () -> { + final ExpireSnapshotsImpl expire = + (ExpireSnapshotsImpl) store.newExpire(1, Integer.MAX_VALUE, 1); + expire.expireUntil(89, latestSnapshotId); + })); + for (int i = 0; i < 10; i++) { + final ExpireSnapshotsImpl expire = + (ExpireSnapshotsImpl) store.newExpire(1, Integer.MAX_VALUE, 1); + s.add( + new Thread( + () -> { + int start = random.nextInt(latestSnapshotId - 10); + int end = start + random.nextInt(10); + expire.expireUntil(start, end); + })); + } + + Assertions.assertThatCode( + () -> { + s.forEach(Thread::start); + s.forEach( + tt -> { + try { + tt.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + }) + .doesNotThrowAnyException(); + } + @Test public void testExpireWithNumber() throws Exception { ExpireSnapshots expire = store.newExpire(1, 3, Long.MAX_VALUE);