Skip to content

Commit

Permalink
Added backend updates notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
hnaderi committed Jun 8, 2022
1 parent 760a652 commit 506f4cf
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ abstract class PersistenceSuite(
.compile
.toList
.assertEquals(List(4, 5, 6))

_ <- s.updates.journal.head.compile.lastOrError.assertEquals(())
_ <- s.updates.outbox.head.compile.lastOrError.assertEquals(())
} yield ()
}

Expand All @@ -79,6 +82,8 @@ abstract class PersistenceSuite(
.compile
.toList
.assertEquals(List(4, 5, 6))

_ <- s.updates.outbox.head.compile.lastOrError.assertEquals(())
} yield ()
}

Expand Down
5 changes: 3 additions & 2 deletions modules/skunk/src/main/scala/SkunkBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,11 @@ object SkunkBackend {
(jQ, nQ, cQ) = qs
given ModelTC[S, E, R] = model
s <- snapshot
updates <- Resource.eval(Notifications[F])
_outbox = SkunkOutboxReader(pool, nQ)
_journal = SkunkJournalReader(pool, jQ)
_repo = RepositoryReader(_journal, s)
skRepo = SkunkRepository(pool, jQ, nQ, cQ, _repo)
skRepo = SkunkRepository(pool, jQ, nQ, cQ, _repo, updates)
compiler <-
if cached then
Resource
Expand All @@ -164,6 +165,6 @@ object SkunkBackend {
else Resource.pure(skRepo)
h = CommandHandler.withRetry(compiler, maxRetry, retryInitialDelay)

} yield Backend(h, _outbox, _journal, _repo)
} yield Backend(h, _outbox, _journal, _repo, updates)
}
}
30 changes: 17 additions & 13 deletions modules/skunk/src/main/scala/SkunkRepository.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ private final class SkunkRepository[F[_], S, E, R, N](
journal: Queries.Journal[E],
outbox: Queries.Outbox[N],
cmds: Queries.Commands,
repository: RepositoryReader[F, S, E, R]
repository: RepositoryReader[F, S, E, R],
updates: NotificationsPublisher[F]
)(using F: Sync[F], clock: Clock[F])
extends Repository[F, S, E, R, N] {

Expand Down Expand Up @@ -94,20 +95,23 @@ private final class SkunkRepository[F[_], S, E, R, N](
.adaptErr { case SqlState.UniqueViolation(ex) =>
BackendError.VersionConflict
}
.flatMap(_ => updates.notifyJournal >> updates.notifyOutbox)

def notify(
ctx: RequestContext[?, ?],
notifications: NonEmptyChain[N]
): F[Unit] = trx.use { s =>
for {
now <- currentTime
ns = notifications.toList.map(
(_, ctx.command.address, now, ctx.command.metadata)
)
_ <- s
.prepare(outbox.insertAll(ns))
.use(_.execute(ns))
.assertInserted(ns.size)
} yield ()
}
): F[Unit] = trx
.use { s =>
for {
now <- currentTime
ns = notifications.toList.map(
(_, ctx.command.address, now, ctx.command.metadata)
)
_ <- s
.prepare(outbox.insertAll(ns))
.use(_.execute(ns))
.assertInserted(ns.size)
} yield ()
}
.flatMap(_ => updates.notifyOutbox)
}
3 changes: 2 additions & 1 deletion modules/sql-backend/src/main/scala/Backend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ final class Backend[F[_], S, E, R, N](
val compile: CommandHandler[F, S, E, R, N],
val outbox: OutboxReader[F, N],
val journal: JournalReader[F, E],
val repository: RepositoryReader[F, S, E, R]
val repository: RepositoryReader[F, S, E, R],
val updates: NotificationsConsumer[F]
)
48 changes: 48 additions & 0 deletions modules/sql-backend/src/main/scala/Notifications.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2021 Hossein Naderi
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package edomata.backend

import cats.effect.Concurrent
import cats.effect.std.Queue
import cats.implicits.*
import fs2.Stream

trait NotificationsConsumer[F[_]] {
def outbox: Stream[F, Unit]
def journal: Stream[F, Unit]
}

trait NotificationsPublisher[F[_]] {
def notifyOutbox: F[Unit]
def notifyJournal: F[Unit]
}

trait Notifications[F[_]]
extends NotificationsConsumer[F],
NotificationsPublisher[F]

object Notifications {
def apply[F[_]: Concurrent]: F[Notifications[F]] = for {
o <- Queue.circularBuffer[F, Unit](1)
j <- Queue.circularBuffer[F, Unit](1)
} yield new {
def outbox: Stream[F, Unit] = Stream.fromQueueUnterminated(o, 1)
def journal: Stream[F, Unit] = Stream.fromQueueUnterminated(j, 1)
def notifyOutbox: F[Unit] = o.offer(())
def notifyJournal: F[Unit] = j.offer(())
}
}
78 changes: 78 additions & 0 deletions modules/sql-backend/src/test/scala/NotificationsSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2021 Hossein Naderi
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package edomata.backend

import cats.effect.IO
import cats.effect.testkit.TestControl
import cats.implicits.*
import fs2.Chunk
import fs2.Stream
import munit.CatsEffectSuite

import scala.concurrent.duration.*

class NotificationsSuite extends CatsEffectSuite {
test("Must notify outbox listeners") {
TestControl.executeEmbed(
for {
ns <- Notifications[IO]
_ <- ns.notifyOutbox
_ <- assertNotified(ns.outbox)
} yield ()
)
}

test("Must notify outbox listeners once") {
TestControl.executeEmbed(
for {
ns <- Notifications[IO]
_ <- ns.notifyOutbox
_ <- ns.notifyOutbox
_ <- ns.notifyOutbox
_ <- assertNotified(ns.outbox)
} yield ()
)
}

test("Must notify journal listeners") {
TestControl.executeEmbed(
for {
ns <- Notifications[IO]
_ <- ns.notifyJournal
_ <- assertNotified(ns.journal)
} yield ()
)
}

test("Must notify journal listeners once") {
TestControl.executeEmbed(
for {
ns <- Notifications[IO]
_ <- ns.notifyJournal
_ <- ns.notifyJournal
_ <- ns.notifyJournal
_ <- assertNotified(ns.journal)
} yield ()
)
}
private def assertNotified(s: Stream[IO, Unit]) =
s.groupWithin(10, 10.hours)
.head
.compile
.lastOrError
.assertEquals(Chunk(()))
}

0 comments on commit 506f4cf

Please sign in to comment.