Skip to content

Commit

Permalink
[core] Optimize snapshot expire loop for consumers (#2546)
Browse files Browse the repository at this point in the history
This closes #2546.
  • Loading branch information
JingsongLi committed Jan 15, 2024
1 parent b83b680 commit d51417c
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.function.Predicate;
Expand Down Expand Up @@ -115,33 +114,39 @@ public void expire() {
return;
}

// locate the first snapshot between the numRetainedMax th and (numRetainedMin+1) th latest
// snapshots to be retained. This snapshot needs to be preserved because it
// doesn't fulfill the time threshold condition for expiration.
for (long id = Math.max(latestSnapshotId - numRetainedMax + 1, earliest);
id <= latestSnapshotId - numRetainedMin;
id++) {
// the min snapshot to retain from 'snapshot.num-retained.max'
// (the maximum number of snapshots to retain)
long min = Math.max(latestSnapshotId - numRetainedMax + 1, earliest);

// the max exclusive snapshot to expire until
// protected by 'snapshot.num-retained.min'
// (the minimum number of completed snapshots to retain)
long maxExclusive = latestSnapshotId - numRetainedMin + 1;

// the snapshot being read by the consumer cannot be deleted
maxExclusive =
Math.min(maxExclusive, consumerManager.minNextSnapshot().orElse(Long.MAX_VALUE));

// protected by 'snapshot.expire.limit'
// (the maximum number of snapshots allowed to expire at a time)
maxExclusive = Math.min(maxExclusive, earliest + expireLimit);

for (long id = min; id < maxExclusive; id++) {
// Early exit the loop for 'snapshot.time-retained'
// (the maximum time of snapshots to retain)
if (snapshotManager.snapshotExists(id)
&& currentMillis - snapshotManager.snapshot(id).timeMillis()
<= millisRetained) {
// within time threshold, can assume that all snapshots after it are also within
// the threshold
expireUntil(earliest, id);
return;
}
}

// by default, expire until there are only numRetainedMin snapshots left
expireUntil(earliest, latestSnapshotId - numRetainedMin + 1);
expireUntil(earliest, maxExclusive);
}

@VisibleForTesting
public void expireUntil(long earliestId, long endExclusiveId) {
OptionalLong minNextSnapshot = consumerManager.minNextSnapshot();
if (minNextSnapshot.isPresent()) {
endExclusiveId = Math.min(minNextSnapshot.getAsLong(), endExclusiveId);
}

if (endExclusiveId <= earliestId) {
// No expire happens:
// write the hint file in order to see the earliest snapshot directly next time
Expand All @@ -165,8 +170,6 @@ public void expireUntil(long earliestId, long endExclusiveId) {
}
}

endExclusiveId = Math.min(beginInclusiveId + expireLimit, endExclusiveId);

if (LOG.isDebugEnabled()) {
LOG.debug(
"Snapshot expire range is [" + beginInclusiveId + ", " + endExclusiveId + ")");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import static org.assertj.core.api.Assertions.assertThat;

/** Base test class for {@link FileStoreExpireImpl}. */
public class FileStoreExpireTestBase {
public abstract class FileStoreExpireTestBase {

protected final FileIO fileIO = new LocalFileIO();
protected TestKeyValueGenerator gen;
Expand Down

0 comments on commit d51417c

Please sign in to comment.