diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/CleanOrphanFilesResult.java b/paimon-core/src/main/java/org/apache/paimon/operation/CleanOrphanFilesResult.java new file mode 100644 index 0000000000000..f6a66a6518dc7 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/CleanOrphanFilesResult.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.fs.Path; + +import java.util.List; + +/** The result of OrphanFilesClean. */ +public class CleanOrphanFilesResult { + + private List deletedFilesPath; + private final long deletedFileCount; + private final long deletedFileTotalSizeInBytes; + + public CleanOrphanFilesResult(long deletedFileCount, long deletedFileTotalSizeInBytes) { + this.deletedFileCount = deletedFileCount; + this.deletedFileTotalSizeInBytes = deletedFileTotalSizeInBytes; + } + + public CleanOrphanFilesResult( + List deletedFilesPath, long deletedFileCount, long deletedFileTotalSizeInBytes) { + this(deletedFileCount, deletedFileTotalSizeInBytes); + this.deletedFilesPath = deletedFilesPath; + } + + public long getDeletedFileCount() { + return deletedFileCount; + } + + public long getDeletedFileTotalSizeInBytes() { + return deletedFileTotalSizeInBytes; + } + + public List getDeletedFilesPath() { + return deletedFilesPath; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java index a5eea6d650cf9..93b44f0715375 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java @@ -47,6 +47,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -68,6 +69,8 @@ public class LocalOrphanFilesClean extends OrphanFilesClean { private final List deleteFiles; + private final AtomicLong deletedFilesSizeInBytes = new AtomicLong(0); + private Set candidateDeletes; public LocalOrphanFilesClean(FileStoreTable table) { @@ -87,16 +90,27 @@ public LocalOrphanFilesClean( table.coreOptions().deleteFileThreadNum(), "ORPHAN_FILES_CLEAN"); } - public List clean() throws IOException, ExecutionException, InterruptedException { + public CleanOrphanFilesResult clean() + throws IOException, ExecutionException, InterruptedException { List branches = validBranches(); // specially handle to clear snapshot dir - cleanSnapshotDir(branches, deleteFiles::add); + cleanSnapshotDir( + branches, + deleteFiles::add, + p -> { + try { + deletedFilesSizeInBytes.addAndGet(fileIO.getFileSize(p)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); // delete candidate files Map candidates = getCandidateDeletingFiles(); if (candidates.isEmpty()) { - return deleteFiles; + return new CleanOrphanFilesResult( + deleteFiles, deleteFiles.size(), deletedFilesSizeInBytes.get()); } candidateDeletes = new HashSet<>(candidates.keySet()); @@ -108,12 +122,24 @@ public List clean() throws IOException, ExecutionException, InterruptedExc // delete unused files candidateDeletes.removeAll(usedFiles); - candidateDeletes.stream().map(candidates::get).forEach(fileCleaner); + candidateDeletes.stream() + .map(candidates::get) + .forEach( + deleteFilePath -> { + try { + deletedFilesSizeInBytes.addAndGet( + fileIO.getFileSize(deleteFilePath)); + } catch (IOException e) { + throw new RuntimeException(e); + } + fileCleaner.accept(deleteFilePath); + }); deleteFiles.addAll( candidateDeletes.stream().map(candidates::get).collect(Collectors.toList())); candidateDeletes.clear(); - return deleteFiles; + return new CleanOrphanFilesResult( + deleteFiles, deleteFiles.size(), deletedFilesSizeInBytes.get()); } private void collectWithoutDataFile( @@ -230,7 +256,7 @@ public static List createOrphanFilesCleans( return orphanFilesCleans; } - public static long executeDatabaseOrphanFiles( + public static CleanOrphanFilesResult executeDatabaseOrphanFiles( Catalog catalog, String databaseName, @Nullable String tableName, @@ -249,15 +275,17 @@ public static long executeDatabaseOrphanFiles( ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); - List>> tasks = new ArrayList<>(); + List> tasks = new ArrayList<>(); for (LocalOrphanFilesClean clean : tableCleans) { tasks.add(executorService.submit(clean::clean)); } - List cleanOrphanFiles = new ArrayList<>(); - for (Future> task : tasks) { + long deletedFileCount = 0; + long deletedFileTotalSizeInBytes = 0; + for (Future task : tasks) { try { - cleanOrphanFiles.addAll(task.get()); + deletedFileCount += task.get().getDeletedFileCount(); + deletedFileTotalSizeInBytes += task.get().getDeletedFileTotalSizeInBytes(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); @@ -267,6 +295,6 @@ public static long executeDatabaseOrphanFiles( } executorService.shutdownNow(); - return cleanOrphanFiles.size(); + return new CleanOrphanFilesResult(deletedFileCount, deletedFileTotalSizeInBytes); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index 5698908cb9b0c..2467e86021dbe 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -119,20 +119,25 @@ protected List validBranches() { return branches; } - protected void cleanSnapshotDir(List branches, Consumer deletedFileConsumer) { + protected void cleanSnapshotDir( + List branches, + Consumer deletedFilesConsumer, + Consumer deletedFilesSizeInBytesConsumer) { for (String branch : branches) { FileStoreTable branchTable = table.switchToBranch(branch); SnapshotManager snapshotManager = branchTable.snapshotManager(); // specially handle the snapshot directory List nonSnapshotFiles = snapshotManager.tryGetNonSnapshotFiles(this::oldEnough); + nonSnapshotFiles.forEach(deletedFilesConsumer); + nonSnapshotFiles.forEach(deletedFilesSizeInBytesConsumer); nonSnapshotFiles.forEach(fileCleaner); - nonSnapshotFiles.forEach(deletedFileConsumer); // specially handle the changelog directory List nonChangelogFiles = snapshotManager.tryGetNonChangelogFiles(this::oldEnough); + nonChangelogFiles.forEach(deletedFilesConsumer); + nonChangelogFiles.forEach(deletedFilesSizeInBytesConsumer); nonChangelogFiles.forEach(fileCleaner); - nonChangelogFiles.forEach(deletedFileConsumer); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java index fdc68b34abb44..5139dd44957d7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java @@ -165,22 +165,20 @@ public void testNormallyRemoving() throws Throwable { // randomly delete tags List deleteTags = Collections.emptyList(); - if (!allTags.isEmpty()) { - deleteTags = randomlyPick(allTags); - for (String tagName : deleteTags) { - table.deleteTag(tagName); - } + deleteTags = randomlyPick(allTags); + for (String tagName : deleteTags) { + table.deleteTag(tagName); } // first check, nothing will be deleted because the default olderThan interval is 1 day LocalOrphanFilesClean orphanFilesClean = new LocalOrphanFilesClean(table); - assertThat(orphanFilesClean.clean().size()).isEqualTo(0); + assertThat(orphanFilesClean.clean().getDeletedFilesPath().size()).isEqualTo(0); // second check orphanFilesClean = new LocalOrphanFilesClean( table, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(2)); - List deleted = orphanFilesClean.clean(); + List deleted = orphanFilesClean.clean().getDeletedFilesPath(); try { validate(deleted, snapshotData, new HashMap<>()); } catch (Throwable t) { @@ -363,13 +361,13 @@ public void testCleanOrphanFilesWithChangelogDecoupled(String changelogProducer) // first check, nothing will be deleted because the default olderThan interval is 1 day LocalOrphanFilesClean orphanFilesClean = new LocalOrphanFilesClean(table); - assertThat(orphanFilesClean.clean().size()).isEqualTo(0); + assertThat(orphanFilesClean.clean().getDeletedFilesPath().size()).isEqualTo(0); // second check orphanFilesClean = new LocalOrphanFilesClean( table, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(2)); - List deleted = orphanFilesClean.clean(); + List deleted = orphanFilesClean.clean().getDeletedFilesPath(); validate(deleted, snapshotData, changelogData); } @@ -399,7 +397,7 @@ public void testAbnormallyRemoving() throws Exception { LocalOrphanFilesClean orphanFilesClean = new LocalOrphanFilesClean( table, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(2)); - assertThat(orphanFilesClean.clean().size()).isGreaterThan(0); + assertThat(orphanFilesClean.clean().getDeletedFilesPath().size()).isGreaterThan(0); } private void writeData( diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java index 7695c510b1dc7..c73fd0b431d8d 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java @@ -20,6 +20,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.orphan.FlinkOrphanFilesClean; +import org.apache.paimon.operation.CleanOrphanFilesResult; import org.apache.paimon.operation.LocalOrphanFilesClean; import org.apache.flink.table.procedure.ProcedureContext; @@ -86,11 +87,12 @@ public String[] call( if (mode == null) { mode = "DISTRIBUTED"; } - long deletedFiles; + + CleanOrphanFilesResult cleanOrphanFilesResult; try { switch (mode.toUpperCase(Locale.ROOT)) { case "DISTRIBUTED": - deletedFiles = + cleanOrphanFilesResult = FlinkOrphanFilesClean.executeDatabaseOrphanFiles( procedureContext.getExecutionEnvironment(), catalog, @@ -101,7 +103,7 @@ public String[] call( tableName); break; case "LOCAL": - deletedFiles = + cleanOrphanFilesResult = LocalOrphanFilesClean.executeDatabaseOrphanFiles( catalog, databaseName, @@ -116,7 +118,10 @@ public String[] call( + mode + ". Only 'DISTRIBUTED' and 'LOCAL' are supported."); } - return new String[] {String.valueOf(deletedFiles)}; + return new String[] { + String.valueOf(cleanOrphanFilesResult.getDeletedFileCount()), + String.valueOf(cleanOrphanFilesResult.getDeletedFileTotalSizeInBytes()) + }; } catch (Exception e) { throw new RuntimeException(e); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java index 61bebca24af45..c6c4e9d22d2ef 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java @@ -27,12 +27,14 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFile; +import org.apache.paimon.operation.CleanOrphanFilesResult; import org.apache.paimon.operation.OrphanFilesClean; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.utils.SerializableConsumer; import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; @@ -61,7 +63,6 @@ import java.util.function.Consumer; import java.util.stream.Collectors; -import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO; import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO; import static org.apache.flink.util.Preconditions.checkState; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -81,7 +82,7 @@ public FlinkOrphanFilesClean( } @Nullable - public DataStream doOrphanClean(StreamExecutionEnvironment env) { + public DataStream doOrphanClean(StreamExecutionEnvironment env) { Configuration flinkConf = new Configuration(); flinkConf.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH); flinkConf.set(ExecutionOptions.SORT_INPUTS, false); @@ -97,8 +98,18 @@ public DataStream doOrphanClean(StreamExecutionEnvironment env) { // snapshot and changelog files are the root of everything, so they are handled specially // here, and subsequently, we will not count their orphan files. - AtomicLong deletedInLocal = new AtomicLong(0); - cleanSnapshotDir(branches, p -> deletedInLocal.incrementAndGet()); + AtomicLong deletedFilesCountInLocal = new AtomicLong(0); + AtomicLong deletedFilesSizeInBytesInLocal = new AtomicLong(0); + cleanSnapshotDir( + branches, + p -> deletedFilesCountInLocal.incrementAndGet(), + p -> { + try { + deletedFilesSizeInBytesInLocal.addAndGet(fileIO.getFileSize(p)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); // branch and manifest file final OutputTag> manifestOutputTag = @@ -222,17 +233,19 @@ public void processElement( } }); - DataStream deleted = + DataStream deleted = usedFiles .keyBy(f -> f) .connect(candidates.keyBy(path -> new Path(path).getName())) .transform( "files_join", - LONG_TYPE_INFO, - new BoundedTwoInputOperator() { + TypeInformation.of(CleanOrphanFilesResult.class), + new BoundedTwoInputOperator< + String, String, CleanOrphanFilesResult>() { private boolean buildEnd; - private long emitted; + private long emittedFilesCount; + private long emittedFilesSize; private final Set used = new HashSet<>(); @@ -254,8 +267,15 @@ public void endInput(int inputId) { case 2: checkState(buildEnd, "Should build ended."); LOG.info("Finish probe phase."); - LOG.info("Clean files: {}", emitted); - output.collect(new StreamRecord<>(emitted)); + LOG.info( + "Clean files count : {}", + emittedFilesCount); + LOG.info("Clean files size : {}", emittedFilesSize); + output.collect( + new StreamRecord<>( + new CleanOrphanFilesResult( + emittedFilesCount, + emittedFilesSize))); break; } } @@ -266,25 +286,33 @@ public void processElement1(StreamRecord element) { } @Override - public void processElement2(StreamRecord element) { + public void processElement2(StreamRecord element) + throws IOException { checkState(buildEnd, "Should build ended."); String value = element.getValue(); Path path = new Path(value); if (!used.contains(path.getName())) { + emittedFilesSize += fileIO.getFileSize(path); fileCleaner.accept(path); LOG.info("Dry clean: {}", path); - emitted++; + emittedFilesCount++; } } }); - if (deletedInLocal.get() != 0) { - deleted = deleted.union(env.fromElements(deletedInLocal.get())); + if (deletedFilesCountInLocal.get() != 0 || deletedFilesSizeInBytesInLocal.get() != 0) { + deleted = + deleted.union( + env.fromElements( + new CleanOrphanFilesResult( + deletedFilesCountInLocal.get(), + deletedFilesSizeInBytesInLocal.get()))); } + return deleted; } - public static long executeDatabaseOrphanFiles( + public static CleanOrphanFilesResult executeDatabaseOrphanFiles( StreamExecutionEnvironment env, Catalog catalog, long olderThanMillis, @@ -293,7 +321,7 @@ public static long executeDatabaseOrphanFiles( String databaseName, @Nullable String tableName) throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException { - List> orphanFilesCleans = new ArrayList<>(); + List> orphanFilesCleans = new ArrayList<>(); List tableNames = Collections.singletonList(tableName); if (tableName == null || "*".equals(tableName)) { tableNames = catalog.listTables(databaseName); @@ -307,7 +335,7 @@ public static long executeDatabaseOrphanFiles( "Only FileStoreTable supports remove-orphan-files action. The table type is '%s'.", table.getClass().getName()); - DataStream clean = + DataStream clean = new FlinkOrphanFilesClean( (FileStoreTable) table, olderThanMillis, @@ -319,8 +347,8 @@ public static long executeDatabaseOrphanFiles( } } - DataStream result = null; - for (DataStream clean : orphanFilesCleans) { + DataStream result = null; + for (DataStream clean : orphanFilesCleans) { if (result == null) { result = clean; } else { @@ -331,20 +359,24 @@ public static long executeDatabaseOrphanFiles( return sum(result); } - private static long sum(DataStream deleted) { - long deleteCount = 0; + private static CleanOrphanFilesResult sum(DataStream deleted) { + long deletedFilesCount = 0; + long deletedFilesSizeInBytes = 0; if (deleted != null) { try { - CloseableIterator iterator = + CloseableIterator iterator = deleted.global().executeAndCollect("OrphanFilesClean"); while (iterator.hasNext()) { - deleteCount += iterator.next(); + CleanOrphanFilesResult cleanOrphanFilesResult = iterator.next(); + deletedFilesCount += cleanOrphanFilesResult.getDeletedFileCount(); + deletedFilesSizeInBytes += + cleanOrphanFilesResult.getDeletedFileTotalSizeInBytes(); } iterator.close(); } catch (Exception e) { throw new RuntimeException(e); } } - return deleteCount; + return new CleanOrphanFilesResult(deletedFilesCount, deletedFilesSizeInBytes); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java index 10ad878e0ccb5..51dd51d806114 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java @@ -20,6 +20,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.orphan.FlinkOrphanFilesClean; +import org.apache.paimon.operation.CleanOrphanFilesResult; import org.apache.paimon.operation.LocalOrphanFilesClean; import org.apache.flink.table.annotation.ArgumentHint; @@ -75,11 +76,11 @@ public String[] call( if (mode == null) { mode = "DISTRIBUTED"; } - long deletedFiles; + CleanOrphanFilesResult cleanOrphanFilesResult; try { switch (mode.toUpperCase(Locale.ROOT)) { case "DISTRIBUTED": - deletedFiles = + cleanOrphanFilesResult = FlinkOrphanFilesClean.executeDatabaseOrphanFiles( procedureContext.getExecutionEnvironment(), catalog, @@ -90,7 +91,7 @@ public String[] call( tableName); break; case "LOCAL": - deletedFiles = + cleanOrphanFilesResult = LocalOrphanFilesClean.executeDatabaseOrphanFiles( catalog, databaseName, @@ -105,7 +106,10 @@ public String[] call( + mode + ". Only 'DISTRIBUTED' and 'LOCAL' are supported."); } - return new String[] {String.valueOf(deletedFiles)}; + return new String[] { + String.valueOf(cleanOrphanFilesResult.getDeletedFileCount()), + String.valueOf(cleanOrphanFilesResult.getDeletedFileTotalSizeInBytes()) + }; } catch (Exception e) { throw new RuntimeException(e); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java index 5f874a5a7f9b2..77f3be2f0c765 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java @@ -148,7 +148,7 @@ public void testRunWithoutException(boolean isNamedArgument) throws Exception { tableName); ImmutableList actualDeleteFile = ImmutableList.copyOf(executeSQL(withOlderThan)); - assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2")); + assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2"), Row.of("2")); } @ParameterizedTest diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java index 293e84ca14bd6..8ea2b911a72ab 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java @@ -19,6 +19,7 @@ package org.apache.paimon.spark.procedure; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.operation.CleanOrphanFilesResult; import org.apache.paimon.operation.LocalOrphanFilesClean; import org.apache.paimon.operation.OrphanFilesClean; import org.apache.paimon.spark.catalog.WithPaimonCatalog; @@ -66,7 +67,9 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure { private static final StructType OUTPUT_TYPE = new StructType( new StructField[] { - new StructField("result", LongType, true, Metadata.empty()) + new StructField("deletedFileCount", LongType, true, Metadata.empty()), + new StructField( + "deletedFileTotalSizeInBytes", LongType, true, Metadata.empty()) }); private RemoveOrphanFilesProcedure(TableCatalog tableCatalog) { @@ -104,11 +107,11 @@ public InternalRow[] call(InternalRow args) { Catalog catalog = ((WithPaimonCatalog) tableCatalog()).paimonCatalog(); String mode = args.isNullAt(4) ? "DISTRIBUTED" : args.getString(4); - long deletedFiles; + CleanOrphanFilesResult cleanOrphanFilesResult; try { switch (mode.toUpperCase(Locale.ROOT)) { case "LOCAL": - deletedFiles = + cleanOrphanFilesResult = LocalOrphanFilesClean.executeDatabaseOrphanFiles( catalog, identifier.getDatabaseName(), @@ -120,7 +123,7 @@ public InternalRow[] call(InternalRow args) { args.isNullAt(3) ? null : args.getInt(3)); break; case "DISTRIBUTED": - deletedFiles = + cleanOrphanFilesResult = SparkOrphanFilesClean.executeDatabaseOrphanFiles( catalog, identifier.getDatabaseName(), @@ -137,7 +140,12 @@ public InternalRow[] call(InternalRow args) { + mode + ". Only 'DISTRIBUTED' and 'LOCAL' are supported."); } - return new InternalRow[] {newInternalRow(deletedFiles)}; + + return new InternalRow[] { + newInternalRow( + cleanOrphanFilesResult.getDeletedFileCount(), + cleanOrphanFilesResult.getDeletedFileTotalSizeInBytes()) + }; } catch (Exception e) { throw new RuntimeException(e); } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala index 488d70e349356..b3cd3260e3287 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala @@ -22,15 +22,14 @@ import org.apache.paimon.{utils, Snapshot} import org.apache.paimon.catalog.{Catalog, Identifier} import org.apache.paimon.fs.Path import org.apache.paimon.manifest.{ManifestEntry, ManifestFile} -import org.apache.paimon.operation.OrphanFilesClean +import org.apache.paimon.operation.{CleanOrphanFilesResult, OrphanFilesClean} import org.apache.paimon.operation.OrphanFilesClean.retryReadingFiles import org.apache.paimon.table.FileStoreTable import org.apache.paimon.utils.SerializableConsumer import org.apache.spark.internal.Logging -import org.apache.spark.sql.{Dataset, SparkSession} +import org.apache.spark.sql.{functions, Dataset, SparkSession} import org.apache.spark.sql.catalyst.SQLConfHelper -import org.apache.spark.sql.functions.sum import java.util import java.util.Collections @@ -50,14 +49,18 @@ case class SparkOrphanFilesClean( with SQLConfHelper with Logging { - def doOrphanClean(): (Dataset[Long], Dataset[BranchAndManifestFile]) = { + def doOrphanClean(): (Dataset[(Long, Long)], Dataset[BranchAndManifestFile]) = { import spark.implicits._ val branches = validBranches() - val deletedInLocal = new AtomicLong(0) + val deletedFilesCountInLocal = new AtomicLong(0) + val deletedFilesSizeInBytesInLocal = new AtomicLong(0) // snapshot and changelog files are the root of everything, so they are handled specially // here, and subsequently, we will not count their orphan files. - cleanSnapshotDir(branches, (_: Path) => deletedInLocal.incrementAndGet) + cleanSnapshotDir( + branches, + (_: Path) => deletedFilesCountInLocal.incrementAndGet, + (path: Path) => deletedFilesSizeInBytesInLocal.addAndGet(fileIO.getFileSize(path))) val maxBranchParallelism = Math.min(branches.size(), parallelism) // find snapshots using branch and find manifests(manifest, index, statistics) using snapshot @@ -132,21 +135,29 @@ case class SparkOrphanFilesClean( .join(usedFiles, $"name" === $"used_name", "left_anti") .mapPartitions { it => - var deleted = 0L + var deletedFilesCount = 0L + var deletedFilesSizeInBytes = 0L + while (it.hasNext) { val pathToClean = it.next().getString(1) - specifiedFileCleaner.accept(new Path(pathToClean)) + val deletedPath = new Path(pathToClean) + deletedFilesSizeInBytes += fileIO.getFileSize(deletedPath) + specifiedFileCleaner.accept(deletedPath) logInfo(s"Cleaned file: $pathToClean") - deleted += 1 + deletedFilesCount += 1 } - logInfo(s"Total cleaned files: $deleted"); - Iterator.single(deleted) + logInfo( + s"Total cleaned files: $deletedFilesCount, Total cleaned files size : $deletedFilesSizeInBytes") + Iterator.single((deletedFilesCount, deletedFilesSizeInBytes)) + } + val finalDeletedDataset = + if (deletedFilesCountInLocal.get() != 0 || deletedFilesSizeInBytesInLocal.get() != 0) { + deleted.union( + spark.createDataset( + Seq((deletedFilesCountInLocal.get(), deletedFilesSizeInBytesInLocal.get())))) + } else { + deleted } - val finalDeletedDataset = if (deletedInLocal.get() != 0) { - deleted.union(spark.createDataset(Seq(deletedInLocal.get()))) - } else { - deleted - } (finalDeletedDataset, usedManifestFiles) } @@ -169,7 +180,7 @@ object SparkOrphanFilesClean extends SQLConfHelper { tableName: String, olderThanMillis: Long, fileCleaner: SerializableConsumer[Path], - parallelismOpt: Integer): Long = { + parallelismOpt: Integer): CleanOrphanFilesResult = { val spark = SparkSession.active val parallelism = if (parallelismOpt == null) { Math.max(spark.sparkContext.defaultParallelism, conf.numShufflePartitions) @@ -192,7 +203,7 @@ object SparkOrphanFilesClean extends SQLConfHelper { table.asInstanceOf[FileStoreTable] } if (tables.isEmpty) { - return 0 + return new CleanOrphanFilesResult(0, 0) } val (deleted, waitToRelease) = tables.map { table => @@ -207,15 +218,15 @@ object SparkOrphanFilesClean extends SQLConfHelper { try { val result = deleted .reduce((l, r) => l.union(r)) - .toDF("deleted") - .agg(sum("deleted")) + .toDF("deletedFilesCount", "deletedFilesSizeInBytes") + .agg(functions.sum("deletedFilesCount"), functions.sum("deletedFilesSizeInBytes")) .head() - assert(result.schema.size == 1, result.schema) + assert(result.schema.size == 2, result.schema) if (result.isNullAt(0)) { // no files can be deleted - 0 + new CleanOrphanFilesResult(0, 0) } else { - result.getLong(0) + new CleanOrphanFilesResult(result.getLong(0), result.getLong(1)) } } finally { waitToRelease.foreach(_.unpersist()) diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala index d9d73811266dd..3ffe7fba264f0 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala @@ -52,7 +52,7 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { fileIO.tryToWriteAtomic(orphanFile2, "b") // by default, no file deleted - checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0) :: Nil) + checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0, 0) :: Nil) val orphanFile2ModTime = fileIO.getFileStatus(orphanFile2).getModificationTime val older_than1 = DateTimeUtils.formatLocalDateTime( @@ -63,7 +63,7 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { checkAnswer( spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than1')"), - Row(1) :: Nil) + Row(1, 1) :: Nil) val older_than2 = DateTimeUtils.formatLocalDateTime( DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), @@ -71,9 +71,9 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { checkAnswer( spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than2')"), - Row(1) :: Nil) + Row(1, 1) :: Nil) - checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0) :: Nil) + checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0, 0) :: Nil) } test("Paimon procedure: dry run remove orphan files") { @@ -97,7 +97,7 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { fileIO.writeFile(orphanFile2, "b", true) // by default, no file deleted - checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0) :: Nil) + checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0, 0) :: Nil) val older_than = DateTimeUtils.formatLocalDateTime( DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), @@ -106,10 +106,10 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { checkAnswer( spark.sql( s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than', dry_run => true)"), - Row(2) :: Nil + Row(2, 2) :: Nil ) - checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0) :: Nil) + checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0, 0) :: Nil) } test("Paimon procedure: remove database orphan files") { @@ -146,7 +146,7 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { fileIO2.tryToWriteAtomic(orphanFile22, "b") // by default, no file deleted - checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*')"), Row(0) :: Nil) + checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*')"), Row(0, 0) :: Nil) val orphanFile12ModTime = fileIO1.getFileStatus(orphanFile12).getModificationTime val older_than1 = DateTimeUtils.formatLocalDateTime( @@ -157,7 +157,7 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { checkAnswer( spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*', older_than => '$older_than1')"), - Row(2) :: Nil + Row(2, 2) :: Nil ) val older_than2 = DateTimeUtils.formatLocalDateTime( @@ -166,10 +166,10 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { checkAnswer( spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*', older_than => '$older_than2')"), - Row(2) :: Nil + Row(2, 2) :: Nil ) - checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*')"), Row(0) :: Nil) + checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*')"), Row(0, 0) :: Nil) } test("Paimon procedure: remove orphan files with mode") { @@ -193,7 +193,7 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { fileIO.tryToWriteAtomic(orphanFile2, "b") // by default, no file deleted - checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0) :: Nil) + checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0, 0) :: Nil) val orphanFile2ModTime = fileIO.getFileStatus(orphanFile2).getModificationTime val older_than1 = DateTimeUtils.formatLocalDateTime( @@ -205,7 +205,7 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { checkAnswer( spark.sql( s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than1', mode => 'diSTributed')"), - Row(1) :: Nil) + Row(1, 1) :: Nil) val older_than2 = DateTimeUtils.formatLocalDateTime( DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), @@ -214,9 +214,9 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { checkAnswer( spark.sql( s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than2', mode => 'local')"), - Row(1) :: Nil) + Row(1, 1) :: Nil) - checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0) :: Nil) + checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0, 0) :: Nil) } }