diff --git a/.travis.yml b/.travis.yml
index fce0567..d3ac60d 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -2,7 +2,7 @@ sudo: false
language: scala
scala:
- 2.11.11
-- 2.12.2
+- 2.12.4
jdk:
- oraclejdk8
cache:
@@ -10,8 +10,13 @@ cache:
- "$HOME/.m2/repository"
- "$HOME/.sbt"
- "$HOME/.ivy2"
+before_script:
+- docker-compose -f localstack/docker-compose.yml pull
+- docker-compose -f localstack/docker-compose.yml up -d
+- docker ps -a
script:
- sbt ++$TRAVIS_SCALA_VERSION clean coverage scalafmtTest test
+- sbt ++$TRAVIS_SCALA_VERSION it:test
# Tagged releases
- if [ $TRAVIS_TEST_RESULT -eq 0 -a "$TRAVIS_PULL_REQUEST" = "false" ] && [[ "$TRAVIS_TAG"
=~ ^v[0-9]+\.[0-9]+(\.[0-9]+)?(-M[0-9]+)?$ ]]; then echo "** Publishing Release
@@ -23,6 +28,8 @@ script:
= "master" ]; then echo "** Publishing Snapshot from master **" && tar xf secrets.tar
&& mkdir ~/.bintray && cp publishing/.credentials ~/.bintray/ && cp publishing/.artifactory
~/.bintray/ && sbt ++$TRAVIS_SCALA_VERSION publish; fi
+after_script:
+- docker-compose -f localstack/docker-compose.yml down
after_success:
- sbt ++$TRAVIS_SCALA_VERSION coverageReport coveralls
before_cache:
@@ -30,7 +37,12 @@ before_cache:
- find $HOME/.sbt -name "*.lock" -print -delete
env:
global:
+ - DOCKER_COMPOSE_VERSION=1.17.1
- secure: KffThTeOg0ELt+iw5ZNtDW6x4/1iKIj1v91Yq6earF+YQquusROqt/MtdmyXRG9ho0c/xnB82BJHrBsvac6SwsPUZ/cRvQ0KML3bWyCMCL2B/SurhjxbahAWdgQGj67KO1n72q24cqyRMb9wCCOwy4p/7qomBYM+m8YRLc6YOxxj6DKriIiqOq8jFmt/nktpCUhXVURYvFxHtHQvLutaWWICwGZX1ibyloCmC3wWPMNOFc/1FBqLXNQTUTKuYOOC46TJ5janON1E2obDKZyGf/Q/ZEuNymcJYKi6/7XWzN24Ep194PLGj29J44Zpp9P36hnIiET8dQ1s2Dafo+alubqFEUaP4fV8XyYu5ZtTt5A0wxKEe/u7N4SdxzUZF/zBszCLRL2uJhQlfC7af6keCpIVoKLDi+9b6zpSVLDZmE1UmFuu8LLoMY2s2xRt6vHXTSiyPrIJaIFyaryxjhH+KmKNE0GW1jzVkPc6NpxjnVtF2E7LYogHWHzJLoMpBtTQoAeJlKvtbEmWrZXO9TGPFzu0fb+x/H0Ocre2QDSnjKobtMZs6S8zvD+STfmrZ8ZtZ0H0HNbGJW/rPkSZFLunOPXWxSZzNoWNrzg9LCoPtdkVsa7BYiAY9fM9xNX8SOx/kRZtw52el71jK8mDCV/lQjzjBwyo3AZl/jOx3cbWrAM=
- secure: KClWy/cCpMSKyJmVNOt8Dmwfatgx/9KsffLcL/BlXUYsBkB3jwdr9SzFok1ArGd02FzrFQU739/wnSKpmsW1fEO64tlh+qJwW76+gFlqWmOjc6r7ENH2u6LGLfRYeD4r76CiEV9mnU5GVMUPJIfbxlP0zB7ukzSGZwbkJ4WBqlxmurRr70jWz2F3ytJM0r31rpqc+ypHi58vj0NPBO7bR6XI4kquW9cDpCCgj4CVVshpsxUB1N4RPzcGsxUPucigH71lJ0sZjnEjSB+E95/Ovbf7PuPYnY0N8S6hLEgTyh7xAcxNpMsFgd1GInb92hrXLujUZU5N2BeX7jnYVa90q55AZd+IQqmJU+/4tBBrgWvppMIiTbbU5pnDoNqHNWmfB4wtbxwNW4y4tQH+/wSVPhgGYOWfsj3nLNnyfcjRAClQzEWCoAbx7Tkgj1OmvrOqwwmiPz3dFMInawQ6RkisqwgjT/sAhNGa2i26Tpj5Sd8XAjxU61MAzsr8gpVdA6TJcSdHkHC2zZVn+64Cc9WHJ9I6WbRD0fR3pFeTbQv+0SOUWV1owqtdGLl6z0gdI5rLZIYO7SMeRPjTYf/5NV/WxF2LcJu06LHYpzURaGQHp6T5NH2eEZof1Dt5tqa2eQ7KfZyui3rz3WB0AQlJSPK9yL78BvZzNlCNWbv5wLiW6hQ=
before_install:
+- sudo rm /usr/local/bin/docker-compose
+- curl -L https://github.com/docker/compose/releases/download/${DOCKER_COMPOSE_VERSION}/docker-compose-`uname -s`-`uname -m` > docker-compose
+- chmod +x docker-compose
+- sudo mv docker-compose /usr/local/bin
- 'if [ "$TRAVIS_PULL_REQUEST" = "false" ]; then openssl aes-256-cbc -K $encrypted_09ef3e55c311_key -iv $encrypted_09ef3e55c311_iv -in secrets.tar.enc -out secrets.tar -d; fi'
diff --git a/README.md b/README.md
index 70ec53d..5da3fa6 100644
--- a/README.md
+++ b/README.md
@@ -27,6 +27,7 @@ It's worth familiarising yourself with [Sequence numbers and Sub sequence number
* [FAQ](#faq)
* [Contributor Guide](#contributor-guide)
* [Code Formatting](#contributor-guide-code-formatting)
+ * [Integration Tests](#contributor-guide-integration-tests)
* [Tag Requirements](#contributor-guide-tag-requirements)
* [Version information](#contributor-guide-tag-requirements-version-information)
* [Valid Release Tag Examples:](#contributor-guide-tag-requirements-valid-release-tag-examples)
@@ -521,6 +522,12 @@ This project uses [scalafmt](http://scalameta.org/scalafmt/) and will automatica
Please run `sbt scalafmt` before committing and pushing changes.
+
+## Integration tests
+As part of the travis build, integration tests will run against a Kinesis localstack instance. You can run these locally as follows:
+* `docker-compose -f localstack/docker-compose.yml up`
+* `sbt it:test`
+
## Tag Requirements
Uses tags and [sbt-git](https://github.com/sbt/sbt-git) to determine the current version.
diff --git a/build.sbt b/build.sbt
index 8d97c81..e550baf 100644
--- a/build.sbt
+++ b/build.sbt
@@ -68,10 +68,10 @@ lazy val library =
"org.scalactic" %% "scalactic" % Version.scalaTest % Compile)
val testing = Seq(
- "org.scalatest" %% "scalatest" % Version.scalaTest % Test,
- "org.scalacheck" %% "scalacheck" % Version.scalaCheck % Test,
- "com.typesafe.akka" %% "akka-testkit" % Version.akka % Test,
- "org.mockito" % "mockito-core" % "2.7.15" % Test,
+ "org.scalatest" %% "scalatest" % Version.scalaTest % "it,test",
+ "org.scalacheck" %% "scalacheck" % Version.scalaCheck % "it,test",
+ "com.typesafe.akka" %% "akka-testkit" % Version.akka % "it,test",
+ "org.mockito" % "mockito-core" % "2.7.15" % "it,test",
"io.kamon" %% "kamon-core" % Version.kamon % Test,
"io.kamon" %% "kamon-akka-2.4" % Version.kamon % Test,
"io.kamon" %% "kamon-statsd" % Version.kamon % Test,
@@ -131,7 +131,7 @@ lazy val commonSettings =
"-Ywarn-infer-any", // Warn when a type argument is inferred to be `Any`.
"-Ywarn-nullary-override", // Warn when non-nullary `def f()' overrides nullary `def f'.
"-Ywarn-nullary-unit", // Warn when nullary methods return Unit.
- "-Ywarn-numeric-widen" // Warn when numerics are widened.
+ "-Ywarn-numeric-widen" // Warn when numerics are widened.
),
scalacOptions in (Compile, doc) ++= Seq(
"-no-link-warnings" // Suppresses problems with Scaladoc @throws links
@@ -146,7 +146,11 @@ lazy val commonSettings =
val project = Project.extract(state).currentRef.project
s"[$project]> "
},
- parallelExecution in Test := false
+ parallelExecution in Test := false,
+ parallelExecution in IntegrationTest := false,
+ fork in IntegrationTest := true,
+ javaOptions in IntegrationTest += "-Dcom.amazonaws.sdk.disableCertChecking=true",
+ envVars in IntegrationTest += ("AWS_CBOR_DISABLE" -> "true")
)
/* This allows to derive an sbt version string from the git information.
diff --git a/localstack/.env b/localstack/.env
new file mode 100644
index 0000000..788a9ef
--- /dev/null
+++ b/localstack/.env
@@ -0,0 +1,23 @@
+AWS_REGION=us-east-1
+AWS_ACCESS_KEY_ID=
+AWS_SECRET_ACCESS_KEY=
+AWS_CBOR_DISABLE=true
+APPLICATION_NAME=myReallyAwesomeApplication
+KINESIS_STREAM_NAME=my-stream
+KPL_KINESIS_ENDPOINT=localhost
+KPL_KINESIS_PORT=4568
+KPL_CLOUDWATCH_ENDPOINT=localhost
+KPL_CLOUDWATCH_PORT=4582
+KPL_VERIFY_CERTIFICATE=false
+KCL_KINESIS_ENDPOINT=https://localhost:4568
+KCL_DYNAMODB_ENDPOINT=https://localhost:4569
+ADDITIONAL_JAVA_OPTS=-Dcom.amazonaws.sdk.disableCertChecking=true
+LOCALSTACK_SERVICES=kinesis,dynamodb,sqs,cloudwatch,cloudformation
+LOCALSTACK_HOSTNAME=localhost
+LOCALSTACK_USE_SSL=true
+LOCALSTACK_KINESIS_ERROR_PROBABILITY=0.0
+LOCALSTACK_DYNAMODB_ERROR_PROBABILITY=0.0
+LOCALSTACK_LAMBDA_EXECUTOR=local
+LOCALSTACK_LAMBDA_REMOTE_DOCKER=false
+LOCALSTACK_DATA_DIR=/tmp/localstack/data
+LOCALSTACK_DOCKER_IMAGE_TAG=0.7.5
diff --git a/localstack/docker-compose.yml b/localstack/docker-compose.yml
new file mode 100644
index 0000000..bac298f
--- /dev/null
+++ b/localstack/docker-compose.yml
@@ -0,0 +1,27 @@
+version: "2.3"
+
+services:
+ localstack:
+ image: markglh/initialised-localstack:0.8.3
+ volumes:
+ - ./templates:/opt/bootstrap/templates
+ #network_mode: "host"
+ environment:
+ - "SERVICES=${LOCALSTACK_SERVICES:-kinesis,dynamodb,cloudwatch,cloudformation}"
+ - "DEFAULT_REGION=${AWS_REGION:-us-east-1}"
+ - "HOSTNAME=${LOCALSTACK_HOSTNAME:-localhost}"
+ - "HOSTNAME_EXTERNAL=${LOCALSTACK_HOSTNAME_EXTERNAL:-localhost}"
+ - "USE_SSL=true"
+ #- "DATA_DIR=${LOCALSTACK_DATA_DIR:-/tmp/localstack/data}"
+ ports:
+ - "4567-4582:4567-4582"
+ - "8080:8080"
+
+ # Ensures the health check runs
+ dummy-service:
+ image: alpine:3.5
+ depends_on:
+ localstack:
+ condition: service_healthy
+
+
diff --git a/localstack/templates/cftemplate.yml b/localstack/templates/cftemplate.yml
new file mode 100644
index 0000000..3352c10
--- /dev/null
+++ b/localstack/templates/cftemplate.yml
@@ -0,0 +1,23 @@
+AWSTemplateFormatVersion: '2010-09-09'
+Description: CloudFormation Int Test Template
+Resources:
+ KinesisStream1:
+ Type: AWS::Kinesis::Stream
+ Properties:
+ Name: int-test-stream-1
+ ShardCount: 1
+ KinesisStream2:
+ Type: AWS::Kinesis::Stream
+ Properties:
+ Name: int-test-stream-2
+ ShardCount: 1
+ KinesisStream3:
+ Type: AWS::Kinesis::Stream
+ Properties:
+ Name: int-test-stream-3
+ ShardCount: 1
+ KinesisStream4:
+ Type: AWS::Kinesis::Stream
+ Properties:
+ Name: int-test-stream-4
+ ShardCount: 1
diff --git a/src/it/resources/application.conf b/src/it/resources/application.conf
index ce36919..8b13789 100644
--- a/src/it/resources/application.conf
+++ b/src/it/resources/application.conf
@@ -1,59 +1 @@
-kamon {
- metric.filters {
- akka-actor {
- includes = ["test-system/user/simple-consumer/consumer-worker", "test-system/user/simple-consumer/consumer-worker*/checkpoint-worker*"]
- excludes = ["test-system/system/**"]
- }
-
- akka-dispatcher {
- includes = ["test-system/akka.actor.default-dispatcher", "test-system/kinesis.akka.default-dispatcher"]
- }
-
- trace {
- includes = [ "**" ]
- excludes = []
- }
-
- #akka-router {
- # includes = [ "test-system/user/some-router" ]
- #}
- }
-
- statsd {
- # Hostname and port in which your StatsD is running. Remember that StatsD packets are sent using UDP and
- # setting unreachable hosts and/or not open ports wont be warned by the Kamon, your data wont go anywhere.
- hostname = "127.0.0.1"
- port = 8125
-
- # Interval between metrics data flushes to StatsD. It's value must be equal or greater than the
- # kamon.metrics.tick-interval setting.
- flush-interval = 1 second
-
- # Max packet size for UDP metrics data sent to StatsD.
- max-packet-size = 1024 bytes
-
- # Subscription patterns used to select which metrics will be pushed to StatsD. Note that first, metrics
- # collection for your desired entities must be activated under the kamon.metrics.filters settings.
- includes {
- actor = ["*"]
- trace = ["*"]
- dispatcher = ["*"]
- }
-
- simple-metric-key-generator {
- # Application prefix for all metrics pushed to StatsD. The default namespacing scheme for metrics follows
- # this pattern:
- # application.host.entity.entity-name.metric-name
- application = "kinesis-test"
- }
- }
-
- # modules can be disabled at startup using yes/no arguments.
- modules {
- kamon-log-reporter.auto-start = no
- kamon-system-metrics.auto-start = no
- kamon-statsd.auto-start = no
- kamon-akka.auto-start = no
- }
-}
diff --git a/src/it/scala/com/weightwatchers/reactive.kinesis/KinesisProducerIntegrationSpec.scala b/src/it/scala/com/weightwatchers/reactive.kinesis/KinesisProducerIntegrationSpec.scala
new file mode 100644
index 0000000..2e75c38
--- /dev/null
+++ b/src/it/scala/com/weightwatchers/reactive.kinesis/KinesisProducerIntegrationSpec.scala
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2017 WeightWatchers
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.weightwatchers.reactive.kinesis
+
+import java.io.File
+
+import com.amazonaws.services.kinesis.producer.{KinesisProducer => AWSKinesisProducer}
+import com.typesafe.config.ConfigFactory
+import com.weightwatchers.reactive.kinesis.common.{KinesisTestConsumer, TestCredentials}
+import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.ConsumerConf
+import com.weightwatchers.reactive.kinesis.models.ProducerEvent
+import com.weightwatchers.reactive.kinesis.producer.{KinesisProducer, ProducerConf}
+import org.scalatest.concurrent.Eventually
+import org.scalatest.mockito.MockitoSugar
+import org.scalatest.time.{Millis, Seconds, Span}
+import org.scalatest.{BeforeAndAfterAll, FreeSpec, Matchers}
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.Random
+
+//scalastyle:off magic.number
+class KinesisProducerIntegrationSpec
+ extends FreeSpec
+ with Matchers
+ with MockitoSugar
+ with BeforeAndAfterAll
+ with Eventually {
+
+ implicit val ece = scala.concurrent.ExecutionContext.global
+
+ val defaultKinesisConfig =
+ ConfigFactory.parseFile(new File("src/main/resources/reference.conf")).getConfig("kinesis")
+
+ val kinesisConfig = ConfigFactory
+ .parseString(
+ """
+ |kinesis {
+ |
+ | application-name = "ScalaProducerTestSpec"
+ |
+ | testProducer {
+ | stream-name = "int-test-stream-1"
+ |
+ | kpl {
+ | Region = us-east-1
+ |
+ | CloudwatchEndpoint = localhost
+ | CloudwatchPort = 4582
+ |
+ | KinesisEndpoint = localhost
+ | KinesisPort = 4568
+ |
+ | VerifyCertificate = false
+ | }
+ | }
+ |
+ | testConsumer {
+ | stream-name = "int-test-stream-1"
+ |
+ | kcl {
+ | AWSCredentialsProvider = "com.weightwatchers.reactive.kinesis.common.TestCredentials|foo|bar"
+ | regionName = us-east-1
+ | KinesisEndpoint = "https://localhost:4568"
+ | DynamoDBEndpoint = "https://localhost:4569"
+ |
+ |
+ | metricsLevel = None
+ | }
+ |
+ | }
+ |}
+ """.stripMargin
+ )
+ .getConfig("kinesis")
+ .withFallback(defaultKinesisConfig)
+
+ implicit override val patienceConfig: PatienceConfig =
+ PatienceConfig(timeout = Span(5, Seconds), interval = Span(100, Millis))
+
+ val consumer: KinesisTestConsumer =
+ KinesisTestConsumer.from(ConsumerConf(kinesisConfig, "testConsumer"), Some(100 millis))
+
+ override protected def afterAll(): Unit = {
+ consumer.shutdown
+ }
+
+ "The KinesisProducer" - {
+
+ "Should publish a message to a stream" in {
+
+ val producerConf =
+ ProducerConf(kinesisConfig, "testProducer", Some(TestCredentials.Credentials))
+ val producer = KinesisProducer(producerConf)
+
+ val existingRecordCount = consumer.retrieveRecords(producerConf.streamName, 10).size
+
+ val event = ProducerEvent("1234", Random.alphanumeric.take(10).mkString)
+ producer.addUserRecord(event)
+
+ eventually {
+ val records: Seq[String] = consumer.retrieveRecords(producerConf.streamName, 10)
+ records.size shouldBe (existingRecordCount + 1)
+ records should contain(
+ new String(event.payload.array(), java.nio.charset.StandardCharsets.UTF_8)
+ )
+ }
+ }
+ }
+}
+
+//scalastyle:on
diff --git a/src/it/scala/com/weightwatchers/reactive.kinesis/SimpleKinesisConsumer.scala b/src/it/scala/com/weightwatchers/reactive.kinesis/SimpleKinesisConsumer.scala
index 29cfaff..306dbcd 100644
--- a/src/it/scala/com/weightwatchers/reactive.kinesis/SimpleKinesisConsumer.scala
+++ b/src/it/scala/com/weightwatchers/reactive.kinesis/SimpleKinesisConsumer.scala
@@ -13,14 +13,12 @@ import com.weightwatchers.reactive.kinesis.consumer.ConsumerWorker.{
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.ConsumerConf
import com.weightwatchers.reactive.kinesis.models.CompoundSequenceNumber
-import kamon.Kamon
import org.joda.time.{DateTime, DateTimeZone, Period}
import scala.collection.mutable.ListBuffer
import com.weightwatchers.eventing.system
object RunSimpleConsumer extends App {
- Kamon.start()
val consumer = system.actorOf(SimpleKinesisConsumer.props, "simple-consumer")
}
@@ -109,7 +107,6 @@ class SimpleKinesisConsumer(kinesisConfig: Config) extends Actor with LazyLoggin
logger.error(s"\n\nFailed pit stop check @ $totalReceived!\n\n")
logger.error(s"\n\nFailed pit stop check @ $totalReceived!\n\n")
context.stop(self)
- Kamon.shutdown()
System.exit(3)
} else {
logger.warn(s"\n\n**** PIT STOP OK: $totalVerified records verified\n\n")
diff --git a/src/it/scala/com/weightwatchers/reactive.kinesis/common/KinesisTestConsumer.scala b/src/it/scala/com/weightwatchers/reactive.kinesis/common/KinesisTestConsumer.scala
new file mode 100644
index 0000000..9e05afc
--- /dev/null
+++ b/src/it/scala/com/weightwatchers/reactive.kinesis/common/KinesisTestConsumer.scala
@@ -0,0 +1,110 @@
+package com.weightwatchers.reactive.kinesis.common
+
+import java.util.Collections
+
+import com.amazonaws.ClientConfiguration
+import com.amazonaws.auth.AWSCredentialsProvider
+import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord
+import com.amazonaws.services.kinesis.model._
+import com.amazonaws.services.kinesis.{AmazonKinesisAsyncClient, _}
+import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.ConsumerConf
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.FiniteDuration
+
+object KinesisTestConsumer {
+
+ /**
+ * Creates an instance of the test consumer from the configuration.
+ * If the endpoint has been specified this will be used instead of deriving it from the region.
+ */
+ def from(config: ConsumerConf, requestTimeout: Option[FiniteDuration]): KinesisTestConsumer =
+ from(
+ config.kclConfiguration.getKinesisCredentialsProvider,
+ Option(config.kclConfiguration.getKinesisEndpoint),
+ Option(config.kclConfiguration.getRegionName),
+ requestTimeout,
+ config.kclConfiguration.getKinesisClientConfiguration
+ )
+
+ /**
+ * Creates an instance of the test consumer from the configuration.
+ * If the endpoint has been specified this will be used instead of deriving it from the region.
+ */
+ def from(credentialsProvider: AWSCredentialsProvider,
+ endpoint: Option[String],
+ region: Option[String],
+ requestTimeout: Option[FiniteDuration],
+ config: ClientConfiguration = new ClientConfiguration()): KinesisTestConsumer = {
+
+ requestTimeout.foreach(timeout => config.setRequestTimeout(timeout.toMillis.toInt))
+
+ val builder = AmazonKinesisAsyncClient
+ .asyncBuilder()
+ .withClientConfiguration(config)
+ .withCredentials(credentialsProvider)
+
+ endpoint.fold(builder.withRegion(region.getOrElse("us-east-1"))) { endpoint =>
+ builder.withEndpointConfiguration(
+ new EndpointConfiguration(endpoint, region.getOrElse("us-east-1"))
+ )
+ }
+
+ new KinesisTestConsumer(builder.build())
+ }
+}
+
+/**
+ * A test consumer which retrieves a batch of messages from Kinesis for validating a producer.
+ *
+ * Does not handle checkpointing.
+ *
+ * The point at which you create this consumer determines the point at which the messages are received from.
+ *
+ * To elaborate, the shardIterator is created as "LATEST", so only messages received after creation of this will be received.
+ *
+ */
+class KinesisTestConsumer(client: AmazonKinesis) {
+
+ /**
+ * Retrieves a batch of records from Kinesis as Strings.
+ *
+ * Handles de-aggregation.
+ */
+ def retrieveRecords(streamName: String, batchSize: Int): List[String] = {
+ getShards(streamName)
+ .flatMap { shard =>
+ val getRecordsRequest = new GetRecordsRequest
+ getRecordsRequest.setShardIterator(getShardIterator(streamName, shard))
+ getRecordsRequest.setLimit(batchSize)
+ client.getRecords(getRecordsRequest).getRecords.asScala.toList
+ }
+ .flatMap { record: Record =>
+ UserRecord
+ .deaggregate(Collections.singletonList(record))
+ .asScala
+ .map { ur =>
+ new String(ur.getData.array(), java.nio.charset.StandardCharsets.UTF_8)
+ }
+ }
+ }
+
+ private def getShardIterator(streamName: String, shard: Shard) = {
+ client
+ .getShardIterator(streamName, shard.getShardId, "TRIM_HORIZON")
+ .getShardIterator
+ }
+
+ private def getShards(streamName: String) = {
+ client
+ .describeStream(streamName)
+ .getStreamDescription
+ .getShards
+ .asScala
+ .toList
+ }
+
+ def shutdown(): Unit = client.shutdown()
+
+}
diff --git a/src/it/scala/com/weightwatchers/reactive.kinesis/common/TestCredentials.scala b/src/it/scala/com/weightwatchers/reactive.kinesis/common/TestCredentials.scala
new file mode 100644
index 0000000..8164a75
--- /dev/null
+++ b/src/it/scala/com/weightwatchers/reactive.kinesis/common/TestCredentials.scala
@@ -0,0 +1,20 @@
+package com.weightwatchers.reactive.kinesis.common
+
+import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider}
+
+class TestCredentials(accessKey: String, secretKey: String)
+ extends AWSCredentials
+ with AWSCredentialsProvider {
+
+ override def getAWSAccessKeyId: String = accessKey
+
+ override def getAWSSecretKey: String = secretKey
+
+ override def getCredentials: AWSCredentials = this
+
+ override def refresh: Unit = {}
+}
+
+object TestCredentials {
+ val Credentials = new TestCredentials("foo", "bar")
+}