Skip to content

Commit

Permalink
[spark] Add remove orphan files procedure (#2431)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored Dec 1, 2023
1 parent e93099d commit cef389f
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 0 deletions.
6 changes: 6 additions & 0 deletions docs/content/maintenance/understand-files.md
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,12 @@ To avoid deleting files that are newly added by other writing jobs, this action
{{< /tab >}}
{{< tab "Spark" >}}
```sql
CALL sys.remove_orphan_files(table => "tableId", [older_then => "2023-10-31 12:00:00"])
```
{{< /tab >}}
{{< /tabs >}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.spark.procedure.MigrateTableProcedure;
import org.apache.paimon.spark.procedure.Procedure;
import org.apache.paimon.spark.procedure.ProcedureBuilder;
import org.apache.paimon.spark.procedure.RemoveOrphanFilesProcedure;
import org.apache.paimon.spark.procedure.RollbackProcedure;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -54,6 +55,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
procedureBuilders.put("compact", CompactProcedure::builder);
procedureBuilders.put("migrate_table", MigrateTableProcedure::builder);
procedureBuilders.put("migrate_file", MigrateFileProcedure::builder);
procedureBuilders.put("remove_orphan_files", RemoveOrphanFilesProcedure::builder);
return procedureBuilders.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.procedure;

import org.apache.paimon.operation.OrphanFilesClean;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.StringUtils;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;

import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.spark.sql.types.DataTypes.StringType;

/**
* Remove orphan files procedure. Usage:
*
* <pre><code>
* CALL sys.remove_orphan_files(table => 'tableId', [older_then => '2023-10-31 12:00:00'])
* </code></pre>
*/
public class RemoveOrphanFilesProcedure extends BaseProcedure {

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", StringType),
ProcedureParameter.optional("older_then", StringType)
};

private static final StructType OUTPUT_TYPE =
new StructType(
new StructField[] {
new StructField("result", StringType, true, Metadata.empty())
});

private RemoveOrphanFilesProcedure(TableCatalog tableCatalog) {
super(tableCatalog);
}

@Override
public ProcedureParameter[] parameters() {
return PARAMETERS;
}

@Override
public StructType outputType() {
return OUTPUT_TYPE;
}

@Override
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String olderThan = args.isNullAt(1) ? null : args.getString(1);

return modifyPaimonTable(
tableIdent,
table -> {
checkArgument(table instanceof FileStoreTable);
OrphanFilesClean orphanFilesClean =
new OrphanFilesClean((FileStoreTable) table);
if (!StringUtils.isBlank(olderThan)) {
orphanFilesClean.olderThan(olderThan);
}
try {
int deleted = orphanFilesClean.clean();
return new InternalRow[] {
newInternalRow(UTF8String.fromString("Deleted=" + deleted))
};
} catch (Exception e) {
throw new RuntimeException("Call remove_orphan_files error", e);
}
});
}

public static ProcedureBuilder builder() {
return new BaseProcedure.Builder<RemoveOrphanFilesProcedure>() {
@Override
public RemoveOrphanFilesProcedure doBuild() {
return new RemoveOrphanFilesProcedure(tableCatalog());
}
};
}

@Override
public String description() {
return "RemoveOrphanFilesProcedure";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.paimon.spark.procedure

import org.apache.paimon.fs.Path
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.utils.DateTimeUtils

import org.apache.spark.sql.Row

import java.util.concurrent.TimeUnit

class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase {

test("Paimon procedure: remove orphan files") {
spark.sql(s"""
|CREATE TABLE T (id STRING, name STRING)
|USING PAIMON
|TBLPROPERTIES ('primary-key'='id')
|""".stripMargin)

spark.sql(s"INSERT INTO T VALUES ('1', 'a'), ('2', 'b')")

val table = loadTable("T")
val fileIO = table.fileIO()
val tablePath = table.location()

val orphanFile1 = new Path(tablePath, "bucket-0/orphan_file1")
val orphanFile2 = new Path(tablePath, "bucket-0/orphan_file2")

fileIO.writeFileUtf8(orphanFile1, "a")
Thread.sleep(2000)
fileIO.writeFileUtf8(orphanFile2, "b")

// by default, no file deleted
checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row("Deleted=0") :: Nil)

val orphanFile2ModTime = fileIO.getFileStatus(orphanFile2).getModificationTime
val older_then1 = DateTimeUtils.formatLocalDateTime(
DateTimeUtils.toLocalDateTime(
orphanFile2ModTime -
TimeUnit.SECONDS.toMillis(1)),
3)

checkAnswer(
spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_then => '$older_then1')"),
Row("Deleted=1") :: Nil)

val older_then2 = DateTimeUtils.formatLocalDateTime(
DateTimeUtils.toLocalDateTime(System.currentTimeMillis()),
3)

checkAnswer(
spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_then => '$older_then2')"),
Row("Deleted=1") :: Nil)
}
}

0 comments on commit cef389f

Please sign in to comment.