From 66d1f25e3cec3f2b6cd34519869cf7f0fbf9800a Mon Sep 17 00:00:00 2001 From: "hongli.wwj" Date: Tue, 19 Nov 2024 08:53:20 +0800 Subject: [PATCH] [core] add deletedFileTotalSizeInBytes in result of OrphanFilesClean --- .../operation/CleanOrphanFilesResult.java | 54 ++++++++++ .../operation/LocalOrphanFilesClean.java | 64 +++++++----- .../paimon/operation/OrphanFilesClean.java | 38 +++++-- .../apache/paimon/utils/SnapshotManager.java | 10 +- .../operation/LocalOrphanFilesCleanTest.java | 18 ++-- .../procedure/RemoveOrphanFilesProcedure.java | 13 ++- .../flink/RemoveOrphanFilesActionITCase.java | 2 +- .../flink/orphan/FlinkOrphanFilesClean.java | 98 +++++++++++++------ .../procedure/RemoveOrphanFilesProcedure.java | 12 ++- .../RemoveOrphanFilesActionITCaseBase.java | 2 +- .../procedure/RemoveOrphanFilesProcedure.java | 18 +++- .../spark/orphan/SparkOrphanFilesClean.scala | 64 +++++++----- .../RemoveOrphanFilesProcedureTest.scala | 30 +++--- 13 files changed, 291 insertions(+), 132 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/operation/CleanOrphanFilesResult.java diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/CleanOrphanFilesResult.java b/paimon-core/src/main/java/org/apache/paimon/operation/CleanOrphanFilesResult.java new file mode 100644 index 000000000000..5a3bc67f9c95 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/CleanOrphanFilesResult.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.fs.Path; + +import java.util.List; + +/** The result of OrphanFilesClean. */ +public class CleanOrphanFilesResult { + + private List deletedFilesPath; + private final long deletedFileCount; + private final long deletedFileTotalLenInBytes; + + public CleanOrphanFilesResult(long deletedFileCount, long deletedFileTotalLenInBytes) { + this.deletedFileCount = deletedFileCount; + this.deletedFileTotalLenInBytes = deletedFileTotalLenInBytes; + } + + public CleanOrphanFilesResult( + List deletedFilesPath, long deletedFileCount, long deletedFileTotalLenInBytes) { + this(deletedFileCount, deletedFileTotalLenInBytes); + this.deletedFilesPath = deletedFilesPath; + } + + public long getDeletedFileCount() { + return deletedFileCount; + } + + public long getDeletedFileTotalLenInBytes() { + return deletedFileTotalLenInBytes; + } + + public List getDeletedFilesPath() { + return deletedFilesPath; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java index a5eea6d650cf..511c5fc7fb79 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java @@ -21,12 +21,12 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SerializableConsumer; import javax.annotation.Nullable; @@ -47,6 +47,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -68,6 +69,8 @@ public class LocalOrphanFilesClean extends OrphanFilesClean { private final List deleteFiles; + private final AtomicLong deletedFilesLenInBytes = new AtomicLong(0); + private Set candidateDeletes; public LocalOrphanFilesClean(FileStoreTable table) { @@ -87,16 +90,18 @@ public LocalOrphanFilesClean( table.coreOptions().deleteFileThreadNum(), "ORPHAN_FILES_CLEAN"); } - public List clean() throws IOException, ExecutionException, InterruptedException { + public CleanOrphanFilesResult clean() + throws IOException, ExecutionException, InterruptedException { List branches = validBranches(); // specially handle to clear snapshot dir - cleanSnapshotDir(branches, deleteFiles::add); + cleanSnapshotDir(branches, deleteFiles::add, deletedFilesLenInBytes::addAndGet); // delete candidate files - Map candidates = getCandidateDeletingFiles(); + Map> candidates = getCandidateDeletingFiles(); if (candidates.isEmpty()) { - return deleteFiles; + return new CleanOrphanFilesResult( + deleteFiles, deleteFiles.size(), deletedFilesLenInBytes.get()); } candidateDeletes = new HashSet<>(candidates.keySet()); @@ -108,12 +113,22 @@ public List clean() throws IOException, ExecutionException, InterruptedExc // delete unused files candidateDeletes.removeAll(usedFiles); - candidateDeletes.stream().map(candidates::get).forEach(fileCleaner); + candidateDeletes.stream() + .map(candidates::get) + .forEach( + deleteFileInfo -> { + deletedFilesLenInBytes.addAndGet(deleteFileInfo.getRight()); + fileCleaner.accept(deleteFileInfo.getLeft()); + }); deleteFiles.addAll( - candidateDeletes.stream().map(candidates::get).collect(Collectors.toList())); + candidateDeletes.stream() + .map(candidates::get) + .map(Pair::getLeft) + .collect(Collectors.toList())); candidateDeletes.clear(); - return deleteFiles; + return new CleanOrphanFilesResult( + deleteFiles, deleteFiles.size(), deletedFilesLenInBytes.get()); } private void collectWithoutDataFile( @@ -172,19 +187,20 @@ private Set getUsedFiles(String branch) { * Get all the candidate deleting files in the specified directories and filter them by * olderThanMillis. */ - private Map getCandidateDeletingFiles() { + private Map> getCandidateDeletingFiles() { List fileDirs = listPaimonFileDirs(); - Function> processor = + Function>> processor = path -> tryBestListingDirs(path).stream() .filter(this::oldEnough) - .map(FileStatus::getPath) + .map(status -> Pair.of(status.getPath(), status.getLen())) .collect(Collectors.toList()); - Iterator allPaths = randomlyExecuteSequentialReturn(executor, processor, fileDirs); - Map result = new HashMap<>(); - while (allPaths.hasNext()) { - Path next = allPaths.next(); - result.put(next.getName(), next); + Iterator> allFilesInfo = + randomlyExecuteSequentialReturn(executor, processor, fileDirs); + Map> result = new HashMap<>(); + while (allFilesInfo.hasNext()) { + Pair fileInfo = allFilesInfo.next(); + result.put(fileInfo.getLeft().getName(), fileInfo); } return result; } @@ -197,7 +213,6 @@ public static List createOrphanFilesCleans( SerializableConsumer fileCleaner, @Nullable Integer parallelism) throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException { - List orphanFilesCleans = new ArrayList<>(); List tableNames = Collections.singletonList(tableName); if (tableName == null || "*".equals(tableName)) { tableNames = catalog.listTables(databaseName); @@ -214,6 +229,7 @@ public static List createOrphanFilesCleans( } }; + List orphanFilesCleans = new ArrayList<>(tableNames.size()); for (String t : tableNames) { Identifier identifier = new Identifier(databaseName, t); Table table = catalog.getTable(identifier).copy(dynamicOptions); @@ -230,7 +246,7 @@ public static List createOrphanFilesCleans( return orphanFilesCleans; } - public static long executeDatabaseOrphanFiles( + public static CleanOrphanFilesResult executeDatabaseOrphanFiles( Catalog catalog, String databaseName, @Nullable String tableName, @@ -249,15 +265,17 @@ public static long executeDatabaseOrphanFiles( ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); - List>> tasks = new ArrayList<>(); + List> tasks = new ArrayList<>(tableCleans.size()); for (LocalOrphanFilesClean clean : tableCleans) { tasks.add(executorService.submit(clean::clean)); } - List cleanOrphanFiles = new ArrayList<>(); - for (Future> task : tasks) { + long deletedFileCount = 0; + long deletedFileTotalLenInBytes = 0; + for (Future task : tasks) { try { - cleanOrphanFiles.addAll(task.get()); + deletedFileCount += task.get().getDeletedFileCount(); + deletedFileTotalLenInBytes += task.get().getDeletedFileTotalLenInBytes(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); @@ -267,6 +285,6 @@ public static long executeDatabaseOrphanFiles( } executorService.shutdownNow(); - return cleanOrphanFiles.size(); + return new CleanOrphanFilesResult(deletedFileCount, deletedFileTotalLenInBytes); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index 5698908cb9b0..b3efd67e98f9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -119,23 +119,47 @@ protected List validBranches() { return branches; } - protected void cleanSnapshotDir(List branches, Consumer deletedFileConsumer) { + protected void cleanSnapshotDir( + List branches, + Consumer deletedFilesConsumer, + Consumer deletedFilesLenInBytesConsumer) { for (String branch : branches) { FileStoreTable branchTable = table.switchToBranch(branch); SnapshotManager snapshotManager = branchTable.snapshotManager(); // specially handle the snapshot directory - List nonSnapshotFiles = snapshotManager.tryGetNonSnapshotFiles(this::oldEnough); - nonSnapshotFiles.forEach(fileCleaner); - nonSnapshotFiles.forEach(deletedFileConsumer); + List> nonSnapshotFiles = + snapshotManager.tryGetNonSnapshotFiles(this::oldEnough); + nonSnapshotFiles.forEach( + nonSnapshotFile -> + cleanFile( + nonSnapshotFile, + deletedFilesConsumer, + deletedFilesLenInBytesConsumer)); // specially handle the changelog directory - List nonChangelogFiles = snapshotManager.tryGetNonChangelogFiles(this::oldEnough); - nonChangelogFiles.forEach(fileCleaner); - nonChangelogFiles.forEach(deletedFileConsumer); + List> nonChangelogFiles = + snapshotManager.tryGetNonChangelogFiles(this::oldEnough); + nonChangelogFiles.forEach( + nonChangelogFile -> + cleanFile( + nonChangelogFile, + deletedFilesConsumer, + deletedFilesLenInBytesConsumer)); } } + private void cleanFile( + Pair deleteFileInfo, + Consumer deletedFilesConsumer, + Consumer deletedFilesLenInBytesConsumer) { + Path filePath = deleteFileInfo.getLeft(); + Long fileSize = deleteFileInfo.getRight(); + deletedFilesConsumer.accept(filePath); + deletedFilesLenInBytesConsumer.accept(fileSize); + fileCleaner.accept(filePath); + } + protected Set safelyGetAllSnapshots(String branch) throws IOException { FileStoreTable branchTable = table.switchToBranch(branch); SnapshotManager snapshotManager = branchTable.snapshotManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index 5902d4c84cf5..ee4c74bc2661 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -539,15 +539,15 @@ private void collectSnapshots(Consumer pathConsumer, List paths) * Try to get non snapshot files. If any error occurred, just ignore it and return an empty * result. */ - public List tryGetNonSnapshotFiles(Predicate fileStatusFilter) { + public List> tryGetNonSnapshotFiles(Predicate fileStatusFilter) { return listPathWithFilter(snapshotDirectory(), fileStatusFilter, nonSnapshotFileFilter()); } - public List tryGetNonChangelogFiles(Predicate fileStatusFilter) { + public List> tryGetNonChangelogFiles(Predicate fileStatusFilter) { return listPathWithFilter(changelogDirectory(), fileStatusFilter, nonChangelogFileFilter()); } - private List listPathWithFilter( + private List> listPathWithFilter( Path directory, Predicate fileStatusFilter, Predicate fileFilter) { try { FileStatus[] statuses = fileIO.listStatus(directory); @@ -557,8 +557,8 @@ private List listPathWithFilter( return Arrays.stream(statuses) .filter(fileStatusFilter) - .map(FileStatus::getPath) - .filter(fileFilter) + .filter(status -> fileFilter.test(status.getPath())) + .map(status -> Pair.of(status.getPath(), status.getLen())) .collect(Collectors.toList()); } catch (IOException ignored) { return Collections.emptyList(); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java index fdc68b34abb4..5139dd44957d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java @@ -165,22 +165,20 @@ public void testNormallyRemoving() throws Throwable { // randomly delete tags List deleteTags = Collections.emptyList(); - if (!allTags.isEmpty()) { - deleteTags = randomlyPick(allTags); - for (String tagName : deleteTags) { - table.deleteTag(tagName); - } + deleteTags = randomlyPick(allTags); + for (String tagName : deleteTags) { + table.deleteTag(tagName); } // first check, nothing will be deleted because the default olderThan interval is 1 day LocalOrphanFilesClean orphanFilesClean = new LocalOrphanFilesClean(table); - assertThat(orphanFilesClean.clean().size()).isEqualTo(0); + assertThat(orphanFilesClean.clean().getDeletedFilesPath().size()).isEqualTo(0); // second check orphanFilesClean = new LocalOrphanFilesClean( table, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(2)); - List deleted = orphanFilesClean.clean(); + List deleted = orphanFilesClean.clean().getDeletedFilesPath(); try { validate(deleted, snapshotData, new HashMap<>()); } catch (Throwable t) { @@ -363,13 +361,13 @@ public void testCleanOrphanFilesWithChangelogDecoupled(String changelogProducer) // first check, nothing will be deleted because the default olderThan interval is 1 day LocalOrphanFilesClean orphanFilesClean = new LocalOrphanFilesClean(table); - assertThat(orphanFilesClean.clean().size()).isEqualTo(0); + assertThat(orphanFilesClean.clean().getDeletedFilesPath().size()).isEqualTo(0); // second check orphanFilesClean = new LocalOrphanFilesClean( table, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(2)); - List deleted = orphanFilesClean.clean(); + List deleted = orphanFilesClean.clean().getDeletedFilesPath(); validate(deleted, snapshotData, changelogData); } @@ -399,7 +397,7 @@ public void testAbnormallyRemoving() throws Exception { LocalOrphanFilesClean orphanFilesClean = new LocalOrphanFilesClean( table, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(2)); - assertThat(orphanFilesClean.clean().size()).isGreaterThan(0); + assertThat(orphanFilesClean.clean().getDeletedFilesPath().size()).isGreaterThan(0); } private void writeData( diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java index 7695c510b1dc..b4a3a6b359d9 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java @@ -20,6 +20,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.orphan.FlinkOrphanFilesClean; +import org.apache.paimon.operation.CleanOrphanFilesResult; import org.apache.paimon.operation.LocalOrphanFilesClean; import org.apache.flink.table.procedure.ProcedureContext; @@ -86,11 +87,12 @@ public String[] call( if (mode == null) { mode = "DISTRIBUTED"; } - long deletedFiles; + + CleanOrphanFilesResult cleanOrphanFilesResult; try { switch (mode.toUpperCase(Locale.ROOT)) { case "DISTRIBUTED": - deletedFiles = + cleanOrphanFilesResult = FlinkOrphanFilesClean.executeDatabaseOrphanFiles( procedureContext.getExecutionEnvironment(), catalog, @@ -101,7 +103,7 @@ public String[] call( tableName); break; case "LOCAL": - deletedFiles = + cleanOrphanFilesResult = LocalOrphanFilesClean.executeDatabaseOrphanFiles( catalog, databaseName, @@ -116,7 +118,10 @@ public String[] call( + mode + ". Only 'DISTRIBUTED' and 'LOCAL' are supported."); } - return new String[] {String.valueOf(deletedFiles)}; + return new String[] { + String.valueOf(cleanOrphanFilesResult.getDeletedFileCount()), + String.valueOf(cleanOrphanFilesResult.getDeletedFileTotalLenInBytes()) + }; } catch (Exception e) { throw new RuntimeException(e); } diff --git a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java index 46b62b6bf307..a168c3785c7c 100644 --- a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java +++ b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java @@ -137,7 +137,7 @@ public void testRunWithoutException() throws Exception { database, tableName); ImmutableList actualDeleteFile = ImmutableList.copyOf(executeSQL(withOlderThan)); - assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2")); + assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2"), Row.of("2")); } @Test diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java index 61bebca24af4..23bbbc9b609c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java @@ -27,12 +27,15 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFile; +import org.apache.paimon.operation.CleanOrphanFilesResult; import org.apache.paimon.operation.OrphanFilesClean; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SerializableConsumer; import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; @@ -61,7 +64,6 @@ import java.util.function.Consumer; import java.util.stream.Collectors; -import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO; import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO; import static org.apache.flink.util.Preconditions.checkState; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -81,7 +83,7 @@ public FlinkOrphanFilesClean( } @Nullable - public DataStream doOrphanClean(StreamExecutionEnvironment env) { + public DataStream doOrphanClean(StreamExecutionEnvironment env) { Configuration flinkConf = new Configuration(); flinkConf.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH); flinkConf.set(ExecutionOptions.SORT_INPUTS, false); @@ -97,8 +99,12 @@ public DataStream doOrphanClean(StreamExecutionEnvironment env) { // snapshot and changelog files are the root of everything, so they are handled specially // here, and subsequently, we will not count their orphan files. - AtomicLong deletedInLocal = new AtomicLong(0); - cleanSnapshotDir(branches, p -> deletedInLocal.incrementAndGet()); + AtomicLong deletedFilesCountInLocal = new AtomicLong(0); + AtomicLong deletedFilesLenInBytesInLocal = new AtomicLong(0); + cleanSnapshotDir( + branches, + path -> deletedFilesCountInLocal.incrementAndGet(), + deletedFilesLenInBytesInLocal::addAndGet); // branch and manifest file final OutputTag> manifestOutputTag = @@ -203,36 +209,45 @@ public void endInput() throws IOException { .map(Path::toUri) .map(Object::toString) .collect(Collectors.toList()); - DataStream candidates = + DataStream> candidates = env.fromCollection(fileDirs) .process( - new ProcessFunction() { + new ProcessFunction>() { @Override public void processElement( String dir, - ProcessFunction.Context ctx, - Collector out) { + ProcessFunction>.Context ctx, + Collector> out) { for (FileStatus fileStatus : tryBestListingDirs(new Path(dir))) { if (oldEnough(fileStatus)) { out.collect( - fileStatus.getPath().toUri().toString()); + Pair.of( + fileStatus + .getPath() + .toUri() + .toString(), + fileStatus.getLen())); } } } }); - DataStream deleted = + DataStream deleted = usedFiles .keyBy(f -> f) - .connect(candidates.keyBy(path -> new Path(path).getName())) + .connect( + candidates.keyBy( + pathAndSize -> new Path(pathAndSize.getKey()).getName())) .transform( "files_join", - LONG_TYPE_INFO, - new BoundedTwoInputOperator() { + TypeInformation.of(CleanOrphanFilesResult.class), + new BoundedTwoInputOperator< + String, Pair, CleanOrphanFilesResult>() { private boolean buildEnd; - private long emitted; + private long emittedFilesCount; + private long emittedFilesLen; private final Set used = new HashSet<>(); @@ -254,8 +269,15 @@ public void endInput(int inputId) { case 2: checkState(buildEnd, "Should build ended."); LOG.info("Finish probe phase."); - LOG.info("Clean files: {}", emitted); - output.collect(new StreamRecord<>(emitted)); + LOG.info( + "Clean files count : {}", + emittedFilesCount); + LOG.info("Clean files size : {}", emittedFilesLen); + output.collect( + new StreamRecord<>( + new CleanOrphanFilesResult( + emittedFilesCount, + emittedFilesLen))); break; } } @@ -266,25 +288,34 @@ public void processElement1(StreamRecord element) { } @Override - public void processElement2(StreamRecord element) { + public void processElement2( + StreamRecord> element) { checkState(buildEnd, "Should build ended."); - String value = element.getValue(); + Pair fileInfo = element.getValue(); + String value = fileInfo.getLeft(); Path path = new Path(value); if (!used.contains(path.getName())) { + emittedFilesCount++; + emittedFilesLen += fileInfo.getRight(); fileCleaner.accept(path); LOG.info("Dry clean: {}", path); - emitted++; } } }); - if (deletedInLocal.get() != 0) { - deleted = deleted.union(env.fromElements(deletedInLocal.get())); + if (deletedFilesCountInLocal.get() != 0 || deletedFilesLenInBytesInLocal.get() != 0) { + deleted = + deleted.union( + env.fromElements( + new CleanOrphanFilesResult( + deletedFilesCountInLocal.get(), + deletedFilesLenInBytesInLocal.get()))); } + return deleted; } - public static long executeDatabaseOrphanFiles( + public static CleanOrphanFilesResult executeDatabaseOrphanFiles( StreamExecutionEnvironment env, Catalog catalog, long olderThanMillis, @@ -293,12 +324,13 @@ public static long executeDatabaseOrphanFiles( String databaseName, @Nullable String tableName) throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException { - List> orphanFilesCleans = new ArrayList<>(); List tableNames = Collections.singletonList(tableName); if (tableName == null || "*".equals(tableName)) { tableNames = catalog.listTables(databaseName); } + List> orphanFilesCleans = + new ArrayList<>(tableNames.size()); for (String t : tableNames) { Identifier identifier = new Identifier(databaseName, t); Table table = catalog.getTable(identifier); @@ -307,7 +339,7 @@ public static long executeDatabaseOrphanFiles( "Only FileStoreTable supports remove-orphan-files action. The table type is '%s'.", table.getClass().getName()); - DataStream clean = + DataStream clean = new FlinkOrphanFilesClean( (FileStoreTable) table, olderThanMillis, @@ -319,8 +351,8 @@ public static long executeDatabaseOrphanFiles( } } - DataStream result = null; - for (DataStream clean : orphanFilesCleans) { + DataStream result = null; + for (DataStream clean : orphanFilesCleans) { if (result == null) { result = clean; } else { @@ -331,20 +363,24 @@ public static long executeDatabaseOrphanFiles( return sum(result); } - private static long sum(DataStream deleted) { - long deleteCount = 0; + private static CleanOrphanFilesResult sum(DataStream deleted) { + long deletedFilesCount = 0; + long deletedFilesLenInBytes = 0; if (deleted != null) { try { - CloseableIterator iterator = + CloseableIterator iterator = deleted.global().executeAndCollect("OrphanFilesClean"); while (iterator.hasNext()) { - deleteCount += iterator.next(); + CleanOrphanFilesResult cleanOrphanFilesResult = iterator.next(); + deletedFilesCount += cleanOrphanFilesResult.getDeletedFileCount(); + deletedFilesLenInBytes += + cleanOrphanFilesResult.getDeletedFileTotalLenInBytes(); } iterator.close(); } catch (Exception e) { throw new RuntimeException(e); } } - return deleteCount; + return new CleanOrphanFilesResult(deletedFilesCount, deletedFilesLenInBytes); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java index 10ad878e0ccb..4cd1b3e00303 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java @@ -20,6 +20,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.orphan.FlinkOrphanFilesClean; +import org.apache.paimon.operation.CleanOrphanFilesResult; import org.apache.paimon.operation.LocalOrphanFilesClean; import org.apache.flink.table.annotation.ArgumentHint; @@ -75,11 +76,11 @@ public String[] call( if (mode == null) { mode = "DISTRIBUTED"; } - long deletedFiles; + CleanOrphanFilesResult cleanOrphanFilesResult; try { switch (mode.toUpperCase(Locale.ROOT)) { case "DISTRIBUTED": - deletedFiles = + cleanOrphanFilesResult = FlinkOrphanFilesClean.executeDatabaseOrphanFiles( procedureContext.getExecutionEnvironment(), catalog, @@ -90,7 +91,7 @@ public String[] call( tableName); break; case "LOCAL": - deletedFiles = + cleanOrphanFilesResult = LocalOrphanFilesClean.executeDatabaseOrphanFiles( catalog, databaseName, @@ -105,7 +106,10 @@ public String[] call( + mode + ". Only 'DISTRIBUTED' and 'LOCAL' are supported."); } - return new String[] {String.valueOf(deletedFiles)}; + return new String[] { + String.valueOf(cleanOrphanFilesResult.getDeletedFileCount()), + String.valueOf(cleanOrphanFilesResult.getDeletedFileTotalLenInBytes()) + }; } catch (Exception e) { throw new RuntimeException(e); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java index 5f874a5a7f9b..77f3be2f0c76 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java @@ -148,7 +148,7 @@ public void testRunWithoutException(boolean isNamedArgument) throws Exception { tableName); ImmutableList actualDeleteFile = ImmutableList.copyOf(executeSQL(withOlderThan)); - assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2")); + assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2"), Row.of("2")); } @ParameterizedTest diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java index 293e84ca14bd..a929641106c6 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java @@ -19,6 +19,7 @@ package org.apache.paimon.spark.procedure; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.operation.CleanOrphanFilesResult; import org.apache.paimon.operation.LocalOrphanFilesClean; import org.apache.paimon.operation.OrphanFilesClean; import org.apache.paimon.spark.catalog.WithPaimonCatalog; @@ -66,7 +67,9 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure { private static final StructType OUTPUT_TYPE = new StructType( new StructField[] { - new StructField("result", LongType, true, Metadata.empty()) + new StructField("deletedFileCount", LongType, true, Metadata.empty()), + new StructField( + "deletedFileTotalLenInBytes", LongType, true, Metadata.empty()) }); private RemoveOrphanFilesProcedure(TableCatalog tableCatalog) { @@ -104,11 +107,11 @@ public InternalRow[] call(InternalRow args) { Catalog catalog = ((WithPaimonCatalog) tableCatalog()).paimonCatalog(); String mode = args.isNullAt(4) ? "DISTRIBUTED" : args.getString(4); - long deletedFiles; + CleanOrphanFilesResult cleanOrphanFilesResult; try { switch (mode.toUpperCase(Locale.ROOT)) { case "LOCAL": - deletedFiles = + cleanOrphanFilesResult = LocalOrphanFilesClean.executeDatabaseOrphanFiles( catalog, identifier.getDatabaseName(), @@ -120,7 +123,7 @@ public InternalRow[] call(InternalRow args) { args.isNullAt(3) ? null : args.getInt(3)); break; case "DISTRIBUTED": - deletedFiles = + cleanOrphanFilesResult = SparkOrphanFilesClean.executeDatabaseOrphanFiles( catalog, identifier.getDatabaseName(), @@ -137,7 +140,12 @@ public InternalRow[] call(InternalRow args) { + mode + ". Only 'DISTRIBUTED' and 'LOCAL' are supported."); } - return new InternalRow[] {newInternalRow(deletedFiles)}; + + return new InternalRow[] { + newInternalRow( + cleanOrphanFilesResult.getDeletedFileCount(), + cleanOrphanFilesResult.getDeletedFileTotalLenInBytes()) + }; } catch (Exception e) { throw new RuntimeException(e); } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala index 488d70e34935..fca0493ede28 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala @@ -22,15 +22,14 @@ import org.apache.paimon.{utils, Snapshot} import org.apache.paimon.catalog.{Catalog, Identifier} import org.apache.paimon.fs.Path import org.apache.paimon.manifest.{ManifestEntry, ManifestFile} -import org.apache.paimon.operation.OrphanFilesClean +import org.apache.paimon.operation.{CleanOrphanFilesResult, OrphanFilesClean} import org.apache.paimon.operation.OrphanFilesClean.retryReadingFiles import org.apache.paimon.table.FileStoreTable import org.apache.paimon.utils.SerializableConsumer import org.apache.spark.internal.Logging -import org.apache.spark.sql.{Dataset, SparkSession} +import org.apache.spark.sql.{functions, Dataset, SparkSession} import org.apache.spark.sql.catalyst.SQLConfHelper -import org.apache.spark.sql.functions.sum import java.util import java.util.Collections @@ -50,14 +49,18 @@ case class SparkOrphanFilesClean( with SQLConfHelper with Logging { - def doOrphanClean(): (Dataset[Long], Dataset[BranchAndManifestFile]) = { + def doOrphanClean(): (Dataset[(Long, Long)], Dataset[BranchAndManifestFile]) = { import spark.implicits._ val branches = validBranches() - val deletedInLocal = new AtomicLong(0) + val deletedFilesCountInLocal = new AtomicLong(0) + val deletedFilesLenInBytesInLocal = new AtomicLong(0) // snapshot and changelog files are the root of everything, so they are handled specially // here, and subsequently, we will not count their orphan files. - cleanSnapshotDir(branches, (_: Path) => deletedInLocal.incrementAndGet) + cleanSnapshotDir( + branches, + (_: Path) => deletedFilesCountInLocal.incrementAndGet, + size => deletedFilesLenInBytesInLocal.addAndGet(size)) val maxBranchParallelism = Math.min(branches.size(), parallelism) // find snapshots using branch and find manifests(manifest, index, statistics) using snapshot @@ -121,10 +124,10 @@ case class SparkOrphanFilesClean( .flatMap { dir => tryBestListingDirs(new Path(dir)).asScala.filter(oldEnough).map { - file => (file.getPath.getName, file.getPath.toUri.toString) + file => (file.getPath.getName, file.getPath.toUri.toString, file.getLen) } } - .toDF("name", "path") + .toDF("name", "path", "len") .repartition(parallelism) // use left anti to filter files which is not used @@ -132,21 +135,30 @@ case class SparkOrphanFilesClean( .join(usedFiles, $"name" === $"used_name", "left_anti") .mapPartitions { it => - var deleted = 0L + var deletedFilesCount = 0L + var deletedFilesLenInBytes = 0L + while (it.hasNext) { - val pathToClean = it.next().getString(1) - specifiedFileCleaner.accept(new Path(pathToClean)) + val fileInfo = it.next(); + val pathToClean = fileInfo.getString(1) + val deletedPath = new Path(pathToClean) + deletedFilesLenInBytes += fileInfo.getLong(2) + specifiedFileCleaner.accept(deletedPath) logInfo(s"Cleaned file: $pathToClean") - deleted += 1 + deletedFilesCount += 1 } - logInfo(s"Total cleaned files: $deleted"); - Iterator.single(deleted) + logInfo( + s"Total cleaned files: $deletedFilesCount, Total cleaned files len : $deletedFilesLenInBytes") + Iterator.single((deletedFilesCount, deletedFilesLenInBytes)) + } + val finalDeletedDataset = + if (deletedFilesCountInLocal.get() != 0 || deletedFilesLenInBytesInLocal.get() != 0) { + deleted.union( + spark.createDataset( + Seq((deletedFilesCountInLocal.get(), deletedFilesLenInBytesInLocal.get())))) + } else { + deleted } - val finalDeletedDataset = if (deletedInLocal.get() != 0) { - deleted.union(spark.createDataset(Seq(deletedInLocal.get()))) - } else { - deleted - } (finalDeletedDataset, usedManifestFiles) } @@ -169,7 +181,7 @@ object SparkOrphanFilesClean extends SQLConfHelper { tableName: String, olderThanMillis: Long, fileCleaner: SerializableConsumer[Path], - parallelismOpt: Integer): Long = { + parallelismOpt: Integer): CleanOrphanFilesResult = { val spark = SparkSession.active val parallelism = if (parallelismOpt == null) { Math.max(spark.sparkContext.defaultParallelism, conf.numShufflePartitions) @@ -192,7 +204,7 @@ object SparkOrphanFilesClean extends SQLConfHelper { table.asInstanceOf[FileStoreTable] } if (tables.isEmpty) { - return 0 + return new CleanOrphanFilesResult(0, 0) } val (deleted, waitToRelease) = tables.map { table => @@ -207,15 +219,15 @@ object SparkOrphanFilesClean extends SQLConfHelper { try { val result = deleted .reduce((l, r) => l.union(r)) - .toDF("deleted") - .agg(sum("deleted")) + .toDF("deletedFilesCount", "deletedFilesLenInBytes") + .agg(functions.sum("deletedFilesCount"), functions.sum("deletedFilesLenInBytes")) .head() - assert(result.schema.size == 1, result.schema) + assert(result.schema.size == 2, result.schema) if (result.isNullAt(0)) { // no files can be deleted - 0 + new CleanOrphanFilesResult(0, 0) } else { - result.getLong(0) + new CleanOrphanFilesResult(result.getLong(0), result.getLong(1)) } } finally { waitToRelease.foreach(_.unpersist()) diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala index d9d73811266d..3ffe7fba264f 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala @@ -52,7 +52,7 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { fileIO.tryToWriteAtomic(orphanFile2, "b") // by default, no file deleted - checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0) :: Nil) + checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0, 0) :: Nil) val orphanFile2ModTime = fileIO.getFileStatus(orphanFile2).getModificationTime val older_than1 = DateTimeUtils.formatLocalDateTime( @@ -63,7 +63,7 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { checkAnswer( spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than1')"), - Row(1) :: Nil) + Row(1, 1) :: Nil) val older_than2 = DateTimeUtils.formatLocalDateTime( DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), @@ -71,9 +71,9 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { checkAnswer( spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than2')"), - Row(1) :: Nil) + Row(1, 1) :: Nil) - checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0) :: Nil) + checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0, 0) :: Nil) } test("Paimon procedure: dry run remove orphan files") { @@ -97,7 +97,7 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { fileIO.writeFile(orphanFile2, "b", true) // by default, no file deleted - checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0) :: Nil) + checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0, 0) :: Nil) val older_than = DateTimeUtils.formatLocalDateTime( DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), @@ -106,10 +106,10 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { checkAnswer( spark.sql( s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than', dry_run => true)"), - Row(2) :: Nil + Row(2, 2) :: Nil ) - checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0) :: Nil) + checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0, 0) :: Nil) } test("Paimon procedure: remove database orphan files") { @@ -146,7 +146,7 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { fileIO2.tryToWriteAtomic(orphanFile22, "b") // by default, no file deleted - checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*')"), Row(0) :: Nil) + checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*')"), Row(0, 0) :: Nil) val orphanFile12ModTime = fileIO1.getFileStatus(orphanFile12).getModificationTime val older_than1 = DateTimeUtils.formatLocalDateTime( @@ -157,7 +157,7 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { checkAnswer( spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*', older_than => '$older_than1')"), - Row(2) :: Nil + Row(2, 2) :: Nil ) val older_than2 = DateTimeUtils.formatLocalDateTime( @@ -166,10 +166,10 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { checkAnswer( spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*', older_than => '$older_than2')"), - Row(2) :: Nil + Row(2, 2) :: Nil ) - checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*')"), Row(0) :: Nil) + checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*')"), Row(0, 0) :: Nil) } test("Paimon procedure: remove orphan files with mode") { @@ -193,7 +193,7 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { fileIO.tryToWriteAtomic(orphanFile2, "b") // by default, no file deleted - checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0) :: Nil) + checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0, 0) :: Nil) val orphanFile2ModTime = fileIO.getFileStatus(orphanFile2).getModificationTime val older_than1 = DateTimeUtils.formatLocalDateTime( @@ -205,7 +205,7 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { checkAnswer( spark.sql( s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than1', mode => 'diSTributed')"), - Row(1) :: Nil) + Row(1, 1) :: Nil) val older_than2 = DateTimeUtils.formatLocalDateTime( DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), @@ -214,9 +214,9 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { checkAnswer( spark.sql( s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than2', mode => 'local')"), - Row(1) :: Nil) + Row(1, 1) :: Nil) - checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0) :: Nil) + checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0, 0) :: Nil) } }