From 01f4ae535ee90ff66e746feadf54c33c14f073da Mon Sep 17 00:00:00 2001 From: meship-starkware Date: Tue, 16 Jul 2024 10:51:07 +0300 Subject: [PATCH] fix(concurrency): stopping the program if one of the threads panics --- .../src/blockifier/transaction_executor.rs | 13 +++++++++++-- crates/blockifier/src/concurrency/scheduler.rs | 6 +++++- crates/blockifier/src/concurrency/worker_logic.rs | 2 +- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/crates/blockifier/src/blockifier/transaction_executor.rs b/crates/blockifier/src/blockifier/transaction_executor.rs index 7e2941fc79..e864981584 100644 --- a/crates/blockifier/src/blockifier/transaction_executor.rs +++ b/crates/blockifier/src/blockifier/transaction_executor.rs @@ -1,6 +1,8 @@ #[cfg(feature = "concurrency")] use std::collections::{HashMap, HashSet}; #[cfg(feature = "concurrency")] +use std::panic::{self, catch_unwind, AssertUnwindSafe}; +#[cfg(feature = "concurrency")] use std::sync::Arc; #[cfg(feature = "concurrency")] use std::sync::Mutex; @@ -233,10 +235,17 @@ impl TransactionExecutor { // TODO(barak, 01/07/2024): Consider using tokio and spawn tasks that will be served by some // upper level tokio thread pool (Runtime in tokio terminology). std::thread::scope(|s| { - for _ in 0..self.config.concurrency_config.n_workers { + for i in 0..self.config.concurrency_config.n_workers { let worker_executor = Arc::clone(&worker_executor); s.spawn(move || { - worker_executor.run(); + let result = catch_unwind(AssertUnwindSafe(|| { + worker_executor.run(); + })); + if let Err(err) = result { + println!("Thread {} caught a panic, propagating it.", i); + worker_executor.scheduler.halt(); + panic::resume_unwind(err); + } }); } }); diff --git a/crates/blockifier/src/concurrency/scheduler.rs b/crates/blockifier/src/concurrency/scheduler.rs index 172918b365..b8301d2e59 100644 --- a/crates/blockifier/src/concurrency/scheduler.rs +++ b/crates/blockifier/src/concurrency/scheduler.rs @@ -52,7 +52,7 @@ impl<'a> TransactionCommitter<'a> { assert!(*self.commit_index_guard > 0, "Commit index underflow."); *self.commit_index_guard -= 1; - self.scheduler.done_marker.store(true, Ordering::Release); + self.scheduler.halt(); } } @@ -161,6 +161,10 @@ impl Scheduler { *self.commit_index.lock().unwrap() } + pub fn halt(&self) { + self.done_marker.store(true, Ordering::Release); + } + fn lock_tx_status(&self, tx_index: TxIndex) -> MutexGuard<'_, TransactionStatus> { lock_mutex_in_array(&self.tx_statuses, tx_index) } diff --git a/crates/blockifier/src/concurrency/worker_logic.rs b/crates/blockifier/src/concurrency/worker_logic.rs index 60d5b19e71..de6753287e 100644 --- a/crates/blockifier/src/concurrency/worker_logic.rs +++ b/crates/blockifier/src/concurrency/worker_logic.rs @@ -118,7 +118,7 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> { fn execute(&self, tx_index: TxIndex) { self.execute_tx(tx_index); - self.scheduler.finish_execution(tx_index) + self.scheduler.finish_execution(tx_index); } fn execute_tx(&self, tx_index: TxIndex) {