Skip to content

Commit

Permalink
[spark] Support MarkPartitionDoneProcedure in Spark (#3746)
Browse files Browse the repository at this point in the history
  • Loading branch information
xuzifu666 authored Jul 16, 2024
1 parent d09d2aa commit d01a384
Show file tree
Hide file tree
Showing 20 changed files with 269 additions and 39 deletions.
14 changes: 14 additions & 0 deletions docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,20 @@ This section introduce all available spark procedures about paimon.
-- delete consumer<br/>
CALL sys.reset_consumer(table => 'default.T', consumerId => 'myid')
</td>
</tr>
<tr>
<td>mark_partition_done</td>
<td>
To mark partition to be done. Arguments:
<li>table: the target table identifier. Cannot be empty.</li>
<li>partitions: partitions need to be mark done, If you specify multiple partitions, delimiter is ';'.</li>
</td>
<td>
-- mark single partition done<br/>
CALL sys.mark_partition_done(table => 'default.T', parititions => 'day=2024-07-01')<br/><br/>
-- mark multiple partitions done<br/>
CALL sys.mark_partition_done(table => 'default.T', parititions => 'day=2024-07-01;day=2024-07-02')
</td>
</tr>
</tbody>
</table>
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,12 @@
<td>Duration</td>
<td>The expiration interval of a partition. A partition will be expired if it‘s lifetime is over this value. Partition time is extracted from the partition value.</td>
</tr>
<tr>
<td><h5>partition.mark-done-action</h5></td>
<td style="word-wrap: break-word;">"success-file"</td>
<td>String</td>
<td>Action to mark a partition done is to notify the downstream application that the partition has finished writing, the partition is ready to be read.<br />1. 'success-file': add '_success' file to directory.<br />2. 'done-partition': add 'xxx.done' partition to metastore.<br />Both can be configured at the same time: 'done-partition,success-file'.</td>
</tr>
<tr>
<td><h5>partition.timestamp-formatter</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,6 @@
<td>Duration</td>
<td>Set a time duration when a partition has no new data after this time duration, mark the done status to indicate that the data is ready.</td>
</tr>
<tr>
<td><h5>partition.mark-done-action</h5></td>
<td style="word-wrap: break-word;">"success-file"</td>
<td>String</td>
<td>Action to mark a partition done is to notify the downstream application that the partition has finished writing, the partition is ready to be read.<br />1. 'success-file': add '_success' file to directory.<br />2. 'done-partition': add 'xxx.done' partition to metastore.<br />Both can be configured at the same time: 'done-partition,success-file'.</td>
</tr>
<tr>
<td><h5>partition.time-interval</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
19 changes: 19 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,25 @@ public class CoreOptions implements Serializable {
"Parameter string for the constructor of class #. "
+ "Callback class should parse the parameter by itself.");

public static final ConfigOption<String> PARTITION_MARK_DONE_ACTION =
key("partition.mark-done-action")
.stringType()
.defaultValue("success-file")
.withDescription(
Description.builder()
.text(
"Action to mark a partition done is to notify the downstream application that the partition"
+ " has finished writing, the partition is ready to be read.")
.linebreak()
.text("1. 'success-file': add '_success' file to directory.")
.linebreak()
.text(
"2. 'done-partition': add 'xxx.done' partition to metastore.")
.linebreak()
.text(
"Both can be configured at the same time: 'done-partition,success-file'.")
.build());

public static final ConfigOption<Boolean> METASTORE_PARTITIONED_TABLE =
key("metastore.partitioned-table")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.paimon.flink.sink.partition;
package org.apache.paimon.partition.actions;

import org.apache.paimon.fs.Path;
import org.apache.paimon.metastore.MetastoreClient;
Expand All @@ -25,13 +25,12 @@

import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map.Entry;
import java.util.Map;

import static org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath;

/** A {@link PartitionMarkDoneAction} which add ".done" partition. */
public class AddDonePartitionAction implements PartitionMarkDoneAction {

private final MetastoreClient metastoreClient;

public AddDonePartitionAction(MetastoreClient metastoreClient) {
Expand All @@ -41,12 +40,12 @@ public AddDonePartitionAction(MetastoreClient metastoreClient) {
@Override
public void markDone(String partition) throws Exception {
LinkedHashMap<String, String> doneSpec = extractPartitionSpecFromPath(new Path(partition));
Entry<String, String> lastField = tailEntry(doneSpec);
Map.Entry<String, String> lastField = tailEntry(doneSpec);
doneSpec.put(lastField.getKey(), lastField.getValue() + ".done");
metastoreClient.addPartition(doneSpec);
}

private Entry<String, String> tailEntry(LinkedHashMap<String, String> partitionSpec) {
private Map.Entry<String, String> tailEntry(LinkedHashMap<String, String> partitionSpec) {
return Iterators.getLast(partitionSpec.entrySet().iterator());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.paimon.flink.sink.partition;
package org.apache.paimon.partition.actions;

import org.apache.paimon.fs.Path;
import org.apache.paimon.metastore.MetastoreClient;
Expand All @@ -28,6 +28,7 @@

/** A {@link PartitionMarkDoneAction} which add mark "PartitionEventType.LOAD_DONE". */
public class MarkPartitionDoneEventAction implements PartitionMarkDoneAction {

private final MetastoreClient metastoreClient;

public MarkPartitionDoneEventAction(MetastoreClient metastoreClient) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.paimon.flink.sink.partition;
package org.apache.paimon.partition.actions;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.metastore.MetastoreClient;
Expand All @@ -28,7 +28,7 @@
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_ACTION;
import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@
* limitations under the License.
*/

package org.apache.paimon.flink.sink.partition;
package org.apache.paimon.partition.actions;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.partition.file.SuccessFile;

import javax.annotation.Nullable;

import java.io.FileNotFoundException;
import java.io.IOException;

/** A {@link PartitionMarkDoneAction} which create "_SUCCESS" file. */
public class SuccessFileMarkDoneAction implements PartitionMarkDoneAction {
Expand Down Expand Up @@ -49,6 +55,16 @@ public void markDone(String partition) throws Exception {
fileIO.overwriteFileUtf8(successPath, successFile.toJson());
}

@Nullable
public static SuccessFile safelyFromPath(FileIO fileIO, Path path) throws IOException {
try {
String json = fileIO.readFileUtf8(path);
return SuccessFile.fromJson(json);
} catch (FileNotFoundException e) {
return null;
}
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.paimon.flink.sink.partition;
package org.apache.paimon.partition.file;

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,25 +357,6 @@ public class FlinkConnectorOptions {
"You can specify time interval for partition, for example, "
+ "daily partition is '1 d', hourly partition is '1 h'.");

public static final ConfigOption<String> PARTITION_MARK_DONE_ACTION =
key("partition.mark-done-action")
.stringType()
.defaultValue("success-file")
.withDescription(
Description.builder()
.text(
"Action to mark a partition done is to notify the downstream application that the partition"
+ " has finished writing, the partition is ready to be read.")
.linebreak()
.text("1. 'success-file': add '_success' file to directory.")
.linebreak()
.text(
"2. 'done-partition': add 'xxx.done' partition to metastore.")
.linebreak()
.text(
"Both can be configured at the same time: 'done-partition,success-file'.")
.build());

public static final ConfigOption<Boolean> PARTITION_MARK_DONE_WHEN_END_INPUT =
ConfigOptions.key("partition.end-input-to-done")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.paimon.flink.action;

import org.apache.paimon.flink.sink.partition.PartitionMarkDoneAction;
import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.PartitionPathUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.partition.PartitionMarkDoneAction;
import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.IOUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.utils.IOUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.flink.sink.partition.SuccessFile;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.file.SuccessFile;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.flink.sink.partition.SuccessFile;
import org.apache.paimon.fs.Path;
import org.apache.paimon.partition.file.SuccessFile;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.types.DataType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.partition.actions.AddDonePartitionAction;

import org.junit.jupiter.api.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.partition.actions.SuccessFileMarkDoneAction;
import org.apache.paimon.partition.file.SuccessFile;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.spark.procedure.ExpirePartitionsProcedure;
import org.apache.paimon.spark.procedure.ExpireSnapshotsProcedure;
import org.apache.paimon.spark.procedure.FastForwardProcedure;
import org.apache.paimon.spark.procedure.MarkPartitionDoneProcedure;
import org.apache.paimon.spark.procedure.MigrateFileProcedure;
import org.apache.paimon.spark.procedure.MigrateTableProcedure;
import org.apache.paimon.spark.procedure.Procedure;
Expand Down Expand Up @@ -70,6 +71,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
procedureBuilders.put("repair", RepairProcedure::builder);
procedureBuilders.put("fast_forward", FastForwardProcedure::builder);
procedureBuilders.put("reset_consumer", ResetConsumerProcedure::builder);
procedureBuilders.put("mark_partition_done", MarkPartitionDoneProcedure::builder);
return procedureBuilders.build();
}
}
Loading

0 comments on commit d01a384

Please sign in to comment.