diff --git a/src/system/system.rs b/src/system/system.rs index d00f019c..f7b736b1 100644 --- a/src/system/system.rs +++ b/src/system/system.rs @@ -317,6 +317,22 @@ impl ActorSystem { .create_actor(props, name, &self.sys_root(), self) } + /// Returns a future which is completed when all + /// actors have successfully stopped. + /// + /// Note! In the current implementation the future will not complete if + /// root actor is already terminated. + pub fn when_terminated(&self) -> Terminated { + let (tx, rx) = oneshot::channel::<()>(); + let tx = Arc::new(Mutex::new(Some(tx))); + + let props = Props::new_args(WhenTerminatedActor::new, tx); + + self.tmp_actor_of(props).unwrap(); + + rx + } + /// Shutdown the actor system /// /// Attempts a graceful shutdown of the system and all actors. @@ -325,13 +341,20 @@ impl ActorSystem { /// Does not block. Returns a future which is completed when all /// actors have successfully stopped. pub fn shutdown(&self) -> Shutdown { - let (tx, rx) = oneshot::channel::<()>(); - let tx = Arc::new(Mutex::new(Some(tx))); + let receiver = self.when_terminated(); - let props = Props::new_args(ShutdownActor::new, tx); - self.tmp_actor_of(props).unwrap(); + // todo this is prone to failing since there is no + // confirmation that ShutdownActor has subscribed to + // the ActorTerminated events yet. + // It may be that the user root actor is Sterminated + // before the subscription is complete. - rx + // std::thread::sleep_ms(1000); + // send stop to all /user children + + self.stop(self.user_root()); + + receiver } } @@ -611,19 +634,20 @@ pub struct SysChannels { } pub type Shutdown = oneshot::Receiver<()>; +pub type Terminated = oneshot::Receiver<()>; #[derive(Clone)] -struct ShutdownActor { +struct WhenTerminatedActor { tx: Arc>>>, } -impl ShutdownActor { +impl WhenTerminatedActor { fn new(tx: Arc>>>) -> Self { - ShutdownActor { tx } + WhenTerminatedActor { tx } } } -impl Actor for ShutdownActor { +impl Actor for WhenTerminatedActor { type Msg = SystemEvent; fn pre_start(&mut self, ctx: &Context) { @@ -632,16 +656,6 @@ impl Actor for ShutdownActor { actor: Box::new(ctx.myself.clone()), }; ctx.system.sys_events().tell(sub, None); - - // todo this is prone to failing since there is no - // confirmation that ShutdownActor has subscribed to - // the ActorTerminated events yet. - // It may be that the user root actor is Sterminated - // before the subscription is complete. - - // std::thread::sleep_ms(1000); - // send stop to all /user children - ctx.system.stop(ctx.system.user_root()); } fn sys_recv( @@ -660,7 +674,7 @@ impl Actor for ShutdownActor { fn recv(&mut self, _: &Context, _: Self::Msg, _: Option) {} } -impl Receive for ShutdownActor { +impl Receive for WhenTerminatedActor { type Msg = SystemEvent; fn receive( diff --git a/tests/system.rs b/tests/system.rs index b5abdc3c..93fdf1a5 100644 --- a/tests/system.rs +++ b/tests/system.rs @@ -38,6 +38,21 @@ impl Actor for ShutdownTest { fn recv(&mut self, _: &Context, _: Self::Msg, _: Sender) {} } +#[test] +#[allow(dead_code)] +fn system_when_shutdown() { + let sys = ActorSystem::new().unwrap(); + + let t = sys.when_terminated(); + + std::thread::spawn(move || { + let s = sys.shutdown(); + block_on(s).unwrap(); + }); + + block_on(t).unwrap(); +} + #[test] #[allow(dead_code)] fn system_shutdown() {