Skip to content

Commit

Permalink
Prevent interrupting/recreating the expiration fiber too often (#96)
Browse files Browse the repository at this point in the history
* Renew expiration fiber instead of recreating it

* fix

* Add unit test
  • Loading branch information
nox213 authored Nov 1, 2023
1 parent 4b4b795 commit 911a90f
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 15 deletions.
3 changes: 2 additions & 1 deletion core/src/main/scala/com/devsisters/shardcake/package.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.devsisters

package object shardcake {
type ShardId = Int
type ShardId = Int
type EpochMillis = Long
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import com.devsisters.shardcake.errors.EntityNotManagedByThisPod
import com.devsisters.shardcake._
import zio.{ Config => _, _ }

import java.util.concurrent.TimeUnit

private[shardcake] trait EntityManager[-Req] {
def send(
entityId: String,
Expand All @@ -27,7 +29,7 @@ private[shardcake] object EntityManager {
entityMaxIdleTime: Option[Duration]
): URIO[R, EntityManager[Req]] =
for {
entities <- Ref.Synchronized.make[Map[String, (Either[Queue[Req], Signal], Fiber[Nothing, Unit])]](Map())
entities <- Ref.Synchronized.make[Map[String, (Either[Queue[Req], Signal], EpochMillis)]](Map())
env <- ZIO.environment[R]
} yield new EntityManagerLive[Req](
recipientType,
Expand All @@ -39,37 +41,52 @@ private[shardcake] object EntityManager {
entityMaxIdleTime
)

private def currentTimeInMilliseconds: UIO[EpochMillis] =
Clock.currentTime(TimeUnit.MILLISECONDS)

class EntityManagerLive[Req](
recipientType: RecipientType[Req],
behavior: (String, Queue[Req]) => Task[Nothing],
terminateMessage: Signal => Option[Req],
entities: Ref.Synchronized[Map[String, (Either[Queue[Req], Signal], Fiber[Nothing, Unit])]],
entities: Ref.Synchronized[Map[String, (Either[Queue[Req], Signal], EpochMillis)]],
sharding: Sharding,
config: Config,
entityMaxIdleTime: Option[Duration]
) extends EntityManager[Req] {
private def startExpirationFiber(entityId: String): UIO[Fiber[Nothing, Unit]] =
private def startExpirationFiber(entityId: String): UIO[Fiber[Nothing, Unit]] = {
val maxIdleTime = entityMaxIdleTime getOrElse config.entityMaxIdleTime

def sleep(duration: Duration): UIO[Unit] = for {
_ <- Clock.sleep(duration)
cdt <- currentTimeInMilliseconds
map <- entities.get
lastReceivedAt = map.get(entityId).map { case (_, lastReceivedAt) => lastReceivedAt }.getOrElse(0L)
remaining = maxIdleTime minus Duration.fromMillis(cdt - lastReceivedAt)
_ <- sleep(remaining).when(remaining > Duration.Zero)
} yield ()

(for {
_ <- Clock.sleep(entityMaxIdleTime getOrElse config.entityMaxIdleTime)
_ <- sleep(maxIdleTime)
_ <- terminateEntity(entityId).forkDaemon.unit // fork daemon otherwise it will interrupt itself
} yield ()).interruptible.forkDaemon
}

private def terminateEntity(entityId: String): UIO[Unit] =
entities.updateZIO(map =>
map.get(entityId) match {
case Some((Left(queue), interruptionFiber)) =>
case Some((Left(queue), lastReceivedAt)) =>
Promise
.make[Nothing, Unit]
.flatMap { p =>
terminateMessage(p) match {
case Some(msg) =>
// if a queue is found, offer the termination message, and set the queue to None so that no new message is enqueued
queue.offer(msg).exit.as(map.updated(entityId, (Right(p), interruptionFiber)))
queue.offer(msg).exit.as(map.updated(entityId, (Right(p), lastReceivedAt)))
case None =>
queue.shutdown.as(map - entityId)
}
}
case _ =>
case _ =>
// if no queue is found, do nothing
ZIO.succeed(map)
}
Expand All @@ -93,14 +110,13 @@ private[shardcake] object EntityManager {
// find the queue for that entity, or create it if needed
queue <- entities.modifyZIO(map =>
map.get(entityId) match {
case Some((queue @ Left(_), expirationFiber)) =>
case Some((queue @ Left(_), _)) =>
// queue exists, delay the interruption fiber and return the queue
expirationFiber.interrupt *>
startExpirationFiber(entityId).map(fiber => (queue, map.updated(entityId, (queue, fiber))))
case Some((p @ Right(_), _)) =>
currentTimeInMilliseconds.map(cdt => (queue, map.updated(entityId, (queue, cdt))))
case Some((p @ Right(_), _)) =>
// the queue is shutting down, stash and retry
ZIO.succeed((p, map))
case None =>
case None =>
sharding.isShuttingDown.flatMap {
case true =>
// don't start any fiber while sharding is shutting down
Expand All @@ -117,8 +133,9 @@ private[shardcake] object EntityManager {
entities.update(_ - entityId) *> queue.shutdown *> expirationFiber.interrupt
)
.forkDaemon
cdt <- currentTimeInMilliseconds
leftQueue = Left(queue)
} yield (leftQueue, map.updated(entityId, (leftQueue, expirationFiber)))
} yield (leftQueue, map.updated(entityId, (leftQueue, cdt)))
}
}
)
Expand Down Expand Up @@ -146,7 +163,7 @@ private[shardcake] object EntityManager {
entities.getAndSet(Map()).flatMap(terminateEntities)

private def terminateEntities(
entitiesToTerminate: Map[String, (Either[Queue[Req], Signal], Fiber[Nothing, Unit])]
entitiesToTerminate: Map[String, (Either[Queue[Req], Signal], EpochMillis)]
): UIO[Unit] =
for {
// send termination message to all entities
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,22 @@ object ShardingSpec extends ZIOSpecDefault {
} yield assertTrue(c0 == 1, c1 == 0)
}
},
test("Entity termination with extension") {
ZIO.scoped {
for {
_ <- Sharding.registerEntity(Counter, behavior, entityMaxIdleTime = Some(3.seconds))
_ <- Sharding.registerScoped
counter <- Sharding.messenger(Counter)
_ <- counter.sendDiscard("c3")(IncrementCounter)
c0 <- counter.send("c3")(GetCounter.apply)
_ <- Clock.sleep(2 seconds)
_ <- counter.sendDiscard("c3")(IncrementCounter)
c1 <- counter.send("c3")(GetCounter.apply)
_ <- Clock.sleep(4 seconds)
c2 <- counter.send("c3")(GetCounter.apply) // counter should be restarted
} yield assertTrue(c0 == 1, c1 == 2, c2 == 0)
}
},
test("Cluster singleton") {
ZIO.scoped {
for {
Expand Down

0 comments on commit 911a90f

Please sign in to comment.