Skip to content

Commit

Permalink
[core] Support the rollback and orphan file clean with changelog deco…
Browse files Browse the repository at this point in the history
…uple
  • Loading branch information
Aitozi committed Apr 2, 2024
1 parent e2eda7a commit e8b996a
Show file tree
Hide file tree
Showing 7 changed files with 479 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.operation;

import org.apache.paimon.Changelog;
import org.apache.paimon.FileStore;
import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.FileIO;
Expand Down Expand Up @@ -125,6 +126,11 @@ public int clean() throws IOException, ExecutionException, InterruptedException
nonSnapshotFiles.forEach(this::deleteFileOrDirQuietly);
deletedFilesNum += nonSnapshotFiles.size();

// specially handle the changelog directory
List<Path> nonChangelogFiles = snapshotManager.tryGetNonChangelogFiles(this::oldEnough);
nonChangelogFiles.forEach(this::deleteFileOrDirQuietly);
deletedFilesNum += nonChangelogFiles.size();

Map<String, Path> candidates = getCandidateDeletingFiles();
Set<String> usedFiles = getUsedFiles();

Expand All @@ -147,15 +153,24 @@ private Set<String> getUsedFiles()
Set<Snapshot> readSnapshots = new HashSet<>(snapshotManager.safelyGetAllSnapshots());
List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();
readSnapshots.addAll(taggedSnapshots);
readSnapshots.addAll(snapshotManager.safelyGetAllChangelogs());

return FileUtils.COMMON_IO_FORK_JOIN_POOL
.submit(
() ->
readSnapshots
.parallelStream()
.flatMap(
snapshot ->
getUsedFilesForSnapshot(snapshot).stream())
snapshot -> {
if (snapshot instanceof Changelog) {
return getUsedFilesForChangelog(
(Changelog) snapshot)
.stream();
} else {
return getUsedFilesForSnapshot(snapshot)
.stream();
}
})
.collect(Collectors.toSet()))
.get();
}
Expand Down Expand Up @@ -184,6 +199,41 @@ private Map<String, Path> getCandidateDeletingFiles() {
}
}

private List<String> getUsedFilesForChangelog(Changelog changelog) {
List<String> files = new ArrayList<>();
if (changelog.changelogManifestList() != null) {
files.add(changelog.changelogManifestList());
}

try {
// try to read manifests
List<ManifestFileMeta> manifestFileMetas =
retryReadingFiles(
() ->
manifestList.readWithIOException(
changelog.changelogManifestList()));
if (manifestFileMetas == null) {
return Collections.emptyList();
}
List<String> manifestFileName =
manifestFileMetas.stream()
.map(ManifestFileMeta::fileName)
.collect(Collectors.toList());
files.addAll(manifestFileName);

// try to read data files
List<String> dataFiles = retryReadingDataFiles(manifestFileName);
if (dataFiles == null) {
return Collections.emptyList();
}
files.addAll(dataFiles);
} catch (IOException e) {
throw new RuntimeException(e);
}

return files;
}

/**
* If getting null when reading some files, the snapshot/tag is being deleted, so just return an
* empty result.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ public void rollbackTo(String tagName) {
checkArgument(tagManager.tagExists(tagName), "Rollback tag '%s' doesn't exist.", tagName);

Snapshot taggedSnapshot = tagManager.taggedSnapshot(tagName);
boolean changelogExists = snapshotManager().longLivedChangelogExists(taggedSnapshot.id());
rollbackHelper().cleanLargerThan(taggedSnapshot);

try {
Expand All @@ -518,9 +519,31 @@ public void rollbackTo(String tagName) {
// earliest hint
SnapshotManager snapshotManager = snapshotManager();
if (!snapshotManager.snapshotExists(taggedSnapshot.id())) {
// clear the changelog manifest when the changelog not exists
Snapshot restored = taggedSnapshot;
if (!changelogExists) {
restored =
new Snapshot(
taggedSnapshot.version(),
taggedSnapshot.id(),
taggedSnapshot.schemaId(),
taggedSnapshot.baseManifestList(),
taggedSnapshot.deltaManifestList(),
null,
taggedSnapshot.indexManifest(),
taggedSnapshot.commitUser(),
taggedSnapshot.commitIdentifier(),
taggedSnapshot.commitKind(),
taggedSnapshot.timeMillis(),
taggedSnapshot.logOffsets(),
taggedSnapshot.totalRecordCount(),
taggedSnapshot.deltaRecordCount(),
0L,
taggedSnapshot.watermark(),
taggedSnapshot.statistics());
}
fileIO.writeFileUtf8(
snapshotManager().snapshotPath(taggedSnapshot.id()),
fileIO.readFileUtf8(tagManager.tagPath(tagName)));
snapshotManager().snapshotPath(taggedSnapshot.id()), restored.toJson());
snapshotManager.commitEarliestHint(taggedSnapshot.id());
}
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private List<Changelog> cleanLongLivedChangelogDataFiles(Snapshot retainedSnapsh
return Collections.emptyList();
}

// delete snapshot files first, cannot be read now
// delete changelog files first, cannot be read now
// it is possible that some snapshots have been expired
List<Changelog> toBeCleaned = new ArrayList<>();
long to = Math.max(earliest, retainedSnapshot.id() + 1);
Expand All @@ -157,6 +157,21 @@ private List<Changelog> cleanLongLivedChangelogDataFiles(Snapshot retainedSnapsh
// delete directories
snapshotDeletion.cleanDataDirectories();

// modify the latest hint
try {
if (toBeCleaned.size() > 0) {
if (to == earliest) {
// all changelog has been cleaned, so we do not know the actual latest id
// set to -1
snapshotManager.commitLongLivedChangelogLatestHint(-1);
} else {
snapshotManager.commitLongLivedChangelogLatestHint(to - 1);
}
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}

return toBeCleaned;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,13 @@ public Iterator<Snapshot> snapshots() throws IOException {
.iterator();
}

public Iterator<Changelog> changelogs() throws IOException {
return listVersionedFiles(fileIO, changelogDirectory(), CHANGELOG_PREFIX)
.map(this::changelog)
.sorted(Comparator.comparingLong(Changelog::id))
.iterator();
}

/**
* If {@link FileNotFoundException} is thrown when reading the snapshot file, this snapshot may
* be deleted by other processes, so just skip this snapshot.
Expand All @@ -336,21 +343,48 @@ public List<Snapshot> safelyGetAllSnapshots() throws IOException {
return snapshots;
}

public List<Changelog> safelyGetAllChangelogs() throws IOException {
List<Path> paths =
listVersionedFiles(fileIO, changelogDirectory(), CHANGELOG_PREFIX)
.map(this::longLivedChangelogPath)
.collect(Collectors.toList());

List<Changelog> changelogs = new ArrayList<>();
for (Path path : paths) {
try {
String json = fileIO.readFileUtf8(path);
changelogs.add(Changelog.fromJson(json));
} catch (FileNotFoundException ignored) {
}
}

return changelogs;
}

/**
* Try to get non snapshot files. If any error occurred, just ignore it and return an empty
* result.
*/
public List<Path> tryGetNonSnapshotFiles(Predicate<FileStatus> fileStatusFilter) {
return listPathWithFilter(snapshotDirectory(), fileStatusFilter, nonSnapshotFileFilter());
}

public List<Path> tryGetNonChangelogFiles(Predicate<FileStatus> fileStatusFilter) {
return listPathWithFilter(changelogDirectory(), fileStatusFilter, nonChangelogFileFilter());
}

private List<Path> listPathWithFilter(
Path directory, Predicate<FileStatus> fileStatusFilter, Predicate<Path> fileFilter) {
try {
FileStatus[] statuses = fileIO.listStatus(snapshotDirectory());
FileStatus[] statuses = fileIO.listStatus(directory);
if (statuses == null) {
return Collections.emptyList();
}

return Arrays.stream(statuses)
.filter(fileStatusFilter)
.map(FileStatus::getPath)
.filter(nonSnapshotFileFilter())
.filter(fileFilter)
.collect(Collectors.toList());
} catch (IOException ignored) {
return Collections.emptyList();
Expand All @@ -366,6 +400,15 @@ private Predicate<Path> nonSnapshotFileFilter() {
};
}

private Predicate<Path> nonChangelogFileFilter() {
return path -> {
String name = path.getName();
return !name.startsWith(CHANGELOG_PREFIX)
&& !name.equals(EARLIEST)
&& !name.equals(LATEST);
};
}

public Optional<Snapshot> latestSnapshotOfUser(String user) {
Long latestId = latestSnapshotId();
if (latestId == null) {
Expand Down Expand Up @@ -472,7 +515,7 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter<Snapshot> checker) {
}

Long snapshotId = readHint(LATEST, dir);
if (snapshotId != null) {
if (snapshotId != null && snapshotId > 0) {
long nextSnapshot = snapshotId + 1;
// it is the latest only there is no next one
if (!fileIO.exists(file.apply(nextSnapshot))) {
Expand Down
Loading

0 comments on commit e8b996a

Please sign in to comment.