Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Commit

Permalink
Add an Akka Stream Sink graph stage for Kinesis. (#47)
Browse files Browse the repository at this point in the history
Add a `KinesisSinkGraphStage` that uses an underlying `KinesisProducerActor` to do the heavy lifting. The stream manages back pressure by allowing only a fixed number of outstanding messages. A materialized value is used to indicate when a stream has been finished (or failed).
  • Loading branch information
aquamatthias authored and markglh committed Feb 1, 2018
1 parent 6ac885e commit f6aaeb7
Show file tree
Hide file tree
Showing 10 changed files with 542 additions and 15 deletions.
50 changes: 50 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ It's worth familiarising yourself with [Sequence numbers and Sub sequence number
* [Usage: Producer](#usage-usage-producer)
* [Actor Based Implementation](#usage-usage-producer-actor-based-implementation)
* [Pure Scala based implementation (simple wrapper around KPL)](#usage-usage-producer-pure-scala-based-implementation-simple-wrapper-around-kpl)
* [Akka Stream Sink](#akka-stream-sink)
* [Running the reliability test](#running-the-reliability-test)
* [Delete & recreate kinesisstreams and dynamo table](#running-the-reliability-test-delete-recreate-kinesisstreams-and-dynamo-table)
* [Running the producer-consumer test](#running-the-reliability-test-running-the-producer-consumer-test)
Expand Down Expand Up @@ -484,6 +485,55 @@ callback onFailure {
}
```

<a name="akka-stream-sink"></a>
### Akka Stream Sink

An Akka `Sink` is provided which can be used to publish messages via streams.
Every message is sent as `ProduserEvent` to the `Sink`, which defines the PartitionKey as well as the payload.
The `Sink` is created from a `ProducerConf` or directly with a `KinesisProducerActor`. See [Kinesis](https://github.com/WW-Digital/reactive-kinesis/blob/master/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala) for the various options.

The `Sink` expects an acknowledgement for every message sent to Kinesis.
An amount of unacknowledged messages can be configured, before back pressure is applied.
This throttling is controlled by the kinesis.{producer}.akka.max-outstanding-requests configuration value.
Please note: a default value (1000 messages) is applied, if throttling is not configured.

The provided `Sink` produces a `Future[Done]` as materialized value.
This future succeeds, if all messages from upstream are sent to Kinesis and acknowledged.
It fails if a message could not be send to Kinesis or upstream fails.

```scala
import akka.stream.scaladsl.Source
import com.weightwatchers.reactive.kinesis.models._
import com.weightwatchers.reactive.kinesis.stream._

Source(1.to(100).map(_.toString))
.map(num => ProducerEvent(num, num))
.runWith(Kinesis.sink("producer-name"))
.onComplete {
case Success(_) => println("All messages are published successfully.")
case Failure(ex) => println(s"Failed to publish messages: ${ex.getMessage}")
}
```

A long running flow can be easily achieved using a `SourceQueue`.
In this case the flow stays open as long as needed.
New elements can be published via the materialized queue:

```scala
import akka.stream.scaladsl.Source
import com.weightwatchers.reactive.kinesis.models._
import com.weightwatchers.reactive.kinesis.stream._

val sourceQueue = Source.queue[ProducerEvent](1000, OverflowStrategy.fail)
.toMat(Kinesis.sink("producer-name"))(Keep.left)
.run()

sourceQueue.offer(ProducerEvent("foo", "bar"))
sourceQueue.offer(ProducerEvent("foo", "baz"))
```

The `Sink` uses a `KinesisProducerActor` under the cover. All rules regarding this actor also apply for the `Sink`.


<a name="running-the-reliability-test"></a>
# Running the reliability test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ trait KinesisConfiguration {
| testProducer {
| stream-name = "$streamName"
|
| akka.max-outstanding-requests = 10
|
| kpl {
| Region = us-east-1
|
Expand Down Expand Up @@ -199,7 +201,9 @@ trait KinesisSuite
Some(100.millis))

def consumerConf(batchSize: Long = TestStreamNrOfMessagesPerShard): ConsumerConf = {
consumerConfFor(streamName = TestStreamName, appName = appName, maxRecords = batchSize.toInt)
consumerConfFor(streamName = TestStreamName,
appName = appName,
maxRecords = math.max(1, batchSize.toInt))
}

def producerConf(): ProducerConf = producerConfFor(TestStreamName, appName)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.weightwatchers.reactive.kinesis.stream

import akka.stream.scaladsl.Source
import com.weightwatchers.reactive.kinesis.common.{
AkkaUnitTestLike,
KinesisConfiguration,
KinesisSuite
}
import com.weightwatchers.reactive.kinesis.models.ProducerEvent
import org.scalatest.{FreeSpec, Matchers}

import scala.concurrent.duration._

class KinesisSinkGraphStageIntegrationSpec
extends FreeSpec
with KinesisSuite
with KinesisConfiguration
with AkkaUnitTestLike
with Matchers {

"KinesisSinkGraph" - {

"produced messages are written to the stream" in new withKinesisConfForApp("sink_produce") {
val messageCount = 100
val elements = 1.to(messageCount).map(_.toString)
Source(elements)
.map(num => ProducerEvent(num, num))
.runWith(Kinesis.sink(producerConf()))
.futureValue
val list = testConsumer.retrieveRecords(TestStreamName, messageCount)
list should contain allElementsOf elements
testConsumer.shutdown()
}

"upstream fail should fail the materialized value of the sink" in new withKinesisConfForApp(
"sink_fail"
) {
Source
.failed(new IllegalStateException("Boom"))
.runWith(Kinesis.sink(producerConf()))
.failed
.futureValue shouldBe a[IllegalStateException]
}
}

// do not create messages in setup, we will create messages inside the test
override def TestStreamNrOfMessagesPerShard: Long = 0
override implicit def patienceConfig: PatienceConfig = PatienceConfig(60.seconds, 1.second)
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.scalatest._

import scala.concurrent.duration._

class KinesisSourceGraphIntegrationSpec
class KinesisSourceGraphStageIntegrationSpec
extends FreeSpec
with KinesisSuite
with KinesisConfiguration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,22 @@

package com.weightwatchers.reactive.kinesis.stream

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
import akka.{Done, NotUsed}
import akka.actor.{ActorSystem, Props}
import akka.stream.scaladsl.{Sink, Source}
import com.amazonaws.auth.AWSCredentialsProvider
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.ConsumerConf
import com.weightwatchers.reactive.kinesis.models.ConsumerEvent
import com.weightwatchers.reactive.kinesis.models.{ConsumerEvent, ProducerEvent}
import com.weightwatchers.reactive.kinesis.producer.{KinesisProducerActor, ProducerConf}

import scala.concurrent.Future

/**
* Main entry point for creating a Kinesis source and sink.
*/
object Kinesis {
object Kinesis extends LazyLogging {

/**
* Create a source, that provides KinesisEvents.
Expand All @@ -39,20 +45,20 @@ object Kinesis {
def source(
consumerConf: ConsumerConf
)(implicit system: ActorSystem): Source[CommittableEvent[ConsumerEvent], NotUsed] = {
Source.fromGraph(new KinesisSourceGraph(consumerConf, system))
Source.fromGraph(new KinesisSourceGraphStage(consumerConf, system))
}

/**
* Create a source by using the actor system configuration, that provides KinesisEvents.
* Please note: every KinesisEvent has to be committed during the user flow!
* Uncommitted events will be retransmitted after a timeout.
*
* The application conf file should look like this:
* A minimal application conf file should look like this:
* {{{
* kinesis {
* application-name = "SampleService"
* consumer-name {
* stream-name = "sample-consumer"
* stream-name = "sample-stream"
* }
* }
* }}}
Expand All @@ -68,4 +74,120 @@ object Kinesis {
): Source[CommittableEvent[ConsumerEvent], NotUsed] = {
source(ConsumerConf(system.settings.config.getConfig(inConfig), consumerName))
}

/**
* Create a Sink that accepts ProducerEvents, which get published to Kinesis.
*
* The sink itself sends all events to an actor, which is created with the given Props.
* Every message send needs to be acknowledged by the underlying producer actor.
*
* This sink signals back pressure, if more than maxOutstanding messages are not acknowledged.
*
* The sink produces a materialized value `Future[Done]`, which is finished if all messages of the stream are send to the producer actor _and_ got acknowledged.
* The future fails, if the sending an event fails or upstream has failed the stream.
*
* @param props the props to create a producer actor. This is a function to work around #48.
* @param maxOutStanding the number of messages to send to the actor unacknowledged before back pressure is applied.
* @param system the actor system.
* @return A sink that accepts ProducerEvents.
*/
def sink(props: => Props, maxOutStanding: Int)(
implicit system: ActorSystem
): Sink[ProducerEvent, Future[Done]] = {
Sink.fromGraph(new KinesisSinkGraphStage(props, maxOutStanding, system))
}

/**
* Create a Sink that accepts ProducerEvents, which get published to Kinesis.
*
* The sink itself sends all events to an KinesisProducerActor which is configured with given config object.
* Every message send needs to be acknowledged by the underlying producer actor.
*
* This sink signals back pressure, if more messages than configured in throttling conf are not acknowledged.
* If throttling is not configured, a default value (= 1000 messages) is applied.
*
* The sink produces a materialized value `Future[Done]`, which is finished if all messages of the stream are send to the producer actor _and_ got acknowledged.
* The future fails, if the sending an event fails or upstream has failed the stream.
*
* @param producerConf the configuration to create KinesisProducerActor
* @param system the actor system.
* @return A sink that accepts ProducerEvents.
*/
def sink(
producerConf: ProducerConf
)(implicit system: ActorSystem): Sink[ProducerEvent, Future[Done]] = {
val maxOutstanding = producerConf.throttlingConf.fold {
logger.info(
"Producer throttling not configured - set maxOutstanding to 1000. Configure with: kinesis.{producer}.akka.max-outstanding-requests=1000"
)
1000
}(_.maxOutstandingRequests)
sink(KinesisProducerActor.props(producerConf), maxOutstanding)
}

/**
* Create a Sink that accepts ProducerEvents, which get published to Kinesis.
*
* The sink itself sends all events to an KinesisProducerActor which is configured from the system configuration for given producer name.
* Every message send needs to be acknowledged by the underlying producer actor.
*
* This sink signals back pressure, if more messages than configured in throttling conf are not acknowledged.
* If throttling is not configured, a default value (= 1000 messages) is applied.
*
* The sink produces a materialized value `Future[Done]`, which is finished if all messages of the stream are send to the producer actor _and_ got acknowledged.
* The future fails, if the sending an event fails or upstream has failed the stream.
*
* @param kinesisConfig the configuration object that holds the producer config.
* @param producerName the name of the producer in the system configuration.
* @param credentialsProvider the AWS credentials provider to use to connect.
* @param system the actor system.
* @return A sink that accepts ProducerEvents.
*/
def sink(kinesisConfig: Config,
producerName: String,
credentialsProvider: Option[AWSCredentialsProvider])(
implicit system: ActorSystem
): Sink[ProducerEvent, Future[Done]] = {
sink(
ProducerConf(kinesisConfig, producerName, credentialsProvider)
)
}

/**
* Create a Sink that accepts ProducerEvents, which get published to Kinesis.
*
* The sink itself sends all events to an KinesisProducerActor which is configured from the system configuration for given producer name.
* Every message send needs to be acknowledged by the underlying producer actor.
*
* This sink signals back pressure, if more messages than configured in throttling conf are not acknowledged.
* If throttling is not configured, a default value (= 1000 messages) is applied.
*
* The sink produces a materialized value `Future[Done]`, which is finished if all messages of the stream are send to the producer actor _and_ got acknowledged.
* The future fails, if the sending an event fails or upstream has failed the stream.
*
* A minimal application conf file should look like this:
* {{{
* kinesis {
* application-name = "SampleService"
* producer-name {
* stream-name = "sample-stream"
* akka.max-outstanding-requests = 100
* }
* }
* }}}
* See kinesis reference.conf for a list of all available config options.
*
* @param producerName the name of the producer in the system configuration.
* @param inConfig the configuration object that holds the producer config (usually kinesis).
* @param credentialsProvider the AWS credentials provider to use to connect.
* @param system the actor system.
* @return A sink that accepts ProducerEvents.
*/
def sink(producerName: String,
inConfig: String = "kinesis",
credentialsProvider: Option[AWSCredentialsProvider] = None)(
implicit system: ActorSystem
): Sink[ProducerEvent, Future[Done]] = {
sink(system.settings.config.getConfig(inConfig), producerName, credentialsProvider)
}
}
Loading

0 comments on commit f6aaeb7

Please sign in to comment.