From 41492f7688f22e8a43c7454364b4caa4345b4910 Mon Sep 17 00:00:00 2001 From: meship-starkware Date: Tue, 16 Jul 2024 10:51:07 +0300 Subject: [PATCH] fix(concurrency): stopping the program if one of the threads panics --- .../src/blockifier/transaction_executor.rs | 23 ++++++++++++++++++- .../blockifier/src/concurrency/scheduler.rs | 6 ++++- crates/blockifier/src/concurrency/utils.rs | 17 ++++++++++++++ 3 files changed, 44 insertions(+), 2 deletions(-) diff --git a/crates/blockifier/src/blockifier/transaction_executor.rs b/crates/blockifier/src/blockifier/transaction_executor.rs index 7e2941fc79..2d7bfa857d 100644 --- a/crates/blockifier/src/blockifier/transaction_executor.rs +++ b/crates/blockifier/src/blockifier/transaction_executor.rs @@ -1,6 +1,8 @@ #[cfg(feature = "concurrency")] use std::collections::{HashMap, HashSet}; #[cfg(feature = "concurrency")] +use std::panic::{self, catch_unwind, AssertUnwindSafe}; +#[cfg(feature = "concurrency")] use std::sync::Arc; #[cfg(feature = "concurrency")] use std::sync::Mutex; @@ -218,6 +220,8 @@ impl TransactionExecutor { &mut self, chunk: &[Transaction], ) -> Vec> { + use crate::concurrency::utils::AbortIfPanic; + let block_state = self.block_state.take().expect("The block state should be `Some`."); let worker_executor = Arc::new(WorkerExecutor::initialize( @@ -236,7 +240,24 @@ impl TransactionExecutor { for _ in 0..self.config.concurrency_config.n_workers { let worker_executor = Arc::clone(&worker_executor); s.spawn(move || { - worker_executor.run(); + // Making sure that the program will abort if a panic accured while halting the + // scheduler. + let abort_guard = AbortIfPanic; + // If a panic is not handled or the handling logic itself panics, then we abort + // the program. + if let Err(err) = catch_unwind(AssertUnwindSafe(|| { + worker_executor.run(); + })) { + // If the program panics here, the abort guard will exit the program. + // In this case, no panic message will be logged. Add the cargo flag + // --nocapture to log the panic message. + + worker_executor.scheduler.halt(); + abort_guard.release(); + panic::resume_unwind(err); + } + + abort_guard.release(); }); } }); diff --git a/crates/blockifier/src/concurrency/scheduler.rs b/crates/blockifier/src/concurrency/scheduler.rs index 172918b365..b8301d2e59 100644 --- a/crates/blockifier/src/concurrency/scheduler.rs +++ b/crates/blockifier/src/concurrency/scheduler.rs @@ -52,7 +52,7 @@ impl<'a> TransactionCommitter<'a> { assert!(*self.commit_index_guard > 0, "Commit index underflow."); *self.commit_index_guard -= 1; - self.scheduler.done_marker.store(true, Ordering::Release); + self.scheduler.halt(); } } @@ -161,6 +161,10 @@ impl Scheduler { *self.commit_index.lock().unwrap() } + pub fn halt(&self) { + self.done_marker.store(true, Ordering::Release); + } + fn lock_tx_status(&self, tx_index: TxIndex) -> MutexGuard<'_, TransactionStatus> { lock_mutex_in_array(&self.tx_statuses, tx_index) } diff --git a/crates/blockifier/src/concurrency/utils.rs b/crates/blockifier/src/concurrency/utils.rs index 4ca2b2eb17..897f5f8050 100644 --- a/crates/blockifier/src/concurrency/utils.rs +++ b/crates/blockifier/src/concurrency/utils.rs @@ -3,6 +3,23 @@ use std::sync::{Mutex, MutexGuard}; use crate::concurrency::TxIndex; +// This struct is used to abort the program if a panic occurred in a place where it could not be +// handled. +pub struct AbortIfPanic; + +impl Drop for AbortIfPanic { + fn drop(&mut self) { + eprintln!("detected unexpected panic; aborting"); + ::std::process::abort(); + } +} + +impl AbortIfPanic { + pub fn release(self) { + std::mem::forget(self); + } +} + pub fn lock_mutex_in_array(array: &[Mutex], tx_index: TxIndex) -> MutexGuard<'_, T> { array[tx_index].lock().unwrap_or_else(|error| { panic!("Cell of transaction index {} is poisoned. Data: {:?}.", tx_index, *error.get_ref())