From 7680fe15fe9b5fb162f440b287a7fe446ae1854d Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 6 Dec 2023 15:12:39 +0100 Subject: [PATCH] test: Harden SlickProjectionSpec (#1085) * make it more similar to R2dbcProjectionSpec --- .../projection/jdbc/JdbcProjectionSpec.scala | 2 +- .../r2dbc/R2dbcProjectionSpec.scala | 2 +- .../slick/SlickProjectionSpec.scala | 89 ++++++++++++++----- 3 files changed, 67 insertions(+), 26 deletions(-) diff --git a/akka-projection-jdbc/src/it/scala/akka/projection/jdbc/JdbcProjectionSpec.scala b/akka-projection-jdbc/src/it/scala/akka/projection/jdbc/JdbcProjectionSpec.scala index 535aded8f..14a158c21 100644 --- a/akka-projection-jdbc/src/it/scala/akka/projection/jdbc/JdbcProjectionSpec.scala +++ b/akka-projection-jdbc/src/it/scala/akka/projection/jdbc/JdbcProjectionSpec.scala @@ -262,7 +262,7 @@ class JdbcProjectionSpec private def projectedValueShouldIncludeNTimes(expected: String, nTimes: Int)(implicit entityId: String) = { withClue(s"checking projected value contains [$expected] $nTimes times: ") { val text = withRepo(_.findById(entityId)).futureValue.value.text - text.split("\\|").count(_ == expected) shouldBe nTimes + text.split('|').count(_ == expected) shouldBe nTimes } } diff --git a/akka-projection-r2dbc/src/it/scala/akka/projection/r2dbc/R2dbcProjectionSpec.scala b/akka-projection-r2dbc/src/it/scala/akka/projection/r2dbc/R2dbcProjectionSpec.scala index c0841a420..6ef7cb13f 100644 --- a/akka-projection-r2dbc/src/it/scala/akka/projection/r2dbc/R2dbcProjectionSpec.scala +++ b/akka-projection-r2dbc/src/it/scala/akka/projection/r2dbc/R2dbcProjectionSpec.scala @@ -229,7 +229,7 @@ class R2dbcProjectionSpec private def projectedValueShouldIncludeNTimes(expected: String, nTimes: Int)(implicit entityId: String) = { withClue(s"checking projected value contains [$expected] $nTimes times: ") { val text = withRepo(_.findById(entityId)).futureValue.get.text - text.split("\\|").count(_ == expected) shouldBe nTimes + text.split('|').count(_ == expected) shouldBe nTimes } } diff --git a/akka-projection-slick/src/test/scala/akka/projection/slick/SlickProjectionSpec.scala b/akka-projection-slick/src/test/scala/akka/projection/slick/SlickProjectionSpec.scala index 36e9102cf..f5816b815 100644 --- a/akka-projection-slick/src/test/scala/akka/projection/slick/SlickProjectionSpec.scala +++ b/akka-projection-slick/src/test/scala/akka/projection/slick/SlickProjectionSpec.scala @@ -875,19 +875,36 @@ class SlickProjectionSpec offsetOpt shouldBe empty } - withClue("check: projection failed with stream failure") { + withClue("check: projection received 3") { projectionTestKit.runWithTestSink(slickProjectionFailing) { sinkProbe => - sinkProbe.request(1000) - eventuallyExpectError(sinkProbe).getMessage should startWith(concatHandlerFail4Msg) + sinkProbe.request(3) + eventually { + val concatStr = dbConfig.db.run(repository.findById(entityId)).futureValue.value + concatStr.text shouldBe "abc|def|ghi" + offsetStore.readOffset[Long](projectionId).futureValue.value shouldBe 3L + } } } - withClue("check: projection is consumed up to third") { - val concatStr = dbConfig.db.run(repository.findById(entityId)).futureValue.value - concatStr.text shouldBe "abc|def|ghi" - } - withClue(s"check: last seen offset is 3L") { - val offset = offsetStore.readOffset[Long](projectionId).futureValue.value - offset shouldBe 3L + + // run again up to failure point + val slickProjectionFailing2 = + SlickProjection + .atLeastOnce( + projectionId = projectionId, + sourceProvider = sourceProvider(entityId), + databaseConfig = dbConfig, + () => bogusEventHandler) + .withSaveOffset(1, Duration.Zero) + withClue("check: projection failed with stream failure") { + projectionTestKit.runWithTestSink(slickProjectionFailing2) { sinkProbe => + sinkProbe.request(3) + eventuallyExpectError(sinkProbe).getMessage should startWith(concatHandlerFail4Msg) + eventually { + val concatStr = dbConfig.db.run(repository.findById(entityId)).futureValue.value + concatStr.text shouldBe "abc|def|ghi" + offsetStore.readOffset[Long](projectionId).futureValue.value shouldBe 3L + } + } } // re-run projection without failing function @@ -911,8 +928,10 @@ class SlickProjectionSpec } withClue("check: all offsets were seen") { - val offset = offsetStore.readOffset[Long](projectionId).futureValue.value - offset shouldBe 6L + eventually { + val offset = offsetStore.readOffset[Long](projectionId).futureValue.value + offset shouldBe 6L + } } } @@ -935,19 +954,39 @@ class SlickProjectionSpec offsetOpt shouldBe empty } - withClue("check: projection failed with stream failure") { + withClue("check: projection received 3") { projectionTestKit.runWithTestSink(slickProjectionFailing) { sinkProbe => - sinkProbe.request(1000) - eventuallyExpectError(sinkProbe).getMessage should startWith(concatHandlerFail4Msg) + sinkProbe.request(3) + eventually { + val concatStr = dbConfig.db.run(repository.findById(entityId)).futureValue.value + concatStr.text shouldBe "abc|def|ghi" + } } } - withClue("check: projection is consumed up to third") { - val concatStr = dbConfig.db.run(repository.findById(entityId)).futureValue.value - concatStr.text shouldBe "abc|def|ghi" - } - withClue(s"check: last seen offset is 2L") { - val offset = offsetStore.readOffset[Long](projectionId).futureValue.value - offset shouldBe 2L + + // run again up to failure point + val slickProjectionFailing2 = + SlickProjection + .atLeastOnce( + projectionId = projectionId, + sourceProvider = sourceProvider(entityId), + databaseConfig = dbConfig, + () => bogusEventHandler) + .withSaveOffset(2, 1.minute) + withClue("check: projection failed with stream failure") { + projectionTestKit.runWithTestSink(slickProjectionFailing2) { sinkProbe => + sinkProbe.request(3) + eventuallyExpectError(sinkProbe).getMessage should startWith(concatHandlerFail4Msg) + eventually { + // because failures, we may consume 'e1' and 'e2' more then once + // we check that it at least starts with 'e1|e2|e3' + val concatStr = dbConfig.db.run(repository.findById(entityId)).futureValue.value + concatStr.text should startWith("abc|def|ghi") + + // note elem 3 is processed twice + concatStr.text.split('|').count(_ == "ghi") shouldBe 2 + } + } } // re-run projection without failing function @@ -968,8 +1007,10 @@ class SlickProjectionSpec projectionTestKit.run(slickProjection) { withClue("checking: all values were concatenated") { val concatStr = dbConfig.db.run(repository.findById(entityId)).futureValue.value - // note that 3rd is duplicated - concatStr.text shouldBe "abc|def|ghi|ghi|jkl|mno|pqr" + concatStr.text should startWith("abc|def|ghi") + concatStr.text should endWith("ghi|jkl|mno|pqr") + // note elem 3 is should have been seen three times + concatStr.text.split('|').count(_ == "ghi") shouldBe 3 } }