From 460651a87c6f6fe0ebb574ffea750374cf4f894d Mon Sep 17 00:00:00 2001 From: Matthias Veit Date: Thu, 21 Dec 2017 16:37:36 +0100 Subject: [PATCH] Fix string->ByteBuffer conversion after rebase. --- .../kinesis/stream/KinesisSourceGraphSpec.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphSpec.scala b/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphSpec.scala index 6ec754c..9a0c347 100644 --- a/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphSpec.scala +++ b/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphSpec.scala @@ -16,6 +16,8 @@ package com.weightwatchers.reactive.kinesis.stream +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets import java.util.Date import akka.actor.Status.Failure @@ -61,7 +63,7 @@ class KinesisSourceGraphSpec } val all = source.take(4).runWith(Sink.seq).futureValue all should have size 4 - all.map(_.payload.payload) shouldBe Seq("1", "2", "3", "4") //correct order is maintained + all.map(_.payload.payloadAsString()) shouldBe Seq("1", "2", "3", "4") //correct order is maintained } "allows mapped and async mapped stage events" in new Fixture { @@ -70,7 +72,7 @@ class KinesisSourceGraphSpec ref ! processEvent("312") } val all = source - .map(_.map(_.payload.toLong)) // CommittableEvent ConsumerEvent => Long + .map(_.map(_.payloadAsString().toLong)) // CommittableEvent ConsumerEvent => Long .mapAsync(1)(_.mapAsync(long => Future(new Date(long)))) // CommittableEvent Long => Date .take(2) .runWith(Sink.seq) @@ -156,7 +158,9 @@ class KinesisSourceGraphSpec val seqNr = 0.until(Int.MaxValue).iterator.map(_.toLong) def processEvent(payload: String): ProcessEvent = ProcessEvent( - ConsumerEvent(CompoundSequenceNumber("fixed", seqNr.next()), payload, DateTime.now()) + ConsumerEvent(CompoundSequenceNumber("fixed", seqNr.next()), + ByteBuffer.wrap(payload.getBytes(StandardCharsets.UTF_8)), + DateTime.now()) ) }