From c480e610386d8a97a3bbccb939d0c12619f5e1d7 Mon Sep 17 00:00:00 2001 From: Aitozi Date: Tue, 22 Oct 2024 18:14:56 +0800 Subject: [PATCH 1/5] [spark] support to mark done for spark writer --- .../generated/core_configuration.html | 6 +++++ .../flink_connector_configuration.html | 6 ----- .../java/org/apache/paimon/CoreOptions.java | 8 +++++++ .../paimon/flink/FlinkConnectorOptions.java | 7 ------ .../sink/partition/PartitionMarkDone.java | 2 +- .../partition/PartitionMarkDoneTrigger.java | 2 +- .../FlinkBatchJobPartitionMarkdoneITCase.java | 2 +- .../sink/partition/PartitionMarkDoneTest.java | 2 +- paimon-spark/paimon-spark-common/pom.xml | 22 +++++++++--------- .../spark/commands/WriteIntoPaimonTable.scala | 23 +++++++++++++++++++ .../apache/paimon/spark/SparkWriteITCase.java | 15 ++++++++++++ 11 files changed, 67 insertions(+), 28 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index aeab93eda81f..fd788301eea9 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -551,6 +551,12 @@ String The default partition name in case the dynamic partition column value is null/empty string. + +
partition.end-input-to-done
+ false + Boolean + Whether mark the done status to indicate that the data is ready when end input. +
partition.expiration-check-interval
1 h diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html index cf4c7c1da692..ea5dd5dfb163 100644 --- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html +++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html @@ -80,12 +80,6 @@ Integer If the pending snapshot count exceeds the threshold, lookup operator will refresh the table in sync. - -
partition.end-input-to-done
- false - Boolean - Whether mark the done status to indicate that the data is ready when end input. -
partition.idle-time-to-done
(none) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 1256c7ba0d87..d965fd142deb 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -28,6 +28,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.lookup.LookupStrategy; import org.apache.paimon.options.ConfigOption; +import org.apache.paimon.options.ConfigOptions; import org.apache.paimon.options.ExpireConfig; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; @@ -818,6 +819,13 @@ public class CoreOptions implements Serializable { + "$hour:00:00'.")) .build()); + public static final ConfigOption PARTITION_MARK_DONE_WHEN_END_INPUT = + ConfigOptions.key("partition.end-input-to-done") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether mark the done status to indicate that the data is ready when end input."); + public static final ConfigOption SCAN_PLAN_SORT_PARTITION = key("scan.plan-sort-partition") .booleanType() diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java index d181d7b5a0c6..2c12b70a2493 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java @@ -355,13 +355,6 @@ public class FlinkConnectorOptions { "You can specify time interval for partition, for example, " + "daily partition is '1 d', hourly partition is '1 h'."); - public static final ConfigOption PARTITION_MARK_DONE_WHEN_END_INPUT = - ConfigOptions.key("partition.end-input-to-done") - .booleanType() - .defaultValue(false) - .withDescription( - "Whether mark the done status to indicate that the data is ready when end input."); - public static final ConfigOption CLUSTERING_COLUMNS = key("sink.clustering.by-columns") .stringType() diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java index 39438a101b04..2c5d55f203dc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java @@ -42,8 +42,8 @@ import java.util.List; import java.util.Set; +import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT; import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE; -import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_WHEN_END_INPUT; /** Mark partition done. */ public class PartitionMarkDone implements Closeable { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java index 5b1b53d6326a..0094c5511809 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java @@ -44,8 +44,8 @@ import java.util.List; import java.util.Map; +import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT; import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE; -import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_WHEN_END_INPUT; import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_TIME_INTERVAL; import static org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkBatchJobPartitionMarkdoneITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkBatchJobPartitionMarkdoneITCase.java index 9c97151ecbe7..340213aeae1b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkBatchJobPartitionMarkdoneITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkBatchJobPartitionMarkdoneITCase.java @@ -161,7 +161,7 @@ private FileStoreTable buildFileStoreTable(int[] partitions, int[] primaryKey) options.set(BUCKET, 3); options.set(PATH, getTempDirPath()); options.set(FILE_FORMAT, CoreOptions.FILE_FORMAT_AVRO); - options.set(FlinkConnectorOptions.PARTITION_MARK_DONE_WHEN_END_INPUT.key(), "true"); + options.set(CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT.key(), "true"); Path tablePath = new CoreOptions(options.toMap()).path(); if (primaryKey.length == 0) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java index 6dc14c9d0342..ecef6d337a6b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java @@ -51,7 +51,7 @@ import static java.util.Collections.singletonList; import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED; import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION; -import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_WHEN_END_INPUT; +import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT; import static org.assertj.core.api.Assertions.assertThat; class PartitionMarkDoneTest extends TableTestBase { diff --git a/paimon-spark/paimon-spark-common/pom.xml b/paimon-spark/paimon-spark-common/pom.xml index 36139e283261..803cc6779df2 100644 --- a/paimon-spark/paimon-spark-common/pom.xml +++ b/paimon-spark/paimon-spark-common/pom.xml @@ -38,17 +38,17 @@ under the License. - - org.apache.paimon - paimon-bundle - ${project.version} - - - * - * - - - + + + + + + + + + + + org.scala-lang diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala index fe740ea8ca11..5911ff050432 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala @@ -18,11 +18,15 @@ package org.apache.paimon.spark.commands +import org.apache.paimon.CoreOptions import org.apache.paimon.CoreOptions.DYNAMIC_PARTITION_OVERWRITE import org.apache.paimon.options.Options +import org.apache.paimon.partition.actions.PartitionMarkDoneAction import org.apache.paimon.spark._ import org.apache.paimon.spark.schema.SparkSystemColumns import org.apache.paimon.table.FileStoreTable +import org.apache.paimon.table.sink.CommitMessage +import org.apache.paimon.utils.{InternalRowPartitionComputer, PartitionPathUtils, TypeUtils} import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Row, SparkSession} @@ -63,9 +67,28 @@ case class WriteIntoPaimonTable( val commitMessages = writer.write(data) writer.commit(commitMessages) + markDone(commitMessages) Seq.empty } + private def markDone(commitMessages: Seq[CommitMessage]): Unit = { + val coreOptions = table.coreOptions() + if (coreOptions.toConfiguration.get(CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT)) { + val actions = PartitionMarkDoneAction.createActions(originTable, table.coreOptions()) + val partitionComputer = new InternalRowPartitionComputer( + coreOptions.partitionDefaultName, + TypeUtils.project(table.rowType(), table.partitionKeys()), + table.partitionKeys().asScala.toArray + ) + val partitions = commitMessages + .map(c => c.partition()) + .map(p => PartitionPathUtils.generatePartitionPath(partitionComputer.generatePartValues(p))) + for (elem <- partitions) { + actions.forEach(a => a.markDone(elem)) + } + } + } + private def parseSaveMode(): (Boolean, Map[String, String]) = { var dynamicPartitionOverwriteMode = false val overwritePartition = saveMode match { diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java index 0cc17639fd80..0ff104eeb13e 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java @@ -36,6 +36,7 @@ import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.io.TempDir; +import java.io.IOException; import java.util.Arrays; import java.util.Comparator; import java.util.List; @@ -318,6 +319,20 @@ public void testChangelogFilePrefixForPkTable() throws Exception { Assertions.assertEquals(1, dataFileCount(files2, "test-changelog-")); } + @Test + public void testMarkDone() throws IOException { + spark.sql( + "CREATE TABLE T (a INT, b INT, c STRING) PARTITIONED BY (c) TBLPROPERTIES (" + + "'partition.end-input-to-done' = 'true', 'partition.mark-done-action' = 'success-file')"); + spark.sql("INSERT INTO T VALUES (1, 1, 'aa')"); + + FileStoreTable table = getTable("T"); + FileIO fileIO = table.fileIO(); + Path tabLocation = table.location(); + + Assertions.assertTrue(fileIO.exists(new Path(tabLocation, "c=aa/_SUCCESS"))); + } + protected static FileStoreTable getTable(String tableName) { return FileStoreTableFactory.create( LocalFileIO.create(), From 1eea95f02f1f398b395a03d4f929fe60cec961b3 Mon Sep 17 00:00:00 2001 From: Aitozi Date: Tue, 22 Oct 2024 18:16:10 +0800 Subject: [PATCH 2/5] revert --- paimon-spark/paimon-spark-common/pom.xml | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/paimon-spark/paimon-spark-common/pom.xml b/paimon-spark/paimon-spark-common/pom.xml index 803cc6779df2..36139e283261 100644 --- a/paimon-spark/paimon-spark-common/pom.xml +++ b/paimon-spark/paimon-spark-common/pom.xml @@ -38,17 +38,17 @@ under the License. - - - - - - - - - - - + + org.apache.paimon + paimon-bundle + ${project.version} + + + * + * + + + org.scala-lang From e9abebca2239481bd22765487c4cbf51f7ff2edc Mon Sep 17 00:00:00 2001 From: Aitozi Date: Tue, 22 Oct 2024 19:49:40 +0800 Subject: [PATCH 3/5] use tbale --- .../apache/paimon/spark/commands/WriteIntoPaimonTable.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala index 5911ff050432..b44d0016c58c 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala @@ -27,11 +27,10 @@ import org.apache.paimon.spark.schema.SparkSystemColumns import org.apache.paimon.table.FileStoreTable import org.apache.paimon.table.sink.CommitMessage import org.apache.paimon.utils.{InternalRowPartitionComputer, PartitionPathUtils, TypeUtils} - import org.apache.spark.internal.Logging -import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.{DataFrame, Row, SparkSession} import scala.collection.JavaConverters._ @@ -74,7 +73,7 @@ case class WriteIntoPaimonTable( private def markDone(commitMessages: Seq[CommitMessage]): Unit = { val coreOptions = table.coreOptions() if (coreOptions.toConfiguration.get(CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT)) { - val actions = PartitionMarkDoneAction.createActions(originTable, table.coreOptions()) + val actions = PartitionMarkDoneAction.createActions(table, table.coreOptions()) val partitionComputer = new InternalRowPartitionComputer( coreOptions.partitionDefaultName, TypeUtils.project(table.rowType(), table.partitionKeys()), From ec68ce5606862632754a2b53306b66da3e908b9f Mon Sep 17 00:00:00 2001 From: Aitozi Date: Tue, 22 Oct 2024 19:56:17 +0800 Subject: [PATCH 4/5] format --- .../apache/paimon/spark/commands/WriteIntoPaimonTable.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala index b44d0016c58c..f912ddb37741 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala @@ -27,10 +27,11 @@ import org.apache.paimon.spark.schema.SparkSystemColumns import org.apache.paimon.table.FileStoreTable import org.apache.paimon.table.sink.CommitMessage import org.apache.paimon.utils.{InternalRowPartitionComputer, PartitionPathUtils, TypeUtils} + import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command.RunnableCommand -import org.apache.spark.sql.{DataFrame, Row, SparkSession} import scala.collection.JavaConverters._ From d105063b5c37e70e4831278a3d4bb50b0a0d0b04 Mon Sep 17 00:00:00 2001 From: Aitozi Date: Tue, 22 Oct 2024 20:00:13 +0800 Subject: [PATCH 5/5] fix comments --- .../paimon/spark/commands/WriteIntoPaimonTable.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala index f912ddb37741..eda39ae1e02d 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala @@ -67,11 +67,11 @@ case class WriteIntoPaimonTable( val commitMessages = writer.write(data) writer.commit(commitMessages) - markDone(commitMessages) + markDoneIfNeeded(commitMessages) Seq.empty } - private def markDone(commitMessages: Seq[CommitMessage]): Unit = { + private def markDoneIfNeeded(commitMessages: Seq[CommitMessage]): Unit = { val coreOptions = table.coreOptions() if (coreOptions.toConfiguration.get(CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT)) { val actions = PartitionMarkDoneAction.createActions(table, table.coreOptions()) @@ -83,8 +83,8 @@ case class WriteIntoPaimonTable( val partitions = commitMessages .map(c => c.partition()) .map(p => PartitionPathUtils.generatePartitionPath(partitionComputer.generatePartValues(p))) - for (elem <- partitions) { - actions.forEach(a => a.markDone(elem)) + for (partition <- partitions) { + actions.forEach(a => a.markDone(partition)) } } }