Skip to content

Commit

Permalink
Merge pull request #118 from SwissBorg/more-unit-tests
Browse files Browse the repository at this point in the history
Cover the eventsByTag offset bugfix with a test case
  • Loading branch information
mkubala authored Oct 20, 2020
2 parents 04b15a5 + b4691ec commit 918315d
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,9 @@ class PostgresReadJournal(config: Config, configPath: String)(implicit val syste
* event to be persisted in the journal. */
from
case (Nil, maxOrdering) if maxOrdering < from =>
/* In case of either `maxOrdering` is staled or journal didn't reach the offset - we should wait
* for either maxOrdering to be discovered or journal to reach the requested offset */
/* In case of either `maxOrdering` is not yet discovered or journal didn't reach the offset - we should
* wait for either maximum ordering value to be eventually discovered or journal to reach
* the requested offset */
from
case (Nil, maxOrdering) =>
/* If no events matched the tag between `from` and `to` (`from + batchSize`) and `maxOrdering` then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,31 +120,6 @@ abstract class EventsByTagTest(val schemaType: SchemaType)
}
}

it should "find all events by tag starting from an offset" in withActorSystem { implicit system =>
val testJournalSize = 200
val journalOps = new ScalaPostgresReadJournalOperations(system)
withTestActors(replyToMessages = true) { (actor1, _, _) =>
for {
n <- (1 to testJournalSize).inclusive
} (actor1 ? withTags(n, "number")).futureValue

journalOps.withEventsByTag()("number", Sequence(testJournalSize / 2)) { tp =>
tp.request(Int.MaxValue)
for {
n <- ((testJournalSize / 2) + 1 to testJournalSize).inclusive
} tp.expectNext(EventEnvelope(Sequence(n), "my-1", n, n))

tp.cancel()
}

journalOps.withEventsByTag()("number", Sequence(testJournalSize)) { tp =>
tp.request(Int.MaxValue)
tp.expectNoMessage(NoMsgTime)
tp.cancel()
}
}
}

it should "deliver EventEnvelopes non-zero timestamps" in withActorSystem { implicit system =>

val journalOps = new ScalaPostgresReadJournalOperations(system)
Expand Down Expand Up @@ -276,6 +251,53 @@ abstract class EventsByTagTest(val schemaType: SchemaType)
}
}

it should "find newly stored event only when offset == maximum ordering" in withActorSystem { implicit system =>
val journalOps = new ScalaPostgresReadJournalOperations(system)
withTestActors(replyToMessages = true) { (actor1, actor2, actor3) =>
(actor1 ? withTags(1, "number")).futureValue
(actor2 ? withTags(2, "number")).futureValue
(actor3 ? withTags(3, "number")).futureValue

eventually {
journalOps.countJournal.futureValue shouldBe 3
}

journalOps.withEventsByTag()("number", Sequence(3)) { tp =>
tp.request(Int.MaxValue)
tp.expectNoMessage(1.second)

actor1 ? withTags(1, "number")
tp.expectNext(EventEnvelope(Sequence(4), "my-1", 2, 1))
tp.expectNoMessage(1.second)

tp.cancel()
}
}
}

it should "not find any events when offset >> maximum ordering" in withActorSystem { implicit system =>
val journalOps = new ScalaPostgresReadJournalOperations(system)
withTestActors(replyToMessages = true) { (actor1, actor2, actor3) =>
(actor1 ? withTags(1, "number")).futureValue
(actor2 ? withTags(2, "number")).futureValue
(actor3 ? withTags(3, "number")).futureValue

eventually {
journalOps.countJournal.futureValue shouldBe 3
}

journalOps.withEventsByTag()("number", Sequence(500)) { tp =>
tp.request(Int.MaxValue)
tp.expectNoMessage(1.second)

actor1 ? withTags(1, "number")
tp.expectNoMessage(1.second)

tp.cancel()
}
}
}

it should "persist and find tagged event for one tag" in withActorSystem { implicit system =>
val journalOps = new JavaDslPostgresReadJournalOperations(system)
withTestActors() { (actor1, actor2, actor3) =>
Expand Down Expand Up @@ -442,8 +464,8 @@ abstract class EventsByTagTest(val schemaType: SchemaType)
}
}

class NestedPartitionsScalaEventsByTagTest extends EventsByTagTest(NestedPartitions) //with BaseDbCleaner
class NestedPartitionsScalaEventsByTagTest extends EventsByTagTest(NestedPartitions)

class PartitionedScalaEventsByTagTest extends EventsByTagTest(Partitioned) //with BaseDbCleaner
class PartitionedScalaEventsByTagTest extends EventsByTagTest(Partitioned)

class PlainScalaEventsByTagTest extends EventsByTagTest(Plain) //with BaseDbCleaner
class PlainScalaEventsByTagTest extends EventsByTagTest(Plain)

0 comments on commit 918315d

Please sign in to comment.