diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java index ab6bb5d0993f..c326eb35e4a1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java @@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory; import java.util.List; -import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.Callable; import java.util.function.Predicate; @@ -112,33 +111,39 @@ public void expire() { return; } - // locate the first snapshot between the numRetainedMax th and (numRetainedMin+1) th latest - // snapshots to be retained. This snapshot needs to be preserved because it - // doesn't fulfill the time threshold condition for expiration. - for (long id = Math.max(latestSnapshotId - numRetainedMax + 1, earliest); - id <= latestSnapshotId - numRetainedMin; - id++) { + // the min snapshot to retain from 'snapshot.num-retained.max' + // (the maximum number of snapshots to retain) + long min = Math.max(latestSnapshotId - numRetainedMax + 1, earliest); + + // the max exclusive snapshot to expire until + // protected by 'snapshot.num-retained.min' + // (the minimum number of completed snapshots to retain) + long maxExclusive = latestSnapshotId - numRetainedMin + 1; + + // the snapshot being read by the consumer cannot be deleted + maxExclusive = + Math.min(maxExclusive, consumerManager.minNextSnapshot().orElse(Long.MAX_VALUE)); + + // protected by 'snapshot.expire.limit' + // (the maximum number of snapshots allowed to expire at a time) + maxExclusive = Math.min(maxExclusive, earliest + expireLimit); + + for (long id = min; id < maxExclusive; id++) { + // Early exit the loop for 'snapshot.time-retained' + // (the maximum time of snapshots to retain) if (snapshotManager.snapshotExists(id) && currentMillis - snapshotManager.snapshot(id).timeMillis() <= millisRetained) { - // within time threshold, can assume that all snapshots after it are also within - // the threshold expireUntil(earliest, id); return; } } - // by default, expire until there are only numRetainedMin snapshots left - expireUntil(earliest, latestSnapshotId - numRetainedMin + 1); + expireUntil(earliest, maxExclusive); } @VisibleForTesting public void expireUntil(long earliestId, long endExclusiveId) { - OptionalLong minNextSnapshot = consumerManager.minNextSnapshot(); - if (minNextSnapshot.isPresent()) { - endExclusiveId = Math.min(minNextSnapshot.getAsLong(), endExclusiveId); - } - if (endExclusiveId <= earliestId) { // No expire happens: // write the hint file in order to see the earliest snapshot directly next time @@ -162,8 +167,6 @@ public void expireUntil(long earliestId, long endExclusiveId) { } } - endExclusiveId = Math.min(beginInclusiveId + expireLimit, endExclusiveId); - if (LOG.isDebugEnabled()) { LOG.debug( "Snapshot expire range is [" + beginInclusiveId + ", " + endExclusiveId + ")"); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java index ad1ef4c33e02..d9b3469f9a16 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java @@ -44,7 +44,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** Base test class for {@link FileStoreExpireImpl}. */ -public class FileStoreExpireTestBase { +public abstract class FileStoreExpireTestBase { protected final FileIO fileIO = new LocalFileIO(); protected TestKeyValueGenerator gen;