Skip to content

Commit

Permalink
test: Harden SlickProjectionSpec (#1085)
Browse files Browse the repository at this point in the history
* make it more similar to R2dbcProjectionSpec
  • Loading branch information
patriknw authored Dec 6, 2023
1 parent 45df52f commit 7680fe1
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ class JdbcProjectionSpec
private def projectedValueShouldIncludeNTimes(expected: String, nTimes: Int)(implicit entityId: String) = {
withClue(s"checking projected value contains [$expected] $nTimes times: ") {
val text = withRepo(_.findById(entityId)).futureValue.value.text
text.split("\\|").count(_ == expected) shouldBe nTimes
text.split('|').count(_ == expected) shouldBe nTimes
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ class R2dbcProjectionSpec
private def projectedValueShouldIncludeNTimes(expected: String, nTimes: Int)(implicit entityId: String) = {
withClue(s"checking projected value contains [$expected] $nTimes times: ") {
val text = withRepo(_.findById(entityId)).futureValue.get.text
text.split("\\|").count(_ == expected) shouldBe nTimes
text.split('|').count(_ == expected) shouldBe nTimes
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -875,19 +875,36 @@ class SlickProjectionSpec
offsetOpt shouldBe empty
}

withClue("check: projection failed with stream failure") {
withClue("check: projection received 3") {
projectionTestKit.runWithTestSink(slickProjectionFailing) { sinkProbe =>
sinkProbe.request(1000)
eventuallyExpectError(sinkProbe).getMessage should startWith(concatHandlerFail4Msg)
sinkProbe.request(3)
eventually {
val concatStr = dbConfig.db.run(repository.findById(entityId)).futureValue.value
concatStr.text shouldBe "abc|def|ghi"
offsetStore.readOffset[Long](projectionId).futureValue.value shouldBe 3L
}
}
}
withClue("check: projection is consumed up to third") {
val concatStr = dbConfig.db.run(repository.findById(entityId)).futureValue.value
concatStr.text shouldBe "abc|def|ghi"
}
withClue(s"check: last seen offset is 3L") {
val offset = offsetStore.readOffset[Long](projectionId).futureValue.value
offset shouldBe 3L

// run again up to failure point
val slickProjectionFailing2 =
SlickProjection
.atLeastOnce(
projectionId = projectionId,
sourceProvider = sourceProvider(entityId),
databaseConfig = dbConfig,
() => bogusEventHandler)
.withSaveOffset(1, Duration.Zero)
withClue("check: projection failed with stream failure") {
projectionTestKit.runWithTestSink(slickProjectionFailing2) { sinkProbe =>
sinkProbe.request(3)
eventuallyExpectError(sinkProbe).getMessage should startWith(concatHandlerFail4Msg)
eventually {
val concatStr = dbConfig.db.run(repository.findById(entityId)).futureValue.value
concatStr.text shouldBe "abc|def|ghi"
offsetStore.readOffset[Long](projectionId).futureValue.value shouldBe 3L
}
}
}

// re-run projection without failing function
Expand All @@ -911,8 +928,10 @@ class SlickProjectionSpec
}

withClue("check: all offsets were seen") {
val offset = offsetStore.readOffset[Long](projectionId).futureValue.value
offset shouldBe 6L
eventually {
val offset = offsetStore.readOffset[Long](projectionId).futureValue.value
offset shouldBe 6L
}
}
}

Expand All @@ -935,19 +954,39 @@ class SlickProjectionSpec
offsetOpt shouldBe empty
}

withClue("check: projection failed with stream failure") {
withClue("check: projection received 3") {
projectionTestKit.runWithTestSink(slickProjectionFailing) { sinkProbe =>
sinkProbe.request(1000)
eventuallyExpectError(sinkProbe).getMessage should startWith(concatHandlerFail4Msg)
sinkProbe.request(3)
eventually {
val concatStr = dbConfig.db.run(repository.findById(entityId)).futureValue.value
concatStr.text shouldBe "abc|def|ghi"
}
}
}
withClue("check: projection is consumed up to third") {
val concatStr = dbConfig.db.run(repository.findById(entityId)).futureValue.value
concatStr.text shouldBe "abc|def|ghi"
}
withClue(s"check: last seen offset is 2L") {
val offset = offsetStore.readOffset[Long](projectionId).futureValue.value
offset shouldBe 2L

// run again up to failure point
val slickProjectionFailing2 =
SlickProjection
.atLeastOnce(
projectionId = projectionId,
sourceProvider = sourceProvider(entityId),
databaseConfig = dbConfig,
() => bogusEventHandler)
.withSaveOffset(2, 1.minute)
withClue("check: projection failed with stream failure") {
projectionTestKit.runWithTestSink(slickProjectionFailing2) { sinkProbe =>
sinkProbe.request(3)
eventuallyExpectError(sinkProbe).getMessage should startWith(concatHandlerFail4Msg)
eventually {
// because failures, we may consume 'e1' and 'e2' more then once
// we check that it at least starts with 'e1|e2|e3'
val concatStr = dbConfig.db.run(repository.findById(entityId)).futureValue.value
concatStr.text should startWith("abc|def|ghi")

// note elem 3 is processed twice
concatStr.text.split('|').count(_ == "ghi") shouldBe 2
}
}
}

// re-run projection without failing function
Expand All @@ -968,8 +1007,10 @@ class SlickProjectionSpec
projectionTestKit.run(slickProjection) {
withClue("checking: all values were concatenated") {
val concatStr = dbConfig.db.run(repository.findById(entityId)).futureValue.value
// note that 3rd is duplicated
concatStr.text shouldBe "abc|def|ghi|ghi|jkl|mno|pqr"
concatStr.text should startWith("abc|def|ghi")
concatStr.text should endWith("ghi|jkl|mno|pqr")
// note elem 3 is should have been seen three times
concatStr.text.split('|').count(_ == "ghi") shouldBe 3
}
}

Expand Down

0 comments on commit 7680fe1

Please sign in to comment.