Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Commit

Permalink
Add documentation for the sink.
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias committed Jan 13, 2018
1 parent a0d31bc commit 0c64c97
Showing 1 changed file with 50 additions and 0 deletions.
50 changes: 50 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ It's worth familiarising yourself with [Sequence numbers and Sub sequence number
* [Usage: Producer](#usage-usage-producer)
* [Actor Based Implementation](#usage-usage-producer-actor-based-implementation)
* [Pure Scala based implementation (simple wrapper around KPL)](#usage-usage-producer-pure-scala-based-implementation-simple-wrapper-around-kpl)
* [Akka Stream Sink](#akka-stream-sink)
* [Running the reliability test](#running-the-reliability-test)
* [Delete & recreate kinesisstreams and dynamo table](#running-the-reliability-test-delete-recreate-kinesisstreams-and-dynamo-table)
* [Running the producer-consumer test](#running-the-reliability-test-running-the-producer-consumer-test)
Expand Down Expand Up @@ -484,6 +485,55 @@ callback onFailure {
}
```

<a name="akka-stream-sink"></a>
### Akka Stream Sink

An Akka `Sink` is provided which can be used to publish messages via streams.
Every message is send as `ProduserEvent` to the `Sink`, which defines the PartitionKey as well as the payload.
The `Sink` is created from a `ProducerConf` or directly with a `KinesisProducerActor`. See [Kinesis](https://github.com/WW-Digital/reactive-kinesis/blob/master/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala) for the various options.

The `Sink` expects an acknowledgement for every messages send to Kinesis.
An amount of unacknowledged messages can be configured, before back pressure is applied.
See the throttling conf for defining this configuration value.
Please note: a default value (1000 messages) is applied, if throttling is not configured.

The provided `Sink` produces a `Future[Done]` as materialized value.
This future succeeds, if all messages from upstream are send to Kinesis and acknowledged.
It fails if a message could not be send to Kinesis or upstream fails.

```scala
import akka.stream.scaladsl.Source
import com.weightwatchers.reactive.kinesis.models._
import com.weightwatchers.reactive.kinesis.stream._

Source(1.to(100).map(_.toString))
.map(num => ProducerEvent(num, num))
.runWith(Kinesis.sink("producer-name"))
.onComplete {
case Success(_) => println("All messages are published successfully.")
case Failure(ex) => println(s"Failed to publish messages: ${ex.getMessage}")
}
```

A long running flow can be easily achieved using a `SourceQueue`.
In this case the flow stays open as long as needed.
New elements can be published via the materialized queue:

```scala
import akka.stream.scaladsl.Source
import com.weightwatchers.reactive.kinesis.models._
import com.weightwatchers.reactive.kinesis.stream._

val sourceQueue = Source.queue[ProducerEvent](1000, OverflowStrategy.fail)
.toMat(Kinesis.sink("producer-name"))(Keep.left)
.run()

sourceQueue.offer(ProducerEvent("foo", "bar"))
sourceQueue.offer(ProducerEvent("foo", "baz"))
```

The `Sink` uses a `KinesisProducerActor` under the cover. All rules regarding this actor also apply for the `Sink`.


<a name="running-the-reliability-test"></a>
# Running the reliability test
Expand Down

0 comments on commit 0c64c97

Please sign in to comment.