Skip to content

Commit

Permalink
Prevent a potential race condition that can cause self assignments to…
Browse files Browse the repository at this point in the history
… be replaced
  • Loading branch information
ghostdogpr committed Oct 30, 2024
1 parent c0c00bf commit ca5f6f0
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions entities/src/main/scala/com/devsisters/shardcake/Sharding.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,14 @@ class Sharding private (

private def updateAssignments(
assignmentsOpt: Map[ShardId, Option[PodAddress]],
fromShardManager: Boolean
replaceAllAssignments: Boolean
): UIO[Unit] = {
val assignments = assignmentsOpt.flatMap { case (k, v) => v.map(k -> _) }
ZIO.logDebug("Received new shard assignments") *>
Metrics.shards
.set(assignmentsOpt.count { case (_, podOpt) => podOpt.contains(address) })
.when(fromShardManager) *>
(if (fromShardManager) shardAssignments.set(assignments)
.when(replaceAllAssignments) *>
(if (replaceAllAssignments) shardAssignments.set(assignments)
else
shardAssignments.update(map =>
// we keep self assignments (we don't override them with the new assignments
Expand All @@ -166,8 +166,8 @@ class Sharding private (
) ++
// then, get assignments changes from Redis
storage.assignmentsStream.map(_ -> false)
_ <- assignmentStream.mapZIO { case (assignmentsOpt, fromShardManager) =>
updateAssignments(assignmentsOpt, fromShardManager) *> latch.succeed(()).when(fromShardManager)
_ <- assignmentStream.mapZIO { case (assignmentsOpt, replaceAllAssignments) =>
updateAssignments(assignmentsOpt, replaceAllAssignments) *> latch.succeed(()).when(replaceAllAssignments)
}.runDrain
.retry(Schedule.fixed(config.refreshAssignmentsRetryInterval))
.interruptible
Expand Down Expand Up @@ -254,7 +254,7 @@ class Sharding private (
(shardManager.notifyUnhealthyPod(pod) *>
// just in case we missed the update from the pubsub, refresh assignments
shardManager.getAssignments
.flatMap[Any, Throwable, Unit](updateAssignments(_, fromShardManager = true))).forkDaemon
.flatMap[Any, Throwable, Unit](updateAssignments(_, replaceAllAssignments = false))).forkDaemon
)
}

Expand Down

0 comments on commit ca5f6f0

Please sign in to comment.