diff --git a/manager/src/main/scala/com/devsisters/shardcake/ShardManager.scala b/manager/src/main/scala/com/devsisters/shardcake/ShardManager.scala index 6d13b1f3..6ccc7ee9 100644 --- a/manager/src/main/scala/com/devsisters/shardcake/ShardManager.scala +++ b/manager/src/main/scala/com/devsisters/shardcake/ShardManager.scala @@ -50,7 +50,7 @@ class ShardManager( ManagerMetrics.podHealthChecked.tagged("pod_address", podAddress.toString).increment *> eventsHub.publish(ShardingEvent.PodHealthChecked(podAddress)) *> ZIO.unlessZIO(healthApi.isAlive(podAddress))( - ZIO.logWarning(s"$podAddress is not alive, unregistering") *> unregister(podAddress) + ZIO.logWarning(s"Pod $podAddress is not alive, unregistering") *> unregister(podAddress) ) } .unit @@ -196,52 +196,96 @@ object ShardManager { * A layer that starts the Shard Manager process */ val live: ZLayer[PodsHealth with Pods with Storage with ManagerConfig, Throwable, ShardManager] = - ZLayer { + ZLayer.scoped { for { - config <- ZIO.service[ManagerConfig] - stateRepository <- ZIO.service[Storage] - healthApi <- ZIO.service[PodsHealth] - podApi <- ZIO.service[Pods] - pods <- stateRepository.getPods - assignments <- stateRepository.getAssignments + config <- ZIO.service[ManagerConfig] + stateRepository <- ZIO.service[Storage] + healthApi <- ZIO.service[PodsHealth] + podApi <- ZIO.service[Pods] + pods <- stateRepository.getPods + assignments <- stateRepository.getAssignments // remove unhealthy pods on startup - filteredPods <- - ZIO.filterPar(pods.toList) { case (podAddress, _) => healthApi.isAlive(podAddress) }.map(_.toMap) - filteredAssignments = assignments.collect { - case assignment @ (_, Some(pod)) if filteredPods.contains(pod) => assignment - } - cdt <- ZIO.succeed(OffsetDateTime.now()) - initialState = ShardManagerState( - filteredPods.map { case (k, v) => k -> PodWithMetadata(v, cdt) }, - (1 to config.numberOfShards).map(_ -> None).toMap ++ filteredAssignments - ) - _ <- ManagerMetrics.pods.incrementBy(initialState.pods.size) - _ <- ZIO.foreachDiscard(initialState.shards) { case (_, podAddressOpt) => - podAddressOpt match { - case Some(podAddress) => - ManagerMetrics.assignedShards.tagged("pod_address", podAddress.toString).increment - case None => - ManagerMetrics.unassignedShards.increment - } - } - state <- Ref.Synchronized.make(initialState) - rebalanceSemaphore <- Semaphore.make(1) - eventsHub <- Hub.unbounded[ShardingEvent] - shardManager = + failedFilteredPods <- + ZIO.partitionPar(pods) { addrPod => + ZIO.ifZIO(healthApi.isAlive(addrPod._1))(ZIO.succeed(addrPod), ZIO.fail(addrPod._2)) + } + (failedPods, filtered) = failedFilteredPods + _ <- ZIO.when(failedPods.nonEmpty)( + ZIO.logInfo(s"Ignoring pods that are no longer alive ${failedPods.mkString("[", ", ", "]")}") + ) + filteredPods = filtered.toMap + failedFilteredAssignments = partitionMap(assignments) { + case assignment @ (_, Some(address)) if filteredPods.contains(address) => + Right(assignment) + case assignment => Left(assignment) + } + (failed, filteredAssignments) = failedFilteredAssignments + failedAssignments = failed.collect { case (shard, Some(addr)) => shard -> addr } + _ <- ZIO.when(failedAssignments.nonEmpty)( + ZIO.logWarning( + s"Ignoring assignments for pods that are no longer alive ${failedAssignments.mkString("[", ", ", "]")}" + ) + ) + cdt <- ZIO.succeed(OffsetDateTime.now()) + initialState = ShardManagerState( + filteredPods.map { case (k, v) => k -> PodWithMetadata(v, cdt) }, + (1 to config.numberOfShards).map(_ -> None).toMap ++ filteredAssignments + ) + _ <- + ZIO.logInfo( + s"Recovered pods ${filteredPods + .mkString("[", ", ", "]")} and assignments ${filteredAssignments.view.flatMap(_._2).mkString("[", ", ", "]")}" + ) + _ <- ManagerMetrics.pods.incrementBy(initialState.pods.size) + _ <- ZIO.foreachDiscard(initialState.shards) { case (_, podAddressOpt) => + podAddressOpt match { + case Some(podAddress) => + ManagerMetrics.assignedShards.tagged("pod_address", podAddress.toString).increment + case None => + ManagerMetrics.unassignedShards.increment + } + } + state <- Ref.Synchronized.make(initialState) + rebalanceSemaphore <- Semaphore.make(1) + eventsHub <- Hub.unbounded[ShardingEvent] + shardManager = new ShardManager(state, rebalanceSemaphore, eventsHub, healthApi, podApi, stateRepository, config) - _ <- shardManager.persistPods.forkDaemon + _ <- ZIO.addFinalizer { + shardManager.persistAssignments.catchAllCause(cause => + ZIO.logWarningCause("Failed to persist assignments on shutdown", cause) + ) *> + shardManager.persistPods.catchAllCause(cause => + ZIO.logWarningCause("Failed to persist pods on shutdown", cause) + ) + } + _ <- shardManager.persistPods.forkDaemon // rebalance immediately if there are unassigned shards - _ <- shardManager.rebalance(rebalanceImmediately = initialState.unassignedShards.nonEmpty).forkDaemon + _ <- shardManager.rebalance(rebalanceImmediately = initialState.unassignedShards.nonEmpty).forkDaemon // start a regular rebalance at the given interval - _ <- shardManager - .rebalance(rebalanceImmediately = false) - .repeat(Schedule.spaced(config.rebalanceInterval)) - .forkDaemon - _ <- shardManager.getShardingEvents.mapZIO(event => ZIO.logInfo(event.toString)).runDrain.forkDaemon - _ <- ZIO.logInfo("Shard Manager loaded") + _ <- shardManager + .rebalance(rebalanceImmediately = false) + .repeat(Schedule.spaced(config.rebalanceInterval)) + .forkDaemon + _ <- shardManager.getShardingEvents.mapZIO(event => ZIO.logInfo(event.toString)).runDrain.forkDaemon + _ <- ZIO.logInfo("Shard Manager loaded") } yield shardManager } + // reimplement Map.partitionMap because it does not exist in 2.12 + private def partitionMap[K, V, VL <: V, VR <: V](map: Map[K, V])(partition: ((K, V)) => Either[(K, VL), (K, VR)]) = { + val left = Map.newBuilder[K, VL] + val right = Map.newBuilder[K, VR] + + map.iterator.foreach { kv => + partition(kv) match { + case Left(kvl) => left += kvl + case Right(kvr) => right += kvr + } + } + + (left.result(), right.result()) + } + implicit def listOrder[A](implicit ev: Ordering[A]): Ordering[List[A]] = (xs: List[A], ys: List[A]) => { @tailrec def loop(xs: List[A], ys: List[A]): Int = xs match { diff --git a/manager/src/test/scala/com/devsisters/shardcake/ShardManagerSpec.scala b/manager/src/test/scala/com/devsisters/shardcake/ShardManagerSpec.scala index 69456ac5..00c6e43c 100644 --- a/manager/src/test/scala/com/devsisters/shardcake/ShardManagerSpec.scala +++ b/manager/src/test/scala/com/devsisters/shardcake/ShardManagerSpec.scala @@ -173,6 +173,32 @@ object ShardManagerSpec extends ZIOSpecDefault { ) } yield assert1 && assert2).provide(shardManager) + }, + test("Simulate temporary storage restart followed by manager restart") { + { + val setup = (for { + _ <- simulate((1 to 10).toList.map(i => SimulationEvent.PodRegister(Pod(PodAddress("server", i), "1")))) + _ <- TestClock.adjust(10 minutes) + // busy wait for the forked daemon fibers to do their job + _ <- ZIO.iterate(Map.empty[ShardId, Option[PodAddress]])(_.isEmpty)(_ => + ZIO.serviceWithZIO[Storage](_.getAssignments) + ) + _ <- ZIO.iterate(Map.empty[PodAddress, Pod])(_.isEmpty)(_ => ZIO.serviceWithZIO[Storage](_.getPods)) + // simulate non-persistent storage restart + _ <- ZIO.serviceWithZIO[Storage](s => s.saveAssignments(Map.empty) *> s.savePods(Map.empty)) + } yield {}).provideSome[Storage]( + ZLayer.makeSome[Storage, ShardManager](config, Pods.noop, PodsHealth.local, ShardManager.live) + ) + + val test = for { + shutdownAssignments <- ZIO.serviceWithZIO[Storage](_.getAssignments) + shutdownPods <- ZIO.serviceWithZIO[Storage](_.getPods) + } yield + // manager should have saved its state to storage when it shut down + assertTrue(shutdownAssignments.nonEmpty && shutdownPods.nonEmpty) + + setup *> test + }.provide(Storage.memory) } ) )