Skip to content

Commit

Permalink
fix comment
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 committed Oct 16, 2024
1 parent 37dad0c commit 1d996fd
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
/** Compact manifest file to reduce deleted manifest entries. */
public class CompactManifestProcedure extends ProcedureBase {

private static final String commitUser = "Compact-Manifest-Procedure-Committer";
private static final String COMMIT_USER = "Compact-Manifest-Procedure-Committer";

@Override
public String identifier() {
Expand All @@ -47,7 +47,7 @@ public String[] call(ProcedureContext procedureContext, String tableId) throws E
table(tableId)
.copy(
Collections.singletonMap(
CoreOptions.COMMIT_USER_PREFIX.key(), commitUser));
CoreOptions.COMMIT_USER_PREFIX.key(), COMMIT_USER));

try (FileStoreCommit commit =
table.store()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.spark;

import org.apache.paimon.spark.procedure.CompactManifestProcedure;
import org.apache.paimon.spark.procedure.CompactProcedure;
import org.apache.paimon.spark.procedure.CreateBranchProcedure;
import org.apache.paimon.spark.procedure.CreateTagFromTimestampProcedure;
Expand Down Expand Up @@ -81,6 +82,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
procedureBuilders.put("fast_forward", FastForwardProcedure::builder);
procedureBuilders.put("reset_consumer", ResetConsumerProcedure::builder);
procedureBuilders.put("mark_partition_done", MarkPartitionDoneProcedure::builder);
procedureBuilders.put("compact_manifest", CompactManifestProcedure::builder);
return procedureBuilders.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.procedure;

import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.table.FileStoreTable;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.types.DataTypes.StringType;

/**
* Compact manifest procedure. Usage:
*
* <pre><code>
* CALL sys.compact_manifest(table => 'tableId')
* </code></pre>
*/
public class CompactManifestProcedure extends BaseProcedure {

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {ProcedureParameter.required("table", StringType)};

private static final StructType OUTPUT_TYPE =
new StructType(
new StructField[] {
new StructField("result", DataTypes.BooleanType, true, Metadata.empty())
});

protected CompactManifestProcedure(TableCatalog tableCatalog) {
super(tableCatalog);
}

@Override
public ProcedureParameter[] parameters() {
return PARAMETERS;
}

@Override
public StructType outputType() {
return OUTPUT_TYPE;
}

@Override
public InternalRow[] call(InternalRow args) {

Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
FileStoreTable table = (FileStoreTable) loadSparkTable(tableIdent).getTable();

try (FileStoreCommit commit =
table.store()
.newCommit(table.coreOptions().createCommitUser())
.ignoreEmptyCommit(false)) {
commit.compactManifest();
}

return new InternalRow[] {newInternalRow(true)};
}

@Override
public String description() {
return "This procedure execute compact action on paimon table.";
}

public static ProcedureBuilder builder() {
return new Builder<CompactManifestProcedure>() {
@Override
public CompactManifestProcedure doBuild() {
return new CompactManifestProcedure(tableCatalog());
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.procedure

import org.apache.paimon.spark.PaimonSparkTestBase

import org.apache.spark.sql.streaming.StreamTest
import org.assertj.core.api.Assertions

/** Test compact manifest procedure. See [[CompactManifestProcedure]]. */
class CompactManifestProcedureTest extends PaimonSparkTestBase with StreamTest {

test("Paimon Procedure: compact manifest") {
spark.sql(
s"""
|CREATE TABLE T (id INT, value STRING, dt STRING, hh INT)
|TBLPROPERTIES ('bucket'='-1', 'write-only'='true', 'compaction.min.file-num'='2', 'compaction.max.file-num'='2')
|PARTITIONED BY (dt, hh)
|""".stripMargin)

spark.sql(s"INSERT INTO T VALUES (5, '5', '2024-01-02', 0), (6, '6', '2024-01-02', 1)")
spark.sql(s"INSERT OVERWRITE T VALUES (5, '5', '2024-01-02', 0), (6, '6', '2024-01-02', 1)")
spark.sql(s"INSERT OVERWRITE T VALUES (5, '5', '2024-01-02', 0), (6, '6', '2024-01-02', 1)")
spark.sql(s"INSERT OVERWRITE T VALUES (5, '5', '2024-01-02', 0), (6, '6', '2024-01-02', 1)")

Thread.sleep(10000);

var rows = spark.sql("SELECT sum(num_deleted_files) FROM `T$manifests`").collectAsList()
Assertions.assertThat(rows.get(0).getLong(0)).isEqualTo(6L)
spark.sql("CALL sys.compact_manifest(table => 'T')")
rows = spark.sql("SELECT sum(num_deleted_files) FROM `T$manifests`").collectAsList()
Assertions.assertThat(rows.get(0).getLong(0)).isEqualTo(0L)
}
}

0 comments on commit 1d996fd

Please sign in to comment.