Skip to content

Commit

Permalink
Add graceful shutdown to worker
Browse files Browse the repository at this point in the history
  • Loading branch information
julianbraha authored and patrick-bcw committed Jun 21, 2024
1 parent 2789b5c commit 3dd6b16
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 5 deletions.
1 change: 1 addition & 0 deletions worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ tracing = { workspace = true }
tracing-subscriber = { workspace = true }
clap = { workspace = true }
tokio = { workspace = true }
uuid = "1.8.0"

# Local dependencies
ops = { path = "../ops" }
Expand Down
42 changes: 37 additions & 5 deletions worker/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
use std::sync::Arc;

use anyhow::Result;
use clap::Parser;
use common::prover_state::cli::CliProverStateConfig;
use dotenvy::dotenv;
use log::{error, info};
use ops::register;
use paladin::runtime::WorkerRuntime;
use paladin::runtime::{WorkerIpc, WorkerRuntime};
use tokio::select;
use tokio::signal::unix::{signal, SignalKind};
use tokio::task;
use uuid::Uuid;

mod init;

Expand All @@ -21,18 +27,44 @@ async fn main() -> Result<()> {
dotenv().ok();
init::tracing();

let mut sigterm =
signal(SignalKind::terminate()).expect("Failed to create SIGTERM signal handler");

let args = Cli::parse();

args.prover_state_config
.into_prover_state_manager()
.initialize()?;

let runtime = WorkerRuntime::from_config(&args.paladin, register()).await?;
let runtime = Arc::new(WorkerRuntime::from_config(&args.paladin, register()).await?);
const IPC_ROUTING_KEY: Uuid = Uuid::max(); // copied over from paladin-core's WorkerRuntime code in src/runtime/mod.rs
let cancel_message: WorkerIpc = WorkerIpc::ExecutionError {
routing_key: IPC_ROUTING_KEY,
};
let runtime_clone = runtime.clone();
let sigterm_task = task::spawn(async move {
let runtime_canceler = runtime_clone.get_ipc_sender().await.unwrap();
sigterm.recv().await;
info!("Received SIGTERM, terminating...");
runtime_canceler.publish(&cancel_message).await.unwrap();
});
let runtime_task = task::spawn(async move {
match runtime.main_loop().await {
Ok(()) => info!("Worker main loop ended..."),
Err(err) => error!("Error occured with the runtime: {}", err),
}
});

match runtime.main_loop().await {
Ok(()) => info!("Worker main loop ended..."),
Err(err) => error!("Error occured with the runtime: {}", err),
info!("starting the main loop");
select! {
_ = sigterm_task => {
info!("Graceful shutdown attempted...");
},
_ = runtime_task => {
info!("Runtime ended without SIGTERM...");
}
}
info!("Graceful shutdown worked!");

Ok(())
}

0 comments on commit 3dd6b16

Please sign in to comment.