-
Notifications
You must be signed in to change notification settings - Fork 13
Fix #25 by providing an Akka Stream Source graph stage for Kinesis. #36
Conversation
…Kinesis. This graph stage provides an akka source graph based on the actor model provided by the reactive kinesis consumer.
Thanks @aquamatthias! Regarding the localstack stuff - I'm in the process of adding that and hooking it up to travis in #34 I've got a lot of travelling to do this week but I'll dig into this properly over the next couple of weeks 👌 |
This is really cool! |
Ok it's been a month - so sorry @aquamatthias - now things have settled down it's high up on my list!! |
I've finished the integration tests finally - so they run in travis now against localstack. This is next... |
Created a branch off this to rebase master and refactor the integration tests. My intention is to find a middle ground between what I did and what you did. There are pros and cons of each! https://github.com/WW-Digital/reactive-kinesis/tree/feature/aquamatthias/akka-streams Right now it's building and running in travis - but not quite finished yet. Not spent much time looking at the main code yet / reviewing... Seen this fail a couple of times in the travis builds:
May be my refactoring - I'll keep prodding :) |
@markglh Sounds good. Let me know if I there is something I can help with. |
Thanks @aquamatthias - you should be able to merge my branch into yours without conflicts 👌 |
This graph stage provides an akka source graph based on the actor model provided by the reactive kinesis consumer.
If a consumer is configured, an akka stream Source can be obtained via a simple
Kinesis.source("consumer-name")
.Every message that flows through the stream needs to be committed explicitly.
An integration test is added that relies on localstack running on the same node (I did not add the localstack startup logic as part of this PR).
Steps to run the test:
export AWS_CBOR_DISABLE=true
docker-compose -f localstack.yml up
) in default configuration (Kinesis on port 4568, Dynamo on port 4569)sbt it:test
It was not as easy to come up with a good integration test, since a Kinesis Source usually never finishes. To have a finite test, I use
Flow.take
which ends the stream after a defined amount of entries. The interplay of different readers/shard consumers could be different between runs, but the test expectation should be met.