Releases: WW-Digital/reactive-kinesis
Fix checkpointing, bump KCL & switch to openjdk
Bump SDK Versions
Upgrade amazon-kinesis-client to 1.9.3
See: #74
Bump Versions
This updates
sbt
1.1.1 -> 1.2.8scala
2.12.5 -> 2.12.8jackson
2.9.4 -> 2.9.8akka
2.5.11 -> 2.5.19(broke, reverted)amazon-kinesis-client
1.8.10 -> 1.9.3amazon-kinesis-producer
0.12.8 -> 0.12.11scala-logging
3.8.0 -> 3.9.0
Note: KCL has two new fields:
listShardsBackoffTimeInMillis
maxListShardsRetryAttempts
Add KPL Typed Config
Add additional factory methods to Kinesis to create Akka Source
Upgrade Dependencies
Updates all packages to newest versions. 1.1.1 SBT builds.
One exception - I could not update the KCL version fully to 1.9.0 as the localstack dependency, kinesisalite, does not support KCL's newest dependency on the ListShards Kinesis API. This resulted in failed integration tests.
Thanks to @etspaceman
Add Akka Stream Producer Sink
Adds a KinesisSinkGraphStage that uses an underlying KinesisProducerActor to do the heavy lifting. The stream manages back pressure by allowing only a fixed number of outstanding messages. A materialized value is used to indicate, when a stream has been finished (or failed).
See: https://github.com/WW-Digital/reactive-kinesis#akka-stream-sink
Remove unnecessary dependencies
Small release to remove some unused dependencies.
Add Akka Stream Source, Upgrade Libraries
Changes
Consumer
- Update KCL to 1.8.8 (#44)
- Support timeouts and retries for
getRecords
calls.- Applications can now use an async retry for
getRecords
calls to Kinesis. In addition to setting theretryGetRecordsInSeconds
timeout, the application must also provide amaxGetRecordsThreadPool
size for concurrent requests.
- Applications can now use an async retry for
- Support configuring the graceful shutdown timeout for MultiLang Clients using
shutdownGraceMillis
. This adds support for setting the timeout that the Java process will wait for the MutliLang client to complete graceful shutdown. - Add Record pre-fetching
- This will retrieve and queue additional records from Kinesis while the application is processing existing records.
- Prefetching can be enabled by setting
dataFetchingStrategy
toPREFETCH_CACHED
. Once enabled an additional fetching thread will be started to retrieve records from Kinesis. Retrieved records will be held in a queue until the application is ready to process them. - Logging of long running tasks can be enabled by setting the
logWarningForTaskAfterMillis
configuration property
- Support timeouts and retries for
- Add an Akka Streams
Source
that can be used with streams. It is possible to create aSource
from aConsumerConf
or directly from the consumer name that is defined in the configuration. Every message that is emitted to the stream is of typeCommitableEvent[ConsumerEvent]
and has to be committed explicitly downstream with a call toevent.commit()
. It is possible to map to a different type ofCommittableEvent
via themap
andmapAsync
functionality. AKinesisConsumer
is used internally for theKinesis.source
. All rules described for the KinesisConsumer also apply for the streamSource
. (#39) - Update
ConsumerEvent
to use ByteBuffer instead of String. #41 - Fix issue where ConsumerWorker with related CheckpointWorker wasn't stopped on lease loss. In case of a lost lease, KCL now calls the shutdown method of the record processor. (#40)
Producer
- Update KPL to 0.12.8 (#44)
Migration guide
ConsumerEvent
now returns a ByteBuffer
instead of a String
. Use payloadAsString
to maintain previous functionality.
Massive thanks to @aquamatthias for the contributions
Reworked Producer Configuration, added ThreadingModel support
PR: #30
See below for the new producer instantiation, you now use ProducerConf
for the Scala implementation too.
New method:
val kinesisConfig: Config = ConfigFactory.load().getConfig("kinesis")
val kpl = KinesisProducer(ProducerConf(kinesisConfig, "some-producer"))
Note that the config is the kinesis
block, not the kpl
block as before.
Old method:
val kinesisConfig: Config = ConfigFactory.load().getConfig("kinesis")
val producerConfig: Config = kinesisConfig.getConfig("some-producer")
val streamName: String = producerConfig.getString("stream-name")
val kpl = KinesisProducerKPL(kinesisConfig.getConfig("kpl"), streamName)
- Small breaking change Removed the trait in the Producer and renamed KinesisProducerKPL to KinesisProducer
- Added
ThreadPoolSize
andThreadingModel
KPL properties, see awslabs/amazon-kinesis-producer#100 - Refactored how the properties are loaded to share commonality between the actor and Scala
implementations. This simplifies the Scala implementation and makes it more robust. It also allows more control over the underlying KPL properties should someone want to not use the Typesafe config approach.