Skip to content

Commit

Permalink
[spark] support to mark done for spark writer (#4363)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi authored Oct 23, 2024
1 parent 20790af commit a0f3ea8
Show file tree
Hide file tree
Showing 10 changed files with 56 additions and 17 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,12 @@
<td>String</td>
<td>The default partition name in case the dynamic partition column value is null/empty string.</td>
</tr>
<tr>
<td><h5>partition.end-input-to-done</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether mark the done status to indicate that the data is ready when end input.</td>
</tr>
<tr>
<td><h5>partition.expiration-check-interval</h5></td>
<td style="word-wrap: break-word;">1 h</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,6 @@
<td>Integer</td>
<td>If the pending snapshot count exceeds the threshold, lookup operator will refresh the table in sync.</td>
</tr>
<tr>
<td><h5>partition.end-input-to-done</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether mark the done status to indicate that the data is ready when end input.</td>
</tr>
<tr>
<td><h5>partition.idle-time-to-done</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.options.ExpireConfig;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
Expand Down Expand Up @@ -818,6 +819,13 @@ public class CoreOptions implements Serializable {
+ "$hour:00:00'."))
.build());

public static final ConfigOption<Boolean> PARTITION_MARK_DONE_WHEN_END_INPUT =
ConfigOptions.key("partition.end-input-to-done")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether mark the done status to indicate that the data is ready when end input.");

public static final ConfigOption<Boolean> SCAN_PLAN_SORT_PARTITION =
key("scan.plan-sort-partition")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,13 +355,6 @@ public class FlinkConnectorOptions {
"You can specify time interval for partition, for example, "
+ "daily partition is '1 d', hourly partition is '1 h'.");

public static final ConfigOption<Boolean> PARTITION_MARK_DONE_WHEN_END_INPUT =
ConfigOptions.key("partition.end-input-to-done")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether mark the done status to indicate that the data is ready when end input.");

public static final ConfigOption<String> CLUSTERING_COLUMNS =
key("sink.clustering.by-columns")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
import java.util.List;
import java.util.Set;

import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE;
import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;

/** Mark partition done. */
public class PartitionMarkDone implements Closeable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@
import java.util.List;
import java.util.Map;

import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE;
import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_TIME_INTERVAL;
import static org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private FileStoreTable buildFileStoreTable(int[] partitions, int[] primaryKey)
options.set(BUCKET, 3);
options.set(PATH, getTempDirPath());
options.set(FILE_FORMAT, CoreOptions.FILE_FORMAT_AVRO);
options.set(FlinkConnectorOptions.PARTITION_MARK_DONE_WHEN_END_INPUT.key(), "true");
options.set(CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT.key(), "true");

Path tablePath = new CoreOptions(options.toMap()).path();
if (primaryKey.length == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import static java.util.Collections.singletonList;
import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION;
import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
import static org.assertj.core.api.Assertions.assertThat;

class PartitionMarkDoneTest extends TableTestBase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@

package org.apache.paimon.spark.commands

import org.apache.paimon.CoreOptions
import org.apache.paimon.CoreOptions.DYNAMIC_PARTITION_OVERWRITE
import org.apache.paimon.options.Options
import org.apache.paimon.partition.actions.PartitionMarkDoneAction
import org.apache.paimon.spark._
import org.apache.paimon.spark.schema.SparkSystemColumns
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink.CommitMessage
import org.apache.paimon.utils.{InternalRowPartitionComputer, PartitionPathUtils, TypeUtils}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
Expand Down Expand Up @@ -63,9 +67,28 @@ case class WriteIntoPaimonTable(
val commitMessages = writer.write(data)
writer.commit(commitMessages)

markDoneIfNeeded(commitMessages)
Seq.empty
}

private def markDoneIfNeeded(commitMessages: Seq[CommitMessage]): Unit = {
val coreOptions = table.coreOptions()
if (coreOptions.toConfiguration.get(CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT)) {
val actions = PartitionMarkDoneAction.createActions(table, table.coreOptions())
val partitionComputer = new InternalRowPartitionComputer(
coreOptions.partitionDefaultName,
TypeUtils.project(table.rowType(), table.partitionKeys()),
table.partitionKeys().asScala.toArray
)
val partitions = commitMessages
.map(c => c.partition())
.map(p => PartitionPathUtils.generatePartitionPath(partitionComputer.generatePartValues(p)))
for (partition <- partitions) {
actions.forEach(a => a.markDone(partition))
}
}
}

private def parseSaveMode(): (Boolean, Map[String, String]) = {
var dynamicPartitionOverwriteMode = false
val overwritePartition = saveMode match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -318,6 +319,20 @@ public void testChangelogFilePrefixForPkTable() throws Exception {
Assertions.assertEquals(1, dataFileCount(files2, "test-changelog-"));
}

@Test
public void testMarkDone() throws IOException {
spark.sql(
"CREATE TABLE T (a INT, b INT, c STRING) PARTITIONED BY (c) TBLPROPERTIES ("
+ "'partition.end-input-to-done' = 'true', 'partition.mark-done-action' = 'success-file')");
spark.sql("INSERT INTO T VALUES (1, 1, 'aa')");

FileStoreTable table = getTable("T");
FileIO fileIO = table.fileIO();
Path tabLocation = table.location();

Assertions.assertTrue(fileIO.exists(new Path(tabLocation, "c=aa/_SUCCESS")));
}

protected static FileStoreTable getTable(String tableName) {
return FileStoreTableFactory.create(
LocalFileIO.create(),
Expand Down

0 comments on commit a0f3ea8

Please sign in to comment.