Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Feature/integration testing #34

Merged
merged 40 commits into from
Dec 3, 2017
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
d30fe19
Upgrading KPL/KCL libraries, replacing deprecated shutdown calls
Aug 30, 2017
a0a69ea
auto format from compile
Aug 30, 2017
a7179f0
Merge branch 'master' of github.com:WW-Digital/reactive-kinesis into …
markglh Sep 7, 2017
8c1211a
restructure the ProducerConf into its own file and spec
markglh Sep 7, 2017
1c9fa2d
further refactoring of Producer to use ProducerConf
markglh Sep 7, 2017
06f2cba
tests pass - ish
markglh Sep 7, 2017
7098a97
improve test exception for manager
markglh Sep 7, 2017
316ca57
Fix Intermittent failing test - #10
markglh Sep 7, 2017
90fa3f5
refactored producer to remove pointless trait, updated readme and specs
markglh Sep 8, 2017
32fe708
Merge branch 'master' into feature/Issue28-TestKPLProperties
markglh Sep 8, 2017
777f375
removed unused config
markglh Sep 8, 2017
a758a85
fix scalafmt
markglh Sep 8, 2017
efae23a
address potters comments
markglh Sep 8, 2017
2997479
address formatting issues
markglh Sep 8, 2017
9f60a4c
added ConsumerConfig test plus missing consumer fields
markglh Sep 8, 2017
6fb2fa5
improved thread reference docs
markglh Sep 9, 2017
d77b0d1
removed comment
markglh Sep 9, 2017
1ed73af
added docker compose localstack environment
markglh Sep 10, 2017
a9362c0
fixed readme
markglh Sep 10, 2017
0ec6bcc
more readme tweaks
markglh Sep 10, 2017
7257f8f
readme fixes
markglh Sep 10, 2017
b19ad1a
readme fixes
markglh Sep 10, 2017
500c521
added integration test for producer, pending CI infra
markglh Sep 11, 2017
9cac2e3
Merge branch 'master' of github.com:WW-Digital/reactive-kinesis into …
markglh Sep 11, 2017
d4daa54
fix formatting
markglh Sep 11, 2017
f302f9f
moved localstack stuff to new image
markglh Sep 13, 2017
f433b3a
fix previous unintended change
markglh Sep 13, 2017
a104e12
formatting
markglh Sep 13, 2017
42e6c5c
removed localstack readme
markglh Dec 2, 2017
8606d96
reworked integration test setup
markglh Dec 2, 2017
bc4ea0d
fix formatting
markglh Dec 2, 2017
a7d59d0
added travis docker-compose
markglh Dec 2, 2017
d0c7335
tweaked travis compose
markglh Dec 2, 2017
8a31827
upgrade docker compose
markglh Dec 2, 2017
2b89d2b
upgrade docker compose
markglh Dec 2, 2017
7473195
fixed travis yaml
markglh Dec 2, 2017
04d9384
run docker compose as daemon
markglh Dec 2, 2017
f8e8166
added integration tests to build
markglh Dec 2, 2017
fb393b2
remove debug, update scala, improve readme
markglh Dec 3, 2017
47275c6
Merge branch 'master' into feature/IntegrationTesting
markglh Dec 3, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@ sudo: false
language: scala
scala:
- 2.11.11
- 2.12.2
- 2.12.4
jdk:
- oraclejdk8
cache:
directories:
- "$HOME/.m2/repository"
- "$HOME/.sbt"
- "$HOME/.ivy2"
before_script:
- docker-compose -f localstack/docker-compose.yml pull
- docker-compose -f localstack/docker-compose.yml up -d
- docker ps -a
script:
- sbt ++$TRAVIS_SCALA_VERSION clean coverage scalafmtTest test
- sbt ++$TRAVIS_SCALA_VERSION it:test
# Tagged releases
- if [ $TRAVIS_TEST_RESULT -eq 0 -a "$TRAVIS_PULL_REQUEST" = "false" ] && [[ "$TRAVIS_TAG"
=~ ^v[0-9]+\.[0-9]+(\.[0-9]+)?(-M[0-9]+)?$ ]]; then echo "** Publishing Release
Expand All @@ -23,14 +28,21 @@ script:
= "master" ]; then echo "** Publishing Snapshot from master **" && tar xf secrets.tar
&& mkdir ~/.bintray && cp publishing/.credentials ~/.bintray/ && cp publishing/.artifactory
~/.bintray/ && sbt ++$TRAVIS_SCALA_VERSION publish; fi
after_script:
- docker-compose -f localstack/docker-compose.yml down
after_success:
- sbt ++$TRAVIS_SCALA_VERSION coverageReport coveralls
before_cache:
- find $HOME/.ivy2/cache -name "ivydata-*.properties" -print -delete
- find $HOME/.sbt -name "*.lock" -print -delete
env:
global:
- DOCKER_COMPOSE_VERSION=1.17.1
- secure: KffThTeOg0ELt+iw5ZNtDW6x4/1iKIj1v91Yq6earF+YQquusROqt/MtdmyXRG9ho0c/xnB82BJHrBsvac6SwsPUZ/cRvQ0KML3bWyCMCL2B/SurhjxbahAWdgQGj67KO1n72q24cqyRMb9wCCOwy4p/7qomBYM+m8YRLc6YOxxj6DKriIiqOq8jFmt/nktpCUhXVURYvFxHtHQvLutaWWICwGZX1ibyloCmC3wWPMNOFc/1FBqLXNQTUTKuYOOC46TJ5janON1E2obDKZyGf/Q/ZEuNymcJYKi6/7XWzN24Ep194PLGj29J44Zpp9P36hnIiET8dQ1s2Dafo+alubqFEUaP4fV8XyYu5ZtTt5A0wxKEe/u7N4SdxzUZF/zBszCLRL2uJhQlfC7af6keCpIVoKLDi+9b6zpSVLDZmE1UmFuu8LLoMY2s2xRt6vHXTSiyPrIJaIFyaryxjhH+KmKNE0GW1jzVkPc6NpxjnVtF2E7LYogHWHzJLoMpBtTQoAeJlKvtbEmWrZXO9TGPFzu0fb+x/H0Ocre2QDSnjKobtMZs6S8zvD+STfmrZ8ZtZ0H0HNbGJW/rPkSZFLunOPXWxSZzNoWNrzg9LCoPtdkVsa7BYiAY9fM9xNX8SOx/kRZtw52el71jK8mDCV/lQjzjBwyo3AZl/jOx3cbWrAM=
- secure: KClWy/cCpMSKyJmVNOt8Dmwfatgx/9KsffLcL/BlXUYsBkB3jwdr9SzFok1ArGd02FzrFQU739/wnSKpmsW1fEO64tlh+qJwW76+gFlqWmOjc6r7ENH2u6LGLfRYeD4r76CiEV9mnU5GVMUPJIfbxlP0zB7ukzSGZwbkJ4WBqlxmurRr70jWz2F3ytJM0r31rpqc+ypHi58vj0NPBO7bR6XI4kquW9cDpCCgj4CVVshpsxUB1N4RPzcGsxUPucigH71lJ0sZjnEjSB+E95/Ovbf7PuPYnY0N8S6hLEgTyh7xAcxNpMsFgd1GInb92hrXLujUZU5N2BeX7jnYVa90q55AZd+IQqmJU+/4tBBrgWvppMIiTbbU5pnDoNqHNWmfB4wtbxwNW4y4tQH+/wSVPhgGYOWfsj3nLNnyfcjRAClQzEWCoAbx7Tkgj1OmvrOqwwmiPz3dFMInawQ6RkisqwgjT/sAhNGa2i26Tpj5Sd8XAjxU61MAzsr8gpVdA6TJcSdHkHC2zZVn+64Cc9WHJ9I6WbRD0fR3pFeTbQv+0SOUWV1owqtdGLl6z0gdI5rLZIYO7SMeRPjTYf/5NV/WxF2LcJu06LHYpzURaGQHp6T5NH2eEZof1Dt5tqa2eQ7KfZyui3rz3WB0AQlJSPK9yL78BvZzNlCNWbv5wLiW6hQ=
before_install:
- sudo rm /usr/local/bin/docker-compose
- curl -L https://github.com/docker/compose/releases/download/${DOCKER_COMPOSE_VERSION}/docker-compose-`uname -s`-`uname -m` > docker-compose
- chmod +x docker-compose
- sudo mv docker-compose /usr/local/bin
- 'if [ "$TRAVIS_PULL_REQUEST" = "false" ]; then openssl aes-256-cbc -K $encrypted_09ef3e55c311_key -iv $encrypted_09ef3e55c311_iv -in secrets.tar.enc -out secrets.tar -d; fi'
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ It's worth familiarising yourself with [Sequence numbers and Sub sequence number
* [FAQ](#faq)
* [Contributor Guide](#contributor-guide)
* [Code Formatting](#contributor-guide-code-formatting)
* [Integration Tests](#contributor-guide-integration-tests)
* [Tag Requirements](#contributor-guide-tag-requirements)
* [Version information](#contributor-guide-tag-requirements-version-information)
* [Valid Release Tag Examples:](#contributor-guide-tag-requirements-valid-release-tag-examples)
Expand Down Expand Up @@ -511,6 +512,12 @@ This project uses [scalafmt](http://scalameta.org/scalafmt/) and will automatica

Please run `sbt scalafmt` before committing and pushing changes.

<a name="contributor-guide-integration-tests"></a>
## Integration tests
As part of the travis build, integration tests will run against a Kinesis localstack instance. You can run these locally as follows:
* `docker-compose -f localstack/docker-compose.yml up`
* `sbt it:test`

<a name="contributor-guide-tag-requirements"></a>
## Tag Requirements
Uses tags and [sbt-git](https://github.com/sbt/sbt-git) to determine the current version.
Expand Down
16 changes: 10 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ lazy val library =
"org.scalactic" %% "scalactic" % Version.scalaTest % Compile)

val testing = Seq(
"org.scalatest" %% "scalatest" % Version.scalaTest % Test,
"org.scalacheck" %% "scalacheck" % Version.scalaCheck % Test,
"com.typesafe.akka" %% "akka-testkit" % Version.akka % Test,
"org.mockito" % "mockito-core" % "2.7.15" % Test,
"org.scalatest" %% "scalatest" % Version.scalaTest % "it,test",
"org.scalacheck" %% "scalacheck" % Version.scalaCheck % "it,test",
"com.typesafe.akka" %% "akka-testkit" % Version.akka % "it,test",
"org.mockito" % "mockito-core" % "2.7.15" % "it,test",
"io.kamon" %% "kamon-core" % Version.kamon % Test,
"io.kamon" %% "kamon-akka-2.4" % Version.kamon % Test,
"io.kamon" %% "kamon-statsd" % Version.kamon % Test,
Expand Down Expand Up @@ -131,7 +131,7 @@ lazy val commonSettings =
"-Ywarn-infer-any", // Warn when a type argument is inferred to be `Any`.
"-Ywarn-nullary-override", // Warn when non-nullary `def f()' overrides nullary `def f'.
"-Ywarn-nullary-unit", // Warn when nullary methods return Unit.
"-Ywarn-numeric-widen" // Warn when numerics are widened.
"-Ywarn-numeric-widen" // Warn when numerics are widened.
),
scalacOptions in (Compile, doc) ++= Seq(
"-no-link-warnings" // Suppresses problems with Scaladoc @throws links
Expand All @@ -146,7 +146,11 @@ lazy val commonSettings =
val project = Project.extract(state).currentRef.project
s"[$project]> "
},
parallelExecution in Test := false
parallelExecution in Test := false,
parallelExecution in IntegrationTest := false,
fork in IntegrationTest := true,
javaOptions in IntegrationTest += "-Dcom.amazonaws.sdk.disableCertChecking=true",
envVars in IntegrationTest += ("AWS_CBOR_DISABLE" -> "true")
)

/* This allows to derive an sbt version string from the git information.
Expand Down
23 changes: 23 additions & 0 deletions localstack/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=<AWS_ACCESS_KEY_ID>
AWS_SECRET_ACCESS_KEY=<AWS_SECRET_ACCESS_KEY>
AWS_CBOR_DISABLE=true
APPLICATION_NAME=myReallyAwesomeApplication
KINESIS_STREAM_NAME=my-stream
KPL_KINESIS_ENDPOINT=localhost
KPL_KINESIS_PORT=4568
KPL_CLOUDWATCH_ENDPOINT=localhost
KPL_CLOUDWATCH_PORT=4582
KPL_VERIFY_CERTIFICATE=false
KCL_KINESIS_ENDPOINT=https://localhost:4568
KCL_DYNAMODB_ENDPOINT=https://localhost:4569
ADDITIONAL_JAVA_OPTS=-Dcom.amazonaws.sdk.disableCertChecking=true
LOCALSTACK_SERVICES=kinesis,dynamodb,sqs,cloudwatch,cloudformation
LOCALSTACK_HOSTNAME=localhost
LOCALSTACK_USE_SSL=true
LOCALSTACK_KINESIS_ERROR_PROBABILITY=0.0
LOCALSTACK_DYNAMODB_ERROR_PROBABILITY=0.0
LOCALSTACK_LAMBDA_EXECUTOR=local
LOCALSTACK_LAMBDA_REMOTE_DOCKER=false
LOCALSTACK_DATA_DIR=/tmp/localstack/data
LOCALSTACK_DOCKER_IMAGE_TAG=0.7.5
27 changes: 27 additions & 0 deletions localstack/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
version: "2.3"

services:
localstack:
image: markglh/initialised-localstack:0.8.3
volumes:
- ./templates:/opt/bootstrap/templates
#network_mode: "host"
environment:
- "SERVICES=${LOCALSTACK_SERVICES:-kinesis,dynamodb,cloudwatch,cloudformation}"
- "DEFAULT_REGION=${AWS_REGION:-us-east-1}"
- "HOSTNAME=${LOCALSTACK_HOSTNAME:-localhost}"
- "HOSTNAME_EXTERNAL=${LOCALSTACK_HOSTNAME_EXTERNAL:-localhost}"
- "USE_SSL=true"
#- "DATA_DIR=${LOCALSTACK_DATA_DIR:-/tmp/localstack/data}"
ports:
- "4567-4582:4567-4582"
- "8080:8080"

# Ensures the health check runs
dummy-service:
image: alpine:3.5
depends_on:
localstack:
condition: service_healthy


23 changes: 23 additions & 0 deletions localstack/templates/cftemplate.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
AWSTemplateFormatVersion: '2010-09-09'
Description: CloudFormation Int Test Template
Resources:
KinesisStream1:
Type: AWS::Kinesis::Stream
Properties:
Name: int-test-stream-1
ShardCount: 1
KinesisStream2:
Type: AWS::Kinesis::Stream
Properties:
Name: int-test-stream-2
ShardCount: 1
KinesisStream3:
Type: AWS::Kinesis::Stream
Properties:
Name: int-test-stream-3
ShardCount: 1
KinesisStream4:
Type: AWS::Kinesis::Stream
Properties:
Name: int-test-stream-4
ShardCount: 1
58 changes: 0 additions & 58 deletions src/it/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,59 +1 @@

kamon {
metric.filters {
akka-actor {
includes = ["test-system/user/simple-consumer/consumer-worker", "test-system/user/simple-consumer/consumer-worker*/checkpoint-worker*"]
excludes = ["test-system/system/**"]
}

akka-dispatcher {
includes = ["test-system/akka.actor.default-dispatcher", "test-system/kinesis.akka.default-dispatcher"]
}

trace {
includes = [ "**" ]
excludes = []
}

#akka-router {
# includes = [ "test-system/user/some-router" ]
#}
}

statsd {
# Hostname and port in which your StatsD is running. Remember that StatsD packets are sent using UDP and
# setting unreachable hosts and/or not open ports wont be warned by the Kamon, your data wont go anywhere.
hostname = "127.0.0.1"
port = 8125

# Interval between metrics data flushes to StatsD. It's value must be equal or greater than the
# kamon.metrics.tick-interval setting.
flush-interval = 1 second

# Max packet size for UDP metrics data sent to StatsD.
max-packet-size = 1024 bytes

# Subscription patterns used to select which metrics will be pushed to StatsD. Note that first, metrics
# collection for your desired entities must be activated under the kamon.metrics.filters settings.
includes {
actor = ["*"]
trace = ["*"]
dispatcher = ["*"]
}

simple-metric-key-generator {
# Application prefix for all metrics pushed to StatsD. The default namespacing scheme for metrics follows
# this pattern:
# application.host.entity.entity-name.metric-name
application = "kinesis-test"
}
}

# modules can be disabled at startup using yes/no arguments.
modules {
kamon-log-reporter.auto-start = no
kamon-system-metrics.auto-start = no
kamon-statsd.auto-start = no
kamon-akka.auto-start = no
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright 2017 WeightWatchers
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.weightwatchers.reactive.kinesis

import java.io.File

import com.amazonaws.services.kinesis.producer.{KinesisProducer => AWSKinesisProducer}
import com.typesafe.config.ConfigFactory
import com.weightwatchers.reactive.kinesis.common.{KinesisTestConsumer, TestCredentials}
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.ConsumerConf
import com.weightwatchers.reactive.kinesis.models.ProducerEvent
import com.weightwatchers.reactive.kinesis.producer.{KinesisProducer, ProducerConf}
import org.scalatest.concurrent.Eventually
import org.scalatest.mockito.MockitoSugar
import org.scalatest.time.{Millis, Seconds, Span}
import org.scalatest.{BeforeAndAfterAll, FreeSpec, Matchers}

import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random

//scalastyle:off magic.number
class KinesisProducerIntegrationSpec
extends FreeSpec
with Matchers
with MockitoSugar
with BeforeAndAfterAll
with Eventually {

implicit val ece = scala.concurrent.ExecutionContext.global

val defaultKinesisConfig =
ConfigFactory.parseFile(new File("src/main/resources/reference.conf")).getConfig("kinesis")

val kinesisConfig = ConfigFactory
.parseString(
"""
|kinesis {
|
| application-name = "ScalaProducerTestSpec"
|
| testProducer {
| stream-name = "int-test-stream-1"
|
| kpl {
| Region = us-east-1
|
| CloudwatchEndpoint = localhost
| CloudwatchPort = 4582
|
| KinesisEndpoint = localhost
| KinesisPort = 4568
|
| VerifyCertificate = false
| }
| }
|
| testConsumer {
| stream-name = "int-test-stream-1"
|
| kcl {
| AWSCredentialsProvider = "com.weightwatchers.reactive.kinesis.common.TestCredentials|foo|bar"
| regionName = us-east-1
| KinesisEndpoint = "https://localhost:4568"
| DynamoDBEndpoint = "https://localhost:4569"
|
|
| metricsLevel = None
| }
|
| }
|}
""".stripMargin
)
.getConfig("kinesis")
.withFallback(defaultKinesisConfig)

implicit override val patienceConfig: PatienceConfig =
PatienceConfig(timeout = Span(5, Seconds), interval = Span(100, Millis))

val consumer: KinesisTestConsumer =
KinesisTestConsumer.from(ConsumerConf(kinesisConfig, "testConsumer"), Some(100 millis))

override protected def afterAll(): Unit = {
consumer.shutdown
}

"The KinesisProducer" - {

"Should publish a message to a stream" in {

val producerConf =
ProducerConf(kinesisConfig, "testProducer", Some(TestCredentials.Credentials))
val producer = KinesisProducer(producerConf)

val existingRecordCount = consumer.retrieveRecords(producerConf.streamName, 10).size

val event = ProducerEvent("1234", Random.alphanumeric.take(10).mkString)
producer.addUserRecord(event)

eventually {
val records: Seq[String] = consumer.retrieveRecords(producerConf.streamName, 10)
records.size shouldBe (existingRecordCount + 1)
records should contain(
new String(event.payload.array(), java.nio.charset.StandardCharsets.UTF_8)
)
}
}
}
}

//scalastyle:on
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@ import com.weightwatchers.reactive.kinesis.consumer.ConsumerWorker.{
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.ConsumerConf
import com.weightwatchers.reactive.kinesis.models.CompoundSequenceNumber
import kamon.Kamon
import org.joda.time.{DateTime, DateTimeZone, Period}

import scala.collection.mutable.ListBuffer
import com.weightwatchers.eventing.system

object RunSimpleConsumer extends App {
Kamon.start()
val consumer = system.actorOf(SimpleKinesisConsumer.props, "simple-consumer")
}

Expand Down Expand Up @@ -109,7 +107,6 @@ class SimpleKinesisConsumer(kinesisConfig: Config) extends Actor with LazyLoggin
logger.error(s"\n\nFailed pit stop check @ $totalReceived!\n\n")
logger.error(s"\n\nFailed pit stop check @ $totalReceived!\n\n")
context.stop(self)
Kamon.shutdown()
System.exit(3)
} else {
logger.warn(s"\n\n**** PIT STOP OK: $totalVerified records verified\n\n")
Expand Down
Loading