Skip to content

Commit

Permalink
[core] add deletedFileTotalSizeInBytes in result of OrphanFilesClean
Browse files Browse the repository at this point in the history
  • Loading branch information
hongli.wwj committed Nov 21, 2024
1 parent d381e12 commit 0361a11
Show file tree
Hide file tree
Showing 11 changed files with 245 additions and 100 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.operation;

import org.apache.paimon.fs.Path;

import java.util.List;

/** The result of OrphanFilesClean. */
public class CleanOrphanFilesResult {

private List<Path> deletedFilesPath;
private final long deletedFileCount;
private final long deletedFileTotalSizeInBytes;

public CleanOrphanFilesResult(long deletedFileCount, long deletedFileTotalSizeInBytes) {
this.deletedFileCount = deletedFileCount;
this.deletedFileTotalSizeInBytes = deletedFileTotalSizeInBytes;
}

public CleanOrphanFilesResult(
List<Path> deletedFilesPath, long deletedFileCount, long deletedFileTotalSizeInBytes) {
this(deletedFileCount, deletedFileTotalSizeInBytes);
this.deletedFilesPath = deletedFilesPath;
}

public long getDeletedFileCount() {
return deletedFileCount;
}

public long getDeletedFileTotalSizeInBytes() {
return deletedFileTotalSizeInBytes;
}

public List<Path> getDeletedFilesPath() {
return deletedFilesPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -68,6 +69,8 @@ public class LocalOrphanFilesClean extends OrphanFilesClean {

private final List<Path> deleteFiles;

private final AtomicLong deletedFilesSizeInBytes = new AtomicLong(0);

private Set<String> candidateDeletes;

public LocalOrphanFilesClean(FileStoreTable table) {
Expand All @@ -87,16 +90,27 @@ public LocalOrphanFilesClean(
table.coreOptions().deleteFileThreadNum(), "ORPHAN_FILES_CLEAN");
}

public List<Path> clean() throws IOException, ExecutionException, InterruptedException {
public CleanOrphanFilesResult clean()
throws IOException, ExecutionException, InterruptedException {
List<String> branches = validBranches();

// specially handle to clear snapshot dir
cleanSnapshotDir(branches, deleteFiles::add);
cleanSnapshotDir(
branches,
deleteFiles::add,
p -> {
try {
deletedFilesSizeInBytes.addAndGet(fileIO.getFileSize(p));
} catch (IOException e) {
throw new RuntimeException(e);
}
});

// delete candidate files
Map<String, Path> candidates = getCandidateDeletingFiles();
if (candidates.isEmpty()) {
return deleteFiles;
return new CleanOrphanFilesResult(
deleteFiles, deleteFiles.size(), deletedFilesSizeInBytes.get());
}
candidateDeletes = new HashSet<>(candidates.keySet());

Expand All @@ -108,12 +122,24 @@ public List<Path> clean() throws IOException, ExecutionException, InterruptedExc

// delete unused files
candidateDeletes.removeAll(usedFiles);
candidateDeletes.stream().map(candidates::get).forEach(fileCleaner);
candidateDeletes.stream()
.map(candidates::get)
.forEach(
deleteFilePath -> {
try {
deletedFilesSizeInBytes.addAndGet(
fileIO.getFileSize(deleteFilePath));
} catch (IOException e) {
throw new RuntimeException(e);
}
fileCleaner.accept(deleteFilePath);
});
deleteFiles.addAll(
candidateDeletes.stream().map(candidates::get).collect(Collectors.toList()));
candidateDeletes.clear();

return deleteFiles;
return new CleanOrphanFilesResult(
deleteFiles, deleteFiles.size(), deletedFilesSizeInBytes.get());
}

private void collectWithoutDataFile(
Expand Down Expand Up @@ -230,7 +256,7 @@ public static List<LocalOrphanFilesClean> createOrphanFilesCleans(
return orphanFilesCleans;
}

public static long executeDatabaseOrphanFiles(
public static CleanOrphanFilesResult executeDatabaseOrphanFiles(
Catalog catalog,
String databaseName,
@Nullable String tableName,
Expand All @@ -249,15 +275,17 @@ public static long executeDatabaseOrphanFiles(

ExecutorService executorService =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
List<Future<List<Path>>> tasks = new ArrayList<>();
List<Future<CleanOrphanFilesResult>> tasks = new ArrayList<>();
for (LocalOrphanFilesClean clean : tableCleans) {
tasks.add(executorService.submit(clean::clean));
}

List<Path> cleanOrphanFiles = new ArrayList<>();
for (Future<List<Path>> task : tasks) {
long deletedFileCount = 0;
long deletedFileTotalSizeInBytes = 0;
for (Future<CleanOrphanFilesResult> task : tasks) {
try {
cleanOrphanFiles.addAll(task.get());
deletedFileCount += task.get().getDeletedFileCount();
deletedFileTotalSizeInBytes += task.get().getDeletedFileTotalSizeInBytes();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
Expand All @@ -267,6 +295,6 @@ public static long executeDatabaseOrphanFiles(
}

executorService.shutdownNow();
return cleanOrphanFiles.size();
return new CleanOrphanFilesResult(deletedFileCount, deletedFileTotalSizeInBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,20 +119,25 @@ protected List<String> validBranches() {
return branches;
}

protected void cleanSnapshotDir(List<String> branches, Consumer<Path> deletedFileConsumer) {
protected void cleanSnapshotDir(
List<String> branches,
Consumer<Path> deletedFilesConsumer,
Consumer<Path> deletedFilesSizeInBytesConsumer) {
for (String branch : branches) {
FileStoreTable branchTable = table.switchToBranch(branch);
SnapshotManager snapshotManager = branchTable.snapshotManager();

// specially handle the snapshot directory
List<Path> nonSnapshotFiles = snapshotManager.tryGetNonSnapshotFiles(this::oldEnough);
nonSnapshotFiles.forEach(deletedFilesConsumer);
nonSnapshotFiles.forEach(deletedFilesSizeInBytesConsumer);
nonSnapshotFiles.forEach(fileCleaner);
nonSnapshotFiles.forEach(deletedFileConsumer);

// specially handle the changelog directory
List<Path> nonChangelogFiles = snapshotManager.tryGetNonChangelogFiles(this::oldEnough);
nonChangelogFiles.forEach(deletedFilesConsumer);
nonChangelogFiles.forEach(deletedFilesSizeInBytesConsumer);
nonChangelogFiles.forEach(fileCleaner);
nonChangelogFiles.forEach(deletedFileConsumer);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,22 +165,20 @@ public void testNormallyRemoving() throws Throwable {

// randomly delete tags
List<String> deleteTags = Collections.emptyList();
if (!allTags.isEmpty()) {
deleteTags = randomlyPick(allTags);
for (String tagName : deleteTags) {
table.deleteTag(tagName);
}
deleteTags = randomlyPick(allTags);
for (String tagName : deleteTags) {
table.deleteTag(tagName);
}

// first check, nothing will be deleted because the default olderThan interval is 1 day
LocalOrphanFilesClean orphanFilesClean = new LocalOrphanFilesClean(table);
assertThat(orphanFilesClean.clean().size()).isEqualTo(0);
assertThat(orphanFilesClean.clean().getDeletedFilesPath().size()).isEqualTo(0);

// second check
orphanFilesClean =
new LocalOrphanFilesClean(
table, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(2));
List<Path> deleted = orphanFilesClean.clean();
List<Path> deleted = orphanFilesClean.clean().getDeletedFilesPath();
try {
validate(deleted, snapshotData, new HashMap<>());
} catch (Throwable t) {
Expand Down Expand Up @@ -363,13 +361,13 @@ public void testCleanOrphanFilesWithChangelogDecoupled(String changelogProducer)

// first check, nothing will be deleted because the default olderThan interval is 1 day
LocalOrphanFilesClean orphanFilesClean = new LocalOrphanFilesClean(table);
assertThat(orphanFilesClean.clean().size()).isEqualTo(0);
assertThat(orphanFilesClean.clean().getDeletedFilesPath().size()).isEqualTo(0);

// second check
orphanFilesClean =
new LocalOrphanFilesClean(
table, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(2));
List<Path> deleted = orphanFilesClean.clean();
List<Path> deleted = orphanFilesClean.clean().getDeletedFilesPath();
validate(deleted, snapshotData, changelogData);
}

Expand Down Expand Up @@ -399,7 +397,7 @@ public void testAbnormallyRemoving() throws Exception {
LocalOrphanFilesClean orphanFilesClean =
new LocalOrphanFilesClean(
table, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(2));
assertThat(orphanFilesClean.clean().size()).isGreaterThan(0);
assertThat(orphanFilesClean.clean().getDeletedFilesPath().size()).isGreaterThan(0);
}

private void writeData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.orphan.FlinkOrphanFilesClean;
import org.apache.paimon.operation.CleanOrphanFilesResult;
import org.apache.paimon.operation.LocalOrphanFilesClean;

import org.apache.flink.table.procedure.ProcedureContext;
Expand Down Expand Up @@ -86,11 +87,12 @@ public String[] call(
if (mode == null) {
mode = "DISTRIBUTED";
}
long deletedFiles;

CleanOrphanFilesResult cleanOrphanFilesResult;
try {
switch (mode.toUpperCase(Locale.ROOT)) {
case "DISTRIBUTED":
deletedFiles =
cleanOrphanFilesResult =
FlinkOrphanFilesClean.executeDatabaseOrphanFiles(
procedureContext.getExecutionEnvironment(),
catalog,
Expand All @@ -101,7 +103,7 @@ public String[] call(
tableName);
break;
case "LOCAL":
deletedFiles =
cleanOrphanFilesResult =
LocalOrphanFilesClean.executeDatabaseOrphanFiles(
catalog,
databaseName,
Expand All @@ -116,7 +118,10 @@ public String[] call(
+ mode
+ ". Only 'DISTRIBUTED' and 'LOCAL' are supported.");
}
return new String[] {String.valueOf(deletedFiles)};
return new String[] {
String.valueOf(cleanOrphanFilesResult.getDeletedFileCount()),
String.valueOf(cleanOrphanFilesResult.getDeletedFileTotalSizeInBytes())
};
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Loading

0 comments on commit 0361a11

Please sign in to comment.