Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #9 from WW-Digital/feature/tweak-versioning
Browse files Browse the repository at this point in the history
* Tweak versioning
* Add Contributors
* Add TOC to readme
  • Loading branch information
markglh authored May 31, 2017
2 parents 11d892d + c61be63 commit 3d3bcf1
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 28 deletions.
106 changes: 83 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,39 @@ Kinesis client built upon Amazon's KCL ([Kinesis Client Library](http://docs.aws

It's worth familiarising yourself with [Sequence numbers and Sub sequence numbers](http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).

## Usage
# Contents
* [Dependency Resolution](#dependency-resolution)
* [Considerations When Using Kinesis in a Distributed Environment](#considerations-when-using-kinesis-in-a-distributed-environment)
* [Required Minimum Sharding for Read-Based Applications](#considerations-when-using-kinesis-in-a-distributed-environment-required-minimum-sharding-for-read-based-applications)
* [DynamoDB Checkpoint Storage](#considerations-when-using-kinesis-in-a-distributed-environment-dynamodb-checkpoint-storage)
* [Usage](#usage)
* [Kinesis streams and auth](#usage-kinesis-streams-and-auth)
* [Defining a config file in the client application](#usage-defining-a-config-file-in-the-client-application)
* [Notable Consumer Configuration Values](#usage-defining-a-config-file-in-the-client-application-notable-consumer-configuration-values)
* [Notable Producer Configuration Values](#usage-defining-a-config-file-in-the-client-application-notable-producer-configuration-values)
* [Usage: Consumer](#usage-usage-consumer)
* [Important considerations when implementing the Event Processor](#usage-usage-consumer-important-considerations-when-implementing-the-event-processor)
* [Checkpointing](#usage-usage-consumer-checkpointing)
* [Graceful Shutdown](#usage-usage-consumer-graceful-shutdown)
* [Usage: Producer](#usage-usage-producer)
* [Actor Based Implementation](#usage-usage-producer-actor-based-implementation)
* [Pure Scala based implementation (simple wrapper around KPL)](#usage-usage-producer-pure-scala-based-implementation-simple-wrapper-around-kpl)
* [Running the reliability test](#running-the-reliability-test)
* [Delete & recreate kinesisstreams and dynamo table](#running-the-reliability-test-delete-recreate-kinesisstreams-and-dynamo-table)
* [Running the producer-consumer test](#running-the-reliability-test-running-the-producer-consumer-test)
* [FAQ](#faq)
* [Contributor Guide](#contributor-guide)
* [Code Formatting](#contributor-guide-code-formatting)
* [Tag Requirements](#contributor-guide-tag-requirements)
* [Version information](#contributor-guide-tag-requirements-version-information)
* [Valid Release Tag Examples:](#contributor-guide-tag-requirements-valid-release-tag-examples)
* [Invalid Release Tag Examples:](#contributor-guide-tag-requirements-invalid-release-tag-examples)
* [Contribution policy](#contribution-policy)
* [License](#license)


<a name="dependency-resolution"></a>
# Dependency Resolution
SBT
```
"com.weightwatchers" %% "reactive-kinesis" % 0.5.0
Expand All @@ -25,9 +57,11 @@ Snapshots will be published [here](https://oss.jfrog.org/webapp/#/artifacts/brow
You will need the following resolver for snapshots:
`https://oss.jfrog.org/artifactory/oss-snapshot-local`

## Considerations When Using Kinesis in a Distributed Environment
<a name="considerations-when-using-kinesis-in-a-distributed-environment"></a>
# Considerations When Using Kinesis in a Distributed Environment

#### Required Minimum Sharding for Read-Based Applications
<a name="considerations-when-using-kinesis-in-a-distributed-environment-required-minimum-sharding-for-read-based-applications"></a>
## Required Minimum Sharding for Read-Based Applications

From http://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-scaling.html:

Expand All @@ -42,7 +76,8 @@ For our purposes, this means *any service reading from Kinesis should expect to
have one shard per instance, as a minimum*. Note that this is specifically for consuming events.
Producers don't have the same shard restrictions.

#### DynamoDB Checkpoint Storage
<a name="considerations-when-using-kinesis-in-a-distributed-environment-dynamodb-checkpoint-storage"></a>
## DynamoDB Checkpoint Storage

Amazon's KCL uses DynamoDB to checkpoint progress through reading the stream. When DynamoDB tables are provisioned automatically, for this purpose, they may have a relatively high write-throughput, which can incur additional cost.

Expand All @@ -56,9 +91,11 @@ The checkpointer will automatically throttle if the write throughput is not suff

`Throttled by DynamoDB on checkpointing -- backing off...`

## Usage
<a name="usage"></a>
# Usage

### Kinesis streams and auth
<a name="usage-kinesis-streams-and-auth"></a>
## Kinesis streams and auth

The stream you have configured must already exist in AWS, e.g.

Expand All @@ -77,7 +114,8 @@ aws_secret_access_key=AAAAAAAAAAAA000000000+AAAAAAAAAAAAAAAAAA
Both the producer and consumer use the [DefaultAWSCredentialsProviderChain](http://docs.aws.amazon.com/java-sdk/latest/developer-guide/credentials.html#id6).


### Defining a config file in the client application
<a name="usage-defining-a-config-file-in-the-client-application"></a>
## Defining a config file in the client application


You'll need some configuration values provided in the application which leverages this library,
Expand Down Expand Up @@ -124,7 +162,8 @@ Once these are defined, you can pass them into the Kinesis producer and consumer
Note that the `application-name` is combined with the `stream-name` for each consumer to define the DynamoDB table for checkpointing.
For example: `SampleService-sample-consumer`.

#### Notable Consumer Configuration Values
<a name="usage-defining-a-config-file-in-the-client-application-notable-consumer-configuration-values"></a>
### Notable Consumer Configuration Values
* `kinesis.<consumer-name>.akka.dispatcher` - Sets the dispatcher for the consumer, defaults to `kinesis.akka.default-dispatcher`
* `kinesis.<consumer-name>.worker.batchTimeoutSeconds` - The timeout for processing a batch.
Note that any messages not processed within this time will be retried (according to the configuration). After retrying any
Expand All @@ -139,7 +178,8 @@ we wait for this amount of time.
* `kinesis.<consumer-name>.kcl.initialPositionInStream` - Controls our strategy for pulling from Kinesis (LATEST, TRIM_HORIZON, ..)
* `kinesis.<consumer-name>.kcl.maxRecords` - The maximum batch size.

#### Notable Producer Configuration Values
<a name="usage-defining-a-config-file-in-the-client-application-notable-producer-configuration-values"></a>
### Notable Producer Configuration Values
* `kinesis.<producer-name>.akka.dispatcher` - Sets the dispatcher for the producer, defaults to `kinesis.akka.default-dispatcher`
* `kinesis.<producer-name>.akka.max-outstanding-requests` - Enables artificial throttling within the Producer.
This limits the number of futures in play at any one time. Each message creates a new future (internally in the KPL),
Expand All @@ -149,7 +189,8 @@ which allows us to track the progress of sent messages when they go with the nex
* `kinesis.<producer-name>.kpl.Region` - The AWS Region to use.
* `kinesis.<producer-name>.kpl.RateLimit` - Limits the maximum allowed put rate for a shard, as a percentage of the backend limits.

### Usage: Consumer
<a name="usage-usage-consumer"></a>
## Usage: Consumer

![Consumer Architecture](https://www.lucidchart.com/publicSegments/view/69b7b7d1-bc09-4dcc-ab1b-a0f7c6e1ffc6/image.png)

Expand Down Expand Up @@ -232,17 +273,20 @@ case class ConsumerWorkerFailure(failedEvents: Seq[ConsumerEvent], shardId: Stri
case class ConsumerShutdown(shardId: String)
```

#### Important considerations when implementing the Event Processor
<a name="usage-usage-consumer-important-considerations-when-implementing-the-event-processor"></a>
### Important considerations when implementing the Event Processor
* The Event Processor MUST handle [[ProcessEvent]] messages (for each message)
* The Event Processor MUST respond with [[EventProcessed]] after processing of the [[ProcessEvent]]
* The Event Processor may set `successful` to false to indicate the message can be skipped
* The Event Processor SHOULD handle [[ConsumerWorkerFailure]] messages which signal a critical failure in the Consumer.
* The Event Processor SHOULD handle [[ConsumerShutdown]] messages which siganl a graceful shutdown of the Consumer.

#### Checkpointing
<a name="usage-usage-consumer-checkpointing"></a>
### Checkpointing
The client will handle checkpointing asynchronously PER SHARD according to the configuration using a separate actor.

#### Graceful Shutdown
<a name="usage-usage-consumer-graceful-shutdown"></a>
### Graceful Shutdown

Currently the KinesisConsumer Shutdown works as follows:
* Shutdown is called on the `KinesisConsumer` (either explicitly or via the jvm shutdown hook)
Expand All @@ -254,14 +298,16 @@ Currently the KinesisConsumer Shutdown works as follows:
* The shutdown timeout is configured by: `kinesis.<consumer-name>.worker.shutdownTimeoutSeconds`
* The shutdown hooks can be disabled using: `kinesis.<consumer-name>.worker.gracefulShutdownHook`

### Usage: Producer
<a name="usage-usage-producer"></a>
## Usage: Producer

The KPL Client sends messages in batches, each message creates a Future which completes upon successful send or failure.

See Amazon's documentation for more information:
https://github.com/awslabs/amazon-kinesis-producer

#### Actor Based Implementation
<a name="usage-usage-producer-actor-based-implementation"></a>
### Actor Based Implementation

This implementation will optionally throttle of the number of futures currently in play,
according to the `max-outstanding-requests` property.
Expand Down Expand Up @@ -302,7 +348,8 @@ case class SendFailed(messageId: String, reason: Throwable)



###### Within an Actor (Strongly recommended)
<a name="usage-usage-producer-actor-based-implementation-within-an-actor-strongly-recommended"></a>
#### Within an Actor (Strongly recommended)

```scala
import java.util.UUID
Expand Down Expand Up @@ -339,7 +386,8 @@ class SomeActor(kinesisConfig: Config) extends Actor {
}
```

##### From outside of an Actor
<a name="usage-usage-producer-actor-based-implementation-from-outside-of-an-actor"></a>
#### From outside of an Actor

```scala
import java.util.UUID
Expand All @@ -360,7 +408,8 @@ kpa ! Send(producerEvent) //Send without a callback confirmation
```


#### Pure Scala based implementation (simple wrapper around KPL)
<a name="usage-usage-producer-pure-scala-based-implementation-simple-wrapper-around-kpl"></a>
### Pure Scala based implementation (simple wrapper around KPL)
*Note that throttling will be unavailable using this method.*

```scala
Expand Down Expand Up @@ -396,15 +445,18 @@ callback onFailure {
```


<a name="running-the-reliability-test"></a>
# Running the reliability test

### Delete & recreate kinesisstreams and dynamo table
<a name="running-the-reliability-test-delete-recreate-kinesisstreams-and-dynamo-table"></a>
## Delete &amp; recreate kinesisstreams and dynamo table
Execute this command in a shell. If you don't have access to WW AWS resources, you'll need it:
```
aws kinesis delete-stream --stream-name test-kinesis-reliability && aws dynamodb delete-table --table-name KinesisReliabilitySpec && sleep 90 && aws kinesis create-stream --stream-name test-kinesis-reliability --shard-count 2
```

### Running the producer-consumer test
<a name="running-the-reliability-test-running-the-producer-consumer-test"></a>
## Running the producer-consumer test

Run the `SimpleKinesisProducer` using the App object.

Expand All @@ -427,6 +479,7 @@ As the test progresses, watch the consumer window for a message of this format:

You'll see some stats logged regarding messages/sec processed, near that line.

<a name="faq"></a>
# FAQ
* How is DynamoDB used in relation to out checkpointing?
* DynamoDB tables will be automatically created, however the write throughput must be configured appropriately using the AWS console or CLI. There is a cost associated with this, but note that setting it too low will cause checkpoint throttling. Configure `kinesis.<consumer-name>.checkpointer.intervalMillis` accordingly.
Expand All @@ -450,30 +503,35 @@ You'll see some stats logged regarding messages/sec processed, near that line.
* To uniquely identify a record on a shard, you need to use **BOTH** the `sequence number` and the `sub-sequence number`. This is because messages that are aggregated together have the same sequence number (they are treated as one messages by Kinesis). Therefore it is important to also use the sub-sequence number to distinguish between them.


<a name="contributor-guide"></a>
# Contributor Guide

<a name="contributor-guide-code-formatting"></a>
## Code Formatting
This project uses [scalafmt](http://scalameta.org/scalafmt/) and will automatically fail the build if any files do not match the expected formatting.

Please run `sbt scalafmt` before committing and pushing changes.

<a name="contributor-guide-tag-requirements"></a>
## Tag Requirements
Uses tags and [sbt-git](https://github.com/sbt/sbt-git) to determine the current version.
* Each merge into master will automatically build a snapshot and publish to [bintray OSS artifactory](https://www.jfrog.com/confluence/display/RTF/Deploying+Snapshots+to+oss.jfrog.org).
* Tagging the master branch will automatically build and publish both Scala 2.11 & Scala 2.12 artifacts (to bintray and maven central).
* Tags are in the format vX.X.X

<a name="contributor-guide-tag-requirements-version-information"></a>
### Version information
* IF the current commit is tagged with "vX.Y.Z" (ie semantic-versioning), the version is "X.Y.Z"
* ELSE IF the current commit is tagged with "vX.Y.Z-Mx", the version is "X.Y.Z-Mx"
* ELSE IF the current commit is tagged with "vX.Y.Z-SNAPSHOT", the version is "X.Y.Z-commitsSinceVersion-SNAPSHOT"
* ELSE IF the latest found tag is "vX.Y.Z", the version is "X.Y.Z-commitsSinceVersion-gCommitHash-SNAPSHOT"
* ELSE the version is "0.0.0-commitHash-SNAPSHOT"

<a name="contributor-guide-tag-requirements-valid-release-tag-examples"></a>
### Valid Release Tag Examples:
v1.2.3 (version=1.2.3)
v1.2.3-M1 (version=1.2.3-M1)

<a name="contributor-guide-tag-requirements-invalid-release-tag-examples"></a>
### Invalid Release Tag Examples:
v1.2.3-SNAPSHOT
v1.2.3-M1-SNAPSHOT
Expand All @@ -483,7 +541,8 @@ v1.2.3-X1
If the current version on master is a snapshot (release tag + x commits),
then the artifact will be deployed to the [JFrog OSS repository](https://oss.jfrog.org/webapp/#/artifacts/browse/simple/General/oss-snapshot-local/com/weightwatchers):

## Contribution policy ##
<a name="contribution-policy"></a>
# Contribution policy

Contributions via GitHub pull requests are gladly accepted from their original author. Along with
any pull requests, please state that the contribution is your original work and that you license
Expand All @@ -492,7 +551,8 @@ explicitly, by submitting any copyrighted material via pull request, email, or o
agree to license the material under the project's open source license and warrant that you have the
legal authority to do so.

## License ##
<a name="license"></a>
# License

This code is open source software licensed under the
[Apache 2.0](http://www.apache.org/licenses/LICENSE-2.0) license.
6 changes: 2 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ lazy val commonSettings =
*
* IF the current commit is tagged with "vX.Y.Z" (ie semantic-versioning), the version is "X.Y.Z"
* ELSE IF the current commit is tagged with "vX.Y.Z-Mx", the version is "X.Y.Z-Mx"
* ELSE IF the current commit is tagged with "vX.Y.Z-SNAPSHOT", the version is "X.Y.Z-commitsSinceVersion-SNAPSHOT"
* ELSE IF the latest found tag is "vX.Y.Z", the version is "X.Y.Z-commitsSinceVersion-gCommitHash-SNAPSHOT"
* ELSE the version is "0.0.0-commitHash-SNAPSHOT"
*/
Expand All @@ -162,18 +161,17 @@ lazy val versioningSettings =
Seq(
git.baseVersion := "0.0.0",
git.useGitDescribe := true,
git.uncommittedSignifier := None,
git.gitTagToVersionNumber := {
case VersionRegex(v, "") => Some(v) //e.g. 1.0.0
case VersionRegex(v, s)
if MilestoneRegex.findFirstIn(s).isDefined => Some(s"$v-$s") //e.g. 1.0.0-M1
case VersionRegex(v, s)
if s.endsWith("SNAPSHOT") => Some(s"$v-SNAPSHOT") //e.g. 1.0.0-SNAPSHOT
case VersionRegex(v, "SNAPSHOT") => Some(s"$v-SNAPSHOT") //e.g. 1.0.0-SNAPSHOT
case VersionRegex(v, s) => Some(s"$v-$s-SNAPSHOT") //e.g. 1.0.0-2-commithash-SNAPSHOT
case _ => None
}
)


import de.heikoseeberger.sbtheader.license._
import sbt.Keys.parallelExecution

Expand Down
7 changes: 6 additions & 1 deletion publish.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ pomIncludeRepository := { _ => false } //remove optional dependencies from our p
licenses += ("Apache-2.0", url("https://www.apache.org/licenses/LICENSE-2.0.html"))
homepage := Some(url("http://www.weightwatchers.com"))
scmInfo := Some(ScmInfo(url("https://github.com/WW-Digital/reactive-kinesis"), "scm:[email protected]:WW-Digital/reactive-kinesis.git"))
developers := List(Developer("markglh", "Mark Harrison", "[email protected]", url("https://github.com/markglh")))
developers := List(
Developer("markglh", "Mark Harrison", "[email protected]", url("https://github.com/markglh")),
Developer("felixt-cake", "Felix Terkhorn", "[email protected]", url("https://github.com/felixt-cake")),
Developer("DavidDeCoding", "David De", "[email protected]", url("https://github.com/DavidDeCoding")),
Developer("agaro1121", "Anthony Garo", "[email protected]", url("https://github.com/agaro1121")),
Developer("dluwtw", "David Lu", "[email protected]", url("https://github.com/dluwtw")))
publishArtifact in Test := false
bintrayReleaseOnPublish := false //We're releasing via travis, set to true to automatically release on publish instead
publishMavenStyle := true
Expand Down

0 comments on commit 3d3bcf1

Please sign in to comment.