From d03835e21bacc2e779c949688b652f58e2240c38 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Thu, 30 Nov 2023 20:59:28 +0800 Subject: [PATCH] update --- docs/content/maintenance/understand-files.md | 6 ++ .../apache/paimon/spark/SparkProcedures.java | 2 + .../procedure/RemoveOrphanFilesProcedure.java | 91 +++++++++++++++++++ .../RemoveOrphanFilesProcedureTest.scala | 72 +++++++++++++++ 4 files changed, 171 insertions(+) create mode 100644 paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java create mode 100644 paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala diff --git a/docs/content/maintenance/understand-files.md b/docs/content/maintenance/understand-files.md index 03e8705c8738..adfe7a921c40 100644 --- a/docs/content/maintenance/understand-files.md +++ b/docs/content/maintenance/understand-files.md @@ -372,6 +372,12 @@ To avoid deleting files that are newly added by other writing jobs, this action {{< /tab >}} +{{< tab "Spark" >}} +```sql +CALL sys.remove_orphan_files(table => "tableId", [older_then => "2023-10-31 12:00:00"]) +``` +{{< /tab >}} + {{< /tabs >}} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java index 632bd416f2ff..e0921ed86fe5 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java @@ -25,6 +25,7 @@ import org.apache.paimon.spark.procedure.MigrateTableProcedure; import org.apache.paimon.spark.procedure.Procedure; import org.apache.paimon.spark.procedure.ProcedureBuilder; +import org.apache.paimon.spark.procedure.RemoveOrphanFilesProcedure; import org.apache.paimon.spark.procedure.RollbackProcedure; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; @@ -54,6 +55,7 @@ private static Map> initProcedureBuilders() { procedureBuilders.put("compact", CompactProcedure::builder); procedureBuilders.put("migrate_table", MigrateTableProcedure::builder); procedureBuilders.put("migrate_file", MigrateFileProcedure::builder); + procedureBuilders.put("remove_orphan_files", RemoveOrphanFilesProcedure::builder); return procedureBuilders.build(); } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java new file mode 100644 index 000000000000..b339de08f27f --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java @@ -0,0 +1,91 @@ +package org.apache.paimon.spark.procedure; + +import org.apache.paimon.operation.OrphanFilesClean; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.StringUtils; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; + +import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.spark.sql.types.DataTypes.StringType; + +/** + * Remove orphan files procedure. Usage: + * + *

+ *  CALL sys.remove_orphan_files(table => 'tableId', [older_then => '2023-10-31 12:00:00'])
+ * 
+ */ +public class RemoveOrphanFilesProcedure extends BaseProcedure { + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] { + ProcedureParameter.required("table", StringType), + ProcedureParameter.optional("older_then", StringType) + }; + + private static final StructType OUTPUT_TYPE = + new StructType( + new StructField[] { + new StructField("result", StringType, true, Metadata.empty()) + }); + + private RemoveOrphanFilesProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + String olderThan = args.isNullAt(1) ? null : args.getString(1); + + return modifyPaimonTable( + tableIdent, + table -> { + checkArgument(table instanceof FileStoreTable); + OrphanFilesClean orphanFilesClean = + new OrphanFilesClean((FileStoreTable) table); + if (!StringUtils.isBlank(olderThan)) { + orphanFilesClean.olderThan(olderThan); + } + try { + int deleted = orphanFilesClean.clean(); + return new InternalRow[] { + newInternalRow(UTF8String.fromString("Deleted=" + deleted)) + }; + } catch (Exception e) { + throw new RuntimeException("Call remove_orphan_files error", e); + } + }); + } + + public static ProcedureBuilder builder() { + return new BaseProcedure.Builder() { + @Override + public RemoveOrphanFilesProcedure doBuild() { + return new RemoveOrphanFilesProcedure(tableCatalog()); + } + }; + } + + @Override + public String description() { + return "RemoveOrphanFilesProcedure"; + } +} diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala new file mode 100644 index 000000000000..5f8a58e785c1 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.paimon.spark.procedure + +import org.apache.paimon.fs.Path +import org.apache.paimon.spark.PaimonSparkTestBase +import org.apache.paimon.utils.DateTimeUtils + +import org.apache.spark.sql.Row + +import java.util.concurrent.TimeUnit + +class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { + + test("Paimon procedure: remove orphan files") { + spark.sql(s""" + |CREATE TABLE T (id STRING, name STRING) + |USING PAIMON + |TBLPROPERTIES ('primary-key'='id') + |""".stripMargin) + + spark.sql(s"INSERT INTO T VALUES ('1', 'a'), ('2', 'b')") + + val table = loadTable("T") + val fileIO = table.fileIO() + val tablePath = table.location() + + val orphanFile1 = new Path(tablePath, "bucket-0/orphan_file1") + val orphanFile2 = new Path(tablePath, "bucket-0/orphan_file2") + + fileIO.writeFileUtf8(orphanFile1, "a") + Thread.sleep(2000) + fileIO.writeFileUtf8(orphanFile2, "b") + + // by default, no file deleted + checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row("Deleted=0") :: Nil) + + val orphanFile2ModTime = fileIO.getFileStatus(orphanFile2).getModificationTime + val older_then1 = DateTimeUtils.formatLocalDateTime( + DateTimeUtils.toLocalDateTime( + orphanFile2ModTime - + TimeUnit.SECONDS.toMillis(1)), + 3) + + checkAnswer( + spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_then => '$older_then1')"), + Row("Deleted=1") :: Nil) + + val older_then2 = DateTimeUtils.formatLocalDateTime( + DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), + 3) + + checkAnswer( + spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_then => '$older_then2')"), + Row("Deleted=1") :: Nil) + } +}