Skip to content

Commit

Permalink
[core] Remove_orphan_files support dry run and optimize output (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
MOBIN-F authored Jul 1, 2024
1 parent 4101f71 commit fa7add8
Show file tree
Hide file tree
Showing 12 changed files with 163 additions and 32 deletions.
8 changes: 6 additions & 2 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,16 +174,20 @@ All available procedures are listed below.
<td>remove_orphan_files</td>
<td>
CALL [catalog.]sys.remove_orphan_files('identifier')<br/><br/>
CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan')
CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan')<br/><br/>
CALL [catalog.]sys.remove_orphan_files('identifier', 'olderThan', 'dryRun')
</td>
<td>
To remove the orphan data files and metadata files. Arguments:
<li>identifier: the target table identifier. Cannot be empty.</li>
<li>olderThan: to avoid deleting newly written files, this procedure only
deletes orphan files older than 1 day by default. This argument can modify the interval.
</li>
<li>dryRun: when true, view only orphan files, don't actually remove files. Default is false.</li>
</td>
<td>CALL remove_orphan_files('default.T', '2023-10-31 12:00:00')<br/><br/>
CALL remove_orphan_files('default.T', '2023-10-31 12:00:00', true)
</td>
<td>CALL remove_orphan_files('default.T', '2023-10-31 12:00:00')</td>
</tr>
<tr>
<td>reset_consumer</td>
Expand Down
3 changes: 2 additions & 1 deletion docs/content/maintenance/manage-snapshots.md
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ submit a `remove_orphan_files` job to clean them:
--warehouse <warehouse-path> \
--database <database-name> \
--table <table-name> \
[--older_than <timestamp>]
[--older_than <timestamp>] \
[--dry_run <false/true>]
```
To avoid deleting files that are newly added by other writing jobs, this action only deletes orphan files older than
Expand Down
4 changes: 3 additions & 1 deletion docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,11 @@ This section introduce all available spark procedures about paimon.
To remove the orphan data files and metadata files. Arguments:
<li>table: the target table identifier. Cannot be empty.</li>
<li>older_than: to avoid deleting newly written files, this procedure only deletes orphan files older than 1 day by default. This argument can modify the interval.</li>
<li>dry_run: when true, view only orphan files, don't actually remove files. Default is false.</li>
</td>
<td>
CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00')
CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00')<br/><br/>
CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true)
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public class OrphanFilesClean {
private int deletedFilesNum = 0;
private final List<Path> deleteFiles;
private long olderThanMillis = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
private boolean isDryRun = false;

public OrphanFilesClean(FileStoreTable table) {
this.snapshotManager = table.snapshotManager();
Expand All @@ -119,10 +120,15 @@ public OrphanFilesClean olderThan(String timestamp) {
return this;
}

public int clean() throws IOException, ExecutionException, InterruptedException {
public OrphanFilesClean dryRun(boolean dryRun) {
this.isDryRun = dryRun;
return this;
}

public List<Path> clean() throws IOException, ExecutionException, InterruptedException {
if (snapshotManager.earliestSnapshotId() == null) {
LOG.info("No snapshot found, skip removing.");
return 0;
return Collections.emptyList();
}

// specially handle the snapshot directory
Expand All @@ -143,14 +149,17 @@ public int clean() throws IOException, ExecutionException, InterruptedException
Set<String> deleted = new HashSet<>(candidates.keySet());
deleted.removeAll(usedFiles);

for (String file : deleted) {
Path path = candidates.get(file);
deleteFileOrDirQuietly(path);
if (!isDryRun) {
for (String file : deleted) {
Path path = candidates.get(file);
deleteFileOrDirQuietly(path);
}
}

deletedFilesNum += deleted.size();
deleteFiles.addAll(deleted.stream().map(candidates::get).collect(Collectors.toList()));

return deletedFilesNum;
return deleteFiles;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public void testNormallyRemoving() throws Throwable {

// first check, nothing will be deleted because the default olderThan interval is 1 day
OrphanFilesClean orphanFilesClean = new OrphanFilesClean(table);
assertThat(orphanFilesClean.clean()).isEqualTo(0);
assertThat(orphanFilesClean.clean().size()).isEqualTo(0);

// second check
orphanFilesClean = new OrphanFilesClean(table);
Expand Down Expand Up @@ -362,7 +362,7 @@ public void testCleanOrphanFilesWithChangelogDecoupled(String changelogProducer)

// first check, nothing will be deleted because the default olderThan interval is 1 day
OrphanFilesClean orphanFilesClean = new OrphanFilesClean(table);
assertThat(orphanFilesClean.clean()).isEqualTo(0);
assertThat(orphanFilesClean.clean().size()).isEqualTo(0);

// second check
orphanFilesClean = new OrphanFilesClean(table);
Expand Down Expand Up @@ -397,7 +397,7 @@ public void testAbnormallyRemoving() throws Exception {

OrphanFilesClean orphanFilesClean = new OrphanFilesClean(table);
setOlderThan(orphanFilesClean);
assertThat(orphanFilesClean.clean()).isGreaterThan(0);
assertThat(orphanFilesClean.clean().size()).isGreaterThan(0);
}

private void writeData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,22 @@

package org.apache.paimon.flink.action;

import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.OrphanFilesClean;
import org.apache.paimon.table.FileStoreTable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Action to remove the orphan data files and metadata files. */
public class RemoveOrphanFilesAction extends TableActionBase {
private static final Logger LOG = LoggerFactory.getLogger(RemoveOrphanFilesAction.class);

private final OrphanFilesClean orphanFilesClean;

Expand All @@ -49,8 +56,18 @@ public RemoveOrphanFilesAction olderThan(String timestamp) {
return this;
}

public RemoveOrphanFilesAction dryRun(Boolean dryRun) {
this.orphanFilesClean.dryRun(dryRun);
return this;
}

@Override
public void run() throws Exception {
orphanFilesClean.clean();
List<Path> orphanFiles = orphanFilesClean.clean();
String files =
orphanFiles.stream()
.map(filePath -> filePath.toUri().getPath())
.collect(Collectors.joining(", "));
LOG.info("orphan files: [{}]", files);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class RemoveOrphanFilesActionFactory implements ActionFactory {
public static final String IDENTIFIER = "remove_orphan_files";

private static final String OLDER_THAN = "older_than";
private static final String DRY_RUN = "dry_run";

@Override
public String identifier() {
Expand All @@ -48,6 +49,10 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
action.olderThan(params.get(OLDER_THAN));
}

if (params.has(DRY_RUN)) {
action.dryRun(Boolean.parseBoolean(params.get(DRY_RUN)));
}

return Optional.of(action);
}

Expand All @@ -60,12 +65,16 @@ public void printHelp() {
System.out.println("Syntax:");
System.out.println(
" remove_orphan_files --warehouse <warehouse_path> --database <database_name> "
+ "--table <table_name> [--older_than <timestamp>]");
+ "--table <table_name> [--older_than <timestamp>] [--dry_run <false/true>]");

System.out.println();
System.out.println(
"To avoid deleting newly written files, this action only deletes orphan files older than 1 day by default. "
+ "The interval can be modified by '--older_than'. <timestamp> format: yyyy-MM-dd HH:mm:ss");

System.out.println();
System.out.println(
"When '--dry_run true', view only orphan files, don't actually remove files. Default is false.");
System.out.println();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ public String[] call(ProcedureContext procedureContext, String tableId) throws E

public String[] call(ProcedureContext procedureContext, String tableId, String olderThan)
throws Exception {
return call(procedureContext, tableId, olderThan, false);
}

public String[] call(
ProcedureContext procedureContext, String tableId, String olderThan, boolean dryRun)
throws Exception {
Identifier identifier = Identifier.fromString(tableId);
Table table = catalog.getTable(identifier);

Expand All @@ -62,9 +68,11 @@ public String[] call(ProcedureContext procedureContext, String tableId, String o
orphanFilesClean.olderThan(olderThan);
}

int deleted = orphanFilesClean.clean();
orphanFilesClean.dryRun(dryRun);

return new String[] {"Deleted=" + deleted};
return orphanFilesClean.clean().stream()
.map(filePath -> filePath.toUri().getPath())
.toArray(String[]::new);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@

import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

Expand Down Expand Up @@ -185,12 +187,13 @@ private void confuseArgs(String[] args, String regex, String replacement) {
}
}

protected void callProcedure(String procedureStatement) {
protected CloseableIterator<Row> callProcedure(String procedureStatement) {
// default execution mode
callProcedure(procedureStatement, true, false);
return callProcedure(procedureStatement, true, false);
}

protected void callProcedure(String procedureStatement, boolean isStreaming, boolean dmlSync) {
protected CloseableIterator<Row> callProcedure(
String procedureStatement, boolean isStreaming, boolean dmlSync) {
TableEnvironment tEnv;
if (isStreaming) {
tEnv = tableEnvironmentBuilder().streamingMode().checkpointIntervalMs(500).build();
Expand All @@ -206,6 +209,6 @@ protected void callProcedure(String procedureStatement, boolean isStreaming, boo
warehouse));
tEnv.useCatalog("PAIMON");

tEnv.executeSql(procedureStatement);
return tEnv.executeSql(procedureStatement).collect();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,26 @@
package org.apache.paimon.flink.action;

import org.apache.paimon.data.BinaryString;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;

import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;

/** IT cases for {@link RemoveOrphanFilesAction}. */
Expand All @@ -58,6 +65,14 @@ public void testRunWithoutException() throws Exception {

writeData(rowData(1L, BinaryString.fromString("Hi")));

Path orphanFile1 = new Path(table.location(), "bucket-0/orphan_file1");
Path orphanFile2 = new Path(table.location(), "bucket-0/orphan_file2");

FileIO fileIO = table.fileIO();
fileIO.writeFileUtf8(orphanFile1, "a");
Thread.sleep(2000);
fileIO.writeFileUtf8(orphanFile2, "b");

List<String> args =
new ArrayList<>(
Arrays.asList(
Expand All @@ -78,12 +93,28 @@ public void testRunWithoutException() throws Exception {

String withoutOlderThan =
String.format("CALL sys.remove_orphan_files('%s.%s')", database, tableName);
assertThatCode(() -> callProcedure(withoutOlderThan)).doesNotThrowAnyException();
CloseableIterator<Row> withoutOlderThanCollect = callProcedure(withoutOlderThan);
assertThat(ImmutableList.copyOf(withoutOlderThanCollect).size()).isEqualTo(0);

String withDryRun =
String.format(
"CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59', true)",
database, tableName);
ImmutableList<Row> actualDryRunDeleteFile = ImmutableList.copyOf(callProcedure(withDryRun));
assertThat(actualDryRunDeleteFile)
.containsExactlyInAnyOrder(
Row.of(orphanFile1.toUri().getPath()),
Row.of(orphanFile2.toUri().getPath()));

String withOlderThan =
String.format(
"CALL sys.remove_orphan_files('%s.%s', '2023-12-31 23:59:59')",
"CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59')",
database, tableName);
assertThatCode(() -> callProcedure(withOlderThan)).doesNotThrowAnyException();
ImmutableList<Row> actualDeleteFile = ImmutableList.copyOf(callProcedure(withOlderThan));

assertThat(actualDeleteFile)
.containsExactlyInAnyOrder(
Row.of(orphanFile1.toUri().getPath()),
Row.of(orphanFile2.toUri().getPath()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.spark.procedure;

import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.OrphanFilesClean;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.StringUtils;
Expand All @@ -30,7 +31,10 @@
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;

import java.util.List;

import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.spark.sql.types.DataTypes.BooleanType;
import static org.apache.spark.sql.types.DataTypes.StringType;

/**
Expand All @@ -45,7 +49,8 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure {
private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", StringType),
ProcedureParameter.optional("older_than", StringType)
ProcedureParameter.optional("older_than", StringType),
ProcedureParameter.optional("dry_run", BooleanType)
};

private static final StructType OUTPUT_TYPE =
Expand All @@ -72,6 +77,7 @@ public StructType outputType() {
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String olderThan = args.isNullAt(1) ? null : args.getString(1);
boolean dryRun = args.isNullAt(2) ? false : args.getBoolean(2);

return modifyPaimonTable(
tableIdent,
Expand All @@ -82,11 +88,18 @@ public InternalRow[] call(InternalRow args) {
if (!StringUtils.isBlank(olderThan)) {
orphanFilesClean.olderThan(olderThan);
}
orphanFilesClean.dryRun(dryRun);
try {
int deleted = orphanFilesClean.clean();
return new InternalRow[] {
newInternalRow(UTF8String.fromString("Deleted=" + deleted))
};
List<Path> orphanFiles = orphanFilesClean.clean();
InternalRow[] rows = new InternalRow[orphanFiles.size()];
int index = 0;
for (Path filePath : orphanFiles) {
rows[index] =
newInternalRow(
UTF8String.fromString(filePath.toUri().getPath()));
index++;
}
return rows;
} catch (Exception e) {
throw new RuntimeException("Call remove_orphan_files error", e);
}
Expand Down
Loading

0 comments on commit fa7add8

Please sign in to comment.