Skip to content

Commit

Permalink
[core] Rename snapshots to tags in expiration classes
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Aug 11, 2024
1 parent 689a5a4 commit ac92d1d
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,11 @@ public abstract class FileDeletionBase<T extends Snapshot> {

protected boolean changelogDecoupled;

/** Used to record which snapshot is cached. */
private long cachedSnapshotId = 0;
/** Used to record which tag is cached. */
private long cachedTag = 0;

/** Used to cache data files used by current snapshot. */
private final Map<BinaryRow, Map<Integer, Set<String>>> cachedSnapshotDataFiles =
new HashMap<>();
/** Used to cache data files used by current tag. */
private final Map<BinaryRow, Map<Integer, Set<String>>> cachedTagDataFiles = new HashMap<>();

public FileDeletionBase(
FileIO fileIO,
Expand Down Expand Up @@ -328,18 +327,18 @@ protected void cleanUnusedManifests(
cleanUnusedStatisticsManifests(snapshot, skippingSet);
}

public Predicate<ManifestEntry> dataFileSkipper(
List<Snapshot> skippingSnapshots, long expiringSnapshotId) throws Exception {
int index = SnapshotManager.findPreviousSnapshot(skippingSnapshots, expiringSnapshotId);
// refresh snapshot data files
public Predicate<ManifestEntry> createDataFileSkipperForTags(
List<Snapshot> taggedSnapshots, long expiringSnapshotId) throws Exception {
int index = SnapshotManager.findPreviousSnapshot(taggedSnapshots, expiringSnapshotId);
// refresh tag data files
if (index >= 0) {
Snapshot previousSnapshot = skippingSnapshots.get(index);
if (previousSnapshot.id() != cachedSnapshotId) {
cachedSnapshotId = previousSnapshot.id();
cachedSnapshotDataFiles.clear();
addMergedDataFiles(cachedSnapshotDataFiles, previousSnapshot);
Snapshot previousTag = taggedSnapshots.get(index);
if (previousTag.id() != cachedTag) {
cachedTag = previousTag.id();
cachedTagDataFiles.clear();
addMergedDataFiles(cachedTagDataFiles, previousTag);
}
return entry -> containsDataFile(cachedSnapshotDataFiles, entry);
return entry -> containsDataFile(cachedTagDataFiles, entry);
}
return entry -> false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,11 @@ public int expireUntil(long earliestId, long endExclusiveId) {
LOG.debug("Changelog expire range is [" + earliestId + ", " + endExclusiveId + ")");
}

List<Snapshot> referencedSnapshots = tagManager.taggedSnapshots();
List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();

List<Snapshot> skippingSnapshots =
SnapshotManager.findOverlappedSnapshots(
referencedSnapshots, earliestId, endExclusiveId);
taggedSnapshots, earliestId, endExclusiveId);
skippingSnapshots.add(snapshotManager.changelog(endExclusiveId));
Set<String> manifestSkippSet = changelogDeletion.manifestSkippingSet(skippingSnapshots);
for (long id = earliestId; id < endExclusiveId; id++) {
Expand All @@ -148,7 +148,7 @@ public int expireUntil(long earliestId, long endExclusiveId) {
Changelog changelog = snapshotManager.longLivedChangelog(id);
Predicate<ManifestEntry> skipper;
try {
skipper = changelogDeletion.dataFileSkipper(referencedSnapshots, id);
skipper = changelogDeletion.createDataFileSkipperForTags(taggedSnapshots, id);
} catch (Exception e) {
LOG.info(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public int expireUntil(long earliestId, long endExclusiveId) {
"Snapshot expire range is [" + beginInclusiveId + ", " + endExclusiveId + ")");
}

List<Snapshot> referencedSnapshots = tagManager.taggedSnapshots();
List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();

// delete merge tree files
// deleted merge tree files in a snapshot are not used by the next snapshot, so the range of
Expand All @@ -166,7 +166,7 @@ public int expireUntil(long earliestId, long endExclusiveId) {
// expire merge tree files and collect changed buckets
Predicate<ManifestEntry> skipper;
try {
skipper = snapshotDeletion.dataFileSkipper(referencedSnapshots, id);
skipper = snapshotDeletion.createDataFileSkipperForTags(taggedSnapshots, id);
} catch (Exception e) {
LOG.info(
String.format(
Expand Down Expand Up @@ -199,7 +199,7 @@ public int expireUntil(long earliestId, long endExclusiveId) {
// delete manifests and indexFiles
List<Snapshot> skippingSnapshots =
SnapshotManager.findOverlappedSnapshots(
referencedSnapshots, beginInclusiveId, endExclusiveId);
taggedSnapshots, beginInclusiveId, endExclusiveId);
skippingSnapshots.add(snapshotManager.snapshot(endExclusiveId));
Set<String> skippingSet = snapshotDeletion.manifestSkippingSet(skippingSnapshots);
for (long id = beginInclusiveId; id < endExclusiveId; id++) {
Expand Down

0 comments on commit ac92d1d

Please sign in to comment.