Skip to content

Commit

Permalink
[ISSUE #2172]🤡Implement name server graceful shutdown⚡️ (#2173)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Jan 8, 2025
1 parent 1db0079 commit d9207d4
Showing 1 changed file with 29 additions and 12 deletions.
41 changes: 29 additions & 12 deletions rocketmq-namesrv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,26 +49,31 @@ pub struct Builder {
}

struct NameServerRuntime {
/*name_server_config: ArcMut<NamesrvConfig>,
tokio_client_config: Arc<TokioClientConfig>,
server_config: Arc<ServerConfig>,
route_info_manager: RouteInfoManager,
kvconfig_manager: KVConfigManager,
name_server_runtime: Option<RocketMQRuntime>,
remoting_client: ArcMut<RocketmqDefaultClient>,*/
name_server_runtime: Option<RocketMQRuntime>,
inner: ArcMut<NameServerRuntimeInner>,
// receiver for shutdown signal
shutdown_rx: Option<tokio::sync::broadcast::Receiver<()>>,
}

impl NameServerBootstrap {
pub async fn boot(mut self) {
/*select! {
_ = self.name_server_runtime.start() =>{
let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
self.name_server_runtime.shutdown_rx = Some(shutdown_rx);
tokio::join!(
self.name_server_runtime.start(),
wait_for_signal_inner(shutdown_tx)
);
}
}

}
}*/
tokio::join!(self.name_server_runtime.start(), wait_for_signal());
async fn wait_for_signal_inner(shutdown_tx: broadcast::Sender<()>) {
tokio::select! {
_ = wait_for_signal() => {
info!("Received signal, initiating shutdown...");
}
}
// Send shutdown signal to all tasks
let _ = shutdown_tx.send(());
}

impl NameServerRuntime {
Expand All @@ -92,6 +97,17 @@ impl NameServerRuntime {
.await;
self.inner.remoting_client.start(weak_arc_mut).await;
info!("Rocketmq NameServer(Rust) started");

tokio::select! {
_ = self.shutdown_rx.as_mut().unwrap().recv() => {
info!("Shutdown received, initiating graceful shutdown...");
self.shutdown();
}
}
}

fn shutdown(&mut self) {
info!("Rocketmq NameServer(Rust) gracefully shutdown completed");
}

fn init_processors(
Expand Down Expand Up @@ -186,6 +202,7 @@ impl Builder {
name_server_runtime: NameServerRuntime {
name_server_runtime: Some(runtime),
inner,
shutdown_rx: None,
},
}
}
Expand Down

0 comments on commit d9207d4

Please sign in to comment.