From 7adf70088334c2e3809b0768c87e6bdd3062bb02 Mon Sep 17 00:00:00 2001 From: Matthias Veit Date: Tue, 2 Jan 2018 19:07:43 +0100 Subject: [PATCH 01/12] Genesis of Kinesis Sink Graph stage. --- .../kinesis/common/KinesisSuite.scala | 2 + .../KinesisSinkGraphIntegrationSpec.scala | 38 +++++ .../reactive/kinesis/stream/Kinesis.scala | 35 ++++- .../kinesis/stream/KinesisSinkGraph.scala | 136 ++++++++++++++++++ 4 files changed, 207 insertions(+), 4 deletions(-) create mode 100644 src/it/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphIntegrationSpec.scala create mode 100644 src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraph.scala diff --git a/src/it/scala/com/weightwatchers/reactive/kinesis/common/KinesisSuite.scala b/src/it/scala/com/weightwatchers/reactive/kinesis/common/KinesisSuite.scala index 5da49bc..756ae86 100644 --- a/src/it/scala/com/weightwatchers/reactive/kinesis/common/KinesisSuite.scala +++ b/src/it/scala/com/weightwatchers/reactive/kinesis/common/KinesisSuite.scala @@ -43,6 +43,8 @@ trait KinesisConfiguration { | testProducer { | stream-name = "$streamName" | + | akka.max-outstanding-requests = 10 + | | kpl { | Region = us-east-1 | diff --git a/src/it/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphIntegrationSpec.scala b/src/it/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphIntegrationSpec.scala new file mode 100644 index 0000000..759baaa --- /dev/null +++ b/src/it/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphIntegrationSpec.scala @@ -0,0 +1,38 @@ +package com.weightwatchers.reactive.kinesis.stream + +import akka.stream.scaladsl.Source +import com.weightwatchers.reactive.kinesis.common.{AkkaUnitTestLike, KinesisConfiguration, KinesisSuite} +import com.weightwatchers.reactive.kinesis.models.ProducerEvent +import org.scalatest.{FreeSpec, Matchers} + +import scala.concurrent.duration._ + +class KinesisSinkGraphIntegrationSpec extends FreeSpec with KinesisSuite with KinesisConfiguration with AkkaUnitTestLike with Matchers { + + "KinesisSinkGraph" - { + + "produced messages are written to the stream" in new withKinesisConfForApp("sink_produce") { + val messageCount = 100 + val elements = 1.to(messageCount).map(_.toString) + Source(elements) + .map(num => ProducerEvent(num, num)) + .runWith(Kinesis.sink(producerConf())) + .futureValue + val list = testConsumer.retrieveRecords(TestStreamName, messageCount) + list should contain allElementsOf elements + testConsumer.shutdown() + } + + "upstream fail should fail the materialized value of the sink" in new withKinesisConfForApp("sink_fail") { + Source + .failed(new IllegalStateException("Boom")) + .runWith(Kinesis.sink(producerConf())) + .failed + .futureValue shouldBe a[IllegalStateException] + } + } + + // do not create messages in setup, we will create messages inside the test + override def TestStreamNrOfMessagesPerShard: Long = 0 + override implicit def patienceConfig: PatienceConfig = PatienceConfig(60.seconds, 1.second) +} diff --git a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala index 291ec89..ffba181 100644 --- a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala +++ b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala @@ -16,11 +16,15 @@ package com.weightwatchers.reactive.kinesis.stream -import akka.NotUsed -import akka.actor.ActorSystem -import akka.stream.scaladsl.Source +import akka.{Done, NotUsed} +import akka.actor.{ActorSystem, Props} +import akka.stream.scaladsl.{Sink, Source} +import com.amazonaws.auth.AWSCredentialsProvider import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.ConsumerConf -import com.weightwatchers.reactive.kinesis.models.ConsumerEvent +import com.weightwatchers.reactive.kinesis.models.{ConsumerEvent, ProducerEvent} +import com.weightwatchers.reactive.kinesis.producer.{KinesisProducerActor, ProducerConf} + +import scala.concurrent.Future /** * Main entry point for creating a Kinesis source and sink. @@ -68,4 +72,27 @@ object Kinesis { ): Source[CommittableEvent[ConsumerEvent], NotUsed] = { source(ConsumerConf(system.settings.config.getConfig(inConfig), consumerName)) } + + def sink(props: Props, maxOutStanding: Int)( + implicit system: ActorSystem + ): Sink[ProducerEvent, Future[Done]] = { + Sink.fromGraph(new KinesisSinkGraph(props, maxOutStanding, system)) + } + + def sink( + producerConf: ProducerConf + )(implicit system: ActorSystem): Sink[ProducerEvent, Future[Done]] = { + val maxOutstanding = producerConf.throttlingConf.fold(Int.MaxValue)(_.maxOutstandingRequests) + sink(KinesisProducerActor.props(producerConf), maxOutstanding) + } + + def sink(producerName: String, + inConfig: String = "kinesis", + credentialsProvider: Option[AWSCredentialsProvider] = None)( + implicit system: ActorSystem + ): Sink[ProducerEvent, Future[Done]] = { + sink( + ProducerConf(system.settings.config.getConfig(inConfig), producerName, credentialsProvider) + ) + } } diff --git a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraph.scala b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraph.scala new file mode 100644 index 0000000..93a956e --- /dev/null +++ b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraph.scala @@ -0,0 +1,136 @@ +/* + * Copyright 2017 WeightWatchers + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.weightwatchers.reactive.kinesis.stream + +import akka.Done +import akka.actor.Actor.Receive +import akka.actor.{ActorRef, ActorSystem, PoisonPill, Props} +import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue, InHandler} +import akka.stream.{Attributes, Inlet, SinkShape} +import com.typesafe.scalalogging.LazyLogging +import com.weightwatchers.reactive.kinesis.models.ProducerEvent +import com.weightwatchers.reactive.kinesis.producer.KinesisProducerActor.{ + SendFailed, + SendSuccessful, + SendWithCallback +} + +import scala.collection.mutable +import scala.concurrent.{Future, Promise} + +class KinesisSinkGraph(producerActorProps: Props, maxOutStanding: Int, actorSystem: ActorSystem) + extends GraphStageWithMaterializedValue[SinkShape[ProducerEvent], Future[Done]] + with LazyLogging { + + private[this] val in: Inlet[ProducerEvent] = Inlet("KinesisSink.in") + override def shape: SinkShape[ProducerEvent] = SinkShape.of(in) + + // The materialized value of this graph stage. + // The promise is fulfilled, when all outstanding messages are acknowledged or the graph stage fails. + val promise: Promise[Done] = Promise() + + override def createLogicAndMaterializedValue( + inheritedAttributes: Attributes + ): (GraphStageLogic, Future[Done]) = { + val logic = new GraphStageLogic(shape) { + + // Holds all outstanding messages by its message identifier. + // Note: the identifier is created here locally and does not make sense outside of this stage. + val outstandingMessages: mutable.AnyRefMap[String, ProducerEvent] = mutable.AnyRefMap.empty + + // The related stage actor. + implicit var graphStageActor: ActorRef = ActorRef.noSender + + // The underlying kinesis producer actor. + var producerActor: ActorRef = ActorRef.noSender + + override def preStart(): Unit = { + super.preStart() + // this stage should keep going, even if upstream is finished + setKeepGoing(true) + // store a reference to the stage actor + graphStageActor = getStageActor(receive).ref + // start up the underlying producer actor + producerActor = actorSystem.actorOf(producerActorProps) + // start the process by signalling demand + pull(in) + } + + override def postStop(): Unit = { + super.postStop() + // stop the underlying producer actor + producerActor ! PoisonPill + // Finish the promise (it could already be finished in case of failure) + if (outstandingMessages.isEmpty) promise.trySuccess(Done) + else + promise.tryFailure( + new IllegalStateException(s"No acknowledge for ${outstandingMessages.size} events.") + ) + } + + setHandler( + in, + new InHandler { + override def onPush(): Unit = { + val element = grab(in) + val toSend = SendWithCallback(element) + outstandingMessages += toSend.messageId -> element + producerActor ! toSend + if (outstandingMessages.size < maxOutStanding) pull(in) + } + + override def onUpstreamFinish(): Unit = { + logger.info("Upstream is finished!") + // Only finish the stage if there are no outstanding messages + // If there are outstanding messages, the receive handler will complete the stage. + if (outstandingMessages.isEmpty) completeStage() + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + logger.info(s"Upstream failed: ${ex.getMessage}") + super.onUpstreamFailure(ex) + // signal the failure to the waiting side + promise.tryFailure(ex) + } + } + ) + + def receive: Receive = { + case (_, SendSuccessful(messageId, recordResult)) => + outstandingMessages.remove(messageId).foreach { producerEvent => + logger.info( + s"Send message: $producerEvent with result $recordResult. ${outstandingMessages.size} messages outstanding." + ) + // upstream is finished? + if (isClosed(in)) { + // only complete the stage if all outstanding messages are acknowledged + if (outstandingMessages.isEmpty) completeStage() + } else { + // signal demand + if (outstandingMessages.size < maxOutStanding) pull(in) + } + } + + case (_, SendFailed(messageId, reason)) => + logger.warn(s"Could not send message with id: $messageId", reason) + failStage(reason) + } + } + + logic -> promise.future + } +} From 62cba2a2e07960d623dee29ff0d5902281ca4590 Mon Sep 17 00:00:00 2001 From: Matthias Veit Date: Thu, 4 Jan 2018 16:10:15 +0100 Subject: [PATCH 02/12] Add documentation. --- .../reactive/kinesis/stream/Kinesis.scala | 74 ++++++++++++++++++- .../kinesis/stream/KinesisSinkGraph.scala | 8 ++ 2 files changed, 78 insertions(+), 4 deletions(-) diff --git a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala index ffba181..28450b1 100644 --- a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala +++ b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala @@ -20,6 +20,7 @@ import akka.{Done, NotUsed} import akka.actor.{ActorSystem, Props} import akka.stream.scaladsl.{Sink, Source} import com.amazonaws.auth.AWSCredentialsProvider +import com.typesafe.scalalogging.LazyLogging import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.ConsumerConf import com.weightwatchers.reactive.kinesis.models.{ConsumerEvent, ProducerEvent} import com.weightwatchers.reactive.kinesis.producer.{KinesisProducerActor, ProducerConf} @@ -29,7 +30,7 @@ import scala.concurrent.Future /** * Main entry point for creating a Kinesis source and sink. */ -object Kinesis { +object Kinesis extends LazyLogging { /** * Create a source, that provides KinesisEvents. @@ -51,12 +52,12 @@ object Kinesis { * Please note: every KinesisEvent has to be committed during the user flow! * Uncommitted events will be retransmitted after a timeout. * - * The application conf file should look like this: + * A minimal application conf file should look like this: * {{{ * kinesis { * application-name = "SampleService" * consumer-name { - * stream-name = "sample-consumer" + * stream-name = "sample-stream" * } * } * }}} @@ -73,19 +74,84 @@ object Kinesis { source(ConsumerConf(system.settings.config.getConfig(inConfig), consumerName)) } + /** + * Create a Sink that accepts ProducerEvents, which get published to Kinesis. + * + * The sink itself sends all events to an actor, which is created with the given Props. + * Every message send needs to be acknowledged by the underlying producer actor. + * + * This sink signals back pressure, if more than maxOutstanding messages are not acknowledged. + * + * The sink produces a materialized value `Future[Done]`, which is finished if all messages of the stream are send to the producer actor _and_ got acknowledged. + * The future fails, if the sending an event fails or upstream has failed the stream. + * + * @param props the props to create a producer actor. + * @param maxOutStanding the number of messages to send to the actor unacknowledged before back pressure is applied. + * @param system the actor system. + * @return A sink that accepts ProducerEvents. + */ def sink(props: Props, maxOutStanding: Int)( implicit system: ActorSystem ): Sink[ProducerEvent, Future[Done]] = { Sink.fromGraph(new KinesisSinkGraph(props, maxOutStanding, system)) } + /** + * Create a Sink that accepts ProducerEvents, which get published to Kinesis. + * + * The sink itself sends all events to an KinesisProducerActor which is configured with given config object. + * Every message send needs to be acknowledged by the underlying producer actor. + * + * This sink signals back pressure, if more messages than configured in throttling conf are not acknowledged. + * If throttling is not configured, a default value (= 1000 messages) is applied. + * + * The sink produces a materialized value `Future[Done]`, which is finished if all messages of the stream are send to the producer actor _and_ got acknowledged. + * The future fails, if the sending an event fails or upstream has failed the stream. + * + * @param producerConf the configuration to create KinesisProducerActor + * @param system the actor system. + * @return A sink that accepts ProducerEvents. + */ def sink( producerConf: ProducerConf )(implicit system: ActorSystem): Sink[ProducerEvent, Future[Done]] = { - val maxOutstanding = producerConf.throttlingConf.fold(Int.MaxValue)(_.maxOutstandingRequests) + val maxOutstanding = producerConf.throttlingConf.fold{ + logger.info("Producer throttling not configured - set maxOutstanding to 1000. Configure with: kinesis.{producer}.akka.max-outstanding-requests=1000") + 1000 + }(_.maxOutstandingRequests) sink(KinesisProducerActor.props(producerConf), maxOutstanding) } + /** + * Create a Sink that accepts ProducerEvents, which get published to Kinesis. + * + * The sink itself sends all events to an KinesisProducerActor which is configured from the system configuration for given producer name. + * Every message send needs to be acknowledged by the underlying producer actor. + * + * This sink signals back pressure, if more messages than configured in throttling conf are not acknowledged. + * If throttling is not configured, a default value (= 1000 messages) is applied. + * + * The sink produces a materialized value `Future[Done]`, which is finished if all messages of the stream are send to the producer actor _and_ got acknowledged. + * The future fails, if the sending an event fails or upstream has failed the stream. + * + * A minimal application conf file should look like this: + * {{{ + * kinesis { + * application-name = "SampleService" + * producer-name { + * stream-name = "sample-stream" + * akka.max-outstanding-requests = 100 + * } + * } + * }}} + * See kinesis reference.conf for a list of all available config options. + * + * @param producerName the name of the producer in the system configuration. + * @param inConfig the configuration object that holds the producer config (usually kinesis). + * @param credentialsProvider the AWS credentials provider to use to connect. + * @param system the actor system. + * @return A sink that accepts ProducerEvents. + */ def sink(producerName: String, inConfig: String = "kinesis", credentialsProvider: Option[AWSCredentialsProvider] = None)( diff --git a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraph.scala b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraph.scala index 93a956e..f6f72b8 100644 --- a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraph.scala +++ b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraph.scala @@ -32,6 +32,14 @@ import com.weightwatchers.reactive.kinesis.producer.KinesisProducerActor.{ import scala.collection.mutable import scala.concurrent.{Future, Promise} +/** + * A KinesisSinkGraph will attach to a kinesis stream with the provided configuration and constitute a Sink[ProducerEvent, Future[Done]. + * This graph stage uses a producer actor to publish events with acknowledgements. + * + * @param producerActorProps the properties to create a producer actor where all events are send to. + * @param maxOutStanding the number of messages send to the producer which are not acknowledged, before signalling back pressure. + * @param actorSystem the actor system. + */ class KinesisSinkGraph(producerActorProps: Props, maxOutStanding: Int, actorSystem: ActorSystem) extends GraphStageWithMaterializedValue[SinkShape[ProducerEvent], Future[Done]] with LazyLogging { From 2c9ef8294300e1ceae2f636f1783be38722fcec6 Mon Sep 17 00:00:00 2001 From: Matthias Veit Date: Thu, 4 Jan 2018 17:57:25 +0100 Subject: [PATCH 03/12] Add a unit test for the SinkGraph Stage. --- .../KinesisSinkGraphIntegrationSpec.scala | 21 ++- .../reactive/kinesis/stream/Kinesis.scala | 6 +- .../kinesis/stream/KinesisSinkGraphSpec.scala | 129 ++++++++++++++++++ 3 files changed, 149 insertions(+), 7 deletions(-) create mode 100644 src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphSpec.scala diff --git a/src/it/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphIntegrationSpec.scala b/src/it/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphIntegrationSpec.scala index 759baaa..12add8d 100644 --- a/src/it/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphIntegrationSpec.scala +++ b/src/it/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphIntegrationSpec.scala @@ -1,19 +1,28 @@ package com.weightwatchers.reactive.kinesis.stream import akka.stream.scaladsl.Source -import com.weightwatchers.reactive.kinesis.common.{AkkaUnitTestLike, KinesisConfiguration, KinesisSuite} +import com.weightwatchers.reactive.kinesis.common.{ + AkkaUnitTestLike, + KinesisConfiguration, + KinesisSuite +} import com.weightwatchers.reactive.kinesis.models.ProducerEvent import org.scalatest.{FreeSpec, Matchers} import scala.concurrent.duration._ -class KinesisSinkGraphIntegrationSpec extends FreeSpec with KinesisSuite with KinesisConfiguration with AkkaUnitTestLike with Matchers { +class KinesisSinkGraphIntegrationSpec + extends FreeSpec + with KinesisSuite + with KinesisConfiguration + with AkkaUnitTestLike + with Matchers { "KinesisSinkGraph" - { "produced messages are written to the stream" in new withKinesisConfForApp("sink_produce") { val messageCount = 100 - val elements = 1.to(messageCount).map(_.toString) + val elements = 1.to(messageCount).map(_.toString) Source(elements) .map(num => ProducerEvent(num, num)) .runWith(Kinesis.sink(producerConf())) @@ -23,7 +32,9 @@ class KinesisSinkGraphIntegrationSpec extends FreeSpec with KinesisSuite with Ki testConsumer.shutdown() } - "upstream fail should fail the materialized value of the sink" in new withKinesisConfForApp("sink_fail") { + "upstream fail should fail the materialized value of the sink" in new withKinesisConfForApp( + "sink_fail" + ) { Source .failed(new IllegalStateException("Boom")) .runWith(Kinesis.sink(producerConf())) @@ -33,6 +44,6 @@ class KinesisSinkGraphIntegrationSpec extends FreeSpec with KinesisSuite with Ki } // do not create messages in setup, we will create messages inside the test - override def TestStreamNrOfMessagesPerShard: Long = 0 + override def TestStreamNrOfMessagesPerShard: Long = 0 override implicit def patienceConfig: PatienceConfig = PatienceConfig(60.seconds, 1.second) } diff --git a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala index 28450b1..aa734cf 100644 --- a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala +++ b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala @@ -115,8 +115,10 @@ object Kinesis extends LazyLogging { def sink( producerConf: ProducerConf )(implicit system: ActorSystem): Sink[ProducerEvent, Future[Done]] = { - val maxOutstanding = producerConf.throttlingConf.fold{ - logger.info("Producer throttling not configured - set maxOutstanding to 1000. Configure with: kinesis.{producer}.akka.max-outstanding-requests=1000") + val maxOutstanding = producerConf.throttlingConf.fold { + logger.info( + "Producer throttling not configured - set maxOutstanding to 1000. Configure with: kinesis.{producer}.akka.max-outstanding-requests=1000" + ) 1000 }(_.maxOutstandingRequests) sink(KinesisProducerActor.props(producerConf), maxOutstanding) diff --git a/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphSpec.scala b/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphSpec.scala new file mode 100644 index 0000000..f60fd82 --- /dev/null +++ b/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphSpec.scala @@ -0,0 +1,129 @@ +/* + * Copyright 2017 WeightWatchers + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.weightwatchers.reactive.kinesis.stream + +import java.util.Collections + +import akka.Done +import akka.actor.{Actor, ActorRef, ActorSystem, Props} +import akka.stream.scaladsl.{Sink, Source} +import akka.stream.{ActorMaterializer, Materializer} +import akka.testkit.{TestActorRef, TestKit} +import com.amazonaws.services.kinesis.producer.UserRecordResult +import com.weightwatchers.reactive.kinesis.models.ProducerEvent +import com.weightwatchers.reactive.kinesis.producer.KinesisProducerActor.{ + SendFailed, + SendSuccessful, + SendWithCallback +} +import org.scalatest.concurrent.{Eventually, ScalaFutures} +import org.scalatest.{BeforeAndAfterAll, FreeSpecLike, Matchers} + +import scala.collection.mutable +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} + +class KinesisSinkGraphSpec + extends TestKit(ActorSystem("source-graph-spec")) + with FreeSpecLike + with Matchers + with BeforeAndAfterAll + with ScalaFutures + with Eventually { + + implicit val materializer: Materializer = ActorMaterializer() + implicit val ec = system.dispatcher + implicit val defaultPatience = PatienceConfig(5.seconds, interval = 50.millis) + + "KinesisSinkGraph" - { + + "all messages are send to the producer, while the stream is finished only, if all messages got acknowledged" in sinkWithProducer( + ackMessage + ) { (sink, producer) => + val messages = 1.to(100).map(_.toString).map(num => ProducerEvent(num, num)) + Source(messages).runWith(sink).futureValue + producer.underlyingActor.allMessages should have size 100 + producer.underlyingActor.allMessages.values should contain allElementsOf messages + } + + "the stream fails, if the producer can not send" in sinkWithProducer(failMessage) { + (sink, _) => + val messages = 1.to(100).map(_.toString).map(num => ProducerEvent(num, num)) + val result = Source(messages).runWith(sink).failed.futureValue + result shouldBe a[IllegalStateException] + } + + "the stream fails, if upstream fails" in sinkWithProducer(ackMessage) { (sink, _) => + val exception = new IllegalStateException("boom") + val result = Source.failed(exception).runWith(sink).failed.futureValue + result shouldBe exception + } + + "do not send more messages than maxOutstanding" in sinkWithProducer( + ignoreAndFailOn(_.partitionKey.toInt > 5) + ) { (sink, producer) => + val messages = Source(1.to(100).map(_.toString).map(num => ProducerEvent(num, num))) + val result = messages.runWith(sink) + eventually { producer.underlyingActor.allMessages should have size 5 } + // the stream would fail if we would read the 6th. element + result.isCompleted shouldBe false + producer.underlyingActor.allMessages should have size 5 + } + } + + class ForwardToProducerActor(ref: ActorRef) extends Actor { + override def receive: Receive = { + case message => ref.forward(message) + } + } + + class TestProducerActor(sendFn: (ActorRef, SendWithCallback) => Unit) extends Actor { + val allMessages = mutable.AnyRefMap.empty[String, ProducerEvent] + override def receive: Receive = { + case send: SendWithCallback => + allMessages += send.messageId -> send.producerEvent + sendFn(sender(), send) + } + } + + def ignoreAndFailOn(decider: ProducerEvent => Boolean)(sender: ActorRef, + event: SendWithCallback): Unit = { + if (decider(event.producerEvent)) failMessage(sender, event) else ignoreMessage(sender, event) + } + + def ackMessage(sender: ActorRef, event: SendWithCallback): Unit = + sender ! SendSuccessful(event.messageId, + new UserRecordResult(Collections.emptyList(), "123", "shard", true)) + + def failMessage(sender: ActorRef, event: SendWithCallback): Unit = + sender ! SendFailed(event.messageId, new IllegalStateException("wrong!")) + + def ignoreMessage(sender: ActorRef, event: SendWithCallback): Unit = () + + def sinkWithProducer(sendFn: (ActorRef, SendWithCallback) => Unit, maxOutstanding: Int = 5)( + sinkFn: (Sink[ProducerEvent, Future[Done]], TestActorRef[TestProducerActor]) => Unit + ) = { + val testActor = TestActorRef[TestProducerActor](Props(new TestProducerActor(sendFn))) + val sink = Kinesis.sink(Props(new ForwardToProducerActor(testActor)), maxOutstanding) + sinkFn(sink, testActor) + } + + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, 5.seconds) + } +} From 4bc1e931e0212e0cd2e7b9ca7ecf61ea4cdc8f0b Mon Sep 17 00:00:00 2001 From: Matthias Veit Date: Thu, 4 Jan 2018 18:11:56 +0100 Subject: [PATCH 04/12] Rename KinesisSourceGraph -> KinesisSourceGraphStage, KinesisSinkGraph -> KinesisSinkGraphStage --- ...pec.scala => KinesisSinkGraphStageIntegrationSpec.scala} | 2 +- ...c.scala => KinesisSourceGraphStageIntegrationSpec.scala} | 2 +- .../weightwatchers/reactive/kinesis/stream/Kinesis.scala | 4 ++-- .../{KinesisSinkGraph.scala => KinesisSinkGraphStage.scala} | 4 +++- ...nesisSourceGraph.scala => KinesisSourceGraphStage.scala} | 6 +++--- ...sSinkGraphSpec.scala => KinesisSinkGraphStageSpec.scala} | 2 +- ...rceGraphSpec.scala => KinesisSourceGraphStageSpec.scala} | 4 ++-- 7 files changed, 13 insertions(+), 11 deletions(-) rename src/it/scala/com/weightwatchers/reactive/kinesis/stream/{KinesisSinkGraphIntegrationSpec.scala => KinesisSinkGraphStageIntegrationSpec.scala} (97%) rename src/it/scala/com/weightwatchers/reactive/kinesis/stream/{KinesisSourceGraphIntegrationSpec.scala => KinesisSourceGraphStageIntegrationSpec.scala} (99%) rename src/main/scala/com/weightwatchers/reactive/kinesis/stream/{KinesisSinkGraph.scala => KinesisSinkGraphStage.scala} (97%) rename src/main/scala/com/weightwatchers/reactive/kinesis/stream/{KinesisSourceGraph.scala => KinesisSourceGraphStage.scala} (98%) rename src/test/scala/com/weightwatchers/reactive/kinesis/stream/{KinesisSinkGraphSpec.scala => KinesisSinkGraphStageSpec.scala} (99%) rename src/test/scala/com/weightwatchers/reactive/kinesis/stream/{KinesisSourceGraphSpec.scala => KinesisSourceGraphStageSpec.scala} (97%) diff --git a/src/it/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphIntegrationSpec.scala b/src/it/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStageIntegrationSpec.scala similarity index 97% rename from src/it/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphIntegrationSpec.scala rename to src/it/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStageIntegrationSpec.scala index 12add8d..fcf9b66 100644 --- a/src/it/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphIntegrationSpec.scala +++ b/src/it/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStageIntegrationSpec.scala @@ -11,7 +11,7 @@ import org.scalatest.{FreeSpec, Matchers} import scala.concurrent.duration._ -class KinesisSinkGraphIntegrationSpec +class KinesisSinkGraphStageIntegrationSpec extends FreeSpec with KinesisSuite with KinesisConfiguration diff --git a/src/it/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphIntegrationSpec.scala b/src/it/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphStageIntegrationSpec.scala similarity index 99% rename from src/it/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphIntegrationSpec.scala rename to src/it/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphStageIntegrationSpec.scala index 7475158..48a7db9 100644 --- a/src/it/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphIntegrationSpec.scala +++ b/src/it/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphStageIntegrationSpec.scala @@ -10,7 +10,7 @@ import org.scalatest._ import scala.concurrent.duration._ -class KinesisSourceGraphIntegrationSpec +class KinesisSourceGraphStageIntegrationSpec extends FreeSpec with KinesisSuite with KinesisConfiguration diff --git a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala index aa734cf..44229bd 100644 --- a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala +++ b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala @@ -44,7 +44,7 @@ object Kinesis extends LazyLogging { def source( consumerConf: ConsumerConf )(implicit system: ActorSystem): Source[CommittableEvent[ConsumerEvent], NotUsed] = { - Source.fromGraph(new KinesisSourceGraph(consumerConf, system)) + Source.fromGraph(new KinesisSourceGraphStage(consumerConf, system)) } /** @@ -93,7 +93,7 @@ object Kinesis extends LazyLogging { def sink(props: Props, maxOutStanding: Int)( implicit system: ActorSystem ): Sink[ProducerEvent, Future[Done]] = { - Sink.fromGraph(new KinesisSinkGraph(props, maxOutStanding, system)) + Sink.fromGraph(new KinesisSinkGraphStage(props, maxOutStanding, system)) } /** diff --git a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraph.scala b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStage.scala similarity index 97% rename from src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraph.scala rename to src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStage.scala index f6f72b8..7ff7b23 100644 --- a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraph.scala +++ b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStage.scala @@ -40,7 +40,9 @@ import scala.concurrent.{Future, Promise} * @param maxOutStanding the number of messages send to the producer which are not acknowledged, before signalling back pressure. * @param actorSystem the actor system. */ -class KinesisSinkGraph(producerActorProps: Props, maxOutStanding: Int, actorSystem: ActorSystem) +class KinesisSinkGraphStage(producerActorProps: Props, + maxOutStanding: Int, + actorSystem: ActorSystem) extends GraphStageWithMaterializedValue[SinkShape[ProducerEvent], Future[Done]] with LazyLogging { diff --git a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraph.scala b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphStage.scala similarity index 98% rename from src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraph.scala rename to src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphStage.scala index e2f9fa1..359e65b 100644 --- a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraph.scala +++ b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphStage.scala @@ -137,9 +137,9 @@ private[kinesis] case class CommittableActorEvent[+A](event: ConsumerEvent, * @param createConsumer function that creates a consumer service from the graph stage actor reference. * @param actorSystem the actor system. */ -class KinesisSourceGraph(config: ConsumerConf, - createConsumer: ActorRef => ConsumerService, - actorSystem: ActorSystem) +class KinesisSourceGraphStage(config: ConsumerConf, + createConsumer: ActorRef => ConsumerService, + actorSystem: ActorSystem) extends GraphStage[SourceShape[CommittableEvent[ConsumerEvent]]] with LazyLogging { diff --git a/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphSpec.scala b/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStageSpec.scala similarity index 99% rename from src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphSpec.scala rename to src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStageSpec.scala index f60fd82..bdcf27a 100644 --- a/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphSpec.scala +++ b/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStageSpec.scala @@ -37,7 +37,7 @@ import scala.collection.mutable import scala.concurrent.duration._ import scala.concurrent.{Await, Future} -class KinesisSinkGraphSpec +class KinesisSinkGraphStageSpec extends TestKit(ActorSystem("source-graph-spec")) with FreeSpecLike with Matchers diff --git a/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphSpec.scala b/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphStageSpec.scala similarity index 97% rename from src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphSpec.scala rename to src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphStageSpec.scala index 00eb9da..7db0a4d 100644 --- a/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphSpec.scala +++ b/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphStageSpec.scala @@ -42,7 +42,7 @@ import org.scalatest.{BeforeAndAfterAll, FreeSpecLike, Matchers} import scala.concurrent.duration._ import scala.concurrent.{Await, Future, Promise} -class KinesisSourceGraphSpec +class KinesisSourceGraphStageSpec extends TestKit(ActorSystem("source-graph-spec")) with FreeSpecLike with Matchers @@ -154,7 +154,7 @@ class KinesisSourceGraphSpec action: ActorRef => Unit ): Source[CommittableEvent[ConsumerEvent], NotUsed] = { Source.fromGraph( - new KinesisSourceGraph(consumerConf, new TestConsumerService(action)(_), system) + new KinesisSourceGraphStage(consumerConf, new TestConsumerService(action)(_), system) ) } From 34d638ba1fdbab4ca8737afeba293332ae41f20a Mon Sep 17 00:00:00 2001 From: Matthias Veit Date: Thu, 4 Jan 2018 18:15:45 +0100 Subject: [PATCH 05/12] Typo: outStanding -> outstanding --- .../reactive/kinesis/stream/KinesisSinkGraphStage.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStage.scala b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStage.scala index 7ff7b23..f6f7532 100644 --- a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStage.scala +++ b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStage.scala @@ -37,11 +37,11 @@ import scala.concurrent.{Future, Promise} * This graph stage uses a producer actor to publish events with acknowledgements. * * @param producerActorProps the properties to create a producer actor where all events are send to. - * @param maxOutStanding the number of messages send to the producer which are not acknowledged, before signalling back pressure. + * @param maxOutstanding the number of messages send to the producer which are not acknowledged, before signalling back pressure. * @param actorSystem the actor system. */ class KinesisSinkGraphStage(producerActorProps: Props, - maxOutStanding: Int, + maxOutstanding: Int, actorSystem: ActorSystem) extends GraphStageWithMaterializedValue[SinkShape[ProducerEvent], Future[Done]] with LazyLogging { @@ -100,7 +100,7 @@ class KinesisSinkGraphStage(producerActorProps: Props, val toSend = SendWithCallback(element) outstandingMessages += toSend.messageId -> element producerActor ! toSend - if (outstandingMessages.size < maxOutStanding) pull(in) + if (outstandingMessages.size < maxOutstanding) pull(in) } override def onUpstreamFinish(): Unit = { @@ -131,7 +131,7 @@ class KinesisSinkGraphStage(producerActorProps: Props, if (outstandingMessages.isEmpty) completeStage() } else { // signal demand - if (outstandingMessages.size < maxOutStanding) pull(in) + if (outstandingMessages.size < maxOutstanding) pull(in) } } From a0f50f325ac209ac71185aae7a7f8a307e7836a1 Mon Sep 17 00:00:00 2001 From: Matthias Veit Date: Thu, 4 Jan 2018 21:23:38 +0100 Subject: [PATCH 06/12] Use parentheses for better reading. --- .../kinesis/stream/KinesisSinkGraphStageSpec.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStageSpec.scala b/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStageSpec.scala index bdcf27a..c3f09a0 100644 --- a/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStageSpec.scala +++ b/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStageSpec.scala @@ -105,12 +105,14 @@ class KinesisSinkGraphStageSpec if (decider(event.producerEvent)) failMessage(sender, event) else ignoreMessage(sender, event) } - def ackMessage(sender: ActorRef, event: SendWithCallback): Unit = - sender ! SendSuccessful(event.messageId, - new UserRecordResult(Collections.emptyList(), "123", "shard", true)) + def ackMessage(sender: ActorRef, event: SendWithCallback): Unit = { + val recordResult = new UserRecordResult(Collections.emptyList(), "123", "shard", true) + sender ! SendSuccessful(event.messageId, recordResult) + } - def failMessage(sender: ActorRef, event: SendWithCallback): Unit = + def failMessage(sender: ActorRef, event: SendWithCallback): Unit = { sender ! SendFailed(event.messageId, new IllegalStateException("wrong!")) + } def ignoreMessage(sender: ActorRef, event: SendWithCallback): Unit = () From 1d7e39b4aed388abb921dbe947d5e2797b122ec0 Mon Sep 17 00:00:00 2001 From: Matthias Veit Date: Fri, 5 Jan 2018 09:08:33 +0100 Subject: [PATCH 07/12] maxRecords should be positive --- .../weightwatchers/reactive/kinesis/common/KinesisSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/it/scala/com/weightwatchers/reactive/kinesis/common/KinesisSuite.scala b/src/it/scala/com/weightwatchers/reactive/kinesis/common/KinesisSuite.scala index 756ae86..7a937fb 100644 --- a/src/it/scala/com/weightwatchers/reactive/kinesis/common/KinesisSuite.scala +++ b/src/it/scala/com/weightwatchers/reactive/kinesis/common/KinesisSuite.scala @@ -201,7 +201,7 @@ trait KinesisSuite Some(100.millis)) def consumerConf(batchSize: Long = TestStreamNrOfMessagesPerShard): ConsumerConf = { - consumerConfFor(streamName = TestStreamName, appName = appName, maxRecords = batchSize.toInt) + consumerConfFor(streamName = TestStreamName, appName = appName, maxRecords = math.max(1, batchSize.toInt)) } def producerConf(): ProducerConf = producerConfFor(TestStreamName, appName) From 5f34f19590ed963c3e3cc643261776849bd2c777 Mon Sep 17 00:00:00 2001 From: Matthias Veit Date: Fri, 5 Jan 2018 10:30:47 +0100 Subject: [PATCH 08/12] Add death watch for KinesisProducerActor --- .../kinesis/common/KinesisSuite.scala | 4 ++- .../stream/KinesisSinkGraphStage.scala | 26 ++++++++++++------- .../stream/KinesisSinkGraphStageSpec.scala | 11 ++++++++ 3 files changed, 31 insertions(+), 10 deletions(-) diff --git a/src/it/scala/com/weightwatchers/reactive/kinesis/common/KinesisSuite.scala b/src/it/scala/com/weightwatchers/reactive/kinesis/common/KinesisSuite.scala index 7a937fb..1999194 100644 --- a/src/it/scala/com/weightwatchers/reactive/kinesis/common/KinesisSuite.scala +++ b/src/it/scala/com/weightwatchers/reactive/kinesis/common/KinesisSuite.scala @@ -201,7 +201,9 @@ trait KinesisSuite Some(100.millis)) def consumerConf(batchSize: Long = TestStreamNrOfMessagesPerShard): ConsumerConf = { - consumerConfFor(streamName = TestStreamName, appName = appName, maxRecords = math.max(1, batchSize.toInt)) + consumerConfFor(streamName = TestStreamName, + appName = appName, + maxRecords = math.max(1, batchSize.toInt)) } def producerConf(): ProducerConf = producerConfFor(TestStreamName, appName) diff --git a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStage.scala b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStage.scala index f6f7532..1322cb1 100644 --- a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStage.scala +++ b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStage.scala @@ -18,7 +18,7 @@ package com.weightwatchers.reactive.kinesis.stream import akka.Done import akka.actor.Actor.Receive -import akka.actor.{ActorRef, ActorSystem, PoisonPill, Props} +import akka.actor.{ActorRef, ActorSystem, PoisonPill, Props, Terminated} import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue, InHandler} import akka.stream.{Attributes, Inlet, SinkShape} import com.typesafe.scalalogging.LazyLogging @@ -63,24 +63,28 @@ class KinesisSinkGraphStage(producerActorProps: Props, val outstandingMessages: mutable.AnyRefMap[String, ProducerEvent] = mutable.AnyRefMap.empty // The related stage actor. - implicit var graphStageActor: ActorRef = ActorRef.noSender + implicit var stageActorRef: ActorRef = actorSystem.deadLetters // The underlying kinesis producer actor. - var producerActor: ActorRef = ActorRef.noSender + var producerActor: ActorRef = actorSystem.deadLetters override def preStart(): Unit = { super.preStart() // this stage should keep going, even if upstream is finished setKeepGoing(true) - // store a reference to the stage actor - graphStageActor = getStageActor(receive).ref // start up the underlying producer actor producerActor = actorSystem.actorOf(producerActorProps) + // create the stage actor and store a reference + val stageActor = getStageActor(receive) + // monitor the producer actor + stageActor.watch(producerActor) + stageActorRef = stageActor.ref // start the process by signalling demand pull(in) } override def postStop(): Unit = { + logger.info("Stop Kinesis Sink") super.postStop() // stop the underlying producer actor producerActor ! PoisonPill @@ -111,10 +115,10 @@ class KinesisSinkGraphStage(producerActorProps: Props, } override def onUpstreamFailure(ex: Throwable): Unit = { - logger.info(s"Upstream failed: ${ex.getMessage}") - super.onUpstreamFailure(ex) + logger.warn(s"Upstream failed: ${ex.getMessage}", ex) // signal the failure to the waiting side promise.tryFailure(ex) + super.onUpstreamFailure(ex) } } ) @@ -122,8 +126,8 @@ class KinesisSinkGraphStage(producerActorProps: Props, def receive: Receive = { case (_, SendSuccessful(messageId, recordResult)) => outstandingMessages.remove(messageId).foreach { producerEvent => - logger.info( - s"Send message: $producerEvent with result $recordResult. ${outstandingMessages.size} messages outstanding." + logger.debug( + s"Message acknowledged: $producerEvent with result $recordResult. ${outstandingMessages.size} messages outstanding." ) // upstream is finished? if (isClosed(in)) { @@ -138,6 +142,10 @@ class KinesisSinkGraphStage(producerActorProps: Props, case (_, SendFailed(messageId, reason)) => logger.warn(s"Could not send message with id: $messageId", reason) failStage(reason) + + case (_, Terminated(_)) => + logger.warn("ProducerActor died unexpectedly.") + failStage(new IllegalStateException("ProducerActor died unexpectedly.")) } } diff --git a/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStageSpec.scala b/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStageSpec.scala index c3f09a0..b786124 100644 --- a/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStageSpec.scala +++ b/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStageSpec.scala @@ -73,6 +73,13 @@ class KinesisSinkGraphStageSpec result shouldBe exception } + "the stream fails, if the producer actor dies" in { + val sink = Kinesis.sink(Props(new FailActor), 1) + val messages = 1.to(100).map(_.toString).map(num => ProducerEvent(num, num)) + val result = Source(messages).runWith(sink).failed.futureValue + result shouldBe a[IllegalStateException] + } + "do not send more messages than maxOutstanding" in sinkWithProducer( ignoreAndFailOn(_.partitionKey.toInt > 5) ) { (sink, producer) => @@ -100,6 +107,10 @@ class KinesisSinkGraphStageSpec } } + class FailActor extends Actor { + override def receive: Receive = throw new IllegalStateException("wrong!") + } + def ignoreAndFailOn(decider: ProducerEvent => Boolean)(sender: ActorRef, event: SendWithCallback): Unit = { if (decider(event.producerEvent)) failMessage(sender, event) else ignoreMessage(sender, event) From a0d31bca9fa8c8fc8991ec7c5493e7ad9ce2db33 Mon Sep 17 00:00:00 2001 From: Matthias Veit Date: Fri, 5 Jan 2018 13:19:24 +0100 Subject: [PATCH 09/12] Work around eager producer creation and add a test for this setup. --- .../reactive/kinesis/stream/Kinesis.scala | 37 ++++++++++++++++--- .../stream/KinesisSinkGraphStage.scala | 4 +- src/test/resources/application.conf | 6 +++ .../stream/KinesisSinkGraphStageSpec.scala | 4 ++ 4 files changed, 44 insertions(+), 7 deletions(-) diff --git a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala index 44229bd..a74ab06 100644 --- a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala +++ b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala @@ -20,6 +20,7 @@ import akka.{Done, NotUsed} import akka.actor.{ActorSystem, Props} import akka.stream.scaladsl.{Sink, Source} import com.amazonaws.auth.AWSCredentialsProvider +import com.typesafe.config.Config import com.typesafe.scalalogging.LazyLogging import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.ConsumerConf import com.weightwatchers.reactive.kinesis.models.{ConsumerEvent, ProducerEvent} @@ -85,12 +86,12 @@ object Kinesis extends LazyLogging { * The sink produces a materialized value `Future[Done]`, which is finished if all messages of the stream are send to the producer actor _and_ got acknowledged. * The future fails, if the sending an event fails or upstream has failed the stream. * - * @param props the props to create a producer actor. + * @param props the props to create a producer actor. This is a function to work around #48. * @param maxOutStanding the number of messages to send to the actor unacknowledged before back pressure is applied. * @param system the actor system. * @return A sink that accepts ProducerEvents. */ - def sink(props: Props, maxOutStanding: Int)( + def sink(props: => Props, maxOutStanding: Int)( implicit system: ActorSystem ): Sink[ProducerEvent, Future[Done]] = { Sink.fromGraph(new KinesisSinkGraphStage(props, maxOutStanding, system)) @@ -124,6 +125,34 @@ object Kinesis extends LazyLogging { sink(KinesisProducerActor.props(producerConf), maxOutstanding) } + /** + * Create a Sink that accepts ProducerEvents, which get published to Kinesis. + * + * The sink itself sends all events to an KinesisProducerActor which is configured from the system configuration for given producer name. + * Every message send needs to be acknowledged by the underlying producer actor. + * + * This sink signals back pressure, if more messages than configured in throttling conf are not acknowledged. + * If throttling is not configured, a default value (= 1000 messages) is applied. + * + * The sink produces a materialized value `Future[Done]`, which is finished if all messages of the stream are send to the producer actor _and_ got acknowledged. + * The future fails, if the sending an event fails or upstream has failed the stream. + * + * @param kinesisConfig the configuration object that holds the producer config. + * @param producerName the name of the producer in the system configuration. + * @param credentialsProvider the AWS credentials provider to use to connect. + * @param system the actor system. + * @return A sink that accepts ProducerEvents. + */ + def sink(kinesisConfig: Config, + producerName: String, + credentialsProvider: Option[AWSCredentialsProvider])( + implicit system: ActorSystem + ): Sink[ProducerEvent, Future[Done]] = { + sink( + ProducerConf(kinesisConfig, producerName, credentialsProvider) + ) + } + /** * Create a Sink that accepts ProducerEvents, which get published to Kinesis. * @@ -159,8 +188,6 @@ object Kinesis extends LazyLogging { credentialsProvider: Option[AWSCredentialsProvider] = None)( implicit system: ActorSystem ): Sink[ProducerEvent, Future[Done]] = { - sink( - ProducerConf(system.settings.config.getConfig(inConfig), producerName, credentialsProvider) - ) + sink(system.settings.config.getConfig(inConfig), producerName, credentialsProvider) } } diff --git a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStage.scala b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStage.scala index 1322cb1..dc4b612 100644 --- a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStage.scala +++ b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStage.scala @@ -36,11 +36,11 @@ import scala.concurrent.{Future, Promise} * A KinesisSinkGraph will attach to a kinesis stream with the provided configuration and constitute a Sink[ProducerEvent, Future[Done]. * This graph stage uses a producer actor to publish events with acknowledgements. * - * @param producerActorProps the properties to create a producer actor where all events are send to. + * @param producerActorProps the properties to create a producer actor where all events are send to. This is a function to work around #48. * @param maxOutstanding the number of messages send to the producer which are not acknowledged, before signalling back pressure. * @param actorSystem the actor system. */ -class KinesisSinkGraphStage(producerActorProps: Props, +class KinesisSinkGraphStage(producerActorProps: => Props, maxOutstanding: Int, actorSystem: ActorSystem) extends GraphStageWithMaterializedValue[SinkShape[ProducerEvent], Future[Done]] diff --git a/src/test/resources/application.conf b/src/test/resources/application.conf index 8eadcf3..111a0e3 100644 --- a/src/test/resources/application.conf +++ b/src/test/resources/application.conf @@ -4,4 +4,10 @@ akka { # factor by which to scale timeouts during tests, e.g. to account for shared build system load timefactor = 1.0 } +} + +kinesis { + test-producer { + stream-name = "int-test-stream-1" + } } \ No newline at end of file diff --git a/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStageSpec.scala b/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStageSpec.scala index b786124..482c85b 100644 --- a/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStageSpec.scala +++ b/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStageSpec.scala @@ -90,6 +90,10 @@ class KinesisSinkGraphStageSpec result.isCompleted shouldBe false producer.underlyingActor.allMessages should have size 5 } + + "A sink can be created from system config" in { + Kinesis.sink("test-producer") + } } class ForwardToProducerActor(ref: ActorRef) extends Actor { From 0c64c9794dd77756c02ca5d4c2a249ea63733135 Mon Sep 17 00:00:00 2001 From: Matthias Veit Date: Fri, 5 Jan 2018 14:09:18 +0100 Subject: [PATCH 10/12] Add documentation for the sink. --- README.md | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/README.md b/README.md index 21b1cd2..faf1774 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ It's worth familiarising yourself with [Sequence numbers and Sub sequence number * [Usage: Producer](#usage-usage-producer) * [Actor Based Implementation](#usage-usage-producer-actor-based-implementation) * [Pure Scala based implementation (simple wrapper around KPL)](#usage-usage-producer-pure-scala-based-implementation-simple-wrapper-around-kpl) + * [Akka Stream Sink](#akka-stream-sink) * [Running the reliability test](#running-the-reliability-test) * [Delete & recreate kinesisstreams and dynamo table](#running-the-reliability-test-delete-recreate-kinesisstreams-and-dynamo-table) * [Running the producer-consumer test](#running-the-reliability-test-running-the-producer-consumer-test) @@ -484,6 +485,55 @@ callback onFailure { } ``` + +### Akka Stream Sink + +An Akka `Sink` is provided which can be used to publish messages via streams. +Every message is send as `ProduserEvent` to the `Sink`, which defines the PartitionKey as well as the payload. +The `Sink` is created from a `ProducerConf` or directly with a `KinesisProducerActor`. See [Kinesis](https://github.com/WW-Digital/reactive-kinesis/blob/master/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala) for the various options. + +The `Sink` expects an acknowledgement for every messages send to Kinesis. +An amount of unacknowledged messages can be configured, before back pressure is applied. +See the throttling conf for defining this configuration value. +Please note: a default value (1000 messages) is applied, if throttling is not configured. + +The provided `Sink` produces a `Future[Done]` as materialized value. +This future succeeds, if all messages from upstream are send to Kinesis and acknowledged. +It fails if a message could not be send to Kinesis or upstream fails. + +```scala +import akka.stream.scaladsl.Source +import com.weightwatchers.reactive.kinesis.models._ +import com.weightwatchers.reactive.kinesis.stream._ + +Source(1.to(100).map(_.toString)) + .map(num => ProducerEvent(num, num)) + .runWith(Kinesis.sink("producer-name")) + .onComplete { + case Success(_) => println("All messages are published successfully.") + case Failure(ex) => println(s"Failed to publish messages: ${ex.getMessage}") + } +``` + +A long running flow can be easily achieved using a `SourceQueue`. +In this case the flow stays open as long as needed. +New elements can be published via the materialized queue: + +```scala +import akka.stream.scaladsl.Source +import com.weightwatchers.reactive.kinesis.models._ +import com.weightwatchers.reactive.kinesis.stream._ + +val sourceQueue = Source.queue[ProducerEvent](1000, OverflowStrategy.fail) + .toMat(Kinesis.sink("producer-name"))(Keep.left) + .run() + +sourceQueue.offer(ProducerEvent("foo", "bar")) +sourceQueue.offer(ProducerEvent("foo", "baz")) +``` + +The `Sink` uses a `KinesisProducerActor` under the cover. All rules regarding this actor also apply for the `Sink`. + # Running the reliability test From bc2f91f114c5b7e813818c7f5142f35bda59a97f Mon Sep 17 00:00:00 2001 From: Matthias Veit Date: Thu, 25 Jan 2018 14:25:26 +0100 Subject: [PATCH 11/12] Grammar - my old enemy. --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index faf1774..2979a98 100644 --- a/README.md +++ b/README.md @@ -489,16 +489,16 @@ callback onFailure { ### Akka Stream Sink An Akka `Sink` is provided which can be used to publish messages via streams. -Every message is send as `ProduserEvent` to the `Sink`, which defines the PartitionKey as well as the payload. +Every message is sent as `ProduserEvent` to the `Sink`, which defines the PartitionKey as well as the payload. The `Sink` is created from a `ProducerConf` or directly with a `KinesisProducerActor`. See [Kinesis](https://github.com/WW-Digital/reactive-kinesis/blob/master/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala) for the various options. -The `Sink` expects an acknowledgement for every messages send to Kinesis. +The `Sink` expects an acknowledgement for every message sent to Kinesis. An amount of unacknowledged messages can be configured, before back pressure is applied. -See the throttling conf for defining this configuration value. +This throttling is controlled by the kinesis.{producer}.akka.max-outstanding-requests configuration value. Please note: a default value (1000 messages) is applied, if throttling is not configured. The provided `Sink` produces a `Future[Done]` as materialized value. -This future succeeds, if all messages from upstream are send to Kinesis and acknowledged. +This future succeeds, if all messages from upstream are sent to Kinesis and acknowledged. It fails if a message could not be send to Kinesis or upstream fails. ```scala From f8730d476fe2edb9fd01b73a684c0de787b02114 Mon Sep 17 00:00:00 2001 From: Matthias Veit Date: Thu, 25 Jan 2018 14:51:57 +0100 Subject: [PATCH 12/12] Use a simple counter instead of maintaining all messages in a set. --- .../stream/KinesisSinkGraphStage.scala | 44 +++++++++---------- 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStage.scala b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStage.scala index dc4b612..16d2ea0 100644 --- a/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStage.scala +++ b/src/main/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSinkGraphStage.scala @@ -29,7 +29,6 @@ import com.weightwatchers.reactive.kinesis.producer.KinesisProducerActor.{ SendWithCallback } -import scala.collection.mutable import scala.concurrent.{Future, Promise} /** @@ -58,9 +57,8 @@ class KinesisSinkGraphStage(producerActorProps: => Props, ): (GraphStageLogic, Future[Done]) = { val logic = new GraphStageLogic(shape) { - // Holds all outstanding messages by its message identifier. - // Note: the identifier is created here locally and does not make sense outside of this stage. - val outstandingMessages: mutable.AnyRefMap[String, ProducerEvent] = mutable.AnyRefMap.empty + // Counts all outstanding messages. + var outstandingMessageCount: Int = 0 // The related stage actor. implicit var stageActorRef: ActorRef = actorSystem.deadLetters @@ -89,10 +87,10 @@ class KinesisSinkGraphStage(producerActorProps: => Props, // stop the underlying producer actor producerActor ! PoisonPill // Finish the promise (it could already be finished in case of failure) - if (outstandingMessages.isEmpty) promise.trySuccess(Done) + if (outstandingMessageCount == 0) promise.trySuccess(Done) else promise.tryFailure( - new IllegalStateException(s"No acknowledge for ${outstandingMessages.size} events.") + new IllegalStateException(s"No acknowledge for $outstandingMessageCount events.") ) } @@ -101,17 +99,16 @@ class KinesisSinkGraphStage(producerActorProps: => Props, new InHandler { override def onPush(): Unit = { val element = grab(in) - val toSend = SendWithCallback(element) - outstandingMessages += toSend.messageId -> element - producerActor ! toSend - if (outstandingMessages.size < maxOutstanding) pull(in) + outstandingMessageCount += 1 + producerActor ! SendWithCallback(element) + if (outstandingMessageCount < maxOutstanding) pull(in) } override def onUpstreamFinish(): Unit = { logger.info("Upstream is finished!") // Only finish the stage if there are no outstanding messages // If there are outstanding messages, the receive handler will complete the stage. - if (outstandingMessages.isEmpty) completeStage() + if (outstandingMessageCount == 0) completeStage() } override def onUpstreamFailure(ex: Throwable): Unit = { @@ -124,19 +121,18 @@ class KinesisSinkGraphStage(producerActorProps: => Props, ) def receive: Receive = { - case (_, SendSuccessful(messageId, recordResult)) => - outstandingMessages.remove(messageId).foreach { producerEvent => - logger.debug( - s"Message acknowledged: $producerEvent with result $recordResult. ${outstandingMessages.size} messages outstanding." - ) - // upstream is finished? - if (isClosed(in)) { - // only complete the stage if all outstanding messages are acknowledged - if (outstandingMessages.isEmpty) completeStage() - } else { - // signal demand - if (outstandingMessages.size < maxOutstanding) pull(in) - } + case (_, SendSuccessful(_, recordResult)) => + outstandingMessageCount -= 1 + logger.debug( + s"Message acknowledged: $recordResult. $outstandingMessageCount messages outstanding." + ) + // upstream is finished? + if (isClosed(in)) { + // only complete the stage if all outstanding messages are acknowledged + if (outstandingMessageCount == 0) completeStage() + } else { + // signal demand + if (outstandingMessageCount < maxOutstanding) pull(in) } case (_, SendFailed(messageId, reason)) =>