Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Commit

Permalink
Add additional factory methods to Kinesis to create Akka Source (#61)
Browse files Browse the repository at this point in the history
* Add additional factory methods to Kinesis object, to create Kinesis stream source with explicitly specifying ConsumerService

* Updated the README and changed the parameter order of the Kinesis.source

* Fixed the formatting issues and the compiler error due to the multiple overloaded methods with default params

* Removed ambiguous constructor from KinesisSourceGraphStage, as the functionality delegated to the Kinesis object

* Fixed formatting issues
  • Loading branch information
htimur authored and markglh committed Apr 23, 2018
1 parent c635b6f commit 15890a5
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 25 deletions.
26 changes: 24 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ An Akka `Source` is provided that can be used with streams. It is possible to cr
directly from the consumer name that is defined in the configuration.
Every message that is emitted to the stream is of type `CommitableEvent[ConsumerEvent]` and has to be committed
explicitly downstream with a call to `event.commit()`. It is possible to map to a different type of `CommittableEvent`
via the `map` and `mapAsync` functionality.
via the `map` and `mapAsync` functionality. A `KinesisConsumer` is created internally for the `Kinesis.source`, when the factory method isn't defined.

```scala
import com.weightwatchers.reactive.kinesis.stream._
Expand All @@ -325,7 +325,29 @@ Kinesis
.runWith(Sink.seq)
```

A `KinesisConsumer` is used internally for the `Kinesis.source`. All rules described here for the `KinesisConsumer` also apply for the stream source.
Or you can explicitly path a lambda, to create the `KinesisConsumer`.

```scala
import akka.actor.{ActorRef, ActorSystem}
import akka.stream.scaladsl.Sink
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer
import com.weightwatchers.reactive.kinesis.stream._

val sys = ActorSystem("kinesis-consumer-system")

Kinesis
.source(
"consumer-name",
(conf: KinesisConsumer.ConsumerConf, eventProcessor: ActorRef) => KinesisConsumer(conf, eventProcessor, sys)
)
.take(100)
.map(event => event.map(_.payloadAsString())) // read and map payload as string
.mapAsync(10)(event => event.mapAsync(Downloader.download(event.payload))) // handle an async message
.map(event => event.commit()) // mark the event as handled by calling commit
.runWith(Sink.seq)
```

All rules described here for the `KinesisConsumer` also apply for the stream source.

<a name="usage-usage-consumer-graceful-shutdown"></a>
### Graceful Shutdown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
package com.weightwatchers.reactive.kinesis.stream

import akka.{Done, NotUsed}
import akka.actor.{ActorSystem, Props}
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.stream.scaladsl.{Sink, Source}
import com.amazonaws.auth.AWSCredentialsProvider
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import com.weightwatchers.reactive.kinesis.consumer.{ConsumerService, KinesisConsumer}
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.ConsumerConf
import com.weightwatchers.reactive.kinesis.models.{ConsumerEvent, ProducerEvent}
import com.weightwatchers.reactive.kinesis.producer.{KinesisProducerActor, ProducerConf}
Expand All @@ -33,6 +34,23 @@ import scala.concurrent.Future
*/
object Kinesis extends LazyLogging {

/**
* Create a source, that provides KinesisEvents.
* Please note: every KinesisEvent has to be committed during the user flow!
* Uncommitted events will be retransmitted after a timeout.
*
* @param consumerConf the configuration to connect to Kinesis.
* @param createConsumer factory function to create ConsumerService from eventProcessor ActorRef.
* @param system the actor system.
* @return A source of KinesisEvent objects.
*/
def source(
consumerConf: ConsumerConf,
createConsumer: ActorRef => ConsumerService
)(implicit system: ActorSystem): Source[CommittableEvent[ConsumerEvent], NotUsed] = {
Source.fromGraph(new KinesisSourceGraphStage(consumerConf, createConsumer, system))
}

/**
* Create a source, that provides KinesisEvents.
* Please note: every KinesisEvent has to be committed during the user flow!
Expand All @@ -45,7 +63,7 @@ object Kinesis extends LazyLogging {
def source(
consumerConf: ConsumerConf
)(implicit system: ActorSystem): Source[CommittableEvent[ConsumerEvent], NotUsed] = {
Source.fromGraph(new KinesisSourceGraphStage(consumerConf, system))
source(consumerConf, KinesisConsumer(consumerConf, _, system))
}

/**
Expand All @@ -69,12 +87,69 @@ object Kinesis extends LazyLogging {
* @param system the actor system to use.
* @return A source of KinesisEvent objects.
*/
def source(consumerName: String, inConfig: String = "kinesis")(
def source(consumerName: String, inConfig: String)(
implicit system: ActorSystem
): Source[CommittableEvent[ConsumerEvent], NotUsed] = {
source(ConsumerConf(system.settings.config.getConfig(inConfig), consumerName))
}

/**
* Create a source by using the actor system configuration, that provides KinesisEvents.
* Please note: every KinesisEvent has to be committed during the user flow!
* Uncommitted events will be retransmitted after a timeout.
*
* A minimal application conf file should look like this:
* {{{
* kinesis {
* application-name = "SampleService"
* consumer-name {
* stream-name = "sample-stream"
* }
* }
* }}}
* See kinesis reference.conf for a list of all available config options.
*
* @param consumerName the name of the consumer in the application.conf.
* @param system the actor system to use.
* @return A source of KinesisEvent objects.
*/
def source(consumerName: String)(
implicit system: ActorSystem
): Source[CommittableEvent[ConsumerEvent], NotUsed] = {
source(consumerName, "kinesis")
}

/**
* Create a source by using the actor system configuration, that provides KinesisEvents.
* Please note: every KinesisEvent has to be committed during the user flow!
* Uncommitted events will be retransmitted after a timeout.
*
* A minimal application conf file should look like this:
* {{{
* kinesis {
* application-name = "SampleService"
* consumer-name {
* stream-name = "sample-stream"
* }
* }
* }}}
* See kinesis reference.conf for a list of all available config options.
*
* @param consumerName the name of the consumer in the application.conf.
* @param createConsumer factory function to create ConsumerService from eventProcessor ActorRef.
* @param inConfig the name of the sub-config for kinesis.
* @param system the actor system to use.
* @return A source of KinesisEvent objects.
*/
def source(
consumerName: String,
createConsumer: (ConsumerConf, ActorRef) => ConsumerService,
inConfig: String = "kinesis"
)(implicit system: ActorSystem): Source[CommittableEvent[ConsumerEvent], NotUsed] = {
val consumerConf = ConsumerConf(system.settings.config.getConfig(inConfig), consumerName)
source(consumerConf, createConsumer(consumerConf, _))
}

/**
* Create a Sink that accepts ProducerEvents, which get published to Kinesis.
*
Expand All @@ -91,9 +166,10 @@ object Kinesis extends LazyLogging {
* @param system the actor system.
* @return A sink that accepts ProducerEvents.
*/
def sink(props: => Props, maxOutStanding: Int)(
implicit system: ActorSystem
): Sink[ProducerEvent, Future[Done]] = {
def sink(
props: => Props,
maxOutStanding: Int
)(implicit system: ActorSystem): Sink[ProducerEvent, Future[Done]] = {
Sink.fromGraph(new KinesisSinkGraphStage(props, maxOutStanding, system))
}

Expand Down Expand Up @@ -143,11 +219,11 @@ object Kinesis extends LazyLogging {
* @param system the actor system.
* @return A sink that accepts ProducerEvents.
*/
def sink(kinesisConfig: Config,
producerName: String,
credentialsProvider: Option[AWSCredentialsProvider])(
implicit system: ActorSystem
): Sink[ProducerEvent, Future[Done]] = {
def sink(
kinesisConfig: Config,
producerName: String,
credentialsProvider: Option[AWSCredentialsProvider]
)(implicit system: ActorSystem): Sink[ProducerEvent, Future[Done]] = {
sink(
ProducerConf(kinesisConfig, producerName, credentialsProvider)
)
Expand Down Expand Up @@ -183,11 +259,11 @@ object Kinesis extends LazyLogging {
* @param system the actor system.
* @return A sink that accepts ProducerEvents.
*/
def sink(producerName: String,
inConfig: String = "kinesis",
credentialsProvider: Option[AWSCredentialsProvider] = None)(
implicit system: ActorSystem
): Sink[ProducerEvent, Future[Done]] = {
def sink(
producerName: String,
inConfig: String = "kinesis",
credentialsProvider: Option[AWSCredentialsProvider] = None
)(implicit system: ActorSystem): Sink[ProducerEvent, Future[Done]] = {
sink(system.settings.config.getConfig(inConfig), producerName, credentialsProvider)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,6 @@ class KinesisSourceGraphStage(config: ConsumerConf,
extends GraphStage[SourceShape[CommittableEvent[ConsumerEvent]]]
with LazyLogging {

/**
* Ctor that uses the KinesisConsumer as ConsumerService implementation.
*/
def this(config: ConsumerConf, actorSystem: ActorSystem) = {
this(config, KinesisConsumer(config, _, actorSystem), actorSystem)
}

private[this] val out: Outlet[CommittableEvent[ConsumerEvent]] = Outlet("KinesisSource.out")
override val shape: SourceShape[CommittableEvent[ConsumerEvent]] = SourceShape.of(out)

Expand Down

0 comments on commit 15890a5

Please sign in to comment.