Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang committed Jan 3, 2025
1 parent 92aca2a commit ecd9021
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,17 @@
import java.util.List;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
import static org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX;
import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute;

/**
* To remove the data files and metadata files that are not used by table (so-called "orphan
Expand Down Expand Up @@ -91,6 +95,10 @@ public abstract class OrphanFilesClean implements Serializable {
protected final int partitionKeysNum;
protected final Path location;

private static final String THREAD_NAME = "ORPHAN-FILES-CLEAN-THREAD-POOL";
private static final ThreadPoolExecutor executorService =
createCachedThreadPool(Runtime.getRuntime().availableProcessors(), THREAD_NAME);

public OrphanFilesClean(
FileStoreTable table, long olderThanMillis, SerializableConsumer<Path> fileCleaner) {
this.table = table;
Expand Down Expand Up @@ -399,27 +407,21 @@ public void cleanEmptyDirectory(Set<Path> deletedPaths) {
return;
}

int level = 0;
while (level <= partitionKeysNum) {
Set<Path> parentPaths = new HashSet<>();
for (Path path : deletedPaths) {
boolean deleted = tryDeleteEmptyDirectory(path);
if (deleted) {
LOG.info("Delete empty directory '{}'.", path);
parentPaths.add(path.getParent());
}
}
randomlyOnlyExecute(executorService, this::tryDeleteEmptyDirectory, deletedPaths);

for (int level = 0; level < partitionKeysNum; level++) {
Set<Path> parentPaths =
deletedPaths.stream().map(Path::getParent).collect(Collectors.toSet());
randomlyOnlyExecute(executorService, this::tryDeleteEmptyDirectory, parentPaths);
deletedPaths = new HashSet<>(parentPaths);
level++;
}
}

private boolean tryDeleteEmptyDirectory(Path path) {
private void tryDeleteEmptyDirectory(Path path) {
try {
return fileIO.delete(path, false);
fileIO.delete(path, false);
} catch (IOException e) {
LOG.debug("Failed to delete directory '{}' because it is not empty.", path);
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicLong
import java.util.function.Consumer

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

case class SparkOrphanFilesClean(
Expand All @@ -50,7 +51,7 @@ case class SparkOrphanFilesClean(
with Logging {

def doOrphanClean()
: (Dataset[(Long, Long)], (Dataset[BranchAndManifestFile], Dataset[(Long, Long, ArrayBuffer[String])])) = {
: (Dataset[(Long, Long)], (Dataset[BranchAndManifestFile], Dataset[(Long, Long, mutable.HashSet[String])])) = {
import spark.implicits._

val branches = validBranches()
Expand Down Expand Up @@ -138,7 +139,7 @@ case class SparkOrphanFilesClean(
it =>
var deletedFilesCount = 0L
var deletedFilesLenInBytes = 0L
val involvedDirectories = new ArrayBuffer[String]()
val involvedDirectories = new mutable.HashSet[String]()

while (it.hasNext) {
val fileInfo = it.next();
Expand All @@ -147,7 +148,7 @@ case class SparkOrphanFilesClean(
deletedFilesLenInBytes += fileInfo.getLong(2)
specifiedFileCleaner.accept(deletedPath)
logInfo(s"Cleaned file: $pathToClean")
involvedDirectories.append(deletedPath.getParent.toUri.toString)
involvedDirectories.add(deletedPath.getParent.toUri.toString)
deletedFilesCount += 1
}
logInfo(
Expand Down

0 comments on commit ecd9021

Please sign in to comment.