Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Add an Akka Stream Sink graph stage for Kinesis. #47

Merged
merged 12 commits into from
Feb 1, 2018
50 changes: 50 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ It's worth familiarising yourself with [Sequence numbers and Sub sequence number
* [Usage: Producer](#usage-usage-producer)
* [Actor Based Implementation](#usage-usage-producer-actor-based-implementation)
* [Pure Scala based implementation (simple wrapper around KPL)](#usage-usage-producer-pure-scala-based-implementation-simple-wrapper-around-kpl)
* [Akka Stream Sink](#akka-stream-sink)
* [Running the reliability test](#running-the-reliability-test)
* [Delete & recreate kinesisstreams and dynamo table](#running-the-reliability-test-delete-recreate-kinesisstreams-and-dynamo-table)
* [Running the producer-consumer test](#running-the-reliability-test-running-the-producer-consumer-test)
Expand Down Expand Up @@ -484,6 +485,55 @@ callback onFailure {
}
```

<a name="akka-stream-sink"></a>
### Akka Stream Sink

An Akka `Sink` is provided which can be used to publish messages via streams.
Every message is send as `ProduserEvent` to the `Sink`, which defines the PartitionKey as well as the payload.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny grammar correction: Every message is sent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grammar my old enemy - thanks for correction.

The `Sink` is created from a `ProducerConf` or directly with a `KinesisProducerActor`. See [Kinesis](https://github.com/WW-Digital/reactive-kinesis/blob/master/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala) for the various options.

The `Sink` expects an acknowledgement for every messages send to Kinesis.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expects an acknowledgement for every message sent to Kinesis

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

An amount of unacknowledged messages can be configured, before back pressure is applied.
See the throttling conf for defining this configuration value.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be clearer as:
This throttling is controlled by the kinesis.{producer}.akka.max-outstanding-requests configuration value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed much better.

Please note: a default value (1000 messages) is applied, if throttling is not configured.

The provided `Sink` produces a `Future[Done]` as materialized value.
This future succeeds, if all messages from upstream are send to Kinesis and acknowledged.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just swap out send for sent :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

It fails if a message could not be send to Kinesis or upstream fails.

```scala
import akka.stream.scaladsl.Source
import com.weightwatchers.reactive.kinesis.models._
import com.weightwatchers.reactive.kinesis.stream._

Source(1.to(100).map(_.toString))
.map(num => ProducerEvent(num, num))
.runWith(Kinesis.sink("producer-name"))
.onComplete {
case Success(_) => println("All messages are published successfully.")
case Failure(ex) => println(s"Failed to publish messages: ${ex.getMessage}")
}
```

A long running flow can be easily achieved using a `SourceQueue`.
In this case the flow stays open as long as needed.
New elements can be published via the materialized queue:

```scala
import akka.stream.scaladsl.Source
import com.weightwatchers.reactive.kinesis.models._
import com.weightwatchers.reactive.kinesis.stream._

val sourceQueue = Source.queue[ProducerEvent](1000, OverflowStrategy.fail)
.toMat(Kinesis.sink("producer-name"))(Keep.left)
.run()

sourceQueue.offer(ProducerEvent("foo", "bar"))
sourceQueue.offer(ProducerEvent("foo", "baz"))
```

The `Sink` uses a `KinesisProducerActor` under the cover. All rules regarding this actor also apply for the `Sink`.


<a name="running-the-reliability-test"></a>
# Running the reliability test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ trait KinesisConfiguration {
| testProducer {
| stream-name = "$streamName"
|
| akka.max-outstanding-requests = 10
|
| kpl {
| Region = us-east-1
|
Expand Down Expand Up @@ -199,7 +201,9 @@ trait KinesisSuite
Some(100.millis))

def consumerConf(batchSize: Long = TestStreamNrOfMessagesPerShard): ConsumerConf = {
consumerConfFor(streamName = TestStreamName, appName = appName, maxRecords = batchSize.toInt)
consumerConfFor(streamName = TestStreamName,
appName = appName,
maxRecords = math.max(1, batchSize.toInt))
}

def producerConf(): ProducerConf = producerConfFor(TestStreamName, appName)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.weightwatchers.reactive.kinesis.stream

import akka.stream.scaladsl.Source
import com.weightwatchers.reactive.kinesis.common.{
AkkaUnitTestLike,
KinesisConfiguration,
KinesisSuite
}
import com.weightwatchers.reactive.kinesis.models.ProducerEvent
import org.scalatest.{FreeSpec, Matchers}

import scala.concurrent.duration._

class KinesisSinkGraphStageIntegrationSpec
extends FreeSpec
with KinesisSuite
with KinesisConfiguration
with AkkaUnitTestLike
with Matchers {

"KinesisSinkGraph" - {

"produced messages are written to the stream" in new withKinesisConfForApp("sink_produce") {
val messageCount = 100
val elements = 1.to(messageCount).map(_.toString)
Source(elements)
.map(num => ProducerEvent(num, num))
.runWith(Kinesis.sink(producerConf()))
.futureValue
val list = testConsumer.retrieveRecords(TestStreamName, messageCount)
list should contain allElementsOf elements
testConsumer.shutdown()
}

"upstream fail should fail the materialized value of the sink" in new withKinesisConfForApp(
"sink_fail"
) {
Source
.failed(new IllegalStateException("Boom"))
.runWith(Kinesis.sink(producerConf()))
.failed
.futureValue shouldBe a[IllegalStateException]
}
}

// do not create messages in setup, we will create messages inside the test
override def TestStreamNrOfMessagesPerShard: Long = 0
override implicit def patienceConfig: PatienceConfig = PatienceConfig(60.seconds, 1.second)
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.scalatest._

import scala.concurrent.duration._

class KinesisSourceGraphIntegrationSpec
class KinesisSourceGraphStageIntegrationSpec
extends FreeSpec
with KinesisSuite
with KinesisConfiguration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,22 @@

package com.weightwatchers.reactive.kinesis.stream

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
import akka.{Done, NotUsed}
import akka.actor.{ActorSystem, Props}
import akka.stream.scaladsl.{Sink, Source}
import com.amazonaws.auth.AWSCredentialsProvider
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.ConsumerConf
import com.weightwatchers.reactive.kinesis.models.ConsumerEvent
import com.weightwatchers.reactive.kinesis.models.{ConsumerEvent, ProducerEvent}
import com.weightwatchers.reactive.kinesis.producer.{KinesisProducerActor, ProducerConf}

import scala.concurrent.Future

/**
* Main entry point for creating a Kinesis source and sink.
*/
object Kinesis {
object Kinesis extends LazyLogging {

/**
* Create a source, that provides KinesisEvents.
Expand All @@ -39,20 +45,20 @@ object Kinesis {
def source(
consumerConf: ConsumerConf
)(implicit system: ActorSystem): Source[CommittableEvent[ConsumerEvent], NotUsed] = {
Source.fromGraph(new KinesisSourceGraph(consumerConf, system))
Source.fromGraph(new KinesisSourceGraphStage(consumerConf, system))
}

/**
* Create a source by using the actor system configuration, that provides KinesisEvents.
* Please note: every KinesisEvent has to be committed during the user flow!
* Uncommitted events will be retransmitted after a timeout.
*
* The application conf file should look like this:
* A minimal application conf file should look like this:
* {{{
* kinesis {
* application-name = "SampleService"
* consumer-name {
* stream-name = "sample-consumer"
* stream-name = "sample-stream"
* }
* }
* }}}
Expand All @@ -68,4 +74,120 @@ object Kinesis {
): Source[CommittableEvent[ConsumerEvent], NotUsed] = {
source(ConsumerConf(system.settings.config.getConfig(inConfig), consumerName))
}

/**
* Create a Sink that accepts ProducerEvents, which get published to Kinesis.
*
* The sink itself sends all events to an actor, which is created with the given Props.
* Every message send needs to be acknowledged by the underlying producer actor.
*
* This sink signals back pressure, if more than maxOutstanding messages are not acknowledged.
*
* The sink produces a materialized value `Future[Done]`, which is finished if all messages of the stream are send to the producer actor _and_ got acknowledged.
* The future fails, if the sending an event fails or upstream has failed the stream.
*
* @param props the props to create a producer actor. This is a function to work around #48.
* @param maxOutStanding the number of messages to send to the actor unacknowledged before back pressure is applied.
* @param system the actor system.
* @return A sink that accepts ProducerEvents.
*/
def sink(props: => Props, maxOutStanding: Int)(
implicit system: ActorSystem
): Sink[ProducerEvent, Future[Done]] = {
Sink.fromGraph(new KinesisSinkGraphStage(props, maxOutStanding, system))
}

/**
* Create a Sink that accepts ProducerEvents, which get published to Kinesis.
*
* The sink itself sends all events to an KinesisProducerActor which is configured with given config object.
* Every message send needs to be acknowledged by the underlying producer actor.
*
* This sink signals back pressure, if more messages than configured in throttling conf are not acknowledged.
* If throttling is not configured, a default value (= 1000 messages) is applied.
*
* The sink produces a materialized value `Future[Done]`, which is finished if all messages of the stream are send to the producer actor _and_ got acknowledged.
* The future fails, if the sending an event fails or upstream has failed the stream.
*
* @param producerConf the configuration to create KinesisProducerActor
* @param system the actor system.
* @return A sink that accepts ProducerEvents.
*/
def sink(
producerConf: ProducerConf
)(implicit system: ActorSystem): Sink[ProducerEvent, Future[Done]] = {
val maxOutstanding = producerConf.throttlingConf.fold {
logger.info(
"Producer throttling not configured - set maxOutstanding to 1000. Configure with: kinesis.{producer}.akka.max-outstanding-requests=1000"
)
1000
}(_.maxOutstandingRequests)
sink(KinesisProducerActor.props(producerConf), maxOutstanding)
}

/**
* Create a Sink that accepts ProducerEvents, which get published to Kinesis.
*
* The sink itself sends all events to an KinesisProducerActor which is configured from the system configuration for given producer name.
* Every message send needs to be acknowledged by the underlying producer actor.
*
* This sink signals back pressure, if more messages than configured in throttling conf are not acknowledged.
* If throttling is not configured, a default value (= 1000 messages) is applied.
*
* The sink produces a materialized value `Future[Done]`, which is finished if all messages of the stream are send to the producer actor _and_ got acknowledged.
* The future fails, if the sending an event fails or upstream has failed the stream.
*
* @param kinesisConfig the configuration object that holds the producer config.
* @param producerName the name of the producer in the system configuration.
* @param credentialsProvider the AWS credentials provider to use to connect.
* @param system the actor system.
* @return A sink that accepts ProducerEvents.
*/
def sink(kinesisConfig: Config,
producerName: String,
credentialsProvider: Option[AWSCredentialsProvider])(
implicit system: ActorSystem
): Sink[ProducerEvent, Future[Done]] = {
sink(
ProducerConf(kinesisConfig, producerName, credentialsProvider)
)
}

/**
* Create a Sink that accepts ProducerEvents, which get published to Kinesis.
*
* The sink itself sends all events to an KinesisProducerActor which is configured from the system configuration for given producer name.
* Every message send needs to be acknowledged by the underlying producer actor.
*
* This sink signals back pressure, if more messages than configured in throttling conf are not acknowledged.
* If throttling is not configured, a default value (= 1000 messages) is applied.
*
* The sink produces a materialized value `Future[Done]`, which is finished if all messages of the stream are send to the producer actor _and_ got acknowledged.
* The future fails, if the sending an event fails or upstream has failed the stream.
*
* A minimal application conf file should look like this:
* {{{
* kinesis {
* application-name = "SampleService"
* producer-name {
* stream-name = "sample-stream"
* akka.max-outstanding-requests = 100
* }
* }
* }}}
* See kinesis reference.conf for a list of all available config options.
*
* @param producerName the name of the producer in the system configuration.
* @param inConfig the configuration object that holds the producer config (usually kinesis).
* @param credentialsProvider the AWS credentials provider to use to connect.
* @param system the actor system.
* @return A sink that accepts ProducerEvents.
*/
def sink(producerName: String,
inConfig: String = "kinesis",
credentialsProvider: Option[AWSCredentialsProvider] = None)(
implicit system: ActorSystem
): Sink[ProducerEvent, Future[Done]] = {
sink(system.settings.config.getConfig(inConfig), producerName, credentialsProvider)
}
}
Loading