diff --git a/.travis.yml b/.travis.yml index 8f5ed65..fc93777 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,7 @@ -sudo: false +sudo: required language: scala +services: + - docker scala: - 2.11.11 - 2.12.5 diff --git a/README.md b/README.md index 02c0008..f322a29 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ It's worth familiarising yourself with [Sequence numbers and Sub sequence number * [Defining a config file in the client application](#usage-defining-a-config-file-in-the-client-application) * [Notable Consumer Configuration Values](#usage-defining-a-config-file-in-the-client-application-notable-consumer-configuration-values) * [Notable Producer Configuration Values](#usage-defining-a-config-file-in-the-client-application-notable-producer-configuration-values) + * [Typed Configuration - Producer](#typed-configuration---producer) * [Usage: Consumer](#usage-usage-consumer) * [Actor Based Consumer](#actor-based-consumer) * [Important considerations when implementing the Event Processor](#usage-usage-consumer-important-considerations-when-implementing-the-event-processor) @@ -205,6 +206,37 @@ which allows us to track the progress of sent messages when they go with the nex * `kinesis..kpl.RateLimit` - Limits the maximum allowed put rate for a shard, as a percentage of the backend limits. + +### Typed Configuration - Producer +If you don't want to depend on config files, there's a typed configuration class available: [KinesisProducerConfig](src/main/scala/com/weightwatchers/reactive/kinesis/producer/KinesisProducerConfig.scala) + +You can construct it in a few ways: + +```scala +// With default values +val defaultProducerConfig = KinesisProducerConfig() + +// With a provided KinesisProducerConfiguration from the Java KPL library +val awsKinesisConfig: KinesisProducerConfiguration = ... +val producerConfig = KinesisProducerConfig(awsKinesisConfig) + +// With a typesafe-config object +val typesafeConfig: Config = ... +val producerConfig = KinesisProducerConfig(typesafeConfig) + +// With a typesafe-config object and an AWSCredentialsProvider +val typesafeConfig: Config = ... +val credentialsProvider: AWSCredentialsProvider = ... +val producerConfig = KinesisProducerConfig(typesafeConfig, credentialsProvider) +``` + +These can be used to create a ProducerConf and ultimately a KinesisProducer, like so: + +```scala +val producerConfig: KinesisProducerConfig = ... +val producerConf: ProducerConf = ProducerConf(producerConfig, "my-stream-name", None, None) +``` + ## Usage: Consumer `reactive-kinesis` provides two different ways to consume messages from Kinesis: [Actor Based Consumer](#actor-based-consumer) and [Akka Stream Source](#akka-stream-source). diff --git a/src/main/scala/com/weightwatchers/reactive/kinesis/producer/KinesisProducerConfig.scala b/src/main/scala/com/weightwatchers/reactive/kinesis/producer/KinesisProducerConfig.scala new file mode 100644 index 0000000..b357a14 --- /dev/null +++ b/src/main/scala/com/weightwatchers/reactive/kinesis/producer/KinesisProducerConfig.scala @@ -0,0 +1,171 @@ +/* + * Copyright 2017 WeightWatchers + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.weightwatchers.reactive.kinesis.producer + +import com.amazonaws.auth.AWSCredentialsProvider +import com.amazonaws.regions.Regions +import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration +import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration.ThreadingModel +import com.amazonaws.services.kinesis.producer.protobuf.Config.AdditionalDimension +import com.typesafe.config.{Config, ConfigFactory} + +/** Typed config class for the KPL */ +final case class KinesisProducerConfig( + additionalMetricDimensions: List[AdditionalDimension], + credentialsProvider: Option[AWSCredentialsProvider], + metricsCredentialsProvider: Option[AWSCredentialsProvider], + aggregationEnabled: Boolean, + aggregationMaxCount: Long, + aggregationMaxSize: Long, + cloudwatchEndpoint: Option[String], + cloudwatchPort: Long, + collectionMaxCount: Long, + collectionMaxSize: Long, + connectTimeout: Long, + credentialsRefreshDelay: Long, + enableCoreDumps: Boolean, + failIfThrottled: Boolean, + kinesisEndpoint: Option[String], + kinesisPort: Long, + logLevel: String, + maxConnections: Long, + metricsGranularity: String, + metricsLevel: String, + metricsNamespace: String, + metricsUploadDelay: Long, + minConnections: Long, + nativeExecutable: Option[String], + rateLimit: Long, + recordMaxBufferedTime: Long, + recordTtl: Long, + region: Option[Regions], + requestTimeout: Long, + tempDirectory: Option[String], + verifyCertificate: Boolean, + threadingModel: ThreadingModel, + threadPoolSize: Int +) { + + def toAwsConfig: KinesisProducerConfiguration = { + val initial = new KinesisProducerConfiguration() + .setAggregationEnabled(aggregationEnabled) + .setAggregationMaxCount(aggregationMaxCount) + .setAggregationMaxSize(aggregationMaxSize) + .setCloudwatchPort(cloudwatchPort) + .setCollectionMaxCount(collectionMaxCount) + .setCollectionMaxSize(collectionMaxSize) + .setConnectTimeout(connectTimeout) + .setCredentialsRefreshDelay(credentialsRefreshDelay) + .setEnableCoreDumps(enableCoreDumps) + .setFailIfThrottled(failIfThrottled) + .setKinesisPort(kinesisPort) + .setLogLevel(logLevel) + .setMaxConnections(maxConnections) + .setMetricsGranularity(metricsGranularity) + .setMetricsLevel(metricsLevel) + .setMetricsNamespace(metricsNamespace) + .setMetricsUploadDelay(metricsUploadDelay) + .setMinConnections(minConnections) + .setRateLimit(rateLimit) + .setRecordMaxBufferedTime(recordMaxBufferedTime) + .setRecordTtl(recordTtl) + .setRequestTimeout(requestTimeout) + .setVerifyCertificate(verifyCertificate) + .setThreadingModel(threadingModel) + .setThreadPoolSize(threadPoolSize) + + KinesisProducerConfig.setAdditionalDimensions(initial, additionalMetricDimensions) + + // This is ugly + val wCredProv = credentialsProvider.fold(initial)(initial.setCredentialsProvider) + val wMetricCredProv = + metricsCredentialsProvider.fold(wCredProv)(wCredProv.setMetricsCredentialsProvider) + val wCWEP = cloudwatchEndpoint.fold(wMetricCredProv)(wMetricCredProv.setCloudwatchEndpoint) + val wKinesisEP = kinesisEndpoint.fold(wCWEP)(wCWEP.setKinesisEndpoint) + val wNativeExec = nativeExecutable.fold(wKinesisEP)(wKinesisEP.setNativeExecutable) + val wRegion = region.fold(wNativeExec)(reg => wNativeExec.setRegion(reg.getName)) + val wTempDir = tempDirectory.fold(wRegion)(wRegion.setTempDirectory) + + wTempDir + } +} + +object KinesisProducerConfig { + def apply(): KinesisProducerConfig = default + def apply(config: KinesisProducerConfiguration): KinesisProducerConfig = fromAwsConfig(config) + def apply(config: Config): KinesisProducerConfig = + fromAwsConfig(ProducerConf.buildKPLConfig(config, None)) + def apply(config: Config, credentialsProvider: AWSCredentialsProvider): KinesisProducerConfig = + fromAwsConfig(ProducerConf.buildKPLConfig(config, Some(credentialsProvider))) + + // Sets default values as if no typesafe configuration was passed. This ensures that the default + // KinesisProducerConfiguration is used + def default: KinesisProducerConfig = { + val defaultConfig = ProducerConf.buildKPLConfig(ConfigFactory.empty(), None) + fromAwsConfig(defaultConfig) + } + + private def fromAwsConfig(config: KinesisProducerConfiguration): KinesisProducerConfig = + KinesisProducerConfig( + additionalMetricDimensions = List(), // No way to retrieve this from a KinesisProducerConfiguration + credentialsProvider = Some(config.getCredentialsProvider), + metricsCredentialsProvider = Some(config.getMetricsCredentialsProvider), + aggregationEnabled = config.isAggregationEnabled, + aggregationMaxCount = config.getAggregationMaxCount, + aggregationMaxSize = config.getAggregationMaxSize, + cloudwatchEndpoint = Some(config.getCloudwatchEndpoint), + cloudwatchPort = config.getCloudwatchPort, + collectionMaxCount = config.getCollectionMaxCount, + collectionMaxSize = config.getCollectionMaxSize, + connectTimeout = config.getConnectTimeout, + credentialsRefreshDelay = config.getCredentialsRefreshDelay, + enableCoreDumps = config.isEnableCoreDumps, + failIfThrottled = config.isFailIfThrottled, + kinesisEndpoint = Some(config.getKinesisEndpoint), + kinesisPort = config.getKinesisPort, + logLevel = config.getLogLevel, + maxConnections = config.getMaxConnections, + metricsGranularity = config.getMetricsGranularity, + metricsLevel = config.getMetricsLevel, + metricsNamespace = config.getMetricsNamespace, + metricsUploadDelay = config.getMetricsUploadDelay, + minConnections = config.getMinConnections, + nativeExecutable = Some(config.getNativeExecutable), + rateLimit = config.getRateLimit, + recordMaxBufferedTime = config.getRecordMaxBufferedTime, + recordTtl = config.getRecordTtl, + region = config.getRegion match { + case x if x.isEmpty => None + case x => Some(Regions.fromName(x)) + }, + requestTimeout = config.getRequestTimeout, + tempDirectory = Some(config.getTempDirectory), + verifyCertificate = config.isVerifyCertificate, + threadingModel = config.getThreadingModel, + threadPoolSize = config.getThreadPoolSize + ) + + private def setAdditionalDimensions( + conf: KinesisProducerConfiguration, + dimensions: List[AdditionalDimension] + ) = dimensions.foldLeft(conf) { (conf, dimension) => + conf.addAdditionalMetricsDimension(dimension.getKey, + dimension.getValue, + dimension.getGranularity) + conf + } +} diff --git a/src/main/scala/com/weightwatchers/reactive/kinesis/producer/ProducerConf.scala b/src/main/scala/com/weightwatchers/reactive/kinesis/producer/ProducerConf.scala index 50ff3cd..1100117 100644 --- a/src/main/scala/com/weightwatchers/reactive/kinesis/producer/ProducerConf.scala +++ b/src/main/scala/com/weightwatchers/reactive/kinesis/producer/ProducerConf.scala @@ -85,8 +85,24 @@ object ProducerConf { parseThrottlingConfig(producerConfig)) } - private def buildKPLConfig(kplConfig: Config, - credentialsProvider: Option[AWSCredentialsProvider]) = { + /** + * Simple typed apply method + * + * @param kinesisConfig The top level Kinesis Configuration, containing the specified producer + * @param streamName The name of the Kinesis stream to consume + * @param dispatcher Optional config path for the akka dispatcher + * @param throttlingConf Optional configuration which defines whether and how often to throttle + * @return A [[ProducerConf]] case class used for constructing the [[KinesisProducerActor]] + */ + def apply(kinesisConfig: KinesisProducerConfig, + streamName: String, + dispatcher: Option[String], + throttlingConf: Option[ThrottlingConf]): ProducerConf = { + + new ProducerConf(streamName, kinesisConfig.toAwsConfig, dispatcher, throttlingConf) + } + + def buildKPLConfig(kplConfig: Config, credentialsProvider: Option[AWSCredentialsProvider]) = { // We directly load our properties into the KPL as a Java `Properties` object // See http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html import TypesafeConfigExtensions._ diff --git a/src/test/scala/com/weightwatchers/reactive/kinesis/producer/ProducerConfSpec.scala b/src/test/scala/com/weightwatchers/reactive/kinesis/producer/ProducerConfSpec.scala index 76b28cb..fd417a8 100644 --- a/src/test/scala/com/weightwatchers/reactive/kinesis/producer/ProducerConfSpec.scala +++ b/src/test/scala/com/weightwatchers/reactive/kinesis/producer/ProducerConfSpec.scala @@ -21,12 +21,24 @@ import java.io.File import akka.actor.ActorSystem import akka.testkit.{ImplicitSender, TestKit} import akka.util.Timeout +import com.amazonaws.auth.ContainerCredentialsProvider.ECSCredentialsEndpointProvider +import com.amazonaws.auth.{ + DefaultAWSCredentialsProviderChain, + EC2ContainerCredentialsProviderWrapper, + EnvironmentVariableCredentialsProvider +} +import com.amazonaws.regions.{Region, Regions} +import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration +import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration.ThreadingModel +import com.amazonaws.services.kinesis.producer.protobuf.Config.AdditionalDimension import com.typesafe.config.ConfigFactory +import org.apache.http.client.CredentialsProvider import org.scalatest.mockito.MockitoSugar import org.scalatest.{BeforeAndAfterAll, FreeSpecLike, Matchers} import scala.concurrent.Await import scala.concurrent.duration._ +import scala.util.Random //scalastyle:off magic.number class ProducerConfSpec @@ -253,9 +265,169 @@ class ProducerConfSpec } } + } + + "The KinesisProducerConfig" - { + "Should convert properly to an AWS KinesisProducerConfiguration object" in { + def randomLong = Random.nextLong().abs + def randomLongInRange(start: Int, end: Int) = start + Random.nextInt((end - start) + 1).toLong + def randomString(chars: Int) = Random.alphanumeric.take(chars).mkString("") + + val testAdditionalDimensions = List( + AdditionalDimension + .newBuilder() + .setKey("key") + .setValue("value") + .setGranularity(List("global", "stream", "shard")(Random.nextInt(3))) + .build() + ) + + val testCredentialsProvider = new EnvironmentVariableCredentialsProvider() + val testMetricsCredentialsProvider = new EC2ContainerCredentialsProviderWrapper() + val testAggregationEnabled = Random.nextBoolean() + val testAggregationMaxCount = randomLong + val testAggregationMaxSize = randomLongInRange(64, 1048576) + val testCloudwatchEndpoint = s"www.cloudwatch.com" + val testCloudwatchPort = randomLongInRange(1, 65535) + val testCollectionMaxCount = randomLongInRange(1, 500) + val testCollectionMaxSize = randomLong + val testConnectTimeout = randomLongInRange(100, 300000) + val testCredentialsRefreshDelay = randomLongInRange(1, 300000) + val testEnableCoreDumps = Random.nextBoolean() + val testFailIfThrottled = Random.nextBoolean() + val testKinesisEndpoint = s"www.kinesis.com" + val testKinesisPort = randomLongInRange(1, 65535) + val testLogLevel = List("info", "warning", "error")(Random.nextInt(3)) + val testMaxConnections = randomLongInRange(1, 256) + val testMetricsGranularity = List("global", "stream", "shard")(Random.nextInt(3)) + val testMetricsLevel = List("none", "summary", "detailed")(Random.nextInt(3)) + val testMetricsNamespace = randomString(5) + val testMetricsUploadDelay = randomLongInRange(1, 60000) + val testMinConnections = randomLongInRange(1, 16) + val testNativeExecutable = s"executable.${randomString(3)}" + val testRateLimit = randomLong + val testRecordMaxBufferedTime = randomLong + val testRecordTtl = randomLong + val testRegion = List(Regions.AP_NORTHEAST_1, + Regions.AP_SOUTH_1, + Regions.US_EAST_1, + Regions.US_WEST_1)(Random.nextInt(4)) + val testRequestTimeout = randomLongInRange(100, 600000) + val testTempDirectory = s"/${randomString(10)}" + val testVerifyCertificate = Random.nextBoolean() + val testThreadingModel = + List(ThreadingModel.PER_REQUEST, ThreadingModel.POOLED)(Random.nextInt(2)) + val testThreadPoolSize = Random.nextInt.abs + + val config = KinesisProducerConfig( + additionalMetricDimensions = testAdditionalDimensions, + credentialsProvider = Some(testCredentialsProvider), + metricsCredentialsProvider = Some(testMetricsCredentialsProvider), + aggregationEnabled = testAggregationEnabled, + aggregationMaxCount = testAggregationMaxCount, + aggregationMaxSize = testAggregationMaxSize, + cloudwatchEndpoint = Some(testCloudwatchEndpoint), + cloudwatchPort = testCloudwatchPort, + collectionMaxCount = testCollectionMaxCount, + collectionMaxSize = testCollectionMaxSize, + connectTimeout = testConnectTimeout, + credentialsRefreshDelay = testCredentialsRefreshDelay, + enableCoreDumps = testEnableCoreDumps, + failIfThrottled = testFailIfThrottled, + kinesisEndpoint = Some(testKinesisEndpoint), + kinesisPort = testKinesisPort, + logLevel = testLogLevel, + maxConnections = testMaxConnections, + metricsGranularity = testMetricsGranularity, + metricsLevel = testMetricsLevel, + metricsNamespace = testMetricsNamespace, + metricsUploadDelay = testMetricsUploadDelay, + minConnections = testMinConnections, + nativeExecutable = Some(testNativeExecutable), + rateLimit = testRateLimit, + recordMaxBufferedTime = testRecordMaxBufferedTime, + recordTtl = testRecordTtl, + region = Some(testRegion), + requestTimeout = testRequestTimeout, + tempDirectory = Some(testTempDirectory), + verifyCertificate = testVerifyCertificate, + threadingModel = testThreadingModel, + threadPoolSize = testThreadPoolSize + ).toAwsConfig + config.getCredentialsProvider should be(testCredentialsProvider) + config.getMetricsCredentialsProvider should be(testMetricsCredentialsProvider) + config.getAggregationMaxCount should be(testAggregationMaxCount) + config.getAggregationMaxSize should be(testAggregationMaxSize) + config.getCloudwatchEndpoint should be(testCloudwatchEndpoint) + config.getCloudwatchPort should be(testCloudwatchPort) + config.getCollectionMaxCount should be(testCollectionMaxCount) + config.getCollectionMaxSize should be(testCollectionMaxSize) + config.getConnectTimeout should be(testConnectTimeout) + config.getCredentialsRefreshDelay should be(testCredentialsRefreshDelay) + config.isEnableCoreDumps should be(testEnableCoreDumps) + config.isFailIfThrottled should be(testFailIfThrottled) + config.getKinesisEndpoint should be(testKinesisEndpoint) + config.getKinesisPort should be(testKinesisPort) + config.getLogLevel should be(testLogLevel) + config.getMaxConnections should be(testMaxConnections) + config.getMetricsGranularity should be(testMetricsGranularity) + config.getMetricsLevel should be(testMetricsLevel) + config.getMetricsNamespace should be(testMetricsNamespace) + config.getMetricsUploadDelay should be(testMetricsUploadDelay) + config.getMinConnections should be(testMinConnections) + config.getNativeExecutable should be(testNativeExecutable) + config.getRateLimit should be(testRateLimit) + config.getRecordMaxBufferedTime should be(testRecordMaxBufferedTime) + config.getRecordTtl should be(testRecordTtl) + config.getRegion should be(testRegion.getName) + config.getRequestTimeout should be(testRequestTimeout) + config.getTempDirectory should be(testTempDirectory) + config.isVerifyCertificate should be(testVerifyCertificate) + config.getThreadingModel should be(testThreadingModel) + config.getThreadPoolSize should be(testThreadPoolSize) + } } -} + "Default method should match provided defaults in KinesisProducerConfiguration" in { + val awsConfig = KinesisProducerConfig().toAwsConfig + val defaultAwsConfig = new KinesisProducerConfiguration() + awsConfig.getCredentialsProvider.getClass.getName should be( + defaultAwsConfig.getCredentialsProvider.getClass.getName + ) + awsConfig.getMetricsCredentialsProvider should be( + defaultAwsConfig.getMetricsCredentialsProvider + ) + awsConfig.getAggregationMaxCount should be(defaultAwsConfig.getAggregationMaxCount) + awsConfig.getAggregationMaxSize should be(defaultAwsConfig.getAggregationMaxSize) + awsConfig.getCloudwatchEndpoint should be(defaultAwsConfig.getCloudwatchEndpoint) + awsConfig.getCloudwatchPort should be(defaultAwsConfig.getCloudwatchPort) + awsConfig.getCollectionMaxCount should be(defaultAwsConfig.getCollectionMaxCount) + awsConfig.getCollectionMaxSize should be(defaultAwsConfig.getCollectionMaxSize) + awsConfig.getConnectTimeout should be(defaultAwsConfig.getConnectTimeout) + awsConfig.getCredentialsRefreshDelay should be(defaultAwsConfig.getCredentialsRefreshDelay) + awsConfig.isEnableCoreDumps should be(defaultAwsConfig.isEnableCoreDumps) + awsConfig.isFailIfThrottled should be(defaultAwsConfig.isFailIfThrottled) + awsConfig.getKinesisEndpoint should be(defaultAwsConfig.getKinesisEndpoint) + awsConfig.getKinesisPort should be(defaultAwsConfig.getKinesisPort) + awsConfig.getLogLevel should be(defaultAwsConfig.getLogLevel) + awsConfig.getMaxConnections should be(defaultAwsConfig.getMaxConnections) + awsConfig.getMetricsGranularity should be(defaultAwsConfig.getMetricsGranularity) + awsConfig.getMetricsLevel should be(defaultAwsConfig.getMetricsLevel) + awsConfig.getMetricsNamespace should be(defaultAwsConfig.getMetricsNamespace) + awsConfig.getMetricsUploadDelay should be(defaultAwsConfig.getMetricsUploadDelay) + awsConfig.getMinConnections should be(defaultAwsConfig.getMinConnections) + awsConfig.getNativeExecutable should be(defaultAwsConfig.getNativeExecutable) + awsConfig.getRateLimit should be(defaultAwsConfig.getRateLimit) + awsConfig.getRecordMaxBufferedTime should be(defaultAwsConfig.getRecordMaxBufferedTime) + awsConfig.getRecordTtl should be(defaultAwsConfig.getRecordTtl) + awsConfig.getRegion should be(defaultAwsConfig.getRegion) + awsConfig.getRequestTimeout should be(defaultAwsConfig.getRequestTimeout) + awsConfig.getTempDirectory should be(defaultAwsConfig.getTempDirectory) + awsConfig.isVerifyCertificate should be(defaultAwsConfig.isVerifyCertificate) + awsConfig.getThreadingModel should be(defaultAwsConfig.getThreadingModel) + awsConfig.getThreadPoolSize should be(defaultAwsConfig.getThreadPoolSize) + } +} //scalastyle:on