diff --git a/common/src/background_thread.rs b/common/src/background_thread.rs index bfe82ab2..462612e0 100644 --- a/common/src/background_thread.rs +++ b/common/src/background_thread.rs @@ -21,6 +21,10 @@ pub enum ReceiverMsg { /// that were received. Finished { frame_stack: FrameStackHandle }, + /// The acquisition was cancelled, as requested + /// by `ControlMsg::CancelAcquisition` + Cancelled, + /// A non-recoverable error occurred, the underlying connection /// to the detector system should re-connect. FatalError { @@ -46,6 +50,9 @@ pub enum ControlMsg { /// Start listening for any acquisitions starting StartAcquisitionPassive, + /// Cancel the currently running acquisition, if any + CancelAcquisition, + /// Detector-specific control message SpecializedControlMsg { msg: CM }, } diff --git a/common/src/generic_connection.rs b/common/src/generic_connection.rs index bb411739..9d365cf8 100644 --- a/common/src/generic_connection.rs +++ b/common/src/generic_connection.rs @@ -59,6 +59,9 @@ pub enum ConnectionError { #[error("operation timed out")] Timeout, + #[error("operation cancelled")] + Cancelled, + #[error("background thread failed to start: {0}")] SpawnFailed(#[from] BackgroundThreadSpawnError), @@ -204,6 +207,10 @@ where ReceiverMsg::FatalError { error } => { log::warn!("adjust_status: fatal error: {error:?}; going back to idle state"); self.status = ConnectionStatus::Idle; + } + ReceiverMsg::Cancelled => { + log::warn!("adjust_status: acquisition cancelled"); + self.status = ConnectionStatus::Idle; } // other => { // trace!("adjust_status: other message: {other:?}"); // } @@ -247,6 +254,12 @@ where where E: std::error::Error + 'static + Send + Sync, { + // if an acquisition is already running, cancel and wait for idle status: + if self.is_running() { + self.cancel()?; + self.wait_for_status(ConnectionStatus::Idle, timeout, &periodic_callback)?; + } + if let Some(timeout) = timeout { self.wait_for_arm_inner(timeout, periodic_callback) } else { @@ -313,6 +326,11 @@ where "ReceiverMsg::Finished in wait_for_arm".to_owned(), )); } + ReceiverMsg::Cancelled => { + return Err(ConnectionError::UnexpectedMessage( + "ReceiverMsg::Cancelled in wait_for_arm".to_owned(), + )); + } } } } @@ -406,6 +424,9 @@ where } => { trace!("wait_for_status: received ReceiverMsg::AcquisitionStart"); } + ReceiverMsg::Cancelled => { + trace!("wait_for_status: received ReceiverMsg::Cancelled"); + } } if self.status == desired_status { debug!("wait_for_status: successfully got status {desired_status:?}"); @@ -492,6 +513,9 @@ where self.stats.count_stats_item(&frame_stack); return Ok(Some(frame_stack)); } + ReceiverMsg::Cancelled => { + return Err(ConnectionError::Cancelled); + } } } } @@ -529,6 +553,14 @@ where self.get_status() == ConnectionStatus::Running } + pub fn cancel(&mut self) -> Result<(), ConnectionError> { + self.bg_thread + .channel_to_thread() + .send(ControlMsg::CancelAcquisition) + .map_err(|_| ConnectionError::Disconnected)?; + Ok(()) + } + pub fn log_shm_stats(&self) { let shm = &self.shm; let free = shm.num_slots_free(); diff --git a/common/src/py_connection.rs b/common/src/py_connection.rs index 6af8e33a..c2fb7a37 100644 --- a/common/src/py_connection.rs +++ b/common/src/py_connection.rs @@ -196,6 +196,15 @@ macro_rules! impl_py_connection { Ok(conn_impl.is_running()) } + pub fn cancel(&mut self) -> PyResult<()> { + let conn_impl = self.get_conn_mut()?; + conn_impl.cancel().map_err(|e| { + PyConnectionError::new_err(format!("cancellation failed: {e}")) + })?; + + Ok(()) + } + pub fn start_passive( &mut self, timeout: Option, diff --git a/libertem_asi_mpx3/src/background_thread.rs b/libertem_asi_mpx3/src/background_thread.rs index 374d7e25..a76014fa 100644 --- a/libertem_asi_mpx3/src/background_thread.rs +++ b/libertem_asi_mpx3/src/background_thread.rs @@ -234,7 +234,10 @@ enum AcquisitionError { #[error("other end has disconnected")] Disconnected, - #[error("acquisition cancelled")] + #[error("background thread stopped")] + StopThread, + + #[error("acquisition cancelled by user")] Cancelled, #[error("shm buffer is full")] @@ -315,10 +318,11 @@ fn check_for_control(control_channel: &Receiver) -> Result<(), msg: "received StartAcquisitionPassive while an acquisition was already running" .to_string(), }), - Ok(ControlMsg::StopThread) => Err(AcquisitionError::Cancelled), + Ok(ControlMsg::StopThread) => Err(AcquisitionError::StopThread), Ok(ControlMsg::SpecializedControlMsg { msg: _ }) => { panic!("unsupported SpecializedControlMsg") } + Ok(ControlMsg::CancelAcquisition) => Err(AcquisitionError::Cancelled), Err(TryRecvError::Disconnected) => Err(AcquisitionError::Disconnected), Err(TryRecvError::Empty) => Ok(()), } @@ -548,6 +552,9 @@ fn background_thread( // control: main threads tells us to quit let control = to_thread_r.recv_timeout(Duration::from_millis(100)); match control { + Ok(ControlMsg::CancelAcquisition) => { + warn!("background_thread: ControlMsg::CancelAcquisition without running acquisition"); + } Ok(ControlMsg::StartAcquisitionPassive) => { match passive_acquisition( to_thread_r, @@ -558,7 +565,11 @@ fn background_thread( &mut shm, ) { Ok(_) => {} - e @ Err(AcquisitionError::Disconnected | AcquisitionError::Cancelled) => { + Err(AcquisitionError::Cancelled) => { + from_thread_s.send(ReceiverMsg::Cancelled).unwrap(); + continue 'outer; + } + e @ Err(AcquisitionError::Disconnected | AcquisitionError::StopThread) => { info!("background_thread: terminating: {e:?}"); return Ok(()); } diff --git a/libertem_dectris/src/background_thread.rs b/libertem_dectris/src/background_thread.rs index 07cddf68..6925ef57 100644 --- a/libertem_dectris/src/background_thread.rs +++ b/libertem_dectris/src/background_thread.rs @@ -78,6 +78,7 @@ enum AcquisitionError { SeriesMismatch, FrameIdMismatch { expected_id: u64, got_id: u64 }, SerdeError { recvd_msg: String, msg: String }, + StopThread, Cancelled, ZmqError { err: zmq::Error }, BufferFull, @@ -91,7 +92,7 @@ impl Display for AcquisitionError { AcquisitionError::ZmqError { err } => { write!(f, "zmq error {err}") } - AcquisitionError::Cancelled => { + AcquisitionError::StopThread => { write!(f, "acquisition cancelled") } AcquisitionError::SerdeError { recvd_msg, msg } => { @@ -118,6 +119,9 @@ impl Display for AcquisitionError { AcquisitionError::ConfigurationError { msg } => { write!(f, "configuration error: {msg}") } + AcquisitionError::Cancelled => { + write!(f, "acquisition cancelled") + } } } } @@ -157,12 +161,17 @@ fn check_for_control( Ok(ControlMsg::StopThread) => { debug!("check_for_control: StopThread received"); + Err(AcquisitionError::StopThread) + } + Ok(ControlMsg::CancelAcquisition) => { + debug!("check_for_control: CancelAcquisition"); + Err(AcquisitionError::Cancelled) } Err(TryRecvError::Disconnected) => { debug!("check_for_control: Disconnected"); - Err(AcquisitionError::Cancelled) + Err(AcquisitionError::StopThread) } Err(TryRecvError::Empty) => Ok(()), } @@ -507,8 +516,12 @@ fn background_thread( &mut shm, ) { Ok(_) => {} + Err(AcquisitionError::Cancelled) => { + from_thread_s.send(ReceiverMsg::Cancelled).unwrap(); + continue 'outer; + } Err( - msg @ (AcquisitionError::Disconnected | AcquisitionError::Cancelled), + msg @ (AcquisitionError::Disconnected | AcquisitionError::StopThread), ) => { debug!("background_thread: passive_acquisition returned {msg:?}"); break 'outer; @@ -572,7 +585,10 @@ fn background_thread( &mut shm, ) { Ok(_) => {} - Err(AcquisitionError::Disconnected | AcquisitionError::Cancelled) => { + Err(AcquisitionError::Cancelled) => { + from_thread_s.send(ReceiverMsg::Cancelled).unwrap(); + } + Err(AcquisitionError::Disconnected | AcquisitionError::StopThread) => { break 'outer; } Err(e) => { @@ -594,6 +610,9 @@ fn background_thread( warn!("background_thread: control channel has disconnected"); break 'outer; } + Ok(ControlMsg::CancelAcquisition) => { + warn!("background_thread: ignoring ControlMsg::CancelAcquisition outside of running acquisition") + } Err(RecvTimeoutError::Timeout) => (), // no message, nothing to do } } diff --git a/libertem_qd_mpx/src/background_thread.rs b/libertem_qd_mpx/src/background_thread.rs index f8e4c834..eedecf05 100644 --- a/libertem_qd_mpx/src/background_thread.rs +++ b/libertem_qd_mpx/src/background_thread.rs @@ -30,7 +30,10 @@ pub enum AcquisitionError { #[error("channel disconnected")] Disconnected, - #[error("acquisition was cancelled by the user")] + #[error("background thread stopped")] + ThreadStopped, + + #[error("acquisition cancelled by user")] Cancelled, #[error("receiver state error: {msg}")] @@ -131,10 +134,11 @@ fn check_for_control(control_channel: &Receiver) -> Result<(), Acq msg: "received StartAcquisitionPassive while an acquisition was already running" .to_string(), }), - Ok(ControlMsg::StopThread) => Err(AcquisitionError::Cancelled), + Ok(ControlMsg::StopThread) => Err(AcquisitionError::ThreadStopped), Ok(ControlMsg::SpecializedControlMsg { msg: _ }) => { panic!("unsupported SpecializedControlMsg") } + Ok(ControlMsg::CancelAcquisition) => Err(AcquisitionError::Cancelled), Err(TryRecvError::Disconnected) => Err(AcquisitionError::Disconnected), Err(TryRecvError::Empty) => Ok(()), } @@ -641,7 +645,14 @@ fn background_thread( Ok(ControlMsg::StartAcquisitionPassive) => { match passive_acquisition(to_thread_r, from_thread_s, config, &mut shm) { Ok(_) => {} - e @ Err(AcquisitionError::Disconnected | AcquisitionError::Cancelled) => { + Err(AcquisitionError::Cancelled) => { + info!("acquisition cancelled by user"); + from_thread_s.send(ReceiverMsg::Cancelled).unwrap(); + continue 'outer; + } + e @ Err( + AcquisitionError::Disconnected | AcquisitionError::ThreadStopped, + ) => { info!("background_thread: terminating: {e:?}"); return Ok(()); } @@ -654,6 +665,11 @@ fn background_thread( } } } + Ok(ControlMsg::CancelAcquisition) => { + warn!( + "background_thread: got a CancelAcquisition message in main loop; ignoring" + ); + } Ok(ControlMsg::StopThread) => { debug!("background_thread: got a StopThread message"); break 'outer; @@ -764,9 +780,7 @@ mod test { time::timeout, }; - use crate::base_types::{ - QdAcquisitionConfig, QdAcquisitionHeader, QdDetectorConnConfig, RecoveryStrategy, - }; + use crate::base_types::{QdAcquisitionConfig, QdDetectorConnConfig, RecoveryStrategy}; use super::QdBackgroundThread;