diff --git a/documentation/src/docs/python.md b/documentation/src/docs/python.md index f2fe1f86..6f855a84 100644 --- a/documentation/src/docs/python.md +++ b/documentation/src/docs/python.md @@ -2,9 +2,9 @@ This Pravega Python DataStream connector provides a data source and data sink for Flink streaming jobs. -Your Flink streaming jobs could use Pravega as their storage with these [Python API Wrappers](https://github.com/pravega/flink-connectors/tree/master/src/main/python). This page only describes the API usage and for parameter concepts please refer to [Configurations](configurations.md) and [Streaming](streaming.md) +Your Flink streaming jobs could use Pravega as their storage with these [Python API Wrappers](https://github.com/pravega/flink-connectors/tree/master/src/main/python). This page only describes the API usage and for parameter concepts please refer to [Configurations](configurations.md) and [Streaming](streaming.md). -**DISCLAIMER: This python wrapper is an IMPLEMENTATION REFERENCE and is not officially published.** +**DISCLAIMER: This python wrapper is an IMPLEMENTATION REFERENCE and is not officially supported.** * [How to use](#How-to-use) * [PravegaConfig](#PravegaConfig) @@ -47,7 +47,9 @@ A `StreamCut` object could be constructed from the `from_base64` class method wh By default, the `FlinkPravegaReader` will pass the `UNBOUNDED` `StreamCut` which let the reader read from the HEAD to the TAIL. -## FlinkPravegaReader +## Source + +### FlinkPravegaReader Use `FlinkPravegaReader` as a datastream source. Could be added by `env.add_source`. @@ -61,10 +63,9 @@ from pravega_reader import FlinkPravegaReader env = StreamExecutionEnvironment.get_execution_environment() pravega_config = PravegaConfig(uri=uri, scope=scope) -pravega_reader = FlinkPravegaReader( - stream=stream, - pravega_config=pravega_config, - deserialization_schema=SimpleStringSchema()) +pravega_reader = FlinkPravegaReader(stream=stream, + pravega_config=pravega_config, + deserialization_schema=SimpleStringSchema()) ds = env.add_source(pravega_reader) ``` @@ -85,7 +86,9 @@ ds = env.add_source(pravega_reader) |event_read_timeout|timedelta|No|None(1 second on java side)|Sets the timeout for the call to read events from Pravega. After the timeout expires (without an event being returned), another call will be made.| |max_outstanding_checkpoint_request|int|No|None(3 on java side)|Configures the maximum outstanding checkpoint requests to Pravega.| -## FlinkPravegaWriter +## Sink + +### FlinkPravegaWriter Use `FlinkPravegaWriter` as a datastream sink. Could be added by `env.add_sink`. @@ -103,9 +106,33 @@ pravega_writer = FlinkPravegaWriter(stream=stream, pravega_config=pravega_config, serialization_schema=SimpleStringSchema()) -ds = env.add_sink(pravega_reader) +ds = env.add_sink(pravega_writer) +``` + +### PravegaSink + +Use `PravegaSink` as a datastream sink. Could be added by `env.sink_to`. + +```python +from pyflink.common.serialization import SimpleStringSchema +from pyflink.datastream import StreamExecutionEnvironment + +from pravega_config import PravegaConfig +from pravega_writer import PravegaSink + +env = StreamExecutionEnvironment.get_execution_environment() + +pravega_config = PravegaConfig(uri=uri, scope=scope) +pravega_sink = PravegaSink(stream=stream, + pravega_config=pravega_config, + serialization_schema=SimpleStringSchema()) + +ds = env.sink_to(pravega_sink) ``` +### Configurations + +Both `FlinkPravegaWriter` and `PravegaSink` share the same parameters. |parameter|type|required|default value|description| |-|-|-|-|-| |stream|Union[str, Stream]|Yes|N/A|Add a stream to be read by the source, from the earliest available position in the stream.| diff --git a/src/main/python/pravega_writer.py b/src/main/python/pravega_writer.py index 54b86665..41917de6 100644 --- a/src/main/python/pravega_writer.py +++ b/src/main/python/pravega_writer.py @@ -20,6 +20,7 @@ from py4j.java_gateway import JavaObject from pyflink.common.serialization import SerializationSchema +from pyflink.datastream.connectors import Sink from pyflink.datastream.functions import SinkFunction from pyflink.java_gateway import get_gateway from pyflink.util.java_utils import to_j_flink_time @@ -71,6 +72,7 @@ def __init__( Args: stream (Union[str, Stream]): Add a stream to be written to by the writer. + In `scope/stream` format if the type is str. pravega_config (PravegaConfig): Set the Pravega client configuration, which includes @@ -123,3 +125,67 @@ def __init__( super(FlinkPravegaWriter, self).__init__(sink_func=j_flink_pravega_writer) + + +class PravegaSink(Sink): + """Flink sink implementation for writing into pravega storage.""" + def __init__( + self, + stream: Union[str, Stream], + pravega_config: PravegaConfig, + serialization_schema: SerializationSchema, + enable_metrics: bool = True, + writer_mode: PravegaWriterMode = PravegaWriterMode.ATLEAST_ONCE, + enable_watermark: bool = False, + txn_lease_renewal_period: timedelta = timedelta(seconds=30) + ) -> None: + """Build the `FlinkPravegaWriter` with options. + NOTE: `withEventRouter` is not supported yet. + Args: + stream (Union[str, Stream]): + Add a stream to be written to by the writer. + In `scope/stream` format if the type is str. + pravega_config (PravegaConfig): + Set the Pravega client configuration, which includes + connection info, security info, and a default scope. + serialization_schema (SerializationSchema): + Sets the serialization schema. + enable_metrics (bool, optional): + Pravega writer metrics. Defaults to True. + writer_mode (PravegaWriterMode, optional): + Sets the writer mode to provide at-least-once or exactly-once + guarantees. Defaults to PravegaWriterMode.ATLEAST_ONCE. + enable_watermark (bool, optional): + Enable watermark. Defaults to False. + txn_lease_renewal_period (timedelta, optional): + Sets the transaction lease renewal period. + Defaults to 30 seconds on java side. + When the writer mode is set to EXACTLY_ONCE, transactions are + used to persist events to the Pravega stream. The transaction + interval corresponds to the Flink checkpoint interval. + Throughout that interval, the transaction is kept alive with a + lease that is periodically renewed. This configuration setting + sets the lease renewal period. + """ + j_builder: JavaObject = get_gateway().jvm \ + .io.pravega.connectors.flink.sink.PravegaSink.builder() + + # AbstractWriterBuilder + j_builder.forStream(stream if type(stream) == + str else stream._j_stream) + j_builder.withPravegaConfig(pravega_config._j_pravega_config) + j_builder.enableMetrics(enable_metrics) + + # AbstractStreamingWriterBuilder + j_builder.withWriterMode(writer_mode._to_j_pravega_writer_mode()) + j_builder.enableWatermark(enable_watermark) + j_builder.withTxnLeaseRenewalPeriod( + to_j_flink_time(txn_lease_renewal_period)) + + # PravegaSink.Builder + j_builder.withSerializationSchema( + serialization_schema._j_serialization_schema) + + j_pravega_sink: JavaObject = j_builder.build() + + super(PravegaSink, self).__init__(sink=j_pravega_sink)