Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Add Configs, tests, related PR#15 #18

Merged
merged 5 commits into from
Aug 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,4 @@ lazy val headerSettings =
headers := Map("scala" -> Apache2_0("2017", "WeightWatchers"))
)

coverageExcludedPackages := "reference.conf"
83 changes: 77 additions & 6 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,39 @@ kinesis {
# Maximum (inclusive): 300000
ConnectTimeout = 6000

# Use a custom Kinesis and CloudWatch endpoint.

# How often to refresh credentials (in milliseconds).
# During a refresh, credentials are retrieved from any SDK credentials providers attached to
# the wrapper and pushed to the core.
#
# Default: 5000
# Minimum: 1
# Maximum (inclusive): 300000
# CredentialsRefreshDelay =

# Use a custom CloudWatch endpoint.
# Note this does not accept protocols or paths, only host names or ip addresses. There is no
# way to disable TLS. The KPL always connects with TLS.
# Expected pattern: ^([A-Za-z0-9-\\.]+)?$
# CloudwatchEndpoint =

# Server port to connect to for CloudWatch.
# Default: 443
# Minimum: 1
# Maximum (inclusive): 65535
# CloudwatchPort =

# This has no effect on Windows.
# If set to true, the KPL native process will attempt to raise its own core file size soft
# limit to 128MB, or the hard limit, whichever is lower. If the soft limit is already at or
# above the target amount, it is not changed.
# Note that even if the limit is successfully raised (or already sufficient), it does not
# guarantee that core files will be written on a crash, since that is dependent on operation
# system settings that's beyond the control of individual processes.
# Default: false
EnableCoreDumps = false

# Use a custom Kinesis endpoint.
#
# Mostly for testing use. Note this does not accept protocols or paths, only
# host names or ip addresses. There is no way to disable TLS. The KPL always
Expand All @@ -129,6 +161,13 @@ kinesis {
# Expected pattern: ^([A-Za-z0-9-\\.]+)?$
# KinesisEndpoint =

# Server port to connect to. Only useful with KinesisEndpoint.
#
# Default: 443
# Minimum: 1
# Maximum (inclusive): 65535
KinesisPort = 443

# If true, throttled puts are not retried. The records that got throttled
# will be failed immediately upon receiving the throttling error. This is
# useful if you want to react immediately to any throttling without waiting
Expand Down Expand Up @@ -236,12 +275,10 @@ kinesis {
# Maximum (inclusive): 16
MinConnections = 1

# Server port to connect to. Only useful with KinesisEndpoint.
#Path to the native KPL binary. Only use this setting if you want to use a custom build of
#the native code.
#
# Default: 443
# Minimum: 1
# Maximum (inclusive): 65535
KinesisPort = 443
#NativeExecutable=

# Limits the maximum allowed put rate for a shard, as a percentage of the
# backend limits.
Expand Down Expand Up @@ -328,11 +365,28 @@ kinesis {
# Maximum (inclusive): 600000
RequestTimeout = 6000

# Temp directory into which to extract the native binaries. The KPL requires write
# permissions in this directory.
#
# If not specified, defaults to /tmp in Unix. (Windows TBD)
# TempDirectory

# Verify the endpoint's certificate. Do not disable unless using
# custom_endpoint for testing. Never disable this in production.
#
# Default: true
VerifyCertificate = true

# Sets the threading model that the native process will use.
# Enum:
# ThreadingModel.PER_REQUEST: Tells the native process to create a thread for each request.
# ThreadingModel.POOLED: Tells the native process to use a thread pool. The size of the pool can be controlled by ThreadPoolSize
# Default = ThreadingModel.PER_REQUEST
# ThreadingModel =

# Sets the maximum number of threads that the native process' thread pool will be configured with.
# Default: 0
# ThreadPoolSize =
}

}
Expand Down Expand Up @@ -475,6 +529,10 @@ kinesis {
# Default: null
#kinesisEndpoint = https://kinesis

# DynamoDB endpoint
# Default: null
#DynamoDBEndpoint =

# Don't call processRecords() on the record processor for empty record lists.
# Enables applications flush/checkpoint (if they have some data "in progress but don't get new data for while)
#
Expand Down Expand Up @@ -554,10 +612,23 @@ kinesis {
# Default: 10
#initialLeaseTableWriteCapacity = 10

# If set to true, the Worker will not sync shards and leases during initialization if there are one or more leases
# in the lease table. This assumes that the shards and leases are in-sync.
# This enables customers to choose faster startup times (e.g. during incremental deployments of an application).
#
# skipShardSyncAtStartupIfLeasesExist Should Worker skip syncing shards and leases at startup (Worker initialization).
# Default: false
#skipShardSyncAtStartupIfLeasesExist=


# Override the default user agent (application name)
#
# Default: <applicationName>
#userAgent =

# TableName name of the lease table in DynamoDB
# Default = <applicationName>
#TableName =
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ class KinesisConsumerSpec
| kcl {
| AWSCredentialsProvider = EnvironmentVariableCredentialsProvider
| regionName = us-east-1
| KinesisEndpoint = "CustomKinesisEndpoint"
| DynamoDBEndpoint = "CustomDynamoDBEndpoint"
| SkipShardSyncAtStartupIfLeasesExist = true
| TableName = "TableName"
| }
| }
|
Expand Down Expand Up @@ -132,6 +136,12 @@ class KinesisConsumerSpec
"TestSpec-test-kinesis-reliability"
)
consumerConf.kclConfiguration.getStreamName should be("test-kinesis-reliability")
consumerConf.kclConfiguration.getKinesisEndpoint should be("CustomKinesisEndpoint") //validate an override property
consumerConf.kclConfiguration.getDynamoDBEndpoint should be("CustomDynamoDBEndpoint") //validate an override property
consumerConf.kclConfiguration.getSkipShardSyncAtWorkerInitializationIfLeasesExist should be(
true
) //validate an override property
consumerConf.kclConfiguration.getTableName should be("TableName") //validate an override property

val credentialsProvider = consumerConf.kclConfiguration.getKinesisCredentialsProvider
.asInstanceOf[AWSCredentialsProviderChain]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ class KinesisProducerActorSpec
|
| kpl {
| Region = us-east-1
| KinesisEndpoint = "CustomKinesisEndpoint"
| KinesisPort = 1111
| CredentialsRefreshDelay = 5001
| CloudwatchEndpoint = "CustomCloudWatchEndpoint"
| CloudwatchPort = 2222
| EnableCoreDumps = true
| NativeExecutable = "NativeExecutable"
| TempDirectory = "TempDirectory"
| ThreadPoolSize = 1
| ThreadingModel = "ThreadingModel.POOLED"
| }
| }
|}
Expand All @@ -81,8 +91,18 @@ class KinesisProducerActorSpec
val producerConf = ProducerConf(kinesisConfig, "testProducer")

producerConf.dispatcher should be(Some("kinesis.akka.default-dispatcher"))
producerConf.kplConfig.getString("Region") should be("us-east-1") //validate an override properly
producerConf.kplConfig.getBoolean("AggregationEnabled") should be(true) //validate a default property
producerConf.kplConfig.getString("Region") should be("us-east-1") //validate an override properly
producerConf.kplConfig.getBoolean("AggregationEnabled") should be(true) //validate a default property
producerConf.kplConfig.getString("KinesisEndpoint") should be("CustomKinesisEndpoint") //validate an override property
producerConf.kplConfig.getLong("KinesisPort") should be(1111) //validate an override property
producerConf.kplConfig.getLong("CredentialsRefreshDelay") should be(5001) //validate an override property
producerConf.kplConfig.getString("CloudwatchEndpoint") should be("CustomCloudWatchEndpoint") //validate an override property
producerConf.kplConfig.getLong("CloudwatchPort") should be(2222) //validate an override property
producerConf.kplConfig.getBoolean("EnableCoreDumps") should be(true) //validate an override property
producerConf.kplConfig.getString("NativeExecutable") should be("NativeExecutable") //validate an override property
producerConf.kplConfig.getString("TempDirectory") should be("TempDirectory") //validate an override property
producerConf.kplConfig.getString("ThreadingModel") should be("ThreadingModel.POOLED") //validate an override property
producerConf.kplConfig.getInt("ThreadPoolSize") should be(1) //validate an override property
producerConf.throttlingConf.get.maxOutstandingRequests should be(50000)
producerConf.throttlingConf.get.retryDuration should be(100.millis)
producerConf.streamName should be("core-test-kinesis-producer")
Expand Down