diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index aeab93eda81f..fd788301eea9 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -551,6 +551,12 @@
String |
The default partition name in case the dynamic partition column value is null/empty string. |
+
+ partition.end-input-to-done |
+ false |
+ Boolean |
+ Whether mark the done status to indicate that the data is ready when end input. |
+
partition.expiration-check-interval |
1 h |
diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index cf4c7c1da692..ea5dd5dfb163 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -80,12 +80,6 @@
Integer |
If the pending snapshot count exceeds the threshold, lookup operator will refresh the table in sync. |
-
- partition.end-input-to-done |
- false |
- Boolean |
- Whether mark the done status to indicate that the data is ready when end input. |
-
partition.idle-time-to-done |
(none) |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 1256c7ba0d87..d965fd142deb 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -28,6 +28,7 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.options.ExpireConfig;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
@@ -818,6 +819,13 @@ public class CoreOptions implements Serializable {
+ "$hour:00:00'."))
.build());
+ public static final ConfigOption PARTITION_MARK_DONE_WHEN_END_INPUT =
+ ConfigOptions.key("partition.end-input-to-done")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether mark the done status to indicate that the data is ready when end input.");
+
public static final ConfigOption SCAN_PLAN_SORT_PARTITION =
key("scan.plan-sort-partition")
.booleanType()
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index d181d7b5a0c6..2c12b70a2493 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -355,13 +355,6 @@ public class FlinkConnectorOptions {
"You can specify time interval for partition, for example, "
+ "daily partition is '1 d', hourly partition is '1 h'.");
- public static final ConfigOption PARTITION_MARK_DONE_WHEN_END_INPUT =
- ConfigOptions.key("partition.end-input-to-done")
- .booleanType()
- .defaultValue(false)
- .withDescription(
- "Whether mark the done status to indicate that the data is ready when end input.");
-
public static final ConfigOption CLUSTERING_COLUMNS =
key("sink.clustering.by-columns")
.stringType()
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
index 39438a101b04..2c5d55f203dc 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
@@ -42,8 +42,8 @@
import java.util.List;
import java.util.Set;
+import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE;
-import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
/** Mark partition done. */
public class PartitionMarkDone implements Closeable {
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
index 5b1b53d6326a..0094c5511809 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
@@ -44,8 +44,8 @@
import java.util.List;
import java.util.Map;
+import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE;
-import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_TIME_INTERVAL;
import static org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath;
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkBatchJobPartitionMarkdoneITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkBatchJobPartitionMarkdoneITCase.java
index 9c97151ecbe7..340213aeae1b 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkBatchJobPartitionMarkdoneITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkBatchJobPartitionMarkdoneITCase.java
@@ -161,7 +161,7 @@ private FileStoreTable buildFileStoreTable(int[] partitions, int[] primaryKey)
options.set(BUCKET, 3);
options.set(PATH, getTempDirPath());
options.set(FILE_FORMAT, CoreOptions.FILE_FORMAT_AVRO);
- options.set(FlinkConnectorOptions.PARTITION_MARK_DONE_WHEN_END_INPUT.key(), "true");
+ options.set(CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT.key(), "true");
Path tablePath = new CoreOptions(options.toMap()).path();
if (primaryKey.length == 0) {
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
index 6dc14c9d0342..ecef6d337a6b 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
@@ -51,7 +51,7 @@
import static java.util.Collections.singletonList;
import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION;
-import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
+import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
import static org.assertj.core.api.Assertions.assertThat;
class PartitionMarkDoneTest extends TableTestBase {
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index fe740ea8ca11..eda39ae1e02d 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -18,11 +18,15 @@
package org.apache.paimon.spark.commands
+import org.apache.paimon.CoreOptions
import org.apache.paimon.CoreOptions.DYNAMIC_PARTITION_OVERWRITE
import org.apache.paimon.options.Options
+import org.apache.paimon.partition.actions.PartitionMarkDoneAction
import org.apache.paimon.spark._
import org.apache.paimon.spark.schema.SparkSystemColumns
import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.sink.CommitMessage
+import org.apache.paimon.utils.{InternalRowPartitionComputer, PartitionPathUtils, TypeUtils}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
@@ -63,9 +67,28 @@ case class WriteIntoPaimonTable(
val commitMessages = writer.write(data)
writer.commit(commitMessages)
+ markDoneIfNeeded(commitMessages)
Seq.empty
}
+ private def markDoneIfNeeded(commitMessages: Seq[CommitMessage]): Unit = {
+ val coreOptions = table.coreOptions()
+ if (coreOptions.toConfiguration.get(CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT)) {
+ val actions = PartitionMarkDoneAction.createActions(table, table.coreOptions())
+ val partitionComputer = new InternalRowPartitionComputer(
+ coreOptions.partitionDefaultName,
+ TypeUtils.project(table.rowType(), table.partitionKeys()),
+ table.partitionKeys().asScala.toArray
+ )
+ val partitions = commitMessages
+ .map(c => c.partition())
+ .map(p => PartitionPathUtils.generatePartitionPath(partitionComputer.generatePartValues(p)))
+ for (partition <- partitions) {
+ actions.forEach(a => a.markDone(partition))
+ }
+ }
+ }
+
private def parseSaveMode(): (Boolean, Map[String, String]) = {
var dynamicPartitionOverwriteMode = false
val overwritePartition = saveMode match {
diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
index 0cc17639fd80..0ff104eeb13e 100644
--- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
+++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
@@ -36,6 +36,7 @@
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.io.TempDir;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
@@ -318,6 +319,20 @@ public void testChangelogFilePrefixForPkTable() throws Exception {
Assertions.assertEquals(1, dataFileCount(files2, "test-changelog-"));
}
+ @Test
+ public void testMarkDone() throws IOException {
+ spark.sql(
+ "CREATE TABLE T (a INT, b INT, c STRING) PARTITIONED BY (c) TBLPROPERTIES ("
+ + "'partition.end-input-to-done' = 'true', 'partition.mark-done-action' = 'success-file')");
+ spark.sql("INSERT INTO T VALUES (1, 1, 'aa')");
+
+ FileStoreTable table = getTable("T");
+ FileIO fileIO = table.fileIO();
+ Path tabLocation = table.location();
+
+ Assertions.assertTrue(fileIO.exists(new Path(tabLocation, "c=aa/_SUCCESS")));
+ }
+
protected static FileStoreTable getTable(String tableName) {
return FileStoreTableFactory.create(
LocalFileIO.create(),