-
Notifications
You must be signed in to change notification settings - Fork 17
/
KPLTests.scala
60 lines (53 loc) · 1.86 KB
/
KPLTests.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package kinesis.mock
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import java.nio.ByteBuffer
import cats.effect.{IO, Resource, SyncIO}
import cats.syntax.all._
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.producer._
import kinesis.mock.instances.arbitrary._
import kinesis.mock.syntax.javaFuture._
import kinesis.mock.syntax.scalacheck._
class KPLTests extends AwsFunctionalTests {
implicit val E: ExecutionContextExecutor = ExecutionContext.global
val kplFixture: SyncIO[FunFixture[KPLResources]] = ResourceFunFixture(
resource.flatMap { resources =>
Resource
.make(
IO(
new KinesisProducer(
new KinesisProducerConfiguration()
.setCredentialsProvider(AwsCreds.LocalCreds)
.setRegion(
RegionUtils.getRegion(resources.awsRegion.entryName).getName
)
.setKinesisEndpoint("localhost")
.setKinesisPort(4567L) // KPL only supports TLS
.setCloudwatchEndpoint("localhost")
.setCloudwatchPort(4566L)
.setStsEndpoint("localhost")
.setStsPort(4566L)
.setVerifyCertificate(false)
)
)
)(x => IO(x.flushSync()) *> IO(x.destroy()))
.map(kpl => KPLResources(resources, kpl))
}
)
kplFixture.test("it should produce records") { resources =>
for {
dataRecords <- IO(dataGen.take(20).toVector)
res <- dataRecords.parTraverse(data =>
resources.kpl
.addUserRecord(
new UserRecord(
resources.functionalTestResources.streamName.streamName,
Utils.randomUUIDString,
ByteBuffer.wrap(data)
)
)
.toIO
)
} yield assert(res.forall(_.isSuccessful()))
}
}