diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java index e1807191defe..3e52322a6f58 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java @@ -32,7 +32,7 @@ /** Compact manifest file to reduce deleted manifest entries. */ public class CompactManifestProcedure extends ProcedureBase { - private static final String commitUser = "Compact-Manifest-Procedure-Committer"; + private static final String COMMIT_USER = "Compact-Manifest-Procedure-Committer"; @Override public String identifier() { @@ -47,7 +47,7 @@ public String[] call(ProcedureContext procedureContext, String tableId) throws E table(tableId) .copy( Collections.singletonMap( - CoreOptions.COMMIT_USER_PREFIX.key(), commitUser)); + CoreOptions.COMMIT_USER_PREFIX.key(), COMMIT_USER)); try (FileStoreCommit commit = table.store() diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java index d2a7180413b7..dee0c38d46fb 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java @@ -18,6 +18,7 @@ package org.apache.paimon.spark; +import org.apache.paimon.spark.procedure.CompactManifestProcedure; import org.apache.paimon.spark.procedure.CompactProcedure; import org.apache.paimon.spark.procedure.CreateBranchProcedure; import org.apache.paimon.spark.procedure.CreateTagFromTimestampProcedure; @@ -81,6 +82,7 @@ private static Map> initProcedureBuilders() { procedureBuilders.put("fast_forward", FastForwardProcedure::builder); procedureBuilders.put("reset_consumer", ResetConsumerProcedure::builder); procedureBuilders.put("mark_partition_done", MarkPartitionDoneProcedure::builder); + procedureBuilders.put("compact_manifest", CompactManifestProcedure::builder); return procedureBuilders.build(); } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java new file mode 100644 index 000000000000..dd064d892c3d --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure; + +import org.apache.paimon.operation.FileStoreCommit; +import org.apache.paimon.table.FileStoreTable; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +import static org.apache.spark.sql.types.DataTypes.StringType; + +/** + * Compact manifest procedure. Usage: + * + *

+ *  CALL sys.compact_manifest(table => 'tableId')
+ * 
+ */ +public class CompactManifestProcedure extends BaseProcedure { + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] {ProcedureParameter.required("table", StringType)}; + + private static final StructType OUTPUT_TYPE = + new StructType( + new StructField[] { + new StructField("result", DataTypes.BooleanType, true, Metadata.empty()) + }); + + protected CompactManifestProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + + Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + FileStoreTable table = (FileStoreTable) loadSparkTable(tableIdent).getTable(); + + try (FileStoreCommit commit = + table.store() + .newCommit(table.coreOptions().createCommitUser()) + .ignoreEmptyCommit(false)) { + commit.compactManifest(); + } + + return new InternalRow[] {newInternalRow(true)}; + } + + @Override + public String description() { + return "This procedure execute compact action on paimon table."; + } + + public static ProcedureBuilder builder() { + return new Builder() { + @Override + public CompactManifestProcedure doBuild() { + return new CompactManifestProcedure(tableCatalog()); + } + }; + } +} diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactManifestProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactManifestProcedureTest.scala new file mode 100644 index 000000000000..c1c90251338f --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactManifestProcedureTest.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure + +import org.apache.paimon.spark.PaimonSparkTestBase + +import org.apache.spark.sql.streaming.StreamTest +import org.assertj.core.api.Assertions + +/** Test compact manifest procedure. See [[CompactManifestProcedure]]. */ +class CompactManifestProcedureTest extends PaimonSparkTestBase with StreamTest { + + test("Paimon Procedure: compact manifest") { + spark.sql( + s""" + |CREATE TABLE T (id INT, value STRING, dt STRING, hh INT) + |TBLPROPERTIES ('bucket'='-1', 'write-only'='true', 'compaction.min.file-num'='2', 'compaction.max.file-num'='2') + |PARTITIONED BY (dt, hh) + |""".stripMargin) + + spark.sql(s"INSERT INTO T VALUES (5, '5', '2024-01-02', 0), (6, '6', '2024-01-02', 1)") + spark.sql(s"INSERT OVERWRITE T VALUES (5, '5', '2024-01-02', 0), (6, '6', '2024-01-02', 1)") + spark.sql(s"INSERT OVERWRITE T VALUES (5, '5', '2024-01-02', 0), (6, '6', '2024-01-02', 1)") + spark.sql(s"INSERT OVERWRITE T VALUES (5, '5', '2024-01-02', 0), (6, '6', '2024-01-02', 1)") + + Thread.sleep(10000); + + var rows = spark.sql("SELECT sum(num_deleted_files) FROM `T$manifests`").collectAsList() + Assertions.assertThat(rows.get(0).getLong(0)).isEqualTo(6L) + spark.sql("CALL sys.compact_manifest(table => 'T')") + rows = spark.sql("SELECT sum(num_deleted_files) FROM `T$manifests`").collectAsList() + Assertions.assertThat(rows.get(0).getLong(0)).isEqualTo(0L) + } +}