diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java index 9cce1061fdd4..d68318e96de4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java @@ -53,7 +53,12 @@ import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool; import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecute; -/** Local {@link OrphanFilesClean}, it will use thread pool to execute deletion. */ +/** + * Local {@link OrphanFilesClean}, it will use thread pool to execute deletion. + * + *

Note that, this class is not used any more since each engine should implement its own + * distributed one. See `FlinkOrphanFilesClean` and `SparkOrphanFilesClean`. + */ public class LocalOrphanFilesClean extends OrphanFilesClean { private final ThreadPoolExecutor executor; @@ -109,8 +114,7 @@ private List getUsedFiles(String branch) { table.switchToBranch(branch).store().manifestFileFactory().create(); try { List manifests = new ArrayList<>(); - collectWithoutDataFile( - branch, usedFiles::add, manifest -> manifests.add(manifest.fileName())); + collectWithoutDataFile(branch, usedFiles::add, manifests::add); usedFiles.addAll(retryReadingDataFiles(manifestFile, manifests)); } catch (IOException e) { throw new RuntimeException(e); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index 8ce95337f27d..0f2bad27fc9a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -32,6 +32,7 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.DateTimeUtils; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SerializableConsumer; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -135,16 +136,6 @@ protected void cleanSnapshotDir(List branches, Consumer deletedFil } } - protected void collectWithoutDataFile( - String branch, - Consumer usedFileConsumer, - Consumer manifestConsumer) - throws IOException { - for (Snapshot snapshot : safelyGetAllSnapshots(branch)) { - collectWithoutDataFile(branch, snapshot, usedFileConsumer, manifestConsumer); - } - } - protected Set safelyGetAllSnapshots(String branch) throws IOException { FileStoreTable branchTable = table.switchToBranch(branch); SnapshotManager snapshotManager = branchTable.snapshotManager(); @@ -155,11 +146,34 @@ protected Set safelyGetAllSnapshots(String branch) throws IOException return readSnapshots; } + protected void collectWithoutDataFile( + String branch, Consumer usedFileConsumer, Consumer manifestConsumer) + throws IOException { + for (Snapshot snapshot : safelyGetAllSnapshots(branch)) { + collectWithoutDataFile(branch, snapshot, usedFileConsumer, manifestConsumer); + } + } + protected void collectWithoutDataFile( String branch, Snapshot snapshot, Consumer usedFileConsumer, - Consumer manifestConsumer) + Consumer manifestConsumer) + throws IOException { + Consumer> usedFileWithFlagConsumer = + fileAndFlag -> { + if (fileAndFlag.getRight()) { + manifestConsumer.accept(fileAndFlag.getLeft()); + } + usedFileConsumer.accept(fileAndFlag.getLeft()); + }; + collectWithoutDataFileWithManifestFlag(branch, snapshot, usedFileWithFlagConsumer); + } + + protected void collectWithoutDataFileWithManifestFlag( + String branch, + Snapshot snapshot, + Consumer> usedFileWithFlagConsumer) throws IOException { FileStoreTable branchTable = table.switchToBranch(branch); ManifestList manifestList = branchTable.store().manifestListFactory().create(); @@ -167,7 +181,7 @@ protected void collectWithoutDataFile( List manifestFileMetas = new ArrayList<>(); // changelog manifest if (snapshot.changelogManifestList() != null) { - usedFileConsumer.accept(snapshot.changelogManifestList()); + usedFileWithFlagConsumer.accept(Pair.of(snapshot.changelogManifestList(), false)); manifestFileMetas.addAll( retryReadingFiles( () -> @@ -178,7 +192,7 @@ protected void collectWithoutDataFile( // delta manifest if (snapshot.deltaManifestList() != null) { - usedFileConsumer.accept(snapshot.deltaManifestList()); + usedFileWithFlagConsumer.accept(Pair.of(snapshot.deltaManifestList(), false)); manifestFileMetas.addAll( retryReadingFiles( () -> manifestList.readWithIOException(snapshot.deltaManifestList()), @@ -186,7 +200,7 @@ protected void collectWithoutDataFile( } // base manifest - usedFileConsumer.accept(snapshot.baseManifestList()); + usedFileWithFlagConsumer.accept(Pair.of(snapshot.baseManifestList(), false)); manifestFileMetas.addAll( retryReadingFiles( () -> manifestList.readWithIOException(snapshot.baseManifestList()), @@ -194,26 +208,25 @@ protected void collectWithoutDataFile( // collect manifests for (ManifestFileMeta manifest : manifestFileMetas) { - manifestConsumer.accept(manifest); - usedFileConsumer.accept(manifest.fileName()); + usedFileWithFlagConsumer.accept(Pair.of(manifest.fileName(), true)); } // index files String indexManifest = snapshot.indexManifest(); if (indexManifest != null && indexFileHandler.existsManifest(indexManifest)) { - usedFileConsumer.accept(indexManifest); + usedFileWithFlagConsumer.accept(Pair.of(indexManifest, false)); retryReadingFiles( () -> indexFileHandler.readManifestWithIOException(indexManifest), Collections.emptyList()) .stream() .map(IndexManifestEntry::indexFile) .map(IndexFileMeta::fileName) - .forEach(usedFileConsumer); + .forEach(name -> usedFileWithFlagConsumer.accept(Pair.of(name, false))); } // statistic file if (snapshot.statistics() != null) { - usedFileConsumer.accept(snapshot.statistics()); + usedFileWithFlagConsumer.accept(Pair.of(snapshot.statistics(), false)); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java index 6740e8980fe2..f50414620551 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java @@ -27,7 +27,6 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFile; -import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.operation.OrphanFilesClean; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; @@ -134,11 +133,10 @@ public void processElement( throws Exception { String branch = branchAndSnapshot.f0; Snapshot snapshot = Snapshot.fromJson(branchAndSnapshot.f1); - Consumer manifestConsumer = + Consumer manifestConsumer = manifest -> { Tuple2 tuple2 = - new Tuple2<>( - branch, manifest.fileName()); + new Tuple2<>(branch, manifest); ctx.output(manifestOutputTag, tuple2); }; collectWithoutDataFile( diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java index 7a3c8df4d829..4f442fbae361 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java @@ -19,9 +19,9 @@ package org.apache.paimon.spark.procedure; import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.operation.LocalOrphanFilesClean; import org.apache.paimon.operation.OrphanFilesClean; import org.apache.paimon.spark.catalog.WithPaimonCatalog; +import org.apache.paimon.spark.orphan.SparkOrphanFilesClean; import org.apache.paimon.utils.Preconditions; import org.apache.spark.sql.catalyst.InternalRow; @@ -29,17 +29,12 @@ import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; -import org.apache.spark.unsafe.types.UTF8String; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import static org.apache.paimon.operation.LocalOrphanFilesClean.executeOrphanFilesClean; import static org.apache.spark.sql.types.DataTypes.BooleanType; import static org.apache.spark.sql.types.DataTypes.IntegerType; +import static org.apache.spark.sql.types.DataTypes.LongType; import static org.apache.spark.sql.types.DataTypes.StringType; /** @@ -67,7 +62,7 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure { private static final StructType OUTPUT_TYPE = new StructType( new StructField[] { - new StructField("result", StringType, true, Metadata.empty()) + new StructField("result", LongType, true, Metadata.empty()) }); private RemoveOrphanFilesProcedure(TableCatalog tableCatalog) { @@ -102,29 +97,19 @@ public InternalRow[] call(InternalRow args) { } LOG.info("identifier is {}.", identifier); - List tableCleans; - try { - Catalog catalog = ((WithPaimonCatalog) tableCatalog()).paimonCatalog(); - tableCleans = - LocalOrphanFilesClean.createOrphanFilesCleans( - catalog, - identifier.getDatabaseName(), - identifier.getObjectName(), - OrphanFilesClean.olderThanMillis( - args.isNullAt(1) ? null : args.getString(1)), - OrphanFilesClean.createFileCleaner( - catalog, !args.isNullAt(2) && args.getBoolean(2)), - args.isNullAt(3) ? null : args.getInt(3)); - } catch (Exception e) { - throw new RuntimeException(e); - } - - String[] result = executeOrphanFilesClean(tableCleans); - List rows = new ArrayList<>(); - Arrays.stream(result) - .forEach(line -> rows.add(newInternalRow(UTF8String.fromString(line)))); - - return rows.toArray(new InternalRow[0]); + Catalog catalog = ((WithPaimonCatalog) tableCatalog()).paimonCatalog(); + long deletedFiles = + SparkOrphanFilesClean.executeDatabaseOrphanFiles( + catalog, + identifier.getDatabaseName(), + identifier.getTableName(), + OrphanFilesClean.olderThanMillis( + args.isNullAt(1) ? null : args.getString(1)), + OrphanFilesClean.createFileCleaner( + catalog, !args.isNullAt(2) && args.getBoolean(2)), + args.isNullAt(3) ? null : args.getInt(3)); + + return new InternalRow[] {newInternalRow(deletedFiles)}; } public static ProcedureBuilder builder() { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala new file mode 100644 index 000000000000..d79105e24eec --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.orphan + +import org.apache.paimon.{utils, Snapshot} +import org.apache.paimon.catalog.{Catalog, Identifier} +import org.apache.paimon.fs.Path +import org.apache.paimon.manifest.{ManifestEntry, ManifestFile} +import org.apache.paimon.operation.OrphanFilesClean +import org.apache.paimon.operation.OrphanFilesClean.retryReadingFiles +import org.apache.paimon.table.FileStoreTable +import org.apache.paimon.utils.SerializableConsumer + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Dataset, SparkSession} +import org.apache.spark.sql.catalyst.SQLConfHelper +import org.apache.spark.sql.functions.sum + +import java.util +import java.util.Collections +import java.util.concurrent.atomic.AtomicLong +import java.util.function.Consumer + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +case class SparkOrphanFilesClean( + specifiedTable: FileStoreTable, + specifiedOlderThanMillis: Long, + specifiedFileCleaner: SerializableConsumer[Path], + parallelism: Int, + @transient spark: SparkSession) + extends OrphanFilesClean(specifiedTable, specifiedOlderThanMillis, specifiedFileCleaner) + with SQLConfHelper + with Logging { + + def doOrphanClean(): (Dataset[Long], Dataset[BranchAndManifestFile]) = { + import spark.implicits._ + + val branches = validBranches() + val deletedInLocal = new AtomicLong(0) + // snapshot and changelog files are the root of everything, so they are handled specially + // here, and subsequently, we will not count their orphan files. + cleanSnapshotDir(branches, (_: Path) => deletedInLocal.incrementAndGet) + + val maxBranchParallelism = Math.min(branches.size(), parallelism) + // find snapshots using branch and find manifests(manifest, index, statistics) using snapshot + val usedManifestFiles = spark.sparkContext + .parallelize(branches.asScala, maxBranchParallelism) + .mapPartitions(_.flatMap { + branch => safelyGetAllSnapshots(branch).asScala.map(snapshot => (branch, snapshot.toJson)) + }) + .repartition(parallelism) + .flatMap { + case (branch, snapshotJson) => + val usedFileBuffer = new ArrayBuffer[BranchAndManifestFile]() + val usedFileConsumer = + new Consumer[org.apache.paimon.utils.Pair[String, java.lang.Boolean]] { + override def accept(pair: utils.Pair[String, java.lang.Boolean]): Unit = { + usedFileBuffer.append(BranchAndManifestFile(branch, pair.getLeft, pair.getRight)) + } + } + val snapshot = Snapshot.fromJson(snapshotJson) + collectWithoutDataFileWithManifestFlag(branch, snapshot, usedFileConsumer) + usedFileBuffer + } + .toDS() + .cache() + + // find all data files + val dataFiles = usedManifestFiles + .filter(_.isManifestFile) + .distinct() + .mapPartitions { + it => + val branchManifests = new util.HashMap[String, ManifestFile] + it.flatMap { + branchAndManifestFile => + val manifestFile = branchManifests.computeIfAbsent( + branchAndManifestFile.branch, + (key: String) => + specifiedTable.switchToBranch(key).store.manifestFileFactory.create) + + retryReadingFiles( + () => manifestFile.readWithIOException(branchAndManifestFile.manifestName), + Collections.emptyList[ManifestEntry] + ).asScala.flatMap { + manifestEntry => + manifestEntry.fileName() +: manifestEntry.file().extraFiles().asScala + } + } + } + + // union manifest and data files + val usedFiles = usedManifestFiles + .map(_.manifestName) + .union(dataFiles) + .toDF("used_name") + + // find candidate files which can be removed + val fileDirs = listPaimonFileDirs.asScala.map(_.toUri.toString) + val maxFileDirsParallelism = Math.min(fileDirs.size, parallelism) + val candidates = spark.sparkContext + .parallelize(fileDirs, maxFileDirsParallelism) + .flatMap { + dir => + tryBestListingDirs(new Path(dir)).asScala.filter(oldEnough).map { + file => (file.getPath.getName, file.getPath.toUri.toString) + } + } + .toDF("name", "path") + .repartition(parallelism) + + // use left anti to filter files which is not used + val deleted = candidates + .join(usedFiles, $"name" === $"used_name", "left_anti") + .mapPartitions { + it => + var deleted = 0L + while (it.hasNext) { + val pathToClean = it.next().getString(1) + specifiedFileCleaner.accept(new Path(pathToClean)) + logInfo(s"Cleaned file: $pathToClean") + deleted += 1 + } + logInfo(s"Total cleaned files: $deleted"); + Iterator.single(deleted) + } + val finalDeletedDataset = if (deletedInLocal.get() != 0) { + deleted.union(spark.createDataset(Seq(deletedInLocal.get()))) + } else { + deleted + } + + (finalDeletedDataset, usedManifestFiles) + } +} + +/** + * @param branch + * The branch name + * @param manifestName + * The manifest file name, including manifest-list, manifest, index-manifest, statistics + * @param isManifestFile + * If it is the manifest file + */ +case class BranchAndManifestFile(branch: String, manifestName: String, isManifestFile: Boolean) + +object SparkOrphanFilesClean extends SQLConfHelper { + def executeDatabaseOrphanFiles( + catalog: Catalog, + databaseName: String, + tableName: String, + olderThanMillis: Long, + fileCleaner: SerializableConsumer[Path], + parallelismOpt: Integer): Long = { + val spark = SparkSession.active + val parallelism = if (parallelismOpt == null) { + Math.max(spark.sparkContext.defaultParallelism, conf.numShufflePartitions) + } else { + parallelismOpt.intValue() + } + + val tableNames = if (tableName == null || "*" == tableName) { + catalog.listTables(databaseName).asScala + } else { + tableName :: Nil + } + val tables = tableNames.map { + tableName => + val identifier = new Identifier(databaseName, tableName) + val table = catalog.getTable(identifier) + assert( + table.isInstanceOf[FileStoreTable], + s"Only FileStoreTable supports remove-orphan-files action. The table type is '${table.getClass.getName}'.") + table.asInstanceOf[FileStoreTable] + } + if (tables.isEmpty) { + return 0 + } + val (deleted, waitToRelease) = tables.map { + table => + new SparkOrphanFilesClean( + table, + olderThanMillis, + fileCleaner, + parallelism, + spark + ).doOrphanClean() + }.unzip + try { + val result = deleted + .reduce((l, r) => l.union(r)) + .toDF("deleted") + .agg(sum("deleted")) + .head() + assert(result.schema.size == 1, result.schema) + if (result.isNullAt(0)) { + // no files can be deleted + 0 + } else { + result.getLong(0) + } + } finally { + waitToRelease.foreach(_.unpersist()) + } + } +} diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala index 23a014d0fe9f..c414515f1885 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala @@ -52,7 +52,7 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { fileIO.tryToWriteAtomic(orphanFile2, "b") // by default, no file deleted - checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Nil) + checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0) :: Nil) val orphanFile2ModTime = fileIO.getFileStatus(orphanFile2).getModificationTime val older_than1 = DateTimeUtils.formatLocalDateTime( @@ -63,7 +63,7 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { checkAnswer( spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than1')"), - Row(orphanFile1.toUri.getPath) :: Nil) + Row(1) :: Nil) val older_than2 = DateTimeUtils.formatLocalDateTime( DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), @@ -71,7 +71,9 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { checkAnswer( spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than2')"), - Row(orphanFile2.toUri.getPath) :: Nil) + Row(1) :: Nil) + + checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0) :: Nil) } test("Paimon procedure: dry run remove orphan files") { @@ -95,7 +97,7 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { fileIO.writeFile(orphanFile2, "b", true) // by default, no file deleted - checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Nil) + checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0) :: Nil) val older_than = DateTimeUtils.formatLocalDateTime( DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), @@ -104,8 +106,10 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { checkAnswer( spark.sql( s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than', dry_run => true)"), - Row(orphanFile1.toUri.getPath) :: Row(orphanFile2.toUri.getPath) :: Nil + Row(2) :: Nil ) + + checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0) :: Nil) } test("Paimon procedure: remove database orphan files") { @@ -142,7 +146,7 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { fileIO2.tryToWriteAtomic(orphanFile22, "b") // by default, no file deleted - checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*')"), Nil) + checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*')"), Row(0) :: Nil) val orphanFile12ModTime = fileIO1.getFileStatus(orphanFile12).getModificationTime val older_than1 = DateTimeUtils.formatLocalDateTime( @@ -153,7 +157,7 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { checkAnswer( spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*', older_than => '$older_than1')"), - Row(orphanFile11.toUri.getPath) :: Row(orphanFile21.toUri.getPath) :: Nil + Row(2) :: Nil ) val older_than2 = DateTimeUtils.formatLocalDateTime( @@ -162,8 +166,10 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { checkAnswer( spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*', older_than => '$older_than2')"), - Row(orphanFile12.toUri.getPath) :: Row(orphanFile22.toUri.getPath) :: Nil + Row(2) :: Nil ) + + checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*')"), Row(0) :: Nil) } }