Skip to content

Commit

Permalink
[core] Support the rollback and orphan file clean with changelog deco…
Browse files Browse the repository at this point in the history
…uple
  • Loading branch information
Aitozi committed Apr 3, 2024
1 parent e2eda7a commit bff560e
Show file tree
Hide file tree
Showing 6 changed files with 455 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

package org.apache.paimon.operation;

import org.apache.paimon.Changelog;
import org.apache.paimon.FileStore;
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
Expand Down Expand Up @@ -91,6 +93,7 @@ public class OrphanFilesClean {

// an estimated value of how many files were deleted
private int deletedFilesNum = 0;
private final List<Path> deleteFiles;
private long olderThanMillis = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);

public OrphanFilesClean(FileStoreTable table) {
Expand All @@ -104,6 +107,7 @@ public OrphanFilesClean(FileStoreTable table) {
this.manifestList = store.manifestListFactory().create();
this.manifestFile = store.manifestFileFactory().create();
this.indexFileHandler = store.newIndexFileHandler();
this.deleteFiles = new ArrayList<>();
}

public OrphanFilesClean olderThan(String timestamp) {
Expand All @@ -124,6 +128,13 @@ public int clean() throws IOException, ExecutionException, InterruptedException
List<Path> nonSnapshotFiles = snapshotManager.tryGetNonSnapshotFiles(this::oldEnough);
nonSnapshotFiles.forEach(this::deleteFileOrDirQuietly);
deletedFilesNum += nonSnapshotFiles.size();
deleteFiles.addAll(nonSnapshotFiles);

// specially handle the changelog directory
List<Path> nonChangelogFiles = snapshotManager.tryGetNonChangelogFiles(this::oldEnough);
nonChangelogFiles.forEach(this::deleteFileOrDirQuietly);
deletedFilesNum += nonChangelogFiles.size();
deleteFiles.addAll(nonChangelogFiles);

Map<String, Path> candidates = getCandidateDeletingFiles();
Set<String> usedFiles = getUsedFiles();
Expand All @@ -136,26 +147,41 @@ public int clean() throws IOException, ExecutionException, InterruptedException
deleteFileOrDirQuietly(path);
}
deletedFilesNum += deleted.size();
deleteFiles.addAll(deleted.stream().map(candidates::get).collect(Collectors.toList()));

return deletedFilesNum;
}

@VisibleForTesting
List<Path> getDeleteFiles() {
return deleteFiles;
}

/** Get all the files used by snapshots and tags. */
private Set<String> getUsedFiles()
throws IOException, ExecutionException, InterruptedException {
// safely get all snapshots to be read
Set<Snapshot> readSnapshots = new HashSet<>(snapshotManager.safelyGetAllSnapshots());
List<Snapshot> taggedSnapshots = tagManager.taggedSnapshots();
readSnapshots.addAll(taggedSnapshots);
readSnapshots.addAll(snapshotManager.safelyGetAllChangelogs());

return FileUtils.COMMON_IO_FORK_JOIN_POOL
.submit(
() ->
readSnapshots
.parallelStream()
.flatMap(
snapshot ->
getUsedFilesForSnapshot(snapshot).stream())
snapshot -> {
if (snapshot instanceof Changelog) {
return getUsedFilesForChangelog(
(Changelog) snapshot)
.stream();
} else {
return getUsedFilesForSnapshot(snapshot)
.stream();
}
})
.collect(Collectors.toSet()))
.get();
}
Expand Down Expand Up @@ -184,6 +210,41 @@ private Map<String, Path> getCandidateDeletingFiles() {
}
}

private List<String> getUsedFilesForChangelog(Changelog changelog) {
List<String> files = new ArrayList<>();
if (changelog.changelogManifestList() != null) {
files.add(changelog.changelogManifestList());
}

try {
// try to read manifests
List<ManifestFileMeta> manifestFileMetas =
retryReadingFiles(
() ->
manifestList.readWithIOException(
changelog.changelogManifestList()));
if (manifestFileMetas == null) {
return Collections.emptyList();
}
List<String> manifestFileName =
manifestFileMetas.stream()
.map(ManifestFileMeta::fileName)
.collect(Collectors.toList());
files.addAll(manifestFileName);

// try to read data files
List<String> dataFiles = retryReadingDataFiles(manifestFileName);
if (dataFiles == null) {
return Collections.emptyList();
}
files.addAll(dataFiles);
} catch (IOException e) {
throw new RuntimeException(e);
}

return files;
}

/**
* If getting null when reading some files, the snapshot/tag is being deleted, so just return an
* empty result.
Expand Down Expand Up @@ -415,6 +476,7 @@ private List<Path> filterAndCleanDataDirs(
.forEach(
p -> {
deleteFileOrDirQuietly(p);
deleteFiles.add(p);
deletedFilesNum++;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private List<Changelog> cleanLongLivedChangelogDataFiles(Snapshot retainedSnapsh
return Collections.emptyList();
}

// delete snapshot files first, cannot be read now
// delete changelog files first, cannot be read now
// it is possible that some snapshots have been expired
List<Changelog> toBeCleaned = new ArrayList<>();
long to = Math.max(earliest, retainedSnapshot.id() + 1);
Expand All @@ -157,6 +157,21 @@ private List<Changelog> cleanLongLivedChangelogDataFiles(Snapshot retainedSnapsh
// delete directories
snapshotDeletion.cleanDataDirectories();

// modify the latest hint
try {
if (toBeCleaned.size() > 0) {
if (to == earliest) {
// all changelog has been cleaned, so we do not know the actual latest id
// set to -1
snapshotManager.commitLongLivedChangelogLatestHint(-1);
} else {
snapshotManager.commitLongLivedChangelogLatestHint(to - 1);
}
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}

return toBeCleaned;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,13 @@ public Iterator<Snapshot> snapshots() throws IOException {
.iterator();
}

public Iterator<Changelog> changelogs() throws IOException {
return listVersionedFiles(fileIO, changelogDirectory(), CHANGELOG_PREFIX)
.map(this::changelog)
.sorted(Comparator.comparingLong(Changelog::id))
.iterator();
}

/**
* If {@link FileNotFoundException} is thrown when reading the snapshot file, this snapshot may
* be deleted by other processes, so just skip this snapshot.
Expand All @@ -336,21 +343,48 @@ public List<Snapshot> safelyGetAllSnapshots() throws IOException {
return snapshots;
}

public List<Changelog> safelyGetAllChangelogs() throws IOException {
List<Path> paths =
listVersionedFiles(fileIO, changelogDirectory(), CHANGELOG_PREFIX)
.map(this::longLivedChangelogPath)
.collect(Collectors.toList());

List<Changelog> changelogs = new ArrayList<>();
for (Path path : paths) {
try {
String json = fileIO.readFileUtf8(path);
changelogs.add(Changelog.fromJson(json));
} catch (FileNotFoundException ignored) {
}
}

return changelogs;
}

/**
* Try to get non snapshot files. If any error occurred, just ignore it and return an empty
* result.
*/
public List<Path> tryGetNonSnapshotFiles(Predicate<FileStatus> fileStatusFilter) {
return listPathWithFilter(snapshotDirectory(), fileStatusFilter, nonSnapshotFileFilter());
}

public List<Path> tryGetNonChangelogFiles(Predicate<FileStatus> fileStatusFilter) {
return listPathWithFilter(changelogDirectory(), fileStatusFilter, nonChangelogFileFilter());
}

private List<Path> listPathWithFilter(
Path directory, Predicate<FileStatus> fileStatusFilter, Predicate<Path> fileFilter) {
try {
FileStatus[] statuses = fileIO.listStatus(snapshotDirectory());
FileStatus[] statuses = fileIO.listStatus(directory);
if (statuses == null) {
return Collections.emptyList();
}

return Arrays.stream(statuses)
.filter(fileStatusFilter)
.map(FileStatus::getPath)
.filter(nonSnapshotFileFilter())
.filter(fileFilter)
.collect(Collectors.toList());
} catch (IOException ignored) {
return Collections.emptyList();
Expand All @@ -366,6 +400,15 @@ private Predicate<Path> nonSnapshotFileFilter() {
};
}

private Predicate<Path> nonChangelogFileFilter() {
return path -> {
String name = path.getName();
return !name.startsWith(CHANGELOG_PREFIX)
&& !name.equals(EARLIEST)
&& !name.equals(LATEST);
};
}

public Optional<Snapshot> latestSnapshotOfUser(String user) {
Long latestId = latestSnapshotId();
if (latestId == null) {
Expand Down Expand Up @@ -472,7 +515,7 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter<Snapshot> checker) {
}

Long snapshotId = readHint(LATEST, dir);
if (snapshotId != null) {
if (snapshotId != null && snapshotId > 0) {
long nextSnapshot = snapshotId + 1;
// it is the latest only there is no next one
if (!fileIO.exists(file.apply(nextSnapshot))) {
Expand Down
Loading

0 comments on commit bff560e

Please sign in to comment.