Skip to content

Commit

Permalink
[core] Fix dry run in OrphanFilesClean and optimize result string
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Jul 1, 2024
1 parent fa7add8 commit 8c4be22
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.paimon.Changelog;
import org.apache.paimon.FileStore;
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
Expand Down Expand Up @@ -55,6 +54,7 @@
import java.util.TimeZone;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -92,11 +92,9 @@ public class OrphanFilesClean {
private final ManifestFile manifestFile;
private final IndexFileHandler indexFileHandler;

// an estimated value of how many files were deleted
private int deletedFilesNum = 0;
private final List<Path> deleteFiles;
private long olderThanMillis = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
private boolean isDryRun = false;
private Consumer<Path> fileCleaner;

public OrphanFilesClean(FileStoreTable table) {
this.snapshotManager = table.snapshotManager();
Expand All @@ -110,6 +108,17 @@ public OrphanFilesClean(FileStoreTable table) {
this.manifestFile = store.manifestFileFactory().create();
this.indexFileHandler = store.newIndexFileHandler();
this.deleteFiles = new ArrayList<>();
this.fileCleaner =
path -> {
try {
if (fileIO.isDir(path)) {
fileIO.deleteDirectoryQuietly(path);
} else {
fileIO.deleteQuietly(path);
}
} catch (IOException ignored) {
}
};
}

public OrphanFilesClean olderThan(String timestamp) {
Expand All @@ -120,8 +129,8 @@ public OrphanFilesClean olderThan(String timestamp) {
return this;
}

public OrphanFilesClean dryRun(boolean dryRun) {
this.isDryRun = dryRun;
public OrphanFilesClean fileCleaner(Consumer<Path> fileCleaner) {
this.fileCleaner = fileCleaner;
return this;
}

Expand All @@ -133,37 +142,22 @@ public List<Path> clean() throws IOException, ExecutionException, InterruptedExc

// specially handle the snapshot directory
List<Path> nonSnapshotFiles = snapshotManager.tryGetNonSnapshotFiles(this::oldEnough);
nonSnapshotFiles.forEach(this::deleteFileOrDirQuietly);
deletedFilesNum += nonSnapshotFiles.size();
nonSnapshotFiles.forEach(fileCleaner);
deleteFiles.addAll(nonSnapshotFiles);

// specially handle the changelog directory
List<Path> nonChangelogFiles = snapshotManager.tryGetNonChangelogFiles(this::oldEnough);
nonChangelogFiles.forEach(this::deleteFileOrDirQuietly);
deletedFilesNum += nonChangelogFiles.size();
nonChangelogFiles.forEach(fileCleaner);
deleteFiles.addAll(nonChangelogFiles);

Map<String, Path> candidates = getCandidateDeletingFiles();
Set<String> usedFiles = getUsedFiles();

Set<String> deleted = new HashSet<>(candidates.keySet());
deleted.removeAll(usedFiles);
deleted.stream().map(candidates::get).forEach(fileCleaner);

if (!isDryRun) {
for (String file : deleted) {
Path path = candidates.get(file);
deleteFileOrDirQuietly(path);
}
}

deletedFilesNum += deleted.size();
deleteFiles.addAll(deleted.stream().map(candidates::get).collect(Collectors.toList()));

return deleteFiles;
}

@VisibleForTesting
List<Path> getDeleteFiles() {
return deleteFiles;
}

Expand Down Expand Up @@ -532,32 +526,35 @@ private List<Path> filterAndCleanDataDirs(
.map(FileStatus::getPath)
.forEach(
p -> {
deleteFileOrDirQuietly(p);
fileCleaner.accept(p);
synchronized (deleteFiles) {
deleteFiles.add(p);
deletedFilesNum++;
}
});
}

return filtered;
}

private void deleteFileOrDirQuietly(Path path) {
try {
if (fileIO.isDir(path)) {
fileIO.deleteDirectoryQuietly(path);
} else {
fileIO.deleteQuietly(path);
}
} catch (IOException ignored) {
}
}

/** A helper functional interface for method {@link #retryReadingFiles}. */
@FunctionalInterface
private interface ReaderWithIOException<T> {

T read() throws IOException;
}

public static List<String> showDeletedFiles(List<Path> deleteFiles, int showLimit) {
int showSize = Math.min(deleteFiles.size(), showLimit);
List<String> result = new ArrayList<>();
if (deleteFiles.size() > showSize) {
result.add(
String.format(
"Total %s files, only %s lines are displayed.",
deleteFiles.size(), showSize));
}
for (int i = 0; i < showSize; i++) {
result.add(deleteFiles.get(i).toUri().getPath());
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ public void testNormallyRemoving() throws Throwable {
// second check
orphanFilesClean = new OrphanFilesClean(table);
setOlderThan(orphanFilesClean);
orphanFilesClean.clean();
List<Path> deleted = orphanFilesClean.clean();
try {
validate(orphanFilesClean.getDeleteFiles(), snapshotData, new HashMap<>());
validate(deleted, snapshotData, new HashMap<>());
} catch (Throwable t) {
String tableOptions = "Table options:\n" + table.options();

Expand Down Expand Up @@ -367,9 +367,8 @@ public void testCleanOrphanFilesWithChangelogDecoupled(String changelogProducer)
// second check
orphanFilesClean = new OrphanFilesClean(table);
setOlderThan(orphanFilesClean);
orphanFilesClean.clean();
List<Path> cleanFiles = orphanFilesClean.getDeleteFiles();
validate(cleanFiles, snapshotData, changelogData);
List<Path> deleted = orphanFilesClean.clean();
validate(deleted, snapshotData, changelogData);
}

/** Manually make a FileNotFoundException to simulate snapshot expire while clean. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.flink.action;

import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.OrphanFilesClean;
import org.apache.paimon.table.FileStoreTable;

Expand All @@ -27,7 +26,6 @@

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.Preconditions.checkArgument;

Expand Down Expand Up @@ -56,18 +54,15 @@ public RemoveOrphanFilesAction olderThan(String timestamp) {
return this;
}

public RemoveOrphanFilesAction dryRun(Boolean dryRun) {
this.orphanFilesClean.dryRun(dryRun);
public RemoveOrphanFilesAction dryRun() {
this.orphanFilesClean.fileCleaner(path -> {});
return this;
}

@Override
public void run() throws Exception {
List<Path> orphanFiles = orphanFilesClean.clean();
String files =
orphanFiles.stream()
.map(filePath -> filePath.toUri().getPath())
.collect(Collectors.joining(", "));
List<String> result = OrphanFilesClean.showDeletedFiles(orphanFilesClean.clean(), 200);
String files = String.join(", ", result);
LOG.info("orphan files: [{}]", files);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
action.olderThan(params.get(OLDER_THAN));
}

if (params.has(DRY_RUN)) {
action.dryRun(Boolean.parseBoolean(params.get(DRY_RUN)));
if (params.has(DRY_RUN) && Boolean.parseBoolean(params.get(DRY_RUN))) {
action.dryRun();
}

return Optional.of(action);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,12 @@ public String[] call(
orphanFilesClean.olderThan(olderThan);
}

orphanFilesClean.dryRun(dryRun);
if (dryRun) {
orphanFilesClean.fileCleaner(path -> {});
}

return orphanFilesClean.clean().stream()
.map(filePath -> filePath.toUri().getPath())
.toArray(String[]::new);
return OrphanFilesClean.showDeletedFiles(orphanFilesClean.clean(), 200)
.toArray(new String[0]);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.spark.procedure;

import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.OrphanFilesClean;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.StringUtils;
Expand Down Expand Up @@ -88,15 +87,16 @@ public InternalRow[] call(InternalRow args) {
if (!StringUtils.isBlank(olderThan)) {
orphanFilesClean.olderThan(olderThan);
}
orphanFilesClean.dryRun(dryRun);
if (dryRun) {
orphanFilesClean.fileCleaner(path -> {});
}
try {
List<Path> orphanFiles = orphanFilesClean.clean();
InternalRow[] rows = new InternalRow[orphanFiles.size()];
List<String> result =
OrphanFilesClean.showDeletedFiles(orphanFilesClean.clean(), 200);
InternalRow[] rows = new InternalRow[result.size()];
int index = 0;
for (Path filePath : orphanFiles) {
rows[index] =
newInternalRow(
UTF8String.fromString(filePath.toUri().getPath()));
for (String line : result) {
rows[index] = newInternalRow(UTF8String.fromString(line));
index++;
}
return rows;
Expand Down

0 comments on commit 8c4be22

Please sign in to comment.