Skip to content

Commit

Permalink
[spark] Implement distributed orphan file clean for spark (#4207)
Browse files Browse the repository at this point in the history
  • Loading branch information
ulysses-you authored Sep 19, 2024
1 parent db321c2 commit 12d6db5
Show file tree
Hide file tree
Showing 6 changed files with 295 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecute;

/** Local {@link OrphanFilesClean}, it will use thread pool to execute deletion. */
/**
* Local {@link OrphanFilesClean}, it will use thread pool to execute deletion.
*
* <p>Note that, this class is not used any more since each engine should implement its own
* distributed one. See `FlinkOrphanFilesClean` and `SparkOrphanFilesClean`.
*/
public class LocalOrphanFilesClean extends OrphanFilesClean {

private final ThreadPoolExecutor executor;
Expand Down Expand Up @@ -109,8 +114,7 @@ private List<String> getUsedFiles(String branch) {
table.switchToBranch(branch).store().manifestFileFactory().create();
try {
List<String> manifests = new ArrayList<>();
collectWithoutDataFile(
branch, usedFiles::add, manifest -> manifests.add(manifest.fileName()));
collectWithoutDataFile(branch, usedFiles::add, manifests::add);
usedFiles.addAll(retryReadingDataFiles(manifestFile, manifests));
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SerializableConsumer;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
Expand Down Expand Up @@ -135,16 +136,6 @@ protected void cleanSnapshotDir(List<String> branches, Consumer<Path> deletedFil
}
}

protected void collectWithoutDataFile(
String branch,
Consumer<String> usedFileConsumer,
Consumer<ManifestFileMeta> manifestConsumer)
throws IOException {
for (Snapshot snapshot : safelyGetAllSnapshots(branch)) {
collectWithoutDataFile(branch, snapshot, usedFileConsumer, manifestConsumer);
}
}

protected Set<Snapshot> safelyGetAllSnapshots(String branch) throws IOException {
FileStoreTable branchTable = table.switchToBranch(branch);
SnapshotManager snapshotManager = branchTable.snapshotManager();
Expand All @@ -155,19 +146,42 @@ protected Set<Snapshot> safelyGetAllSnapshots(String branch) throws IOException
return readSnapshots;
}

protected void collectWithoutDataFile(
String branch, Consumer<String> usedFileConsumer, Consumer<String> manifestConsumer)
throws IOException {
for (Snapshot snapshot : safelyGetAllSnapshots(branch)) {
collectWithoutDataFile(branch, snapshot, usedFileConsumer, manifestConsumer);
}
}

protected void collectWithoutDataFile(
String branch,
Snapshot snapshot,
Consumer<String> usedFileConsumer,
Consumer<ManifestFileMeta> manifestConsumer)
Consumer<String> manifestConsumer)
throws IOException {
Consumer<Pair<String, Boolean>> usedFileWithFlagConsumer =
fileAndFlag -> {
if (fileAndFlag.getRight()) {
manifestConsumer.accept(fileAndFlag.getLeft());
}
usedFileConsumer.accept(fileAndFlag.getLeft());
};
collectWithoutDataFileWithManifestFlag(branch, snapshot, usedFileWithFlagConsumer);
}

protected void collectWithoutDataFileWithManifestFlag(
String branch,
Snapshot snapshot,
Consumer<Pair<String, Boolean>> usedFileWithFlagConsumer)
throws IOException {
FileStoreTable branchTable = table.switchToBranch(branch);
ManifestList manifestList = branchTable.store().manifestListFactory().create();
IndexFileHandler indexFileHandler = branchTable.store().newIndexFileHandler();
List<ManifestFileMeta> manifestFileMetas = new ArrayList<>();
// changelog manifest
if (snapshot.changelogManifestList() != null) {
usedFileConsumer.accept(snapshot.changelogManifestList());
usedFileWithFlagConsumer.accept(Pair.of(snapshot.changelogManifestList(), false));
manifestFileMetas.addAll(
retryReadingFiles(
() ->
Expand All @@ -178,42 +192,41 @@ protected void collectWithoutDataFile(

// delta manifest
if (snapshot.deltaManifestList() != null) {
usedFileConsumer.accept(snapshot.deltaManifestList());
usedFileWithFlagConsumer.accept(Pair.of(snapshot.deltaManifestList(), false));
manifestFileMetas.addAll(
retryReadingFiles(
() -> manifestList.readWithIOException(snapshot.deltaManifestList()),
emptyList()));
}

// base manifest
usedFileConsumer.accept(snapshot.baseManifestList());
usedFileWithFlagConsumer.accept(Pair.of(snapshot.baseManifestList(), false));
manifestFileMetas.addAll(
retryReadingFiles(
() -> manifestList.readWithIOException(snapshot.baseManifestList()),
emptyList()));

// collect manifests
for (ManifestFileMeta manifest : manifestFileMetas) {
manifestConsumer.accept(manifest);
usedFileConsumer.accept(manifest.fileName());
usedFileWithFlagConsumer.accept(Pair.of(manifest.fileName(), true));
}

// index files
String indexManifest = snapshot.indexManifest();
if (indexManifest != null && indexFileHandler.existsManifest(indexManifest)) {
usedFileConsumer.accept(indexManifest);
usedFileWithFlagConsumer.accept(Pair.of(indexManifest, false));
retryReadingFiles(
() -> indexFileHandler.readManifestWithIOException(indexManifest),
Collections.<IndexManifestEntry>emptyList())
.stream()
.map(IndexManifestEntry::indexFile)
.map(IndexFileMeta::fileName)
.forEach(usedFileConsumer);
.forEach(name -> usedFileWithFlagConsumer.accept(Pair.of(name, false)));
}

// statistic file
if (snapshot.statistics() != null) {
usedFileConsumer.accept(snapshot.statistics());
usedFileWithFlagConsumer.accept(Pair.of(snapshot.statistics(), false));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.operation.OrphanFilesClean;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
Expand Down Expand Up @@ -134,11 +133,10 @@ public void processElement(
throws Exception {
String branch = branchAndSnapshot.f0;
Snapshot snapshot = Snapshot.fromJson(branchAndSnapshot.f1);
Consumer<ManifestFileMeta> manifestConsumer =
Consumer<String> manifestConsumer =
manifest -> {
Tuple2<String, String> tuple2 =
new Tuple2<>(
branch, manifest.fileName());
new Tuple2<>(branch, manifest);
ctx.output(manifestOutputTag, tuple2);
};
collectWithoutDataFile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,22 @@
package org.apache.paimon.spark.procedure;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.operation.LocalOrphanFilesClean;
import org.apache.paimon.operation.OrphanFilesClean;
import org.apache.paimon.spark.catalog.WithPaimonCatalog;
import org.apache.paimon.spark.orphan.SparkOrphanFilesClean;
import org.apache.paimon.utils.Preconditions;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static org.apache.paimon.operation.LocalOrphanFilesClean.executeOrphanFilesClean;
import static org.apache.spark.sql.types.DataTypes.BooleanType;
import static org.apache.spark.sql.types.DataTypes.IntegerType;
import static org.apache.spark.sql.types.DataTypes.LongType;
import static org.apache.spark.sql.types.DataTypes.StringType;

/**
Expand Down Expand Up @@ -67,7 +62,7 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure {
private static final StructType OUTPUT_TYPE =
new StructType(
new StructField[] {
new StructField("result", StringType, true, Metadata.empty())
new StructField("result", LongType, true, Metadata.empty())
});

private RemoveOrphanFilesProcedure(TableCatalog tableCatalog) {
Expand Down Expand Up @@ -102,29 +97,19 @@ public InternalRow[] call(InternalRow args) {
}
LOG.info("identifier is {}.", identifier);

List<LocalOrphanFilesClean> tableCleans;
try {
Catalog catalog = ((WithPaimonCatalog) tableCatalog()).paimonCatalog();
tableCleans =
LocalOrphanFilesClean.createOrphanFilesCleans(
catalog,
identifier.getDatabaseName(),
identifier.getObjectName(),
OrphanFilesClean.olderThanMillis(
args.isNullAt(1) ? null : args.getString(1)),
OrphanFilesClean.createFileCleaner(
catalog, !args.isNullAt(2) && args.getBoolean(2)),
args.isNullAt(3) ? null : args.getInt(3));
} catch (Exception e) {
throw new RuntimeException(e);
}

String[] result = executeOrphanFilesClean(tableCleans);
List<InternalRow> rows = new ArrayList<>();
Arrays.stream(result)
.forEach(line -> rows.add(newInternalRow(UTF8String.fromString(line))));

return rows.toArray(new InternalRow[0]);
Catalog catalog = ((WithPaimonCatalog) tableCatalog()).paimonCatalog();
long deletedFiles =
SparkOrphanFilesClean.executeDatabaseOrphanFiles(
catalog,
identifier.getDatabaseName(),
identifier.getTableName(),
OrphanFilesClean.olderThanMillis(
args.isNullAt(1) ? null : args.getString(1)),
OrphanFilesClean.createFileCleaner(
catalog, !args.isNullAt(2) && args.getBoolean(2)),
args.isNullAt(3) ? null : args.getInt(3));

return new InternalRow[] {newInternalRow(deletedFiles)};
}

public static ProcedureBuilder builder() {
Expand Down
Loading

0 comments on commit 12d6db5

Please sign in to comment.