Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Commit

Permalink
KPL Typed Config (#65)
Browse files Browse the repository at this point in the history
* Adding typed config for the KPL
  • Loading branch information
etspaceman authored and markglh committed Jul 13, 2018
1 parent 3bd04e5 commit 3e9bb6c
Show file tree
Hide file tree
Showing 5 changed files with 397 additions and 4 deletions.
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
sudo: false
sudo: required
language: scala
services:
- docker
scala:
- 2.11.11
- 2.12.5
Expand Down
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ It's worth familiarising yourself with [Sequence numbers and Sub sequence number
* [Defining a config file in the client application](#usage-defining-a-config-file-in-the-client-application)
* [Notable Consumer Configuration Values](#usage-defining-a-config-file-in-the-client-application-notable-consumer-configuration-values)
* [Notable Producer Configuration Values](#usage-defining-a-config-file-in-the-client-application-notable-producer-configuration-values)
* [Typed Configuration - Producer](#typed-configuration---producer)
* [Usage: Consumer](#usage-usage-consumer)
* [Actor Based Consumer](#actor-based-consumer)
* [Important considerations when implementing the Event Processor](#usage-usage-consumer-important-considerations-when-implementing-the-event-processor)
Expand Down Expand Up @@ -205,6 +206,37 @@ which allows us to track the progress of sent messages when they go with the nex
* `kinesis.<producer-name>.kpl.RateLimit` - Limits the maximum allowed put rate for a shard, as a percentage of the backend limits.

<a name="usage-usage-consumer"></a>

### Typed Configuration - Producer
If you don't want to depend on config files, there's a typed configuration class available: [KinesisProducerConfig](src/main/scala/com/weightwatchers/reactive/kinesis/producer/KinesisProducerConfig.scala)

You can construct it in a few ways:

```scala
// With default values
val defaultProducerConfig = KinesisProducerConfig()

// With a provided KinesisProducerConfiguration from the Java KPL library
val awsKinesisConfig: KinesisProducerConfiguration = ...
val producerConfig = KinesisProducerConfig(awsKinesisConfig)

// With a typesafe-config object
val typesafeConfig: Config = ...
val producerConfig = KinesisProducerConfig(typesafeConfig)

// With a typesafe-config object and an AWSCredentialsProvider
val typesafeConfig: Config = ...
val credentialsProvider: AWSCredentialsProvider = ...
val producerConfig = KinesisProducerConfig(typesafeConfig, credentialsProvider)
```

These can be used to create a ProducerConf and ultimately a KinesisProducer, like so:

```scala
val producerConfig: KinesisProducerConfig = ...
val producerConf: ProducerConf = ProducerConf(producerConfig, "my-stream-name", None, None)
```

## Usage: Consumer
`reactive-kinesis` provides two different ways to consume messages from Kinesis: [Actor Based Consumer](#actor-based-consumer) and [Akka Stream Source](#akka-stream-source).

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Copyright 2017 WeightWatchers
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.weightwatchers.reactive.kinesis.producer

import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.regions.Regions
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration.ThreadingModel
import com.amazonaws.services.kinesis.producer.protobuf.Config.AdditionalDimension
import com.typesafe.config.{Config, ConfigFactory}

/** Typed config class for the KPL */
final case class KinesisProducerConfig(
additionalMetricDimensions: List[AdditionalDimension],
credentialsProvider: Option[AWSCredentialsProvider],
metricsCredentialsProvider: Option[AWSCredentialsProvider],
aggregationEnabled: Boolean,
aggregationMaxCount: Long,
aggregationMaxSize: Long,
cloudwatchEndpoint: Option[String],
cloudwatchPort: Long,
collectionMaxCount: Long,
collectionMaxSize: Long,
connectTimeout: Long,
credentialsRefreshDelay: Long,
enableCoreDumps: Boolean,
failIfThrottled: Boolean,
kinesisEndpoint: Option[String],
kinesisPort: Long,
logLevel: String,
maxConnections: Long,
metricsGranularity: String,
metricsLevel: String,
metricsNamespace: String,
metricsUploadDelay: Long,
minConnections: Long,
nativeExecutable: Option[String],
rateLimit: Long,
recordMaxBufferedTime: Long,
recordTtl: Long,
region: Option[Regions],
requestTimeout: Long,
tempDirectory: Option[String],
verifyCertificate: Boolean,
threadingModel: ThreadingModel,
threadPoolSize: Int
) {

def toAwsConfig: KinesisProducerConfiguration = {
val initial = new KinesisProducerConfiguration()
.setAggregationEnabled(aggregationEnabled)
.setAggregationMaxCount(aggregationMaxCount)
.setAggregationMaxSize(aggregationMaxSize)
.setCloudwatchPort(cloudwatchPort)
.setCollectionMaxCount(collectionMaxCount)
.setCollectionMaxSize(collectionMaxSize)
.setConnectTimeout(connectTimeout)
.setCredentialsRefreshDelay(credentialsRefreshDelay)
.setEnableCoreDumps(enableCoreDumps)
.setFailIfThrottled(failIfThrottled)
.setKinesisPort(kinesisPort)
.setLogLevel(logLevel)
.setMaxConnections(maxConnections)
.setMetricsGranularity(metricsGranularity)
.setMetricsLevel(metricsLevel)
.setMetricsNamespace(metricsNamespace)
.setMetricsUploadDelay(metricsUploadDelay)
.setMinConnections(minConnections)
.setRateLimit(rateLimit)
.setRecordMaxBufferedTime(recordMaxBufferedTime)
.setRecordTtl(recordTtl)
.setRequestTimeout(requestTimeout)
.setVerifyCertificate(verifyCertificate)
.setThreadingModel(threadingModel)
.setThreadPoolSize(threadPoolSize)

KinesisProducerConfig.setAdditionalDimensions(initial, additionalMetricDimensions)

// This is ugly
val wCredProv = credentialsProvider.fold(initial)(initial.setCredentialsProvider)
val wMetricCredProv =
metricsCredentialsProvider.fold(wCredProv)(wCredProv.setMetricsCredentialsProvider)
val wCWEP = cloudwatchEndpoint.fold(wMetricCredProv)(wMetricCredProv.setCloudwatchEndpoint)
val wKinesisEP = kinesisEndpoint.fold(wCWEP)(wCWEP.setKinesisEndpoint)
val wNativeExec = nativeExecutable.fold(wKinesisEP)(wKinesisEP.setNativeExecutable)
val wRegion = region.fold(wNativeExec)(reg => wNativeExec.setRegion(reg.getName))
val wTempDir = tempDirectory.fold(wRegion)(wRegion.setTempDirectory)

wTempDir
}
}

object KinesisProducerConfig {
def apply(): KinesisProducerConfig = default
def apply(config: KinesisProducerConfiguration): KinesisProducerConfig = fromAwsConfig(config)
def apply(config: Config): KinesisProducerConfig =
fromAwsConfig(ProducerConf.buildKPLConfig(config, None))
def apply(config: Config, credentialsProvider: AWSCredentialsProvider): KinesisProducerConfig =
fromAwsConfig(ProducerConf.buildKPLConfig(config, Some(credentialsProvider)))

// Sets default values as if no typesafe configuration was passed. This ensures that the default
// KinesisProducerConfiguration is used
def default: KinesisProducerConfig = {
val defaultConfig = ProducerConf.buildKPLConfig(ConfigFactory.empty(), None)
fromAwsConfig(defaultConfig)
}

private def fromAwsConfig(config: KinesisProducerConfiguration): KinesisProducerConfig =
KinesisProducerConfig(
additionalMetricDimensions = List(), // No way to retrieve this from a KinesisProducerConfiguration
credentialsProvider = Some(config.getCredentialsProvider),
metricsCredentialsProvider = Some(config.getMetricsCredentialsProvider),
aggregationEnabled = config.isAggregationEnabled,
aggregationMaxCount = config.getAggregationMaxCount,
aggregationMaxSize = config.getAggregationMaxSize,
cloudwatchEndpoint = Some(config.getCloudwatchEndpoint),
cloudwatchPort = config.getCloudwatchPort,
collectionMaxCount = config.getCollectionMaxCount,
collectionMaxSize = config.getCollectionMaxSize,
connectTimeout = config.getConnectTimeout,
credentialsRefreshDelay = config.getCredentialsRefreshDelay,
enableCoreDumps = config.isEnableCoreDumps,
failIfThrottled = config.isFailIfThrottled,
kinesisEndpoint = Some(config.getKinesisEndpoint),
kinesisPort = config.getKinesisPort,
logLevel = config.getLogLevel,
maxConnections = config.getMaxConnections,
metricsGranularity = config.getMetricsGranularity,
metricsLevel = config.getMetricsLevel,
metricsNamespace = config.getMetricsNamespace,
metricsUploadDelay = config.getMetricsUploadDelay,
minConnections = config.getMinConnections,
nativeExecutable = Some(config.getNativeExecutable),
rateLimit = config.getRateLimit,
recordMaxBufferedTime = config.getRecordMaxBufferedTime,
recordTtl = config.getRecordTtl,
region = config.getRegion match {
case x if x.isEmpty => None
case x => Some(Regions.fromName(x))
},
requestTimeout = config.getRequestTimeout,
tempDirectory = Some(config.getTempDirectory),
verifyCertificate = config.isVerifyCertificate,
threadingModel = config.getThreadingModel,
threadPoolSize = config.getThreadPoolSize
)

private def setAdditionalDimensions(
conf: KinesisProducerConfiguration,
dimensions: List[AdditionalDimension]
) = dimensions.foldLeft(conf) { (conf, dimension) =>
conf.addAdditionalMetricsDimension(dimension.getKey,
dimension.getValue,
dimension.getGranularity)
conf
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,24 @@ object ProducerConf {
parseThrottlingConfig(producerConfig))
}

private def buildKPLConfig(kplConfig: Config,
credentialsProvider: Option[AWSCredentialsProvider]) = {
/**
* Simple typed apply method
*
* @param kinesisConfig The top level Kinesis Configuration, containing the specified producer
* @param streamName The name of the Kinesis stream to consume
* @param dispatcher Optional config path for the akka dispatcher
* @param throttlingConf Optional configuration which defines whether and how often to throttle
* @return A [[ProducerConf]] case class used for constructing the [[KinesisProducerActor]]
*/
def apply(kinesisConfig: KinesisProducerConfig,
streamName: String,
dispatcher: Option[String],
throttlingConf: Option[ThrottlingConf]): ProducerConf = {

new ProducerConf(streamName, kinesisConfig.toAwsConfig, dispatcher, throttlingConf)
}

def buildKPLConfig(kplConfig: Config, credentialsProvider: Option[AWSCredentialsProvider]) = {
// We directly load our properties into the KPL as a Java `Properties` object
// See http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html
import TypesafeConfigExtensions._
Expand Down
Loading

0 comments on commit 3e9bb6c

Please sign in to comment.