From 8831d7f8daeded0a405fdeb102a229bec10e8d7e Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Mon, 16 Dec 2024 15:34:35 +0800 Subject: [PATCH] [core] Introduce PurgeFilesProcedure to purge table --- docs/content/flink/procedures.md | 21 ++++ docs/content/spark/procedures.md | 10 ++ .../flink/procedure/PurgeFilesProcedure.java | 66 +++++++++++ .../flink/procedure/PurgeFilesProcedure.java | 78 ++++++++++++ .../org.apache.paimon.factories.Factory | 1 + .../procedure/PurgeFilesProcedureITCase.java | 48 ++++++++ .../apache/paimon/spark/SparkProcedures.java | 2 + .../spark/procedure/PurgeFilesProcedure.java | 112 ++++++++++++++++++ .../procedure/PurgeFilesProcedureTest.scala | 44 +++++++ 9 files changed, 382 insertions(+) create mode 100644 paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/PurgeFilesProcedureITCase.java create mode 100644 paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/PurgeFilesProcedure.java create mode 100644 paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/PurgeFilesProcedureTest.scala diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md index 7a9b23807392..8eb1786a08b3 100644 --- a/docs/content/flink/procedures.md +++ b/docs/content/flink/procedures.md @@ -434,6 +434,27 @@ All available procedures are listed below. CALL sys.rollback_to_watermark(`table` => 'default.T', watermark => 1730292023000) + + purge_files + + -- for Flink 1.18
+ -- clear table with purge files directly.
+ CALL sys.purge_files('identifier')

+ -- for Flink 1.19 and later
+ -- clear table with purge files directly.
+ CALL sys.purge_files(`table` => 'default.T')

+ + + To clear table with purge files directly. Argument: +
  • identifier: the target table identifier. Cannot be empty.
  • + + + -- for Flink 1.18
    + CALL sys.purge_files('default.T') + -- for Flink 1.19 and later
    + CALL sys.purge_files(`table` => 'default.T') + + expire_snapshots diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md index 5b0efd5f90a6..bf7b8ae2d572 100644 --- a/docs/content/spark/procedures.md +++ b/docs/content/spark/procedures.md @@ -190,6 +190,16 @@ This section introduce all available spark procedures about paimon. CALL sys.rollback_to_watermark(table => 'default.T', watermark => 1730292023000)

    + + purge_files + + To clear table with purge files directly. Argument: +
  • table: the target table identifier. Cannot be empty.
  • + + + CALL sys.purge_files(table => 'default.T')

    + + migrate_database diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java new file mode 100644 index 000000000000..3053eae3c7cf --- /dev/null +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; + +import org.apache.flink.table.procedure.ProcedureContext; + +import java.io.IOException; +import java.util.Arrays; + +/** A procedure to purge files for a table. */ +public class PurgeFilesProcedure extends ProcedureBase { + public static final String IDENTIFIER = "purge_files"; + + public String[] call(ProcedureContext procedureContext, String tableId) + throws Catalog.TableNotExistException { + Table table = catalog.getTable(Identifier.fromString(tableId)); + FileStoreTable fileStoreTable = (FileStoreTable) table; + FileIO fileIO = fileStoreTable.fileIO(); + Path tablePath = fileStoreTable.snapshotManager().tablePath(); + try { + Arrays.stream(fileIO.listStatus(tablePath)) + .filter(f -> !f.getPath().getName().contains("schema")) + .forEach( + fileStatus -> { + try { + fileIO.delete(fileStatus.getPath(), true); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } catch (IOException e) { + throw new RuntimeException(e); + } + return new String[] { + String.format("Success purge files with table: %s.", fileStoreTable.name()) + }; + } + + @Override + public String identifier() { + return IDENTIFIER; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java new file mode 100644 index 000000000000..7ee2a3610402 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; + +import java.io.IOException; +import java.util.Arrays; + +/** + * A procedure to purge files for a table. Usage: + * + *
    
    + *  -- rollback to the snapshot which earlier or equal than watermark.
    + *  CALL sys.purge_files(`table` => 'tableId')
    + * 
    + */ +public class PurgeFilesProcedure extends ProcedureBase { + + public static final String IDENTIFIER = "purge_files"; + + @ProcedureHint(argument = {@ArgumentHint(name = "table", type = @DataTypeHint("STRING"))}) + public String[] call(ProcedureContext procedureContext, String tableId) + throws Catalog.TableNotExistException { + Table table = catalog.getTable(Identifier.fromString(tableId)); + FileStoreTable fileStoreTable = (FileStoreTable) table; + FileIO fileIO = fileStoreTable.fileIO(); + Path tablePath = fileStoreTable.snapshotManager().tablePath(); + try { + Arrays.stream(fileIO.listStatus(tablePath)) + .filter(f -> !f.getPath().getName().contains("schema")) + .forEach( + fileStatus -> { + try { + fileIO.delete(fileStatus.getPath(), true); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } catch (IOException e) { + throw new RuntimeException(e); + } + return new String[] { + String.format("Success purge files with table: %s.", fileStoreTable.name()) + }; + } + + @Override + public String identifier() { + return IDENTIFIER; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 6c3b0e7664c0..6251189560f6 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -69,6 +69,7 @@ org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure org.apache.paimon.flink.procedure.QueryServiceProcedure org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure org.apache.paimon.flink.procedure.ExpirePartitionsProcedure +org.apache.paimon.flink.procedure.PurgeFilesProcedure org.apache.paimon.flink.procedure.privilege.InitFileBasedPrivilegeProcedure org.apache.paimon.flink.procedure.privilege.CreatePrivilegedUserProcedure org.apache.paimon.flink.procedure.privilege.DropPrivilegedUserProcedure diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/PurgeFilesProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/PurgeFilesProcedureITCase.java new file mode 100644 index 000000000000..9eb9aad2969e --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/PurgeFilesProcedureITCase.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.flink.CatalogITCaseBase; + +import org.apache.flink.types.Row; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** IT Case for {@link PurgeFilesProcedure}. */ +public class PurgeFilesProcedureITCase extends CatalogITCaseBase { + + @Test + public void testPurgeFiles() throws Exception { + sql( + "CREATE TABLE T (id INT, name STRING," + + " PRIMARY KEY (id) NOT ENFORCED)" + + " WITH ('bucket'='1')"); + + sql("INSERT INTO T VALUES (1, 'a')"); + assertThat(sql("select * from `T`")).containsExactly(Row.of(1, "a")); + + sql("INSERT INTO T VALUES (1, 'a')"); + sql("CALL sys.purge_files(`table` => 'default.T')"); + assertThat(sql("select * from `T`")).containsExactly(); + + sql("INSERT INTO T VALUES (2, 'a')"); + assertThat(sql("select * from `T`")).containsExactly(Row.of(2, "a")); + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java index b2fa66a15090..f5052ea25f95 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java @@ -35,6 +35,7 @@ import org.apache.paimon.spark.procedure.MigrateTableProcedure; import org.apache.paimon.spark.procedure.Procedure; import org.apache.paimon.spark.procedure.ProcedureBuilder; +import org.apache.paimon.spark.procedure.PurgeFilesProcedure; import org.apache.paimon.spark.procedure.RefreshObjectTableProcedure; import org.apache.paimon.spark.procedure.RemoveOrphanFilesProcedure; import org.apache.paimon.spark.procedure.RenameTagProcedure; @@ -74,6 +75,7 @@ private static Map> initProcedureBuilders() { procedureBuilders.put("rollback", RollbackProcedure::builder); procedureBuilders.put("rollback_to_timestamp", RollbackToTimestampProcedure::builder); procedureBuilders.put("rollback_to_watermark", RollbackToWatermarkProcedure::builder); + procedureBuilders.put("purge_files", PurgeFilesProcedure::builder); procedureBuilders.put("create_tag", CreateTagProcedure::builder); procedureBuilders.put("replace_tag", ReplaceTagProcedure::builder); procedureBuilders.put("rename_tag", RenameTagProcedure::builder); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/PurgeFilesProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/PurgeFilesProcedure.java new file mode 100644 index 000000000000..8a7aec6e1410 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/PurgeFilesProcedure.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.table.FileStoreTable; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; + +import java.io.IOException; +import java.util.Arrays; + +import static org.apache.spark.sql.types.DataTypes.StringType; + +/** A procedure to purge files for a table. */ +public class PurgeFilesProcedure extends BaseProcedure { + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] {ProcedureParameter.required("table", StringType)}; + + private static final StructType OUTPUT_TYPE = + new StructType( + new StructField[] { + new StructField("result", StringType, true, Metadata.empty()) + }); + + private PurgeFilesProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + + return modifyPaimonTable( + tableIdent, + table -> { + FileStoreTable fileStoreTable = (FileStoreTable) table; + FileIO fileIO = fileStoreTable.fileIO(); + Path tablePath = fileStoreTable.snapshotManager().tablePath(); + try { + Arrays.stream(fileIO.listStatus(tablePath)) + .filter(f -> !f.getPath().getName().contains("schema")) + .forEach( + fileStatus -> { + try { + fileIO.delete(fileStatus.getPath(), true); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } catch (IOException e) { + throw new RuntimeException(e); + } + + InternalRow outputRow = + newInternalRow( + UTF8String.fromString( + String.format( + "Success purge files with table: %s.", + fileStoreTable.name()))); + return new InternalRow[] {outputRow}; + }); + } + + public static ProcedureBuilder builder() { + return new BaseProcedure.Builder() { + @Override + public PurgeFilesProcedure doBuild() { + return new PurgeFilesProcedure(tableCatalog()); + } + }; + } + + @Override + public String description() { + return "PurgeFilesProcedure"; + } +} diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/PurgeFilesProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/PurgeFilesProcedureTest.scala new file mode 100644 index 000000000000..27eafe1c3d03 --- /dev/null +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/PurgeFilesProcedureTest.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure + +import org.apache.paimon.spark.PaimonSparkTestBase + +import org.apache.spark.sql.Row + +class PurgeFilesProcedureTest extends PaimonSparkTestBase { + + test("Paimon procedure: purge files test") { + spark.sql(s""" + |CREATE TABLE T (id STRING, name STRING) + |USING PAIMON + |""".stripMargin) + + spark.sql("insert into T select '1', 'aa'"); + checkAnswer(spark.sql("select * from test.T"), Row("1", "aa") :: Nil) + + spark.sql("CALL paimon.sys.purge_files(table => 'test.T')") + checkAnswer(spark.sql("select * from test.T"), Nil) + + spark.sql("refresh table test.T"); + spark.sql("insert into T select '2', 'aa'"); + checkAnswer(spark.sql("select * from test.T"), Row("2", "aa") :: Nil) + } + +}