Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Fix #25 by providing an Akka Stream Source graph stage for Kinesis. #39

Merged

Conversation

aquamatthias
Copy link
Contributor

Initial PR #36 has been closed in favour of automated integration testing setup.

This graph stage provides an akka source graph based on the actor model provided by the reactive kinesis consumer.

If a consumer is configured, an akka stream Source can be obtained via a simple Kinesis.source("consumer-name").
Every message that flows through the stream needs to be committed explicitly.

An integration test is added that relies on localstack running on the same node (I did not add the localstack startup logic as part of this PR).

Steps to run the test:

Set this env var, since the localstack does not support CBOR: export AWS_CBOR_DISABLE=true
Start localstack (I use the docker-compose with: docker-compose -f localstack.yml up) in default configuration (Kinesis on port 4568, Dynamo on port 4569)
sbt it:test
It was not as easy to come up with a good integration test, since a Kinesis Source usually never finishes. To have a finite test, I use Flow.take which ends the stream after a defined amount of entries. The interplay of different readers/shard consumers could be different between runs, but the test expectation should be met.

@markglh tests and integration tests have passed several successive test runs on my machine.

  • There seems to be a race condition in KCL when the lease table is created/accessed, which made the tests brittle. The test setup creates the lease table now proactively to remedy this shortcoming.
  • KinesisProducerIntegrationSpec was broken and is ignored to minimise the change
  • The test kinesis configuration settings have been relaxed. The original settings are good for local development but could lead to problems on not so well equipped or loaded machines (like CI server)

Let me know what you think.

@markglh
Copy link
Contributor

markglh commented Dec 11, 2017

Thanks @aquamatthias !

Was the Producer Spec failing locally or in CI? That one relies on the initialized localstack image for the setup.

@aquamatthias
Copy link
Contributor Author

@markglh locally. I just started withdocker-compose up And started it:test. What is missing?

@markglh
Copy link
Contributor

markglh commented Dec 11, 2017

Yeh that should be it really, if you bash into the container (docker exec -it container_id /bin/bash) then run awslocal kinesis list-streams what do you see?

These should exist:
https://github.com/WW-Digital/reactive-kinesis/blob/master/localstack/templates/cftemplate.yml

If not, I've seen this before where the localstack container has been cached so it doesn't initialise, in that case try deleting the cached container (no need to delete the image).

@markglh
Copy link
Contributor

markglh commented Dec 11, 2017

I'll also update that test to use your method of creating what it needs as part of the test.

@aquamatthias
Copy link
Contributor Author

Hey @markglh. This is what I see in my local docker setup, after starting docker-comppse up:

➔ docker exec -it 8f18d6e353fb /bin/bash
bash-4.3# awslocal kinesis list-streams
{
    "StreamNames": []
}

This seems to be the problem with the failing test. It failed before I did any change. Something you can look into?

  1. Regarding the travis build: 201.1 fails, but sbt has finished successfully. https://travis-ci.org/WW-Digital/reactive-kinesis/jobs/314856826 . What is missing?

  2. I still see timeouts in the CI build: Timed out processing batch. I will increase the settings.

@markglh
Copy link
Contributor

markglh commented Dec 11, 2017

That's weird with the travis build failing!

Locally, did you try clearing the containers?

docker stop $(docker ps -a -q)
docker rm $(docker ps -a -q)

To elaborate: this version of localstack uses the healthcheck to init the container. However it seems that if the container already exists (default localstack) that it's considered healthy and doesn't run the custom healthcheck. It's not an issue in CI because it's always clean.
Swapping over to create the streams if they don't exist will fix this in all cases - which I'll do, but clearing the container should work.

@aquamatthias
Copy link
Contributor Author

Done as suggested. I get some strange logs:

localstack_1     | 2017-12-11T18:08:54:ERROR:localstack.services.generic_proxy: Error forwarding request: HTTPConnectionPool(host='127.0.0.1', port=4559): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f6e2352f490>: Failed to establish a new connection: [Errno 111] Connection refused',)) Traceback (most recent call last):
localstack_1     |   File "/opt/code/localstack/localstack/services/generic_proxy.py", line 201, in forward
localstack_1     |     headers=forward_headers)
localstack_1     |   File "/opt/code/localstack/.venv/lib/python2.7/site-packages/requests/api.py", line 112, in post
localstack_1     |     return request('post', url, data=data, json=json, **kwargs)
localstack_1     |   File "/opt/code/localstack/.venv/lib/python2.7/site-packages/requests/api.py", line 58, in request
localstack_1     |     return session.request(method=method, url=url, **kwargs)
localstack_1     |   File "/opt/code/localstack/.venv/lib/python2.7/site-packages/requests/sessions.py", line 508, in request
localstack_1     |     resp = self.send(prep, **send_kwargs)
localstack_1     |   File "/opt/code/localstack/.venv/lib/python2.7/site-packages/requests/sessions.py", line 618, in send
localstack_1     |     r = adapter.send(request, **kwargs)
localstack_1     |   File "/opt/code/localstack/.venv/lib/python2.7/site-packages/requests/adapters.py", line 508, in send
localstack_1     |     raise ConnectionError(e, request=request)
localstack_1     | ConnectionError: HTTPConnectionPool(host='127.0.0.1', port=4559): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f6e2352f490>: Failed to establish a new connection: [Errno 111] Connection refused',))
localstack_1     | 

but I see the streams locally:

bash-4.3# awslocal kinesis list-streams
{
    "StreamNames": [
        "KinesisSourceSpec", 
        "int-test-stream-1", 
        "int-test-stream-2", 
        "int-test-stream-3", 
        "int-test-stream-4"
    ]
}

Reenabled the test, which runs green now.

Next step: try to see if we can make Travis happy.

@aquamatthias
Copy link
Contributor Author

@markglh The sbt side of things is green as far as I can see:

[success] Total time: 90 s, completed Dec 11, 2017 6:19:10 PM
[success] Total time: 101 s, completed Dec 11, 2017 6:19:30 PM

But the build is not successful:

$ docker-compose -f localstack/docker-compose.yml down
Done. Your build exited with 1.

Something you can look into?

@markglh
Copy link
Contributor

markglh commented Dec 11, 2017

Thanks @aquamatthias - Some progress at least :D

I will look into the Travis weirdness!

@markglh
Copy link
Contributor

markglh commented Dec 17, 2017

@aquamatthias
Sorted the travis issues - it was just formatting, but the structure of the build made it confusing so I've tweaked it.

I've applied the changes and refactored the integration test setup and producer spec to all create the streams on the fly.

I'll have a proper run through the Stream Source implementation tomorrow hopefully so we can finally get this merged in!

The branch lives here: https://github.com/WW-Digital/reactive-kinesis/tree/feature/akka-streams-review - feel free to just rebase that into yours. Any concerns let me know.

I'm not sure whether my tweaks affected anything - but I have seen that same test fail again in Travis. I'll also look into it. Haven't managed to reproduce locally yet.

@coveralls
Copy link

Coverage Status

Coverage increased (+0.1%) to 86.923% when pulling 6ed6fa1 on aquamatthias:feature/aquamatthias/akka-streams into 2b25839 on WW-Digital:master.

1 similar comment
@coveralls
Copy link

Coverage Status

Coverage increased (+0.1%) to 86.923% when pulling 6ed6fa1 on aquamatthias:feature/aquamatthias/akka-streams into 2b25839 on WW-Digital:master.

@aquamatthias
Copy link
Contributor Author

@markglh I cherry picked your commits on top of this branch - thanks for the improvements.
Travis seems to be happy. Waiting for your review.

* The KinesisEvent is passed through the stream.
* Every event has to be committed explicitly.
*/
sealed trait KinesisEvent {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about the name of this. What about CommittableEvent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes CommittableEvent sounds good to me. Changed it that way.
In my current project we need a way to read and handle the message and than in a later stage commit this event. In order to support such cases I changed the definition to CommittableEvent[T] where T is emitted as ConsumerEvent. Together with map and mapAsync it is possible to change the payload and still have the event context. WDYT?

Copy link
Contributor

@markglh markglh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just the one minor code change mentioned above.

Otherwise, would you mind adding a new section to the readme here:
https://github.com/WW-Digital/reactive-kinesis/blame/master/README.md#L298
With a h3 heading Akka Stream Source Implementation - which gives a short overview of the implementation with a code example like the other sections in the readme.

Then add another heading here:
https://github.com/WW-Digital/reactive-kinesis/blame/master/README.md#L208
Actor Based Implementation - to separate out the two implementations.

I'm trying to keep the library as approachable as possible by having detailed docs for newcomers.

After that we're all good! Thanks again for all your work :D

I would ideally like a unit test for KinesisSourceGraph - but it's not trivial and we have it covered in the integration test - so I'm happy to defer that.

@aquamatthias aquamatthias force-pushed the feature/aquamatthias/akka-streams branch from 6ed6fa1 to 460651a Compare December 21, 2017 15:39
@coveralls
Copy link

Coverage Status

Coverage increased (+1.01%) to 88.101% when pulling 460651a on aquamatthias:feature/aquamatthias/akka-streams into b4e85eb on WW-Digital:master.

@coveralls
Copy link

Coverage Status

Coverage increased (+2.3%) to 89.421% when pulling e3f58e3 on aquamatthias:feature/aquamatthias/akka-streams into b4e85eb on WW-Digital:master.

1 similar comment
@coveralls
Copy link

Coverage Status

Coverage increased (+2.3%) to 89.421% when pulling e3f58e3 on aquamatthias:feature/aquamatthias/akka-streams into b4e85eb on WW-Digital:master.

@aquamatthias
Copy link
Contributor Author

@markglh Thanks for good review comments.
I added a section in the Readme: https://github.com/aquamatthias/reactive-kinesis/tree/feature/aquamatthias/akka-streams#akka-stream-source.
Since I added a unit test for the source graph stage, coveralls seems to be happy too.
Let me know what you think.

@aquamatthias aquamatthias force-pushed the feature/aquamatthias/akka-streams branch from 4bf9b84 to 357efd3 Compare December 21, 2017 16:47
@coveralls
Copy link

coveralls commented Dec 21, 2017

Coverage Status

Coverage increased (+1.3%) to 88.413% when pulling 357efd3 on aquamatthias:feature/aquamatthias/akka-streams into b4e85eb on WW-Digital:master.

@markglh
Copy link
Contributor

markglh commented Dec 21, 2017

That's great @aquamatthias - the changes make sense too, thanks again for this!!

@markglh markglh merged commit 765c1c8 into WW-Digital:master Dec 21, 2017
@aquamatthias aquamatthias deleted the feature/aquamatthias/akka-streams branch December 22, 2017 07:26
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Development

Successfully merging this pull request may close these issues.

3 participants