diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index d464881d6058..80e120740ea4 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -201,6 +201,20 @@ This section introduce all available spark procedures about paimon.
-- delete consumer
CALL sys.reset_consumer(table => 'default.T', consumerId => 'myid')
+
+
+ mark_partition_done |
+
+ To mark partition to be done. Arguments:
+ table: the target table identifier. Cannot be empty.
+ partitions: partitions need to be mark done, If you specify multiple partitions, delimiter is ';'.
+ |
+
+ -- mark single partition done
+ CALL sys.mark_partition_done(table => 'default.T', parititions => 'day=2024-07-01')
+ -- mark multiple partitions done
+ CALL sys.mark_partition_done(table => 'default.T', parititions => 'day=2024-07-01;day=2024-07-02')
+ |
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index 9faeb0865a68..3e5709606fc6 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -509,6 +509,12 @@
Duration |
The expiration interval of a partition. A partition will be expired if it‘s lifetime is over this value. Partition time is extracted from the partition value. |
+
+ partition.mark-done-action |
+ "success-file" |
+ String |
+ Action to mark a partition done is to notify the downstream application that the partition has finished writing, the partition is ready to be read. 1. 'success-file': add '_success' file to directory. 2. 'done-partition': add 'xxx.done' partition to metastore. Both can be configured at the same time: 'done-partition,success-file'. |
+
partition.timestamp-formatter |
(none) |
diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 32dbe6c0fa16..f5c98df171eb 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -92,12 +92,6 @@
Duration |
Set a time duration when a partition has no new data after this time duration, mark the done status to indicate that the data is ready. |
-
- partition.mark-done-action |
- "success-file" |
- String |
- Action to mark a partition done is to notify the downstream application that the partition has finished writing, the partition is ready to be read. 1. 'success-file': add '_success' file to directory. 2. 'done-partition': add 'xxx.done' partition to metastore. Both can be configured at the same time: 'done-partition,success-file'. |
-
partition.time-interval |
(none) |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 0ec6d59a085b..d5bee40e883f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1033,6 +1033,25 @@ public class CoreOptions implements Serializable {
"Parameter string for the constructor of class #. "
+ "Callback class should parse the parameter by itself.");
+ public static final ConfigOption PARTITION_MARK_DONE_ACTION =
+ key("partition.mark-done-action")
+ .stringType()
+ .defaultValue("success-file")
+ .withDescription(
+ Description.builder()
+ .text(
+ "Action to mark a partition done is to notify the downstream application that the partition"
+ + " has finished writing, the partition is ready to be read.")
+ .linebreak()
+ .text("1. 'success-file': add '_success' file to directory.")
+ .linebreak()
+ .text(
+ "2. 'done-partition': add 'xxx.done' partition to metastore.")
+ .linebreak()
+ .text(
+ "Both can be configured at the same time: 'done-partition,success-file'.")
+ .build());
+
public static final ConfigOption METASTORE_PARTITIONED_TABLE =
key("metastore.partitioned-table")
.booleanType()
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/AddDonePartitionAction.java b/paimon-core/src/main/java/org/apache/paimon/partition/actions/AddDonePartitionAction.java
similarity index 89%
rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/AddDonePartitionAction.java
rename to paimon-core/src/main/java/org/apache/paimon/partition/actions/AddDonePartitionAction.java
index ea888c6d596f..c6db6cb6e63a 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/AddDonePartitionAction.java
+++ b/paimon-core/src/main/java/org/apache/paimon/partition/actions/AddDonePartitionAction.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.partition.actions;
import org.apache.paimon.fs.Path;
import org.apache.paimon.metastore.MetastoreClient;
@@ -25,13 +25,12 @@
import java.io.IOException;
import java.util.LinkedHashMap;
-import java.util.Map.Entry;
+import java.util.Map;
import static org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath;
/** A {@link PartitionMarkDoneAction} which add ".done" partition. */
public class AddDonePartitionAction implements PartitionMarkDoneAction {
-
private final MetastoreClient metastoreClient;
public AddDonePartitionAction(MetastoreClient metastoreClient) {
@@ -41,12 +40,12 @@ public AddDonePartitionAction(MetastoreClient metastoreClient) {
@Override
public void markDone(String partition) throws Exception {
LinkedHashMap doneSpec = extractPartitionSpecFromPath(new Path(partition));
- Entry lastField = tailEntry(doneSpec);
+ Map.Entry lastField = tailEntry(doneSpec);
doneSpec.put(lastField.getKey(), lastField.getValue() + ".done");
metastoreClient.addPartition(doneSpec);
}
- private Entry tailEntry(LinkedHashMap partitionSpec) {
+ private Map.Entry tailEntry(LinkedHashMap partitionSpec) {
return Iterators.getLast(partitionSpec.entrySet().iterator());
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/MarkPartitionDoneEventAction.java b/paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java
similarity index 97%
rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/MarkPartitionDoneEventAction.java
rename to paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java
index a6dc0bd1e364..a5ebe34051c1 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/MarkPartitionDoneEventAction.java
+++ b/paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.partition.actions;
import org.apache.paimon.fs.Path;
import org.apache.paimon.metastore.MetastoreClient;
@@ -28,6 +28,7 @@
/** A {@link PartitionMarkDoneAction} which add mark "PartitionEventType.LOAD_DONE". */
public class MarkPartitionDoneEventAction implements PartitionMarkDoneAction {
+
private final MetastoreClient metastoreClient;
public MarkPartitionDoneEventAction(MetastoreClient metastoreClient) {
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneAction.java b/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java
similarity index 96%
rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneAction.java
rename to paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java
index 2177f7d0fb6c..f79992efb96a 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneAction.java
+++ b/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.partition.actions;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.metastore.MetastoreClient;
@@ -28,7 +28,7 @@
import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
-import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_ACTION;
+import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/SuccessFileMarkDoneAction.java b/paimon-core/src/main/java/org/apache/paimon/partition/actions/SuccessFileMarkDoneAction.java
similarity index 79%
rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/SuccessFileMarkDoneAction.java
rename to paimon-core/src/main/java/org/apache/paimon/partition/actions/SuccessFileMarkDoneAction.java
index ac97b6f14edc..7c4ec375aad9 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/SuccessFileMarkDoneAction.java
+++ b/paimon-core/src/main/java/org/apache/paimon/partition/actions/SuccessFileMarkDoneAction.java
@@ -16,10 +16,16 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.partition.actions;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.partition.file.SuccessFile;
+
+import javax.annotation.Nullable;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
/** A {@link PartitionMarkDoneAction} which create "_SUCCESS" file. */
public class SuccessFileMarkDoneAction implements PartitionMarkDoneAction {
@@ -49,6 +55,16 @@ public void markDone(String partition) throws Exception {
fileIO.overwriteFileUtf8(successPath, successFile.toJson());
}
+ @Nullable
+ public static SuccessFile safelyFromPath(FileIO fileIO, Path path) throws IOException {
+ try {
+ String json = fileIO.readFileUtf8(path);
+ return SuccessFile.fromJson(json);
+ } catch (FileNotFoundException e) {
+ return null;
+ }
+ }
+
@Override
public void close() {}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/SuccessFile.java b/paimon-core/src/main/java/org/apache/paimon/partition/file/SuccessFile.java
similarity index 98%
rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/SuccessFile.java
rename to paimon-core/src/main/java/org/apache/paimon/partition/file/SuccessFile.java
index 4d3656e6fe07..39a202ccc72b 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/SuccessFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/partition/file/SuccessFile.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.partition.file;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 4c5d1f762072..b42de765eef3 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -357,25 +357,6 @@ public class FlinkConnectorOptions {
"You can specify time interval for partition, for example, "
+ "daily partition is '1 d', hourly partition is '1 h'.");
- public static final ConfigOption PARTITION_MARK_DONE_ACTION =
- key("partition.mark-done-action")
- .stringType()
- .defaultValue("success-file")
- .withDescription(
- Description.builder()
- .text(
- "Action to mark a partition done is to notify the downstream application that the partition"
- + " has finished writing, the partition is ready to be read.")
- .linebreak()
- .text("1. 'success-file': add '_success' file to directory.")
- .linebreak()
- .text(
- "2. 'done-partition': add 'xxx.done' partition to metastore.")
- .linebreak()
- .text(
- "Both can be configured at the same time: 'done-partition,success-file'.")
- .build());
-
public static final ConfigOption PARTITION_MARK_DONE_WHEN_END_INPUT =
ConfigOptions.key("partition.end-input-to-done")
.booleanType()
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
index f323c5509249..9fd906ee44dc 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.action;
-import org.apache.paimon.flink.sink.partition.PartitionMarkDoneAction;
+import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.PartitionPathUtils;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
index d846b25ee1b5..d70cccf6ba25 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
@@ -21,7 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.sink.partition.PartitionMarkDoneAction;
+import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.IOUtils;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
index 6a92a0548f4d..ac5895767c4c 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
@@ -22,6 +22,7 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.utils.IOUtils;
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkBatchJobPartitionMarkdoneITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkBatchJobPartitionMarkdoneITCase.java
index 8e606b9a6e67..9c97151ecbe7 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkBatchJobPartitionMarkdoneITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkBatchJobPartitionMarkdoneITCase.java
@@ -20,10 +20,10 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
-import org.apache.paimon.flink.sink.partition.SuccessFile;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.file.SuccessFile;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
index f2f163d6eebb..e6c2c8678ea8 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
@@ -20,8 +20,8 @@
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.flink.sink.partition.SuccessFile;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.partition.file.SuccessFile;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.types.DataType;
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
index cff4479d99f5..5338b3886001 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
@@ -20,6 +20,7 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.metastore.MetastoreClient;
+import org.apache.paimon.partition.actions.AddDonePartitionAction;
import org.junit.jupiter.api.Test;
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/SuccessFileMarkDoneActionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/SuccessFileMarkDoneActionTest.java
index 5a6818b25d3c..95be025405f9 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/SuccessFileMarkDoneActionTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/SuccessFileMarkDoneActionTest.java
@@ -20,6 +20,8 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.partition.actions.SuccessFileMarkDoneAction;
+import org.apache.paimon.partition.file.SuccessFile;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index 36d5082661fe..f143cf7b6109 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -26,6 +26,7 @@
import org.apache.paimon.spark.procedure.ExpirePartitionsProcedure;
import org.apache.paimon.spark.procedure.ExpireSnapshotsProcedure;
import org.apache.paimon.spark.procedure.FastForwardProcedure;
+import org.apache.paimon.spark.procedure.MarkPartitionDoneProcedure;
import org.apache.paimon.spark.procedure.MigrateFileProcedure;
import org.apache.paimon.spark.procedure.MigrateTableProcedure;
import org.apache.paimon.spark.procedure.Procedure;
@@ -70,6 +71,7 @@ private static Map> initProcedureBuilders() {
procedureBuilders.put("repair", RepairProcedure::builder);
procedureBuilders.put("fast_forward", FastForwardProcedure::builder);
procedureBuilders.put("reset_consumer", ResetConsumerProcedure::builder);
+ procedureBuilders.put("mark_partition_done", MarkPartitionDoneProcedure::builder);
return procedureBuilders.build();
}
}
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedure.java
new file mode 100644
index 000000000000..ff064e9140da
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedure.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.PartitionPathUtils;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+
+import static org.apache.paimon.utils.ParameterUtils.getPartitions;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Partition mark done procedure. Usage:
+ *
+ *
+ * CALL sys.mark_partition_done('tableId', 'partition1;partition2')
+ *
+ */
+public class MarkPartitionDoneProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter[] PARAMETERS =
+ new ProcedureParameter[] {
+ ProcedureParameter.required("table", StringType),
+ ProcedureParameter.required("partitions", StringType)
+ };
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ private static final StructType OUTPUT_TYPE =
+ new StructType(
+ new StructField[] {
+ new StructField("result", DataTypes.BooleanType, true, Metadata.empty())
+ });
+
+ protected MarkPartitionDoneProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
+ String partitionStr = args.getString(1);
+ String[] partitions = partitionStr.split(";");
+
+ return modifyPaimonTable(
+ tableIdent,
+ table -> {
+ checkArgument(
+ table instanceof FileStoreTable,
+ "Only FileStoreTable supports mark_partition_done procedure. The table type is '%s'.",
+ table.getClass().getName());
+
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ CoreOptions coreOptions = fileStoreTable.coreOptions();
+ List actions =
+ PartitionMarkDoneAction.createActions(fileStoreTable, coreOptions);
+
+ List partitionPaths =
+ PartitionPathUtils.generatePartitionPaths(
+ getPartitions(partitions),
+ fileStoreTable.store().partitionType());
+
+ markDone(partitionPaths, actions);
+
+ IOUtils.closeAllQuietly(actions);
+ InternalRow outputRow = newInternalRow(true);
+ return new InternalRow[] {outputRow};
+ });
+ }
+
+ public static void markDone(List partitions, List actions) {
+ for (String partition : partitions) {
+ try {
+ for (PartitionMarkDoneAction action : actions) {
+ action.markDone(partition);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static ProcedureBuilder builder() {
+ return new BaseProcedure.Builder() {
+ @Override
+ public MarkPartitionDoneProcedure doBuild() {
+ return new MarkPartitionDoneProcedure(tableCatalog());
+ }
+ };
+ }
+
+ @Override
+ public String description() {
+ return "MarkPartitionDoneProcedure";
+ }
+}
diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedureTest.scala
new file mode 100644
index 000000000000..8abc7ddfdae8
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedureTest.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.fs.Path
+import org.apache.paimon.partition.file.SuccessFile
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions.assertThat
+
+class MarkPartitionDoneProcedureTest extends PaimonSparkTestBase {
+
+ test("Paimon procedure: mark_partition_done test") {
+ spark.sql(s"""
+ |CREATE TABLE T (id STRING, name STRING, day STRING)
+ |USING PAIMON
+ |PARTITIONED BY (day)
+ |TBLPROPERTIES (
+ |'primary-key'='day,id',
+ |'partition.mark-done-action'='success-file')
+ |""".stripMargin)
+
+ spark.sql(s"INSERT INTO T VALUES ('1', 'a', '2024-07-13')")
+ spark.sql(s"INSERT INTO T VALUES ('2', 'b', '2024-07-14')")
+
+ checkAnswer(
+ spark.sql(
+ "CALL paimon.sys.mark_partition_done(" +
+ "table => 'test.T', partitions => 'day=2024-07-13;day=2024-07-14')"),
+ Row(true) :: Nil)
+
+ val table = loadTable("T")
+
+ val successPath1 = new Path(table.location, "day=2024-07-13/_SUCCESS")
+ val successFile1 = SuccessFile.safelyFromPath(table.fileIO, successPath1)
+ assertThat(successFile1).isNotNull
+
+ val successPath2 = new Path(table.location, "day=2024-07-14/_SUCCESS")
+ val successFile2 = SuccessFile.safelyFromPath(table.fileIO, successPath2)
+ assertThat(successFile2).isNotNull
+
+ }
+
+}