Skip to content
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.

Commit

Permalink
Fix string->ByteBuffer conversion after rebase.
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias committed Dec 21, 2017
1 parent 8009c8d commit 460651a
Showing 1 changed file with 7 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.weightwatchers.reactive.kinesis.stream

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.util.Date

import akka.actor.Status.Failure
Expand Down Expand Up @@ -61,7 +63,7 @@ class KinesisSourceGraphSpec
}
val all = source.take(4).runWith(Sink.seq).futureValue
all should have size 4
all.map(_.payload.payload) shouldBe Seq("1", "2", "3", "4") //correct order is maintained
all.map(_.payload.payloadAsString()) shouldBe Seq("1", "2", "3", "4") //correct order is maintained
}

"allows mapped and async mapped stage events" in new Fixture {
Expand All @@ -70,7 +72,7 @@ class KinesisSourceGraphSpec
ref ! processEvent("312")
}
val all = source
.map(_.map(_.payload.toLong)) // CommittableEvent ConsumerEvent => Long
.map(_.map(_.payloadAsString().toLong)) // CommittableEvent ConsumerEvent => Long
.mapAsync(1)(_.mapAsync(long => Future(new Date(long)))) // CommittableEvent Long => Date
.take(2)
.runWith(Sink.seq)
Expand Down Expand Up @@ -156,7 +158,9 @@ class KinesisSourceGraphSpec
val seqNr = 0.until(Int.MaxValue).iterator.map(_.toLong)
def processEvent(payload: String): ProcessEvent =
ProcessEvent(
ConsumerEvent(CompoundSequenceNumber("fixed", seqNr.next()), payload, DateTime.now())
ConsumerEvent(CompoundSequenceNumber("fixed", seqNr.next()),
ByteBuffer.wrap(payload.getBytes(StandardCharsets.UTF_8)),
DateTime.now())
)
}

Expand Down

0 comments on commit 460651a

Please sign in to comment.