Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue-624] Python API support for the new unified sink #625

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 36 additions & 9 deletions documentation/src/docs/python.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

This Pravega Python DataStream connector provides a data source and data sink for Flink streaming jobs.

Your Flink streaming jobs could use Pravega as their storage with these [Python API Wrappers](https://github.com/pravega/flink-connectors/tree/master/src/main/python). This page only describes the API usage and for parameter concepts please refer to [Configurations](configurations.md) and [Streaming](streaming.md)
Your Flink streaming jobs could use Pravega as their storage with these [Python API Wrappers](https://github.com/pravega/flink-connectors/tree/master/src/main/python). This page only describes the API usage and for parameter concepts please refer to [Configurations](configurations.md) and [Streaming](streaming.md).

**DISCLAIMER: This python wrapper is an IMPLEMENTATION REFERENCE and is not officially published.**
**DISCLAIMER: This python wrapper is an IMPLEMENTATION REFERENCE and is not officially supported.**

* [How to use](#How-to-use)
* [PravegaConfig](#PravegaConfig)
Expand Down Expand Up @@ -47,7 +47,9 @@ A `StreamCut` object could be constructed from the `from_base64` class method wh

By default, the `FlinkPravegaReader` will pass the `UNBOUNDED` `StreamCut` which let the reader read from the HEAD to the TAIL.

## FlinkPravegaReader
## Source

### FlinkPravegaReader

Use `FlinkPravegaReader` as a datastream source. Could be added by `env.add_source`.

Expand All @@ -61,10 +63,9 @@ from pravega_reader import FlinkPravegaReader
env = StreamExecutionEnvironment.get_execution_environment()

pravega_config = PravegaConfig(uri=uri, scope=scope)
pravega_reader = FlinkPravegaReader(
stream=stream,
pravega_config=pravega_config,
deserialization_schema=SimpleStringSchema())
pravega_reader = FlinkPravegaReader(stream=stream,
pravega_config=pravega_config,
deserialization_schema=SimpleStringSchema())

ds = env.add_source(pravega_reader)
```
Expand All @@ -85,7 +86,9 @@ ds = env.add_source(pravega_reader)
|event_read_timeout|timedelta|No|None(1 second on java side)|Sets the timeout for the call to read events from Pravega. After the timeout expires (without an event being returned), another call will be made.|
|max_outstanding_checkpoint_request|int|No|None(3 on java side)|Configures the maximum outstanding checkpoint requests to Pravega.|

## FlinkPravegaWriter
## Sink

### FlinkPravegaWriter

Use `FlinkPravegaWriter` as a datastream sink. Could be added by `env.add_sink`.

Expand All @@ -103,9 +106,33 @@ pravega_writer = FlinkPravegaWriter(stream=stream,
pravega_config=pravega_config,
serialization_schema=SimpleStringSchema())

ds = env.add_sink(pravega_reader)
ds = env.add_sink(pravega_writer)
```

### PravegaSink

Use `PravegaSink` as a datastream sink. Could be added by `env.sink_to`.

```python
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream import StreamExecutionEnvironment

from pravega_config import PravegaConfig
from pravega_writer import PravegaSink

env = StreamExecutionEnvironment.get_execution_environment()

pravega_config = PravegaConfig(uri=uri, scope=scope)
pravega_sink = PravegaSink(stream=stream,
pravega_config=pravega_config,
serialization_schema=SimpleStringSchema())

ds = env.sink_to(pravega_sink)
```

### Configurations

Both `FlinkPravegaWriter` and `PravegaSink` share the same parameters.
|parameter|type|required|default value|description|
|-|-|-|-|-|
|stream|Union[str, Stream]|Yes|N/A|Add a stream to be read by the source, from the earliest available position in the stream.|
Expand Down
66 changes: 66 additions & 0 deletions src/main/python/pravega_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from py4j.java_gateway import JavaObject
from pyflink.common.serialization import SerializationSchema
from pyflink.datastream.connectors import Sink
from pyflink.datastream.functions import SinkFunction
from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import to_j_flink_time
Expand Down Expand Up @@ -71,6 +72,7 @@ def __init__(
Args:
stream (Union[str, Stream]):
Add a stream to be written to by the writer.
In `scope/stream` format if the type is str.

pravega_config (PravegaConfig):
Set the Pravega client configuration, which includes
Expand Down Expand Up @@ -123,3 +125,67 @@ def __init__(

super(FlinkPravegaWriter,
self).__init__(sink_func=j_flink_pravega_writer)


class PravegaSink(Sink):
"""Flink sink implementation for writing into pravega storage."""
def __init__(
self,
stream: Union[str, Stream],
pravega_config: PravegaConfig,
serialization_schema: SerializationSchema,
enable_metrics: bool = True,
writer_mode: PravegaWriterMode = PravegaWriterMode.ATLEAST_ONCE,
enable_watermark: bool = False,
txn_lease_renewal_period: timedelta = timedelta(seconds=30)
) -> None:
"""Build the `FlinkPravegaWriter` with options.
NOTE: `withEventRouter` is not supported yet.
Args:
stream (Union[str, Stream]):
Add a stream to be written to by the writer.
In `scope/stream` format if the type is str.
pravega_config (PravegaConfig):
Set the Pravega client configuration, which includes
connection info, security info, and a default scope.
serialization_schema (SerializationSchema):
Sets the serialization schema.
enable_metrics (bool, optional):
Pravega writer metrics. Defaults to True.
writer_mode (PravegaWriterMode, optional):
Sets the writer mode to provide at-least-once or exactly-once
guarantees. Defaults to PravegaWriterMode.ATLEAST_ONCE.
enable_watermark (bool, optional):
Enable watermark. Defaults to False.
txn_lease_renewal_period (timedelta, optional):
Sets the transaction lease renewal period.
Defaults to 30 seconds on java side.
When the writer mode is set to EXACTLY_ONCE, transactions are
used to persist events to the Pravega stream. The transaction
interval corresponds to the Flink checkpoint interval.
Throughout that interval, the transaction is kept alive with a
lease that is periodically renewed. This configuration setting
sets the lease renewal period.
"""
j_builder: JavaObject = get_gateway().jvm \
.io.pravega.connectors.flink.sink.PravegaSink.builder()

# AbstractWriterBuilder
j_builder.forStream(stream if type(stream) ==
str else stream._j_stream)
j_builder.withPravegaConfig(pravega_config._j_pravega_config)
j_builder.enableMetrics(enable_metrics)

# AbstractStreamingWriterBuilder
j_builder.withWriterMode(writer_mode._to_j_pravega_writer_mode())
j_builder.enableWatermark(enable_watermark)
j_builder.withTxnLeaseRenewalPeriod(
to_j_flink_time(txn_lease_renewal_period))

# PravegaSink.Builder
j_builder.withSerializationSchema(
serialization_schema._j_serialization_schema)

j_pravega_sink: JavaObject = j_builder.build()

super(PravegaSink, self).__init__(sink=j_pravega_sink)