Skip to content

Commit

Permalink
Introduce_clone_Action_and_Procedure_0418
Browse files Browse the repository at this point in the history
  • Loading branch information
wwj6591812 committed Apr 23, 2024
1 parent c2c0d75 commit e79cb89
Show file tree
Hide file tree
Showing 39 changed files with 3,756 additions and 262 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.FileUtils;
Expand All @@ -41,12 +38,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -79,10 +73,8 @@ public class OrphanFilesClean {

private static final Logger LOG = LoggerFactory.getLogger(OrphanFilesClean.class);

private static final int READ_FILE_RETRY_NUM = 3;
private static final int READ_FILE_RETRY_INTERVAL = 5;

private final SnapshotManager snapshotManager;
private final SchemaManager schemaManager;
private final TagManager tagManager;
private final FileIO fileIO;
private final Path location;
Expand All @@ -102,6 +94,7 @@ public OrphanFilesClean(FileStoreTable table) {
this.fileIO = table.fileIO();
this.location = table.location();
this.partitionKeysNum = table.partitionKeys().size();
this.schemaManager = new SchemaManager(fileIO, location);

FileStore<?> store = table.store();
this.manifestList = store.manifestListFactory().create();
Expand Down Expand Up @@ -174,11 +167,21 @@ private Set<String> getUsedFiles()
.flatMap(
snapshot -> {
if (snapshot instanceof Changelog) {
return getUsedFilesForChangelog(
(Changelog) snapshot)
return PickFilesUtil
.getUsedFilesForChangelog(
snapshot,
manifestList,
manifestFile)
.stream();
} else {
return getUsedFilesForSnapshot(snapshot)
return PickFilesUtil
.getUsedFilesForSnapshot(
snapshot,
snapshotManager,
manifestList,
manifestFile,
schemaManager,
indexFileHandler)
.stream();
}
})
Expand All @@ -197,7 +200,11 @@ private Map<String, Path> getCandidateDeletingFiles() {
.submit(
() ->
fileDirs.parallelStream()
.flatMap(p -> tryBestListingDirs(p).stream())
.flatMap(
p ->
PickFilesUtil.tryBestListingDirs(
p, fileIO)
.stream())
.filter(this::oldEnough)
.map(FileStatus::getPath)
.collect(
Expand All @@ -210,171 +217,6 @@ private Map<String, Path> getCandidateDeletingFiles() {
}
}

private List<String> getUsedFilesForChangelog(Changelog changelog) {
List<String> files = new ArrayList<>();
if (changelog.changelogManifestList() != null) {
files.add(changelog.changelogManifestList());
}

try {
// try to read manifests
List<ManifestFileMeta> manifestFileMetas =
retryReadingFiles(
() ->
manifestList.readWithIOException(
changelog.changelogManifestList()));
if (manifestFileMetas == null) {
return Collections.emptyList();
}
List<String> manifestFileName =
manifestFileMetas.stream()
.map(ManifestFileMeta::fileName)
.collect(Collectors.toList());
files.addAll(manifestFileName);

// try to read data files
List<String> dataFiles = retryReadingDataFiles(manifestFileName);
if (dataFiles == null) {
return Collections.emptyList();
}
files.addAll(dataFiles);
} catch (IOException e) {
throw new RuntimeException(e);
}

return files;
}

/**
* If getting null when reading some files, the snapshot/tag is being deleted, so just return an
* empty result.
*/
private List<String> getUsedFilesForSnapshot(Snapshot snapshot) {
List<String> files = new ArrayList<>();
addManifestList(files, snapshot);

try {
// try to read manifests
List<ManifestFileMeta> manifestFileMetas =
retryReadingFiles(() -> readAllManifestsWithIOException(snapshot));
if (manifestFileMetas == null) {
return Collections.emptyList();
}
List<String> manifestFileName =
manifestFileMetas.stream()
.map(ManifestFileMeta::fileName)
.collect(Collectors.toList());
files.addAll(manifestFileName);

// try to read data files
List<String> dataFiles = retryReadingDataFiles(manifestFileName);
if (dataFiles == null) {
return Collections.emptyList();
}
files.addAll(dataFiles);

// try to read index files
String indexManifest = snapshot.indexManifest();
if (indexManifest != null && indexFileHandler.existsManifest(indexManifest)) {
files.add(indexManifest);

List<IndexManifestEntry> indexManifestEntries =
retryReadingFiles(
() -> indexFileHandler.readManifestWithIOException(indexManifest));
if (indexManifestEntries == null) {
return Collections.emptyList();
}

indexManifestEntries.stream()
.map(IndexManifestEntry::indexFile)
.map(IndexFileMeta::fileName)
.forEach(files::add);
}

// try to read statistic
if (snapshot.statistics() != null) {
files.add(snapshot.statistics());
}
} catch (IOException e) {
throw new RuntimeException(e);
}

return files;
}

private void addManifestList(List<String> used, Snapshot snapshot) {
used.add(snapshot.baseManifestList());
used.add(snapshot.deltaManifestList());
String changelogManifestList = snapshot.changelogManifestList();
if (changelogManifestList != null) {
used.add(changelogManifestList);
}
}

/**
* Retry reading files when {@link IOException} was thrown by the reader. If the exception is
* {@link FileNotFoundException}, return null. Finally, if retry times reaches the limits,
* rethrow the IOException.
*/
@Nullable
private <T> T retryReadingFiles(ReaderWithIOException<T> reader) throws IOException {
int retryNumber = 0;
IOException caught = null;
while (retryNumber++ < READ_FILE_RETRY_NUM) {
try {
return reader.read();
} catch (FileNotFoundException e) {
return null;
} catch (IOException e) {
caught = e;
}
try {
TimeUnit.MILLISECONDS.sleep(READ_FILE_RETRY_INTERVAL);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}

throw caught;
}

private List<ManifestFileMeta> readAllManifestsWithIOException(Snapshot snapshot)
throws IOException {
List<ManifestFileMeta> result = new ArrayList<>();

result.addAll(manifestList.readWithIOException(snapshot.baseManifestList()));
result.addAll(manifestList.readWithIOException(snapshot.deltaManifestList()));

String changelogManifestList = snapshot.changelogManifestList();
if (changelogManifestList != null) {
result.addAll(manifestList.readWithIOException(changelogManifestList));
}

return result;
}

@Nullable
private List<String> retryReadingDataFiles(List<String> manifestNames) throws IOException {
List<String> dataFiles = new ArrayList<>();
for (String manifestName : manifestNames) {
List<ManifestEntry> manifestEntries =
retryReadingFiles(() -> manifestFile.readWithIOException(manifestName));
if (manifestEntries == null) {
return null;
}

manifestEntries.stream()
.map(ManifestEntry::file)
.forEach(
f -> {
dataFiles.add(f.fileName());
dataFiles.addAll(f.extraFiles());
});
}
return dataFiles;
}

/** List directories that contains data files and manifest files. */
private List<Path> listPaimonFileDirs() {
List<Path> paimonFileDirs = new ArrayList<>();
Expand All @@ -387,28 +229,6 @@ private List<Path> listPaimonFileDirs() {
return paimonFileDirs;
}

/**
* If failed to list directory, just return an empty result because it's OK to not delete them.
*/
private List<FileStatus> tryBestListingDirs(Path dir) {
try {
if (!fileIO.exists(dir)) {
return Collections.emptyList();
}

List<FileStatus> status =
retryReadingFiles(
() -> {
FileStatus[] s = fileIO.listStatus(dir);
return s == null ? Collections.emptyList() : Arrays.asList(s);
});
return status == null ? Collections.emptyList() : status;
} catch (IOException e) {
LOG.debug("Failed to list directory {}, skip it.", dir, e);
return Collections.emptyList();
}
}

private boolean oldEnough(FileStatus status) {
return status.getModificationTime() < olderThanMillis;
}
Expand All @@ -418,7 +238,7 @@ private boolean oldEnough(FileStatus status) {
* argument level is used to control recursive depth.
*/
private List<Path> listAndCleanDataDirs(Path dir, int level) {
List<FileStatus> dirs = tryBestListingDirs(dir);
List<FileStatus> dirs = PickFilesUtil.tryBestListingDirs(dir, fileIO);

if (level == 0) {
// return bucket paths
Expand Down Expand Up @@ -497,10 +317,9 @@ private void deleteFileOrDirQuietly(Path path) {
}
}

/** A helper functional interface for method {@link #retryReadingFiles}. */
/** A helper functional interface for method {@link PickFilesUtil}. */
@FunctionalInterface
private interface ReaderWithIOException<T> {

interface ReaderWithIOException<T> {
T read() throws IOException;
}
}
Loading

0 comments on commit e79cb89

Please sign in to comment.