From 3dd6b16c386ff89a679fbe902cca86717c14f830 Mon Sep 17 00:00:00 2001 From: Julian Braha Date: Thu, 20 Jun 2024 18:14:40 +0100 Subject: [PATCH] Add graceful shutdown to worker --- worker/Cargo.toml | 1 + worker/src/main.rs | 42 +++++++++++++++++++++++++++++++++++++----- 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/worker/Cargo.toml b/worker/Cargo.toml index 8ad755b0..2dda8cf6 100644 --- a/worker/Cargo.toml +++ b/worker/Cargo.toml @@ -16,6 +16,7 @@ tracing = { workspace = true } tracing-subscriber = { workspace = true } clap = { workspace = true } tokio = { workspace = true } +uuid = "1.8.0" # Local dependencies ops = { path = "../ops" } diff --git a/worker/src/main.rs b/worker/src/main.rs index 86ce0744..1c0868de 100644 --- a/worker/src/main.rs +++ b/worker/src/main.rs @@ -1,10 +1,16 @@ +use std::sync::Arc; + use anyhow::Result; use clap::Parser; use common::prover_state::cli::CliProverStateConfig; use dotenvy::dotenv; use log::{error, info}; use ops::register; -use paladin::runtime::WorkerRuntime; +use paladin::runtime::{WorkerIpc, WorkerRuntime}; +use tokio::select; +use tokio::signal::unix::{signal, SignalKind}; +use tokio::task; +use uuid::Uuid; mod init; @@ -21,18 +27,44 @@ async fn main() -> Result<()> { dotenv().ok(); init::tracing(); + let mut sigterm = + signal(SignalKind::terminate()).expect("Failed to create SIGTERM signal handler"); + let args = Cli::parse(); args.prover_state_config .into_prover_state_manager() .initialize()?; - let runtime = WorkerRuntime::from_config(&args.paladin, register()).await?; + let runtime = Arc::new(WorkerRuntime::from_config(&args.paladin, register()).await?); + const IPC_ROUTING_KEY: Uuid = Uuid::max(); // copied over from paladin-core's WorkerRuntime code in src/runtime/mod.rs + let cancel_message: WorkerIpc = WorkerIpc::ExecutionError { + routing_key: IPC_ROUTING_KEY, + }; + let runtime_clone = runtime.clone(); + let sigterm_task = task::spawn(async move { + let runtime_canceler = runtime_clone.get_ipc_sender().await.unwrap(); + sigterm.recv().await; + info!("Received SIGTERM, terminating..."); + runtime_canceler.publish(&cancel_message).await.unwrap(); + }); + let runtime_task = task::spawn(async move { + match runtime.main_loop().await { + Ok(()) => info!("Worker main loop ended..."), + Err(err) => error!("Error occured with the runtime: {}", err), + } + }); - match runtime.main_loop().await { - Ok(()) => info!("Worker main loop ended..."), - Err(err) => error!("Error occured with the runtime: {}", err), + info!("starting the main loop"); + select! { + _ = sigterm_task => { + info!("Graceful shutdown attempted..."); + }, + _ = runtime_task => { + info!("Runtime ended without SIGTERM..."); + } } + info!("Graceful shutdown worked!"); Ok(()) }