Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Fix #25 by providing an Akka Stream Source graph stage for Kinesis. #39

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8eb406e
Fix #25 by providing an Akka Stream Source graph stage for Kinesis.
aquamatthias Oct 28, 2017
6ffc579
Format all sources.
aquamatthias Oct 28, 2017
6bd85cd
upgrade akka
markglh Dec 3, 2017
65288c0
fixed deprecations
markglh Dec 3, 2017
368931a
fixed stream integration test
markglh Dec 3, 2017
e1aa742
initial refactor of stream int tests
markglh Dec 3, 2017
96b6f85
further int test refactoring
markglh Dec 3, 2017
7385176
fixed formatting
markglh Dec 3, 2017
11f0323
swapped to TestCredentials for dynamo
markglh Dec 3, 2017
ab77ce9
Fix package structure.
aquamatthias Dec 11, 2017
1b545b1
Create KCL Lease tables proactively, since KCL handles this in an unr…
aquamatthias Dec 11, 2017
57e7c31
The KinesisProducerTest is broken - ignore to minimize the change.
aquamatthias Dec 11, 2017
ecfed81
Reenable producer spec.
aquamatthias Dec 11, 2017
1e892ad
Use small thread pool for integration test.
aquamatthias Dec 11, 2017
be06a46
remove docker-compose down
markglh Dec 16, 2017
3ebbcc6
merge sbt commands in travis
markglh Dec 16, 2017
5d91f98
run code formatter
markglh Dec 16, 2017
c5a04a3
removed custom workerid from tests
markglh Dec 17, 2017
95430d3
formatting
markglh Dec 17, 2017
1f99cb8
refactored producer int test
markglh Dec 17, 2017
f3af1a9
changed stream name
markglh Dec 17, 2017
7a7f83b
formatting
markglh Dec 17, 2017
278cc51
Add explicit type.
aquamatthias Dec 18, 2017
2ac5e93
KinesisEvent should allow to map the payload to a different type in t…
aquamatthias Dec 13, 2017
bd9e1c0
KinesisEvent should not be sealed.
aquamatthias Dec 14, 2017
302a693
Refactor KinesisEvent to CommittableEvent which wraps a typed payload.
aquamatthias Dec 21, 2017
8009c8d
Introduce ConsumerService trait for KinesisSource, so we are able to …
aquamatthias Dec 21, 2017
460651a
Fix string->ByteBuffer conversion after rebase.
aquamatthias Dec 21, 2017
357efd3
Describe the akka stream source in README.md
aquamatthias Dec 21, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ before_script:
- docker-compose -f localstack/docker-compose.yml up -d
- docker ps -a
script:
- sbt ++$TRAVIS_SCALA_VERSION clean coverage scalafmtTest test
- sbt ++$TRAVIS_SCALA_VERSION it:test
- sbt ++$TRAVIS_SCALA_VERSION clean coverage scalafmtTest test it:test
# Tagged releases
- if [ $TRAVIS_TEST_RESULT -eq 0 -a "$TRAVIS_PULL_REQUEST" = "false" ] && [[ "$TRAVIS_TAG"
=~ ^v[0-9]+\.[0-9]+(\.[0-9]+)?(-M[0-9]+)?$ ]]; then echo "** Publishing Release
Expand Down
79 changes: 54 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ It's worth familiarising yourself with [Sequence numbers and Sub sequence number
* [Notable Consumer Configuration Values](#usage-defining-a-config-file-in-the-client-application-notable-consumer-configuration-values)
* [Notable Producer Configuration Values](#usage-defining-a-config-file-in-the-client-application-notable-producer-configuration-values)
* [Usage: Consumer](#usage-usage-consumer)
* [Important considerations when implementing the Event Processor](#usage-usage-consumer-important-considerations-when-implementing-the-event-processor)
* [Checkpointing](#usage-usage-consumer-checkpointing)
* [Actor Based Consumer](#actor-based-consumer)
* [Important considerations when implementing the Event Processor](#usage-usage-consumer-important-considerations-when-implementing-the-event-processor)
* [Checkpointing](#usage-usage-consumer-checkpointing)
* [Akka Stream Source](#akka-stream-source)
* [Graceful Shutdown](#usage-usage-consumer-graceful-shutdown)
* [Usage: Producer](#usage-usage-producer)
* [Actor Based Implementation](#usage-usage-producer-actor-based-implementation)
Expand Down Expand Up @@ -84,18 +86,18 @@ From http://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-scal
> exceeds the number of instances.

For our purposes, this means *any service reading from Kinesis should expect to
have one shard per instance, as a minimum*. Note that this is specifically for consuming events.
have one shard per instance, as a minimum*. Note that this is specifically for consuming events.
Producers don't have the same shard restrictions.

<a name="considerations-when-using-kinesis-in-a-distributed-environment-dynamodb-checkpoint-storage"></a>
## DynamoDB Checkpoint Storage

Amazon's KCL uses DynamoDB to checkpoint progress through reading the stream. When DynamoDB tables are provisioned automatically, for this purpose, they may have a relatively high write-throughput, which can incur additional cost.
Amazon's KCL uses DynamoDB to checkpoint progress through reading the stream. When DynamoDB tables are provisioned automatically, for this purpose, they may have a relatively high write-throughput, which can incur additional cost.

You should make sure that the DynamoDB table used for checkpointing your stream
You should make sure that the DynamoDB table used for checkpointing your stream

1. Has a reasonable write throughput defined

2. Is cleaned up when you're done with it -- KCL will not automatically delete it for you

The checkpointer will automatically throttle if the write throughput is not sufficient, look out for the following info log:
Expand Down Expand Up @@ -154,7 +156,7 @@ kinesis {
# The name of the consumer stream, MUST be specified per consumer and MUST exist
stream-name = "sample-consumer"
}

some-other-consumer {
stream-name = "another-sample-consumer"
}
Expand All @@ -176,24 +178,24 @@ For example: `SampleService-sample-consumer`.
<a name="usage-defining-a-config-file-in-the-client-application-notable-consumer-configuration-values"></a>
### Notable Consumer Configuration Values
* `kinesis.<consumer-name>.akka.dispatcher` - Sets the dispatcher for the consumer, defaults to `kinesis.akka.default-dispatcher`
* `kinesis.<consumer-name>.worker.batchTimeoutSeconds` - The timeout for processing a batch.
* `kinesis.<consumer-name>.worker.batchTimeoutSeconds` - The timeout for processing a batch.
Note that any messages not processed within this time will be retried (according to the configuration). After retrying any
unconfirmed messages will be considered failed.
* `kinesis.<consumer-name>.worker.failedMessageRetries` - The number of times to retry failed messages within a batch, after the batch timeout.
* `kinesis.<consumer-name>.worker.failureTolerancePercentage` - If, after retrying, messages are still unconfirmed.
* `kinesis.<consumer-name>.worker.failureTolerancePercentage` - If, after retrying, messages are still unconfirmed.
We will either continue processing the next batch, or shutdown processing completely depending on this tolerance percentage.
* `kinesis.<consumer-name>.checkpointer.backoffMillis` - When DynamoDB throttles us (due to hitting the write threshold)
we wait for this amount of time.
we wait for this amount of time.
* `kinesis.<consumer-name>.checkpointer.intervalMillis` - The interval between checkpoints. Setting this too high will cause lots of
messages to be duplicated in event of a failed node. Setting it too low will result in throttling from DynamoDB.
messages to be duplicated in event of a failed node. Setting it too low will result in throttling from DynamoDB.
* `kinesis.<consumer-name>.kcl.initialPositionInStream` - Controls our strategy for pulling from Kinesis (LATEST, TRIM_HORIZON, ..)
* `kinesis.<consumer-name>.kcl.maxRecords` - The maximum batch size.

<a name="usage-defining-a-config-file-in-the-client-application-notable-producer-configuration-values"></a>
### Notable Producer Configuration Values
* `kinesis.<producer-name>.akka.dispatcher` - Sets the dispatcher for the producer, defaults to `kinesis.akka.default-dispatcher`
* `kinesis.<producer-name>.akka.max-outstanding-requests` - Enables artificial throttling within the Producer.
This limits the number of futures in play at any one time. Each message creates a new future (internally in the KPL),
* `kinesis.<producer-name>.akka.max-outstanding-requests` - Enables artificial throttling within the Producer.
This limits the number of futures in play at any one time. Each message creates a new future (internally in the KPL),
which allows us to track the progress of sent messages when they go with the next batch.
* `kinesis.<producer-name>.akka.throttling-retry-millis` - How soon to retry after hitting the above throttling cap.
* `kinesis.<producer-name>.kpl.AggregationEnabled` - Enables [aggregation of messages](http://docs.aws.amazon.com/streams/latest/dev/kinesis-producer-adv-aggregation.html).
Expand All @@ -202,12 +204,16 @@ which allows us to track the progress of sent messages when they go with the nex

<a name="usage-usage-consumer"></a>
## Usage: Consumer
`reactive-kinesis` provides two different ways to consume messages from Kinesis: [Actor Based Consumer](#actor-based-consumer) and [Akka Stream Source](#akka-stream-source).

![Consumer Architecture](https://www.lucidchart.com/publicSegments/view/69b7b7d1-bc09-4dcc-ab1b-a0f7c6e1ffc6/image.png)

Implementing the consumer requires a simple actor which is responsible for processing messages sent to it by the library.
We call this the `Event Processor`.
Upon creating an instance of the `KinesisConsumer`, internally one `ConsumerWorker` (this is different from the KCL Worker) is
<a name="actor-based-consumer"></a>
### Actor Based Consumer

Implementing the consumer requires a simple actor which is responsible for processing messages sent to it by the library.
We call this the `Event Processor`.
Upon creating an instance of the `KinesisConsumer`, internally one `ConsumerWorker` (this is different from the KCL Worker) is
created per shard (shards are distributed amongst consumers automatically). This consumer worker is what sends messages to the
`Event Processor`. Note that the `Event Processor` is shared amongst ALL shards, so it is important not to cache the sender of previous messages.
It is perfectly valid to use a router to spread the work amongst many `Event Processor` actors.
Expand Down Expand Up @@ -246,7 +252,7 @@ object Consumer extends App {
val system = akka.actor.ActorSystem.create("test-system")
val config = ConfigFactory.load()
val eventProcessor = system.actorOf(Props[TestEventProcessor], "test-processor")
val consumer = KinesisConsumer(ConsumerConf(config.getConfig("kinesis"), "some-consumer"),
val consumer = KinesisConsumer(ConsumerConf(config.getConfig("kinesis"), "some-consumer"),
eventProcessor, system)
consumer.start()
}
Expand Down Expand Up @@ -285,17 +291,40 @@ case class ConsumerShutdown(shardId: String)
```

<a name="usage-usage-consumer-important-considerations-when-implementing-the-event-processor"></a>
### Important considerations when implementing the Event Processor
#### Important considerations when implementing the Event Processor
* The Event Processor MUST handle [[ProcessEvent]] messages (for each message)
* The Event Processor MUST respond with [[EventProcessed]] after processing of the [[ProcessEvent]]
* The Event Processor may set `successful` to false to indicate the message can be skipped
* The Event Processor SHOULD handle [[ConsumerWorkerFailure]] messages which signal a critical failure in the Consumer.
* The Event Processor SHOULD handle [[ConsumerShutdown]] messages which siganl a graceful shutdown of the Consumer.

<a name="usage-usage-consumer-checkpointing"></a>
### Checkpointing
#### Checkpointing
The client will handle checkpointing asynchronously PER SHARD according to the configuration using a separate actor.

<a name="akka-stream-source"></a>
### Akka Stream Source

An Akka `Source` is provided that can be used with streams. It is possible to create a source from a `ConsumerConf` or
directly from the consumer name that is defined in the configuration.
Every message that is emitted to the stream is of type `CommitableEvent[ConsumerEvent]` and has to be committed
explicitly downstream with a call to `event.commit()`. It is possible to map to a different type of `CommittableEvent`
via the `map` and `mapAsync` functionality.

```scala
import com.weightwatchers.reactive.kinesis.stream._

Kinesis
.source("consumer-name")
.take(100)
.map(event => event.map(_.payloadAsString())) // read and map payload as string
.mapAsync(10)(event => event.mapAsync(Downloader.download(event.payload))) // handle an async message
.map(event => event.commit()) // mark the event as handled by calling commit
.runWith(Sink.seq)
```

A `KinesisConsumer` is used internally for the `Kinesis.source`. All rules described here for the `KinesisConsumer` also apply for the stream source.

<a name="usage-usage-consumer-graceful-shutdown"></a>
### Graceful Shutdown

Expand Down Expand Up @@ -494,7 +523,7 @@ You'll see some stats logged regarding messages/sec processed, near that line.
* How is DynamoDB used in relation to out checkpointing?
* DynamoDB tables will be automatically created, however the write throughput must be configured appropriately using the AWS console or CLI. There is a cost associated with this, but note that setting it too low will cause checkpoint throttling. Configure `kinesis.<consumer-name>.checkpointer.intervalMillis` accordingly.
* How is data sharded?
* Sharding relates to the distribution of messages across the shards for a given stream. Ideally you want an even distribution amongst our shards. However ordering is only guaranteed within a given shard, it is therefore important to group related messages by shard. For example if a specific user performs several operations, in which the order of execution matters, then ensuring they land on the same shard will guarantee the order is maintained.
* Sharding relates to the distribution of messages across the shards for a given stream. Ideally you want an even distribution amongst our shards. However ordering is only guaranteed within a given shard, it is therefore important to group related messages by shard. For example if a specific user performs several operations, in which the order of execution matters, then ensuring they land on the same shard will guarantee the order is maintained.
* For this, the `partition key` is used. Messages with the same partition key will land on the same shard. In the example above a userId may be a good `partition key`.
* Note that if the number of partition keys exceeds the number of shards, some shards necessarily contain records with different partition keys. From a design standpoint, to ensure that all your shards are well utilized, the number of shards should be substantially less than the number of unique partition keys.
* How long do we keep data?
Expand All @@ -510,7 +539,7 @@ You'll see some stats logged regarding messages/sec processed, near that line.
* `LATEST` - Start reading just after the most recent record in the shard, so that you always read the most recent data in the shard.
* How do sequence numbers work?
* Sequence numbers for the same partition key generally increase over time, but **NOT** necessarily in a continuous sequence. The longer the time period between records, the bigger the gap between the sequence numbers.
* To uniquely identify a record on a shard, you need to use **BOTH** the `sequence number` and the `sub-sequence number`. This is because messages that are aggregated together have the same sequence number (they are treated as one messages by Kinesis). Therefore it is important to also use the sub-sequence number to distinguish between them.
* To uniquely identify a record on a shard, you need to use **BOTH** the `sequence number` and the `sub-sequence number`. This is because messages that are aggregated together have the same sequence number (they are treated as one messages by Kinesis). Therefore it is important to also use the sub-sequence number to distinguish between them.


<a name="contributor-guide"></a>
Expand All @@ -534,7 +563,7 @@ Uses tags and [sbt-git](https://github.com/sbt/sbt-git) to determine the current
* Each merge into master will automatically build a snapshot and publish to [bintray OSS artifactory](https://www.jfrog.com/confluence/display/RTF/Deploying+Snapshots+to+oss.jfrog.org).
* Tagging the master branch will automatically build and publish both Scala 2.11 & Scala 2.12 artifacts (to bintray and maven central).
* Tags are in the format vX.X.X

<a name="contributor-guide-tag-requirements-version-information"></a>
### Version information
* IF the current commit is tagged with "vX.Y.Z" (ie semantic-versioning), the version is "X.Y.Z"
Expand All @@ -554,8 +583,8 @@ v1.2.3-M1-SNAPSHOT
v1.2.3-X1
1.2.3

If the current version on master is a snapshot (release tag + x commits),
then the artifact will be deployed to the [JFrog OSS repository](https://oss.jfrog.org/webapp/#/artifacts/browse/simple/General/oss-snapshot-local/com/weightwatchers):
If the current version on master is a snapshot (release tag + x commits),
then the artifact will be deployed to the [JFrog OSS repository](https://oss.jfrog.org/webapp/#/artifacts/browse/simple/General/oss-snapshot-local/com/weightwatchers):

<a name="contribution-policy"></a>
# Contribution policy
Expand Down
13 changes: 7 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ lazy val library =
val scalaTest = "3.0.3"
val kamon = "0.6.6"
val jackson = "2.8.7"
val akka = "2.5.1"
val akka = "2.5.7"
}

val jackson = Seq(
Expand All @@ -53,6 +53,7 @@ lazy val library =
val lightbend = Seq(
"com.typesafe" % "config" % "1.3.1" % Compile,
"com.typesafe.akka" %% "akka-actor" % Version.akka % Compile,
"com.typesafe.akka" %% "akka-stream" % Version.akka % Compile,
"com.typesafe.scala-logging" %% "scala-logging" % "3.5.0" % Compile
)

Expand All @@ -72,11 +73,11 @@ lazy val library =
"org.scalacheck" %% "scalacheck" % Version.scalaCheck % "it,test",
"com.typesafe.akka" %% "akka-testkit" % Version.akka % "it,test",
"org.mockito" % "mockito-core" % "2.7.15" % "it,test",
"io.kamon" %% "kamon-core" % Version.kamon % Test,
"io.kamon" %% "kamon-akka-2.4" % Version.kamon % Test,
"io.kamon" %% "kamon-statsd" % Version.kamon % Test,
"io.kamon" %% "kamon-log-reporter" % Version.kamon % Test,
"io.kamon" %% "kamon-system-metrics" % Version.kamon % Test
"io.kamon" %% "kamon-core" % Version.kamon % "it,test",
"io.kamon" %% "kamon-akka-2.4" % Version.kamon % "it,test",
"io.kamon" %% "kamon-statsd" % Version.kamon % "it,test",
"io.kamon" %% "kamon-log-reporter" % Version.kamon % "it,test",
"io.kamon" %% "kamon-system-metrics" % Version.kamon % "it,test"
)
}

Expand Down
26 changes: 2 additions & 24 deletions src/it/resources/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,9 @@ kinesis {

application-name = "KinesisReliabilitySpec"

# The name of the this producer, we can have many producers per application.
# MUST contain the stream-name as a minimum. Any additional settings defined will override
# defaults in the kinesis.producer reference.conf for this producer only.
testProducer {
# The name of the producer stream
stream-name = "test-kinesis-reliability"
### TODO To be removed...

# Can specify settings here as per default-producer, to override those defaults for this producer.

akka {
max-outstanding-requests = 50000
}

kpl {
Region = us-east-1
}
}

# The name of the consumer, we can have many consumers per application
testConsumer {
# The name of the consumer stream, MUST be specified per consumer
stream-name = "test-kinesis-reliability"
}

## Test specific properties
## (Simplexxx) Test specific properties
test {
# These settings basically say: Send 5,000,000 total messages, split into 2500 batches with 100 ms between batches
# Equates to max throughput of 20,000/sec on producer
Expand Down
Loading