Skip to content

Commit

Permalink
[spark][flink] Introduce orphan file cleaning local and distributed m…
Browse files Browse the repository at this point in the history
…ode (#4285)
  • Loading branch information
bknbkn authored Oct 8, 2024
1 parent 64f8762 commit 4f0a475
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 36 deletions.
9 changes: 6 additions & 3 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,13 @@ All available procedures are listed below.
<td>remove_orphan_files</td>
<td>
-- Use named argument<br/>
CALL [catalog.]sys.remove_orphan_files(`table` => 'identifier', older_than => 'olderThan', dry_run => 'dryRun') <br/><br/>
CALL [catalog.]sys.remove_orphan_files(`table` => 'identifier', older_than => 'olderThan', dry_run => 'dryRun', mode => 'mode') <br/><br/>
-- Use indexed argument<br/>
CALL [catalog.]sys.remove_orphan_files('identifier')<br/><br/>
CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan')<br/><br/>
CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan', 'dryRun')<br/><br/>
CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan', 'dryRun','parallelism')
CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan', 'dryRun','parallelism')<br/><br/>
CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan', 'dryRun','parallelism','mode')
</td>
<td>
To remove the orphan data files and metadata files. Arguments:
Expand All @@ -283,11 +284,13 @@ All available procedures are listed below.
</li>
<li>dryRun: when true, view only orphan files, don't actually remove files. Default is false.</li>
<li>parallelism: The maximum number of concurrent deleting files. By default is the number of processors available to the Java virtual machine.</li>
<li>mode: The mode of remove orphan clean procedure (local or distributed) . By default is distributed.</li>
</td>
<td>CALL remove_orphan_files(`table` => 'default.T', older_than => '2023-10-31 12:00:00')<br/><br/>
CALL remove_orphan_files(`table` => 'default.*', older_than => '2023-10-31 12:00:00')<br/><br/>
CALL remove_orphan_files(`table` => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true)<br/><br/>
CALL remove_orphan_files(`table` => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => false, parallelism => '5')
CALL remove_orphan_files(`table` => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => false, parallelism => '5')<br/><br/>
CALL remove_orphan_files(`table` => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => false, parallelism => '5', mode => 'local')
</td>
</tr>
<tr>
Expand Down
4 changes: 3 additions & 1 deletion docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,14 @@ This section introduce all available spark procedures about paimon.
<li>older_than: to avoid deleting newly written files, this procedure only deletes orphan files older than 1 day by default. This argument can modify the interval.</li>
<li>dry_run: when true, view only orphan files, don't actually remove files. Default is false.</li>
<li>parallelism: The maximum number of concurrent deleting files. By default is the number of processors available to the Java virtual machine.</li>
<li>mode: The mode of remove orphan clean procedure (local or distributed) . By default is distributed.</li>
</td>
<td>
CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00')<br/><br/>
CALL sys.remove_orphan_files(table => 'default.*', older_than => '2023-10-31 12:00:00')<br/><br/>
CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true)<br/><br/>
CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true, parallelism => '5')
CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true, parallelism => '5')<br/><br/>
CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true, parallelism => '5', mode => 'local')
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,13 @@
/**
* Local {@link OrphanFilesClean}, it will use thread pool to execute deletion.
*
* <p>Note that, this class is not used any more since each engine should implement its own
* distributed one. See `FlinkOrphanFilesClean` and `SparkOrphanFilesClean`.
* <p>Note that, this class will be used when the orphan clean mode is local, else orphan clean will
* use distributed one. See `FlinkOrphanFilesClean` and `SparkOrphanFilesClean`.
*/
public class LocalOrphanFilesClean extends OrphanFilesClean {

private final ThreadPoolExecutor executor;

private static final int SHOW_LIMIT = 200;

private final List<Path> deleteFiles;

public LocalOrphanFilesClean(FileStoreTable table) {
Expand Down Expand Up @@ -220,7 +218,23 @@ public static List<LocalOrphanFilesClean> createOrphanFilesCleans(
return orphanFilesCleans;
}

public static String[] executeOrphanFilesClean(List<LocalOrphanFilesClean> tableCleans) {
public static long executeDatabaseOrphanFiles(
Catalog catalog,
String databaseName,
@Nullable String tableName,
long olderThanMillis,
SerializableConsumer<Path> fileCleaner,
@Nullable Integer parallelism)
throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException {
List<LocalOrphanFilesClean> tableCleans =
createOrphanFilesCleans(
catalog,
databaseName,
tableName,
olderThanMillis,
fileCleaner,
parallelism);

ExecutorService executorService =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
List<Future<List<Path>>> tasks = new ArrayList<>();
Expand All @@ -241,6 +255,6 @@ public static String[] executeOrphanFilesClean(List<LocalOrphanFilesClean> table
}

executorService.shutdownNow();
return showDeletedFiles(cleanOrphanFiles, SHOW_LIMIT).toArray(new String[0]);
return cleanOrphanFiles.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@

import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.orphan.FlinkOrphanFilesClean;
import org.apache.paimon.operation.LocalOrphanFilesClean;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;

import java.util.Locale;

import static org.apache.paimon.operation.OrphanFilesClean.createFileCleaner;
import static org.apache.paimon.operation.OrphanFilesClean.olderThanMillis;

Expand Down Expand Up @@ -55,29 +58,57 @@ public class RemoveOrphanFilesProcedure extends ProcedureBase {
type = @DataTypeHint("STRING"),
isOptional = true),
@ArgumentHint(name = "dry_run", type = @DataTypeHint("BOOLEAN"), isOptional = true),
@ArgumentHint(name = "parallelism", type = @DataTypeHint("INT"), isOptional = true)
@ArgumentHint(name = "parallelism", type = @DataTypeHint("INT"), isOptional = true),
@ArgumentHint(name = "mode", type = @DataTypeHint("STRING"), isOptional = true)
})
public String[] call(
ProcedureContext procedureContext,
String tableId,
String olderThan,
Boolean dryRun,
Integer parallelism)
Integer parallelism,
String mode)
throws Exception {
Identifier identifier = Identifier.fromString(tableId);
String databaseName = identifier.getDatabaseName();
String tableName = identifier.getObjectName();

long deleted =
FlinkOrphanFilesClean.executeDatabaseOrphanFiles(
procedureContext.getExecutionEnvironment(),
catalog,
olderThanMillis(olderThan),
createFileCleaner(catalog, dryRun),
parallelism,
databaseName,
tableName);
return new String[] {String.valueOf(deleted)};
if (mode == null) {
mode = "DISTRIBUTED";
}
long deletedFiles;
try {
switch (mode.toUpperCase(Locale.ROOT)) {
case "DISTRIBUTED":
deletedFiles =
FlinkOrphanFilesClean.executeDatabaseOrphanFiles(
procedureContext.getExecutionEnvironment(),
catalog,
olderThanMillis(olderThan),
createFileCleaner(catalog, dryRun),
parallelism,
databaseName,
tableName);
break;
case "LOCAL":
deletedFiles =
LocalOrphanFilesClean.executeDatabaseOrphanFiles(
catalog,
databaseName,
tableName,
olderThanMillis(olderThan),
createFileCleaner(catalog, dryRun),
parallelism);
break;
default:
throw new IllegalArgumentException(
"Unknown mode: "
+ mode
+ ". Only 'DISTRIBUTED' and 'LOCAL' are supported.");
}
return new String[] {String.valueOf(deletedFiles)};
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,4 +271,71 @@ public void testCleanWithBranch(boolean isNamedArgument) throws Exception {
ImmutableList<Row> actualDeleteFile = ImmutableList.copyOf(executeSQL(procedure));
assertThat(actualDeleteFile).containsOnly(Row.of("4"));
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testRunWithMode(boolean isNamedArgument) throws Exception {
createTableAndWriteData(tableName);

List<String> args =
new ArrayList<>(
Arrays.asList(
"remove_orphan_files",
"--warehouse",
warehouse,
"--database",
database,
"--table",
tableName));
RemoveOrphanFilesAction action1 = createAction(RemoveOrphanFilesAction.class, args);
assertThatCode(action1::run).doesNotThrowAnyException();

args.add("--older_than");
args.add("2023-12-31 23:59:59");
RemoveOrphanFilesAction action2 = createAction(RemoveOrphanFilesAction.class, args);
assertThatCode(action2::run).doesNotThrowAnyException();

String withoutOlderThan =
String.format(
isNamedArgument
? "CALL sys.remove_orphan_files(`table` => '%s.%s')"
: "CALL sys.remove_orphan_files('%s.%s')",
database,
tableName);
CloseableIterator<Row> withoutOlderThanCollect = executeSQL(withoutOlderThan);
assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0"));

String withLocalMode =
String.format(
isNamedArgument
? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true, parallelism => 5, mode => 'local')"
: "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59', true, 5, 'local')",
database,
tableName);
ImmutableList<Row> actualLocalRunDeleteFile =
ImmutableList.copyOf(executeSQL(withLocalMode));
assertThat(actualLocalRunDeleteFile).containsOnly(Row.of("2"));

String withDistributedMode =
String.format(
isNamedArgument
? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true, parallelism => 5, mode => 'distributed')"
: "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59', true, 5, 'distributed')",
database,
tableName);
ImmutableList<Row> actualDistributedRunDeleteFile =
ImmutableList.copyOf(executeSQL(withDistributedMode));
assertThat(actualDistributedRunDeleteFile).containsOnly(Row.of("2"));

String withInvalidMode =
String.format(
isNamedArgument
? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true, parallelism => 5, mode => 'unknown')"
: "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59', true, 5, 'unknown')",
database,
tableName);
assertThatCode(() -> executeSQL(withInvalidMode))
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("Unknown mode");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.spark.procedure;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.operation.LocalOrphanFilesClean;
import org.apache.paimon.operation.OrphanFilesClean;
import org.apache.paimon.spark.catalog.WithPaimonCatalog;
import org.apache.paimon.spark.orphan.SparkOrphanFilesClean;
Expand All @@ -32,6 +33,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Locale;

import static org.apache.spark.sql.types.DataTypes.BooleanType;
import static org.apache.spark.sql.types.DataTypes.IntegerType;
import static org.apache.spark.sql.types.DataTypes.LongType;
Expand All @@ -56,7 +59,8 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure {
ProcedureParameter.required("table", StringType),
ProcedureParameter.optional("older_than", StringType),
ProcedureParameter.optional("dry_run", BooleanType),
ProcedureParameter.optional("parallelism", IntegerType)
ProcedureParameter.optional("parallelism", IntegerType),
ProcedureParameter.optional("mode", StringType)
};

private static final StructType OUTPUT_TYPE =
Expand Down Expand Up @@ -98,18 +102,45 @@ public InternalRow[] call(InternalRow args) {
LOG.info("identifier is {}.", identifier);

Catalog catalog = ((WithPaimonCatalog) tableCatalog()).paimonCatalog();
long deletedFiles =
SparkOrphanFilesClean.executeDatabaseOrphanFiles(
catalog,
identifier.getDatabaseName(),
identifier.getTableName(),
OrphanFilesClean.olderThanMillis(
args.isNullAt(1) ? null : args.getString(1)),
OrphanFilesClean.createFileCleaner(
catalog, !args.isNullAt(2) && args.getBoolean(2)),
args.isNullAt(3) ? null : args.getInt(3));

return new InternalRow[] {newInternalRow(deletedFiles)};
String mode = args.isNullAt(4) ? "DISTRIBUTED" : args.getString(4);

long deletedFiles;
try {
switch (mode.toUpperCase(Locale.ROOT)) {
case "LOCAL":
deletedFiles =
LocalOrphanFilesClean.executeDatabaseOrphanFiles(
catalog,
identifier.getDatabaseName(),
identifier.getTableName(),
OrphanFilesClean.olderThanMillis(
args.isNullAt(1) ? null : args.getString(1)),
OrphanFilesClean.createFileCleaner(
catalog, !args.isNullAt(2) && args.getBoolean(2)),
args.isNullAt(3) ? null : args.getInt(3));
break;
case "DISTRIBUTED":
deletedFiles =
SparkOrphanFilesClean.executeDatabaseOrphanFiles(
catalog,
identifier.getDatabaseName(),
identifier.getTableName(),
OrphanFilesClean.olderThanMillis(
args.isNullAt(1) ? null : args.getString(1)),
OrphanFilesClean.createFileCleaner(
catalog, !args.isNullAt(2) && args.getBoolean(2)),
args.isNullAt(3) ? null : args.getInt(3));
break;
default:
throw new IllegalArgumentException(
"Unknown mode: "
+ mode
+ ". Only 'DISTRIBUTED' and 'LOCAL' are supported.");
}
return new InternalRow[] {newInternalRow(deletedFiles)};
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public static ProcedureBuilder builder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,51 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase {
checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*')"), Row(0) :: Nil)
}

test("Paimon procedure: remove orphan files with mode") {
spark.sql(s"""
|CREATE TABLE T (id STRING, name STRING)
|USING PAIMON
|TBLPROPERTIES ('primary-key'='id')
|""".stripMargin)

spark.sql(s"INSERT INTO T VALUES ('1', 'a'), ('2', 'b')")

val table = loadTable("T")
val fileIO = table.fileIO()
val tablePath = table.location()

val orphanFile1 = new Path(tablePath, ORPHAN_FILE_1)
val orphanFile2 = new Path(tablePath, ORPHAN_FILE_2)

fileIO.tryToWriteAtomic(orphanFile1, "a")
Thread.sleep(2000)
fileIO.tryToWriteAtomic(orphanFile2, "b")

// by default, no file deleted
checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0) :: Nil)

val orphanFile2ModTime = fileIO.getFileStatus(orphanFile2).getModificationTime
val older_than1 = DateTimeUtils.formatLocalDateTime(
DateTimeUtils.toLocalDateTime(
orphanFile2ModTime -
TimeUnit.SECONDS.toMillis(1)),
3)

checkAnswer(
spark.sql(
s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than1', mode => 'diSTributed')"),
Row(1) :: Nil)

val older_than2 = DateTimeUtils.formatLocalDateTime(
DateTimeUtils.toLocalDateTime(System.currentTimeMillis()),
3)

checkAnswer(
spark.sql(
s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than2', mode => 'local')"),
Row(1) :: Nil)

checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0) :: Nil)
}

}

0 comments on commit 4f0a475

Please sign in to comment.