diff --git a/rocketmq-namesrv/src/bootstrap.rs b/rocketmq-namesrv/src/bootstrap.rs index 2f81444a..81d4cc51 100644 --- a/rocketmq-namesrv/src/bootstrap.rs +++ b/rocketmq-namesrv/src/bootstrap.rs @@ -49,26 +49,31 @@ pub struct Builder { } struct NameServerRuntime { - /*name_server_config: ArcMut, - tokio_client_config: Arc, - server_config: Arc, - route_info_manager: RouteInfoManager, - kvconfig_manager: KVConfigManager, - name_server_runtime: Option, - remoting_client: ArcMut,*/ name_server_runtime: Option, inner: ArcMut, + // receiver for shutdown signal + shutdown_rx: Option>, } impl NameServerBootstrap { pub async fn boot(mut self) { - /*select! { - _ = self.name_server_runtime.start() =>{ + let (shutdown_tx, shutdown_rx) = broadcast::channel(1); + self.name_server_runtime.shutdown_rx = Some(shutdown_rx); + tokio::join!( + self.name_server_runtime.start(), + wait_for_signal_inner(shutdown_tx) + ); + } +} - } - }*/ - tokio::join!(self.name_server_runtime.start(), wait_for_signal()); +async fn wait_for_signal_inner(shutdown_tx: broadcast::Sender<()>) { + tokio::select! { + _ = wait_for_signal() => { + info!("Received signal, initiating shutdown..."); + } } + // Send shutdown signal to all tasks + let _ = shutdown_tx.send(()); } impl NameServerRuntime { @@ -92,6 +97,17 @@ impl NameServerRuntime { .await; self.inner.remoting_client.start(weak_arc_mut).await; info!("Rocketmq NameServer(Rust) started"); + + tokio::select! { + _ = self.shutdown_rx.as_mut().unwrap().recv() => { + info!("Shutdown received, initiating graceful shutdown..."); + self.shutdown(); + } + } + } + + fn shutdown(&mut self) { + info!("Rocketmq NameServer(Rust) gracefully shutdown completed"); } fn init_processors( @@ -186,6 +202,7 @@ impl Builder { name_server_runtime: NameServerRuntime { name_server_runtime: Some(runtime), inner, + shutdown_rx: None, }, } }