Skip to content

Commit

Permalink
Add explicit cancellation
Browse files Browse the repository at this point in the history
When calling `wait_for_arm`, we must first cancel any already running
acquisitions, wait for the system to be idle, and then can arm it again.
  • Loading branch information
sk1p committed Sep 17, 2024
1 parent da69578 commit 58d8e3a
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 13 deletions.
7 changes: 7 additions & 0 deletions common/src/background_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ pub enum ReceiverMsg<M: FrameMeta, P: AcquisitionConfig> {
/// that were received.
Finished { frame_stack: FrameStackHandle<M> },

/// The acquisition was cancelled, as requested
/// by `ControlMsg::CancelAcquisition`
Cancelled,

/// A non-recoverable error occurred, the underlying connection
/// to the detector system should re-connect.
FatalError {
Expand All @@ -46,6 +50,9 @@ pub enum ControlMsg<CM: Debug> {
/// Start listening for any acquisitions starting
StartAcquisitionPassive,

/// Cancel the currently running acquisition, if any
CancelAcquisition,

/// Detector-specific control message
SpecializedControlMsg { msg: CM },
}
Expand Down
32 changes: 32 additions & 0 deletions common/src/generic_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ pub enum ConnectionError {
#[error("operation timed out")]
Timeout,

#[error("operation cancelled")]
Cancelled,

#[error("background thread failed to start: {0}")]
SpawnFailed(#[from] BackgroundThreadSpawnError),

Expand Down Expand Up @@ -204,6 +207,10 @@ where
ReceiverMsg::FatalError { error } => {
log::warn!("adjust_status: fatal error: {error:?}; going back to idle state");
self.status = ConnectionStatus::Idle;
}
ReceiverMsg::Cancelled => {
log::warn!("adjust_status: acquisition cancelled");
self.status = ConnectionStatus::Idle;
} // other => {
// trace!("adjust_status: other message: {other:?}");
// }
Expand Down Expand Up @@ -247,6 +254,12 @@ where
where
E: std::error::Error + 'static + Send + Sync,
{
// if an acquisition is already running, cancel and wait for idle status:
if self.is_running() {
self.cancel()?;
self.wait_for_status(ConnectionStatus::Idle, timeout, &periodic_callback)?;
}

if let Some(timeout) = timeout {
self.wait_for_arm_inner(timeout, periodic_callback)
} else {
Expand Down Expand Up @@ -313,6 +326,11 @@ where
"ReceiverMsg::Finished in wait_for_arm".to_owned(),
));
}
ReceiverMsg::Cancelled => {
return Err(ConnectionError::UnexpectedMessage(
"ReceiverMsg::Cancelled in wait_for_arm".to_owned(),
));
}
}
}
}
Expand Down Expand Up @@ -406,6 +424,9 @@ where
} => {
trace!("wait_for_status: received ReceiverMsg::AcquisitionStart");
}
ReceiverMsg::Cancelled => {
trace!("wait_for_status: received ReceiverMsg::Cancelled");
}
}
if self.status == desired_status {
debug!("wait_for_status: successfully got status {desired_status:?}");
Expand Down Expand Up @@ -492,6 +513,9 @@ where
self.stats.count_stats_item(&frame_stack);
return Ok(Some(frame_stack));
}
ReceiverMsg::Cancelled => {
return Err(ConnectionError::Cancelled);
}
}
}
}
Expand Down Expand Up @@ -529,6 +553,14 @@ where
self.get_status() == ConnectionStatus::Running
}

pub fn cancel(&mut self) -> Result<(), ConnectionError> {
self.bg_thread
.channel_to_thread()
.send(ControlMsg::CancelAcquisition)
.map_err(|_| ConnectionError::Disconnected)?;
Ok(())
}

pub fn log_shm_stats(&self) {
let shm = &self.shm;
let free = shm.num_slots_free();
Expand Down
9 changes: 9 additions & 0 deletions common/src/py_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,15 @@ macro_rules! impl_py_connection {
Ok(conn_impl.is_running())
}

pub fn cancel(&mut self) -> PyResult<()> {
let conn_impl = self.get_conn_mut()?;
conn_impl.cancel().map_err(|e| {
PyConnectionError::new_err(format!("cancellation failed: {e}"))
})?;

Ok(())
}

pub fn start_passive(
&mut self,
timeout: Option<f32>,
Expand Down
17 changes: 14 additions & 3 deletions libertem_asi_mpx3/src/background_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,10 @@ enum AcquisitionError {
#[error("other end has disconnected")]
Disconnected,

#[error("acquisition cancelled")]
#[error("background thread stopped")]
StopThread,

#[error("acquisition cancelled by user")]
Cancelled,

#[error("shm buffer is full")]
Expand Down Expand Up @@ -315,10 +318,11 @@ fn check_for_control(control_channel: &Receiver<ASIMpxControlMsg>) -> Result<(),
msg: "received StartAcquisitionPassive while an acquisition was already running"
.to_string(),
}),
Ok(ControlMsg::StopThread) => Err(AcquisitionError::Cancelled),
Ok(ControlMsg::StopThread) => Err(AcquisitionError::StopThread),
Ok(ControlMsg::SpecializedControlMsg { msg: _ }) => {
panic!("unsupported SpecializedControlMsg")
}
Ok(ControlMsg::CancelAcquisition) => Err(AcquisitionError::Cancelled),
Err(TryRecvError::Disconnected) => Err(AcquisitionError::Disconnected),
Err(TryRecvError::Empty) => Ok(()),
}
Expand Down Expand Up @@ -548,6 +552,9 @@ fn background_thread(
// control: main threads tells us to quit
let control = to_thread_r.recv_timeout(Duration::from_millis(100));
match control {
Ok(ControlMsg::CancelAcquisition) => {
warn!("background_thread: ControlMsg::CancelAcquisition without running acquisition");
}
Ok(ControlMsg::StartAcquisitionPassive) => {
match passive_acquisition(
to_thread_r,
Expand All @@ -558,7 +565,11 @@ fn background_thread(
&mut shm,
) {
Ok(_) => {}
e @ Err(AcquisitionError::Disconnected | AcquisitionError::Cancelled) => {
Err(AcquisitionError::Cancelled) => {
from_thread_s.send(ReceiverMsg::Cancelled).unwrap();
continue 'outer;
}
e @ Err(AcquisitionError::Disconnected | AcquisitionError::StopThread) => {
info!("background_thread: terminating: {e:?}");
return Ok(());
}
Expand Down
27 changes: 23 additions & 4 deletions libertem_dectris/src/background_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ enum AcquisitionError {
SeriesMismatch,
FrameIdMismatch { expected_id: u64, got_id: u64 },
SerdeError { recvd_msg: String, msg: String },
StopThread,
Cancelled,
ZmqError { err: zmq::Error },
BufferFull,
Expand All @@ -91,7 +92,7 @@ impl Display for AcquisitionError {
AcquisitionError::ZmqError { err } => {
write!(f, "zmq error {err}")
}
AcquisitionError::Cancelled => {
AcquisitionError::StopThread => {
write!(f, "acquisition cancelled")
}
AcquisitionError::SerdeError { recvd_msg, msg } => {
Expand All @@ -118,6 +119,9 @@ impl Display for AcquisitionError {
AcquisitionError::ConfigurationError { msg } => {
write!(f, "configuration error: {msg}")
}
AcquisitionError::Cancelled => {
write!(f, "acquisition cancelled")
}
}
}
}
Expand Down Expand Up @@ -157,12 +161,17 @@ fn check_for_control(
Ok(ControlMsg::StopThread) => {
debug!("check_for_control: StopThread received");

Err(AcquisitionError::StopThread)
}
Ok(ControlMsg::CancelAcquisition) => {
debug!("check_for_control: CancelAcquisition");

Err(AcquisitionError::Cancelled)
}
Err(TryRecvError::Disconnected) => {
debug!("check_for_control: Disconnected");

Err(AcquisitionError::Cancelled)
Err(AcquisitionError::StopThread)
}
Err(TryRecvError::Empty) => Ok(()),
}
Expand Down Expand Up @@ -507,8 +516,12 @@ fn background_thread(
&mut shm,
) {
Ok(_) => {}
Err(AcquisitionError::Cancelled) => {
from_thread_s.send(ReceiverMsg::Cancelled).unwrap();
continue 'outer;
}
Err(
msg @ (AcquisitionError::Disconnected | AcquisitionError::Cancelled),
msg @ (AcquisitionError::Disconnected | AcquisitionError::StopThread),
) => {
debug!("background_thread: passive_acquisition returned {msg:?}");
break 'outer;
Expand Down Expand Up @@ -572,7 +585,10 @@ fn background_thread(
&mut shm,
) {
Ok(_) => {}
Err(AcquisitionError::Disconnected | AcquisitionError::Cancelled) => {
Err(AcquisitionError::Cancelled) => {
from_thread_s.send(ReceiverMsg::Cancelled).unwrap();
}
Err(AcquisitionError::Disconnected | AcquisitionError::StopThread) => {
break 'outer;
}
Err(e) => {
Expand All @@ -594,6 +610,9 @@ fn background_thread(
warn!("background_thread: control channel has disconnected");
break 'outer;
}
Ok(ControlMsg::CancelAcquisition) => {
warn!("background_thread: ignoring ControlMsg::CancelAcquisition outside of running acquisition")
}
Err(RecvTimeoutError::Timeout) => (), // no message, nothing to do
}
}
Expand Down
26 changes: 20 additions & 6 deletions libertem_qd_mpx/src/background_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ pub enum AcquisitionError {
#[error("channel disconnected")]
Disconnected,

#[error("acquisition was cancelled by the user")]
#[error("background thread stopped")]
ThreadStopped,

#[error("acquisition cancelled by user")]
Cancelled,

#[error("receiver state error: {msg}")]
Expand Down Expand Up @@ -131,10 +134,11 @@ fn check_for_control(control_channel: &Receiver<QdControlMsg>) -> Result<(), Acq
msg: "received StartAcquisitionPassive while an acquisition was already running"
.to_string(),
}),
Ok(ControlMsg::StopThread) => Err(AcquisitionError::Cancelled),
Ok(ControlMsg::StopThread) => Err(AcquisitionError::ThreadStopped),
Ok(ControlMsg::SpecializedControlMsg { msg: _ }) => {
panic!("unsupported SpecializedControlMsg")
}
Ok(ControlMsg::CancelAcquisition) => Err(AcquisitionError::Cancelled),
Err(TryRecvError::Disconnected) => Err(AcquisitionError::Disconnected),
Err(TryRecvError::Empty) => Ok(()),
}
Expand Down Expand Up @@ -641,7 +645,14 @@ fn background_thread(
Ok(ControlMsg::StartAcquisitionPassive) => {
match passive_acquisition(to_thread_r, from_thread_s, config, &mut shm) {
Ok(_) => {}
e @ Err(AcquisitionError::Disconnected | AcquisitionError::Cancelled) => {
Err(AcquisitionError::Cancelled) => {
info!("acquisition cancelled by user");
from_thread_s.send(ReceiverMsg::Cancelled).unwrap();
continue 'outer;
}
e @ Err(
AcquisitionError::Disconnected | AcquisitionError::ThreadStopped,
) => {
info!("background_thread: terminating: {e:?}");
return Ok(());
}
Expand All @@ -654,6 +665,11 @@ fn background_thread(
}
}
}
Ok(ControlMsg::CancelAcquisition) => {
warn!(
"background_thread: got a CancelAcquisition message in main loop; ignoring"
);
}
Ok(ControlMsg::StopThread) => {
debug!("background_thread: got a StopThread message");
break 'outer;
Expand Down Expand Up @@ -764,9 +780,7 @@ mod test {
time::timeout,
};

use crate::base_types::{
QdAcquisitionConfig, QdAcquisitionHeader, QdDetectorConnConfig, RecoveryStrategy,
};
use crate::base_types::{QdAcquisitionConfig, QdDetectorConnConfig, RecoveryStrategy};

use super::QdBackgroundThread;

Expand Down

0 comments on commit 58d8e3a

Please sign in to comment.