Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Commit

Permalink
Missing properties for configuring underlying KPL & KCL PR#18 Issue#16
Browse files Browse the repository at this point in the history
Missing properties for configuring underlying KPL & KCL
  • Loading branch information
fernando-torterolo authored and markglh committed Aug 18, 2017
1 parent 8e2cea7 commit 4f84e22
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 8 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,4 @@ lazy val headerSettings =
headers := Map("scala" -> Apache2_0("2017", "WeightWatchers"))
)

coverageExcludedPackages := "reference.conf"
83 changes: 77 additions & 6 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,39 @@ kinesis {
# Maximum (inclusive): 300000
ConnectTimeout = 6000

# Use a custom Kinesis and CloudWatch endpoint.

# How often to refresh credentials (in milliseconds).
# During a refresh, credentials are retrieved from any SDK credentials providers attached to
# the wrapper and pushed to the core.
#
# Default: 5000
# Minimum: 1
# Maximum (inclusive): 300000
# CredentialsRefreshDelay =

# Use a custom CloudWatch endpoint.
# Note this does not accept protocols or paths, only host names or ip addresses. There is no
# way to disable TLS. The KPL always connects with TLS.
# Expected pattern: ^([A-Za-z0-9-\\.]+)?$
# CloudwatchEndpoint =

# Server port to connect to for CloudWatch.
# Default: 443
# Minimum: 1
# Maximum (inclusive): 65535
# CloudwatchPort =

# This has no effect on Windows.
# If set to true, the KPL native process will attempt to raise its own core file size soft
# limit to 128MB, or the hard limit, whichever is lower. If the soft limit is already at or
# above the target amount, it is not changed.
# Note that even if the limit is successfully raised (or already sufficient), it does not
# guarantee that core files will be written on a crash, since that is dependent on operation
# system settings that's beyond the control of individual processes.
# Default: false
EnableCoreDumps = false

# Use a custom Kinesis endpoint.
#
# Mostly for testing use. Note this does not accept protocols or paths, only
# host names or ip addresses. There is no way to disable TLS. The KPL always
Expand All @@ -129,6 +161,13 @@ kinesis {
# Expected pattern: ^([A-Za-z0-9-\\.]+)?$
# KinesisEndpoint =

# Server port to connect to. Only useful with KinesisEndpoint.
#
# Default: 443
# Minimum: 1
# Maximum (inclusive): 65535
KinesisPort = 443

# If true, throttled puts are not retried. The records that got throttled
# will be failed immediately upon receiving the throttling error. This is
# useful if you want to react immediately to any throttling without waiting
Expand Down Expand Up @@ -236,12 +275,10 @@ kinesis {
# Maximum (inclusive): 16
MinConnections = 1

# Server port to connect to. Only useful with KinesisEndpoint.
#Path to the native KPL binary. Only use this setting if you want to use a custom build of
#the native code.
#
# Default: 443
# Minimum: 1
# Maximum (inclusive): 65535
KinesisPort = 443
#NativeExecutable=

# Limits the maximum allowed put rate for a shard, as a percentage of the
# backend limits.
Expand Down Expand Up @@ -328,11 +365,28 @@ kinesis {
# Maximum (inclusive): 600000
RequestTimeout = 6000

# Temp directory into which to extract the native binaries. The KPL requires write
# permissions in this directory.
#
# If not specified, defaults to /tmp in Unix. (Windows TBD)
# TempDirectory

# Verify the endpoint's certificate. Do not disable unless using
# custom_endpoint for testing. Never disable this in production.
#
# Default: true
VerifyCertificate = true

# Sets the threading model that the native process will use.
# Enum:
# ThreadingModel.PER_REQUEST: Tells the native process to create a thread for each request.
# ThreadingModel.POOLED: Tells the native process to use a thread pool. The size of the pool can be controlled by ThreadPoolSize
# Default = ThreadingModel.PER_REQUEST
# ThreadingModel =

# Sets the maximum number of threads that the native process' thread pool will be configured with.
# Default: 0
# ThreadPoolSize =
}

}
Expand Down Expand Up @@ -475,6 +529,10 @@ kinesis {
# Default: null
#kinesisEndpoint = https://kinesis

# DynamoDB endpoint
# Default: null
#DynamoDBEndpoint =

# Don't call processRecords() on the record processor for empty record lists.
# Enables applications flush/checkpoint (if they have some data "in progress but don't get new data for while)
#
Expand Down Expand Up @@ -554,10 +612,23 @@ kinesis {
# Default: 10
#initialLeaseTableWriteCapacity = 10

# If set to true, the Worker will not sync shards and leases during initialization if there are one or more leases
# in the lease table. This assumes that the shards and leases are in-sync.
# This enables customers to choose faster startup times (e.g. during incremental deployments of an application).
#
# skipShardSyncAtStartupIfLeasesExist Should Worker skip syncing shards and leases at startup (Worker initialization).
# Default: false
#skipShardSyncAtStartupIfLeasesExist=


# Override the default user agent (application name)
#
# Default: <applicationName>
#userAgent =

# TableName name of the lease table in DynamoDB
# Default = <applicationName>
#TableName =
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ class KinesisConsumerSpec
| kcl {
| AWSCredentialsProvider = EnvironmentVariableCredentialsProvider
| regionName = us-east-1
| KinesisEndpoint = "CustomKinesisEndpoint"
| DynamoDBEndpoint = "CustomDynamoDBEndpoint"
| SkipShardSyncAtStartupIfLeasesExist = true
| TableName = "TableName"
| }
| }
|
Expand Down Expand Up @@ -132,6 +136,12 @@ class KinesisConsumerSpec
"TestSpec-test-kinesis-reliability"
)
consumerConf.kclConfiguration.getStreamName should be("test-kinesis-reliability")
consumerConf.kclConfiguration.getKinesisEndpoint should be("CustomKinesisEndpoint") //validate an override property
consumerConf.kclConfiguration.getDynamoDBEndpoint should be("CustomDynamoDBEndpoint") //validate an override property
consumerConf.kclConfiguration.getSkipShardSyncAtWorkerInitializationIfLeasesExist should be(
true
) //validate an override property
consumerConf.kclConfiguration.getTableName should be("TableName") //validate an override property

val credentialsProvider = consumerConf.kclConfiguration.getKinesisCredentialsProvider
.asInstanceOf[AWSCredentialsProviderChain]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ class KinesisProducerActorSpec
|
| kpl {
| Region = us-east-1
| KinesisEndpoint = "CustomKinesisEndpoint"
| KinesisPort = 1111
| CredentialsRefreshDelay = 5001
| CloudwatchEndpoint = "CustomCloudWatchEndpoint"
| CloudwatchPort = 2222
| EnableCoreDumps = true
| NativeExecutable = "NativeExecutable"
| TempDirectory = "TempDirectory"
| ThreadPoolSize = 1
| ThreadingModel = "ThreadingModel.POOLED"
| }
| }
|}
Expand All @@ -81,8 +91,18 @@ class KinesisProducerActorSpec
val producerConf = ProducerConf(kinesisConfig, "testProducer")

producerConf.dispatcher should be(Some("kinesis.akka.default-dispatcher"))
producerConf.kplConfig.getString("Region") should be("us-east-1") //validate an override properly
producerConf.kplConfig.getBoolean("AggregationEnabled") should be(true) //validate a default property
producerConf.kplConfig.getString("Region") should be("us-east-1") //validate an override properly
producerConf.kplConfig.getBoolean("AggregationEnabled") should be(true) //validate a default property
producerConf.kplConfig.getString("KinesisEndpoint") should be("CustomKinesisEndpoint") //validate an override property
producerConf.kplConfig.getLong("KinesisPort") should be(1111) //validate an override property
producerConf.kplConfig.getLong("CredentialsRefreshDelay") should be(5001) //validate an override property
producerConf.kplConfig.getString("CloudwatchEndpoint") should be("CustomCloudWatchEndpoint") //validate an override property
producerConf.kplConfig.getLong("CloudwatchPort") should be(2222) //validate an override property
producerConf.kplConfig.getBoolean("EnableCoreDumps") should be(true) //validate an override property
producerConf.kplConfig.getString("NativeExecutable") should be("NativeExecutable") //validate an override property
producerConf.kplConfig.getString("TempDirectory") should be("TempDirectory") //validate an override property
producerConf.kplConfig.getString("ThreadingModel") should be("ThreadingModel.POOLED") //validate an override property
producerConf.kplConfig.getInt("ThreadPoolSize") should be(1) //validate an override property
producerConf.throttlingConf.get.maxOutstandingRequests should be(50000)
producerConf.throttlingConf.get.retryDuration should be(100.millis)
producerConf.streamName should be("core-test-kinesis-producer")
Expand Down

0 comments on commit 4f84e22

Please sign in to comment.