From ecd902157475f490eb708f5c743120f0c699dc3d Mon Sep 17 00:00:00 2001 From: Askwang <135721692+Askwang@users.noreply.github.com> Date: Fri, 3 Jan 2025 15:46:28 +0800 Subject: [PATCH] fix comments --- .../paimon/operation/OrphanFilesClean.java | 30 ++++++++++--------- .../spark/orphan/SparkOrphanFilesClean.scala | 7 +++-- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index ad25d1b54d90d..9526ae66254e3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -54,13 +54,17 @@ import java.util.List; import java.util.Set; import java.util.TimeZone; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Predicate; +import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX; import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly; +import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool; +import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute; /** * To remove the data files and metadata files that are not used by table (so-called "orphan @@ -91,6 +95,10 @@ public abstract class OrphanFilesClean implements Serializable { protected final int partitionKeysNum; protected final Path location; + private static final String THREAD_NAME = "ORPHAN-FILES-CLEAN-THREAD-POOL"; + private static final ThreadPoolExecutor executorService = + createCachedThreadPool(Runtime.getRuntime().availableProcessors(), THREAD_NAME); + public OrphanFilesClean( FileStoreTable table, long olderThanMillis, SerializableConsumer fileCleaner) { this.table = table; @@ -399,27 +407,21 @@ public void cleanEmptyDirectory(Set deletedPaths) { return; } - int level = 0; - while (level <= partitionKeysNum) { - Set parentPaths = new HashSet<>(); - for (Path path : deletedPaths) { - boolean deleted = tryDeleteEmptyDirectory(path); - if (deleted) { - LOG.info("Delete empty directory '{}'.", path); - parentPaths.add(path.getParent()); - } - } + randomlyOnlyExecute(executorService, this::tryDeleteEmptyDirectory, deletedPaths); + + for (int level = 0; level < partitionKeysNum; level++) { + Set parentPaths = + deletedPaths.stream().map(Path::getParent).collect(Collectors.toSet()); + randomlyOnlyExecute(executorService, this::tryDeleteEmptyDirectory, parentPaths); deletedPaths = new HashSet<>(parentPaths); - level++; } } - private boolean tryDeleteEmptyDirectory(Path path) { + private void tryDeleteEmptyDirectory(Path path) { try { - return fileIO.delete(path, false); + fileIO.delete(path, false); } catch (IOException e) { LOG.debug("Failed to delete directory '{}' because it is not empty.", path); - return false; } } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala index dd34b4b44a56b..b2f3c30dd6e99 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala @@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicLong import java.util.function.Consumer import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer case class SparkOrphanFilesClean( @@ -50,7 +51,7 @@ case class SparkOrphanFilesClean( with Logging { def doOrphanClean() - : (Dataset[(Long, Long)], (Dataset[BranchAndManifestFile], Dataset[(Long, Long, ArrayBuffer[String])])) = { + : (Dataset[(Long, Long)], (Dataset[BranchAndManifestFile], Dataset[(Long, Long, mutable.HashSet[String])])) = { import spark.implicits._ val branches = validBranches() @@ -138,7 +139,7 @@ case class SparkOrphanFilesClean( it => var deletedFilesCount = 0L var deletedFilesLenInBytes = 0L - val involvedDirectories = new ArrayBuffer[String]() + val involvedDirectories = new mutable.HashSet[String]() while (it.hasNext) { val fileInfo = it.next(); @@ -147,7 +148,7 @@ case class SparkOrphanFilesClean( deletedFilesLenInBytes += fileInfo.getLong(2) specifiedFileCleaner.accept(deletedPath) logInfo(s"Cleaned file: $pathToClean") - involvedDirectories.append(deletedPath.getParent.toUri.toString) + involvedDirectories.add(deletedPath.getParent.toUri.toString) deletedFilesCount += 1 } logInfo(