Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Clarify the purpose of method findOverlappedSnapshots and move… #3808

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.paimon.utils.FileDeletionThreadPool;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.TagManager;
import org.apache.paimon.utils.SnapshotManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -77,11 +77,12 @@ public abstract class FileDeletionBase<T extends Snapshot> {

protected boolean changelogDecoupled;

/** Used to record which tag is cached. */
private long cachedTag = 0;
/** Used to record which snapshot is cached. */
private long cachedSnapshotId = 0;

/** Used to cache data files used by current tag. */
private final Map<BinaryRow, Map<Integer, Set<String>>> cachedTagDataFiles = new HashMap<>();
/** Used to cache data files used by current snapshot. */
private final Map<BinaryRow, Map<Integer, Set<String>>> cachedSnapshotDataFiles =
new HashMap<>();

public FileDeletionBase(
FileIO fileIO,
Expand Down Expand Up @@ -328,17 +329,17 @@ protected void cleanUnusedManifests(
}

public Predicate<ManifestEntry> dataFileSkipper(
List<Snapshot> taggedSnapshots, long expiringSnapshotId) throws Exception {
int index = TagManager.findPreviousTag(taggedSnapshots, expiringSnapshotId);
// refresh tag data files
List<Snapshot> skippingSnapshots, long expiringSnapshotId) throws Exception {
int index = SnapshotManager.findPreviousSnapshot(skippingSnapshots, expiringSnapshotId);
// refresh snapshot data files
if (index >= 0) {
Snapshot previousTag = taggedSnapshots.get(index);
if (previousTag.id() != cachedTag) {
cachedTag = previousTag.id();
cachedTagDataFiles.clear();
addMergedDataFiles(cachedTagDataFiles, previousTag);
Snapshot previousSnapshot = skippingSnapshots.get(index);
if (previousSnapshot.id() != cachedSnapshotId) {
cachedSnapshotId = previousSnapshot.id();
cachedSnapshotDataFiles.clear();
addMergedDataFiles(cachedSnapshotDataFiles, previousSnapshot);
}
return entry -> containsDataFile(cachedTagDataFiles, entry);
return entry -> containsDataFile(cachedSnapshotDataFiles, entry);
}
return entry -> false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,11 @@ public int expireUntil(long earliestId, long endExclusiveId) {
LOG.debug("Changelog expire range is [" + earliestId + ", " + endExclusiveId + ")");
}

List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();
List<Snapshot> referencedSnapshots = tagManager.taggedSnapshots();

List<Snapshot> skippingSnapshots =
TagManager.findOverlappedSnapshots(taggedSnapshots, earliestId, endExclusiveId);
SnapshotManager.findOverlappedSnapshots(
referencedSnapshots, earliestId, endExclusiveId);
skippingSnapshots.add(snapshotManager.changelog(endExclusiveId));
Set<String> manifestSkippSet = changelogDeletion.manifestSkippingSet(skippingSnapshots);
for (long id = earliestId; id < endExclusiveId; id++) {
Expand All @@ -147,7 +148,7 @@ public int expireUntil(long earliestId, long endExclusiveId) {
Changelog changelog = snapshotManager.longLivedChangelog(id);
Predicate<ManifestEntry> skipper;
try {
skipper = changelogDeletion.dataFileSkipper(taggedSnapshots, id);
skipper = changelogDeletion.dataFileSkipper(referencedSnapshots, id);
} catch (Exception e) {
LOG.info(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public int expireUntil(long earliestId, long endExclusiveId) {
"Snapshot expire range is [" + beginInclusiveId + ", " + endExclusiveId + ")");
}

List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();
List<Snapshot> referencedSnapshots = tagManager.taggedSnapshots();

// delete merge tree files
// deleted merge tree files in a snapshot are not used by the next snapshot, so the range of
Expand All @@ -166,7 +166,7 @@ public int expireUntil(long earliestId, long endExclusiveId) {
// expire merge tree files and collect changed buckets
Predicate<ManifestEntry> skipper;
try {
skipper = snapshotDeletion.dataFileSkipper(taggedSnapshots, id);
skipper = snapshotDeletion.dataFileSkipper(referencedSnapshots, id);
} catch (Exception e) {
LOG.info(
String.format(
Expand Down Expand Up @@ -198,8 +198,8 @@ public int expireUntil(long earliestId, long endExclusiveId) {

// delete manifests and indexFiles
List<Snapshot> skippingSnapshots =
TagManager.findOverlappedSnapshots(
taggedSnapshots, beginInclusiveId, endExclusiveId);
SnapshotManager.findOverlappedSnapshots(
referencedSnapshots, beginInclusiveId, endExclusiveId);
skippingSnapshots.add(snapshotManager.snapshot(endExclusiveId));
Set<String> skippingSet = snapshotDeletion.manifestSkippingSet(skippingSnapshots);
for (long id = beginInclusiveId; id < endExclusiveId; id++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,42 @@ private Long findByListFiles(BinaryOperator<Long> reducer, Path dir, String pref
return listVersionedFiles(fileIO, dir, prefix).reduce(reducer).orElse(null);
}

/**
* Find the overlapping snapshots between sortedSnapshots and range of [beginInclusive,
* endExclusive).
*/
public static List<Snapshot> findOverlappedSnapshots(
List<Snapshot> sortedSnapshots, long beginInclusive, long endExclusive) {
List<Snapshot> overlappedSnapshots = new ArrayList<>();
int right = findPreviousSnapshot(sortedSnapshots, endExclusive);
if (right >= 0) {
int left = Math.max(findPreviousOrEqualSnapshot(sortedSnapshots, beginInclusive), 0);
for (int i = left; i <= right; i++) {
overlappedSnapshots.add(sortedSnapshots.get(i));
}
}
return overlappedSnapshots;
}

public static int findPreviousSnapshot(List<Snapshot> sortedSnapshots, long targetSnapshotId) {
for (int i = sortedSnapshots.size() - 1; i >= 0; i--) {
if (sortedSnapshots.get(i).id() < targetSnapshotId) {
return i;
}
}
return -1;
}

private static int findPreviousOrEqualSnapshot(
List<Snapshot> sortedSnapshots, long targetSnapshotId) {
for (int i = sortedSnapshots.size() - 1; i >= 0; i--) {
if (sortedSnapshots.get(i).id() <= targetSnapshotId) {
return i;
}
}
return -1;
}

public void deleteLatestHint() throws IOException {
Path snapshotDir = snapshotDirectory();
Path hintFile = new Path(snapshotDir, LATEST);
Expand Down
32 changes: 0 additions & 32 deletions paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -366,36 +366,4 @@ private int findIndex(Snapshot taggedSnapshot, List<Snapshot> taggedSnapshots) {
"Didn't find tag with snapshot id '%s'.This is unexpected.",
taggedSnapshot.id()));
}

public static List<Snapshot> findOverlappedSnapshots(
List<Snapshot> taggedSnapshots, long beginInclusive, long endExclusive) {
List<Snapshot> snapshots = new ArrayList<>();
int right = findPreviousTag(taggedSnapshots, endExclusive);
if (right >= 0) {
int left = Math.max(findPreviousOrEqualTag(taggedSnapshots, beginInclusive), 0);
for (int i = left; i <= right; i++) {
snapshots.add(taggedSnapshots.get(i));
}
}
return snapshots;
}

public static int findPreviousTag(List<Snapshot> taggedSnapshots, long targetSnapshotId) {
for (int i = taggedSnapshots.size() - 1; i >= 0; i--) {
if (taggedSnapshots.get(i).id() < targetSnapshotId) {
return i;
}
}
return -1;
}

private static int findPreviousOrEqualTag(
List<Snapshot> taggedSnapshots, long targetSnapshotId) {
for (int i = taggedSnapshots.size() - 1; i >= 0; i--) {
if (taggedSnapshots.get(i).id() <= targetSnapshotId) {
return i;
}
}
return -1;
}
}
Loading