Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent interrupting/recreating the expiration fiber too often #96

Merged
merged 3 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/scala/com/devsisters/shardcake/package.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.devsisters

package object shardcake {
type ShardId = Int
type ShardId = Int
type EpochMillis = Long
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import com.devsisters.shardcake.errors.EntityNotManagedByThisPod
import com.devsisters.shardcake._
import zio.{ Config => _, _ }

import java.util.concurrent.TimeUnit

private[shardcake] trait EntityManager[-Req] {
def send(
entityId: String,
Expand All @@ -27,7 +29,7 @@ private[shardcake] object EntityManager {
entityMaxIdleTime: Option[Duration]
): URIO[R, EntityManager[Req]] =
for {
entities <- Ref.Synchronized.make[Map[String, (Either[Queue[Req], Signal], Fiber[Nothing, Unit])]](Map())
entities <- Ref.Synchronized.make[Map[String, (Either[Queue[Req], Signal], EpochMillis)]](Map())
env <- ZIO.environment[R]
} yield new EntityManagerLive[Req](
recipientType,
Expand All @@ -39,37 +41,52 @@ private[shardcake] object EntityManager {
entityMaxIdleTime
)

private def currentTimeInMilliseconds: UIO[EpochMillis] =
Clock.currentTime(TimeUnit.MILLISECONDS)

class EntityManagerLive[Req](
recipientType: RecipientType[Req],
behavior: (String, Queue[Req]) => Task[Nothing],
terminateMessage: Signal => Option[Req],
entities: Ref.Synchronized[Map[String, (Either[Queue[Req], Signal], Fiber[Nothing, Unit])]],
entities: Ref.Synchronized[Map[String, (Either[Queue[Req], Signal], EpochMillis)]],
sharding: Sharding,
config: Config,
entityMaxIdleTime: Option[Duration]
) extends EntityManager[Req] {
private def startExpirationFiber(entityId: String): UIO[Fiber[Nothing, Unit]] =
private def startExpirationFiber(entityId: String): UIO[Fiber[Nothing, Unit]] = {
val maxIdleTime = entityMaxIdleTime getOrElse config.entityMaxIdleTime

def sleep(duration: Duration): UIO[Unit] = for {
_ <- Clock.sleep(duration)
cdt <- currentTimeInMilliseconds
map <- entities.get
lastReceivedAt = map.get(entityId).map { case (_, lastReceivedAt) => lastReceivedAt }.getOrElse(0L)
remaining = maxIdleTime minus Duration.fromMillis(cdt - lastReceivedAt)
_ <- sleep(remaining).when(remaining > Duration.Zero)
} yield ()

(for {
_ <- Clock.sleep(entityMaxIdleTime getOrElse config.entityMaxIdleTime)
_ <- sleep(maxIdleTime)
_ <- terminateEntity(entityId).forkDaemon.unit // fork daemon otherwise it will interrupt itself
} yield ()).interruptible.forkDaemon
}

private def terminateEntity(entityId: String): UIO[Unit] =
entities.updateZIO(map =>
map.get(entityId) match {
case Some((Left(queue), interruptionFiber)) =>
case Some((Left(queue), lastReceivedAt)) =>
Promise
.make[Nothing, Unit]
.flatMap { p =>
terminateMessage(p) match {
case Some(msg) =>
// if a queue is found, offer the termination message, and set the queue to None so that no new message is enqueued
queue.offer(msg).exit.as(map.updated(entityId, (Right(p), interruptionFiber)))
queue.offer(msg).exit.as(map.updated(entityId, (Right(p), lastReceivedAt)))
case None =>
queue.shutdown.as(map - entityId)
}
}
case _ =>
case _ =>
// if no queue is found, do nothing
ZIO.succeed(map)
}
Expand All @@ -93,14 +110,13 @@ private[shardcake] object EntityManager {
// find the queue for that entity, or create it if needed
queue <- entities.modifyZIO(map =>
map.get(entityId) match {
case Some((queue @ Left(_), expirationFiber)) =>
case Some((queue @ Left(_), _)) =>
// queue exists, delay the interruption fiber and return the queue
expirationFiber.interrupt *>
startExpirationFiber(entityId).map(fiber => (queue, map.updated(entityId, (queue, fiber))))
case Some((p @ Right(_), _)) =>
currentTimeInMilliseconds.map(cdt => (queue, map.updated(entityId, (queue, cdt))))
case Some((p @ Right(_), _)) =>
// the queue is shutting down, stash and retry
ZIO.succeed((p, map))
case None =>
case None =>
sharding.isShuttingDown.flatMap {
case true =>
// don't start any fiber while sharding is shutting down
Expand All @@ -117,8 +133,9 @@ private[shardcake] object EntityManager {
entities.update(_ - entityId) *> queue.shutdown *> expirationFiber.interrupt
)
.forkDaemon
cdt <- currentTimeInMilliseconds
leftQueue = Left(queue)
} yield (leftQueue, map.updated(entityId, (leftQueue, expirationFiber)))
} yield (leftQueue, map.updated(entityId, (leftQueue, cdt)))
}
}
)
Expand Down Expand Up @@ -146,7 +163,7 @@ private[shardcake] object EntityManager {
entities.getAndSet(Map()).flatMap(terminateEntities)

private def terminateEntities(
entitiesToTerminate: Map[String, (Either[Queue[Req], Signal], Fiber[Nothing, Unit])]
entitiesToTerminate: Map[String, (Either[Queue[Req], Signal], EpochMillis)]
): UIO[Unit] =
for {
// send termination message to all entities
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,22 @@ object ShardingSpec extends ZIOSpecDefault {
} yield assertTrue(c0 == 1, c1 == 0)
}
},
test("Entity termination with extension") {
ZIO.scoped {
for {
_ <- Sharding.registerEntity(Counter, behavior, entityMaxIdleTime = Some(3.seconds))
_ <- Sharding.registerScoped
counter <- Sharding.messenger(Counter)
_ <- counter.sendDiscard("c3")(IncrementCounter)
c0 <- counter.send("c3")(GetCounter.apply)
_ <- Clock.sleep(2 seconds)
_ <- counter.sendDiscard("c3")(IncrementCounter)
c1 <- counter.send("c3")(GetCounter.apply)
_ <- Clock.sleep(4 seconds)
c2 <- counter.send("c3")(GetCounter.apply) // counter should be restarted
} yield assertTrue(c0 == 1, c1 == 2, c2 == 0)
}
},
test("Cluster singleton") {
ZIO.scoped {
for {
Expand Down
Loading