Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Add an Akka Stream Sink graph stage for Kinesis. #47

Merged
merged 12 commits into from
Feb 1, 2018

Conversation

aquamatthias
Copy link
Contributor

@aquamatthias aquamatthias commented Jan 4, 2018

Since there is now a Source as Kinesis Consumer, a Sink is needed to act as Kinesis Producer.

This PR adds a KinesisSinkGraphStage that uses an underlying KinesisProducerActor to do the heavy lifting. The stream manages back pressure by allowing only a fixed number of outstanding messages. A materialized value is used to indicate, when a stream has been finished (or failed).
A unit as well as integration test is in place.

Open topics:

  • Can we use throttling conf for the number of outstanding messages or should this be a separate config option?
  • The current actor based implementation does not limit the number of outstanding messages by default. This PR does not change this, but uses a default (of 1000 messages) if throttling conf is not available, since a defined maximum is a better default IMO. WDYT?
  • Documentation in README.md is missing. Todo on my side.

@coveralls
Copy link

Coverage Status

Coverage increased (+1.6%) to 89.655% when pulling 456ca86 on aquamatthias:mv/sink into 9185032 on WW-Digital:master.

@coveralls
Copy link

coveralls commented Jan 4, 2018

Coverage Status

Coverage increased (+1.3%) to 89.425% when pulling dddcf3c on aquamatthias:mv/sink into 9185032 on WW-Digital:master.

@coveralls
Copy link

Coverage Status

Coverage increased (+1.6%) to 89.726% when pulling 106734e on aquamatthias:mv/sink into 9185032 on WW-Digital:master.

1 similar comment
@coveralls
Copy link

Coverage Status

Coverage increased (+1.6%) to 89.726% when pulling 106734e on aquamatthias:mv/sink into 9185032 on WW-Digital:master.

@coveralls
Copy link

Coverage Status

Coverage increased (+1.8%) to 89.931% when pulling b63e42c on aquamatthias:mv/sink into 9185032 on WW-Digital:master.

@coveralls
Copy link

coveralls commented Jan 5, 2018

Coverage Status

Coverage increased (+2.1%) to 90.205% when pulling 053dfff on aquamatthias:mv/sink into 9185032 on WW-Digital:master.

@markglh markglh self-assigned this Jan 8, 2018
@coveralls
Copy link

Coverage Status

Coverage increased (+2.1%) to 90.205% when pulling 0c64c97 on aquamatthias:mv/sink into 6ac885e on WW-Digital:master.

@coveralls
Copy link

coveralls commented Jan 13, 2018

Coverage Status

Coverage increased (+2.08%) to 90.183% when pulling f8730d4 on aquamatthias:mv/sink into 6ac885e on WW-Digital:master.

@markglh
Copy link
Contributor

markglh commented Jan 18, 2018

I haven't forgotten about this it's just been a crazy week!

@aquamatthias
Copy link
Contributor Author

@markglh I totally understand. Looking forward to see your review.

@markglh
Copy link
Contributor

markglh commented Jan 24, 2018

@aquamatthias I'll get this reviewed today.

In answer to your questions:

  • Q. Can we use throttling conf for the number of outstanding messages or should this be a separate config option?
    • A. Yeah that makes sense to me, will you update the comments in reference.conf to make it clear the property serves both purposes?
  • Q. The current actor based implementation does not limit the number of outstanding messages by default. This PR does not change this, but uses a default (of 1000 messages) if throttling conf is not available, since a defined maximum is a better default IMO. WDYT?
    • A. Yeh that's a sensible default - I see you've already added a log too to notify about this, good stuff!

Copy link
Contributor

@markglh markglh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome work again, thanks @aquamatthias !

Just a few comments / questions

README.md Outdated
### Akka Stream Sink

An Akka `Sink` is provided which can be used to publish messages via streams.
Every message is send as `ProduserEvent` to the `Sink`, which defines the PartitionKey as well as the payload.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny grammar correction: Every message is sent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grammar my old enemy - thanks for correction.

README.md Outdated
Every message is send as `ProduserEvent` to the `Sink`, which defines the PartitionKey as well as the payload.
The `Sink` is created from a `ProducerConf` or directly with a `KinesisProducerActor`. See [Kinesis](https://github.com/WW-Digital/reactive-kinesis/blob/master/src/main/scala/com/weightwatchers/reactive/kinesis/stream/Kinesis.scala) for the various options.

The `Sink` expects an acknowledgement for every messages send to Kinesis.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expects an acknowledgement for every message sent to Kinesis

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

README.md Outdated

The `Sink` expects an acknowledgement for every messages send to Kinesis.
An amount of unacknowledged messages can be configured, before back pressure is applied.
See the throttling conf for defining this configuration value.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be clearer as:
This throttling is controlled by the kinesis.{producer}.akka.max-outstanding-requests configuration value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed much better.

README.md Outdated
Please note: a default value (1000 messages) is applied, if throttling is not configured.

The provided `Sink` produces a `Future[Done]` as materialized value.
This future succeeds, if all messages from upstream are send to Kinesis and acknowledged.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just swap out send for sent :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


// Holds all outstanding messages by its message identifier.
// Note: the identifier is created here locally and does not make sense outside of this stage.
val outstandingMessages: mutable.AnyRefMap[String, ProducerEvent] = mutable.AnyRefMap.empty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of questions about this:

  • If there's a break in sending messages (quiet period), will it complete and stop? As in, when outstandingMessages is empty.
  • Do we need to store the outstanding messages, or could we instead defer to producer.outstandingRecordsCount() like the underlying actor does? I'm thinking about memory consumption here, albeit a small amount.
  • When we receive a SendFailed - is failing the stage the right thing to do? As in, should we instead retry? Happy to defer this logic to another PR if it's worthwhile, in on the fence right now. Hard to know how often this will happen in real world usage. If often, then it's worth it.
  • Regarding the earlier discussion about re-using the maxOutstandingRequests config. Am I correct in saying the following:
    • We'll stop requesting more messages when this count is hit (e.g. 1000 messages), which in theory should mean we won't actually throttle. So the fact that throttling is enabled won't matter because we won't start buffering messages within the actor.
    • The reason I implemented this was to reduce the number of futures in play (thereby saving memory and reducing work within the KPL). But it can potentially add latency depending on the value of throttling-retry-millis - so my question here is, do we ever want this to happen here, if not then it does actually make sense to use a different property - which goes against my earlier answer :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Q: If there's a break in sending messages (quiet period), will it complete and stop? As in, when outstandingMessages is empty.

    • A: No. The stream finishes only, if upstream finishes or the stream fails. See the SourceQueue Example added to the Readme : all messages send to the SourceQueue get published to Kinesis.
  • Q: Do we need to store the outstanding messages, or could we instead defer to producer.outstandingRecordsCount() like the underlying actor does? I'm thinking about memory consumption here, albeit a small amount.

    • A: The graph stage runs async with the actor and the producer - the decision based on outstandingRecordsCount of the producer might be wrong. Since there is also no need to store a reference to all outstanding messages, I reworked my solution to only use an internal counter (and removed the map). This should be correct and has no memory impact.
  • Q: When we receive a SendFailed - is failing the stage the right thing to do? As in, should we instead retry? Happy to defer this logic to another PR if it's worthwhile, in on the fence right now. Hard to know how often this will happen in real world usage. If often, then it's worth it.

  • Q: We'll stop requesting more messages when this count is hit (e.g. 1000 messages), which in theory should mean we won't actually throttle. So the fact that throttling is enabled won't matter because we won't start buffering messages within the actor.

    • A: akka streams are reactive: events are send downstream, demand is send upstream. The sink signals demand until maxOutstandingMessages are outstanding. If this number is reached no demand is signalled upstream. This also means that no more messages flow through the stream. The source of the stream will not emit more messages until demand is signalled.
  • Q: The reason I implemented this was to reduce the number of futures in play (thereby saving memory and reducing work within the KPL). But it can potentially add latency depending on the value of throttling-retry-millis - so my question here is, do we ever want this to happen here, if not then it does actually make sense to use a different property - which goes against my earlier answer

    • A: If the source is faster than the sink, the number of outstanding messages will grow over time which will result in an application failure. IMHO it is necessary to define a maximum number of outstanding messages and to react (e.g. upstream should not produce more messages until demand is signalled) to prevent this. This is what akka streams provide out of the box. And this is also the reason to use a sensible throttling default. Since the number can be configured and can get arbitrary large, I don't think we will suffer from latency.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 thanks!

import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

class KinesisSinkGraphStageSpec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good coverage of scenarios here!

One comment, some aspects might be simpler using the streams testkit - and it may avoid having to resolve the futureValue: https://doc.akka.io/docs/akka/2.5.5/scala/stream/stream-testkit.html#streams-testkit

For example using the TestSource to assert exceptions / completion and perhaps introducer breaks in the flow of messages more easily (to allow assertions between messages).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestKit is great. The materialized Future produced by the sink is part of the GraphStage, that I wanted to test as well. If you see a test case not covered, that you would like to see - please let me know.

@markglh markglh merged commit f6aaeb7 into WW-Digital:master Feb 1, 2018
@aquamatthias
Copy link
Contributor Author

🎉 🎉 🎉

@aquamatthias aquamatthias deleted the mv/sink branch February 1, 2018 10:33
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Development

Successfully merging this pull request may close these issues.

3 participants