diff --git a/Cargo.lock b/Cargo.lock index 4baefb12..d95687f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1658,7 +1658,7 @@ checksum = "99227334921fae1a979cf0bfdfcc6b3e5ce376ef57e16fb6fb3ea2ed6095f80c" [[package]] name = "libertem-asi-tpx3" -version = "0.2.3" +version = "0.2.5" dependencies = [ "bincode", "crossbeam", @@ -1676,7 +1676,7 @@ dependencies = [ [[package]] name = "libertem-dectris" -version = "0.2.3" +version = "0.2.5" dependencies = [ "bincode", "bs_sys", diff --git a/libertem_asi_tpx3/Cargo.toml b/libertem_asi_tpx3/Cargo.toml index 4ea5a255..38ab6cb6 100644 --- a/libertem_asi_tpx3/Cargo.toml +++ b/libertem_asi_tpx3/Cargo.toml @@ -2,7 +2,7 @@ name = "libertem-asi-tpx3" authors = ["Alexander Clausen "] license = "MIT" -version = "0.2.4" +version = "0.2.5" edition = "2021" readme = "README.md" diff --git a/libertem_dectris/Cargo.toml b/libertem_dectris/Cargo.toml index f6fbfb7d..b090227a 100644 --- a/libertem_dectris/Cargo.toml +++ b/libertem_dectris/Cargo.toml @@ -2,7 +2,7 @@ name = "libertem-dectris" authors = ["Alexander Clausen "] license = "MIT" -version = "0.2.4" +version = "0.2.5" edition = "2021" readme = "README.md" diff --git a/libertem_dectris/src/common.rs b/libertem_dectris/src/common.rs index ccce46d4..98ff4811 100644 --- a/libertem_dectris/src/common.rs +++ b/libertem_dectris/src/common.rs @@ -11,8 +11,9 @@ use zmq::{Context, Message, Socket, SocketEvent}; #[derive(Serialize, Deserialize, Debug, Clone)] #[pyclass] -pub struct DSeriesOnly { +pub struct DSeriesAndType { pub series: u64, + pub htype: String, } #[derive(Serialize, Deserialize, Debug, Clone)] diff --git a/libertem_dectris/src/receiver.rs b/libertem_dectris/src/receiver.rs index f0a8ac80..922644f7 100644 --- a/libertem_dectris/src/receiver.rs +++ b/libertem_dectris/src/receiver.rs @@ -7,12 +7,12 @@ use std::{ use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, SendError, Sender, TryRecvError}; use ipc_test::{SHMHandle, SharedSlabAllocator}; -use log::{debug, info, warn}; +use log::{debug, info, warn, error}; use zmq::{Message, Socket}; use crate::{ common::{ - setup_monitor, DConfig, DHeader, DImage, DImageD, DSeriesEnd, DSeriesOnly, DetectorConfig, + setup_monitor, DConfig, DHeader, DImage, DImageD, DSeriesEnd, DSeriesAndType, DetectorConfig, }, frame_stack::{FrameStackForWriting, FrameStackHandle}, }; @@ -386,11 +386,11 @@ fn drain_if_mismatch( control_channel: &Receiver, ) -> Result<(), AcquisitionError> { loop { - let series_res: Result = serde_json::from_str(msg.as_str().unwrap()); + let series_res: Result = serde_json::from_str(msg.as_str().unwrap()); if let Ok(recvd_series) = series_res { // everything is ok, we can go ahead: - if recvd_series.series == series { + if recvd_series.series == series && recvd_series.htype == "dheader-1.0" { return Ok(()); } } @@ -424,94 +424,110 @@ fn background_thread( frame_stack_size: usize, mut shm: SharedSlabAllocator, ) -> Result<(), AcquisitionError> { - let ctx = zmq::Context::new(); - let socket = ctx.socket(zmq::PULL).unwrap(); - socket.set_rcvtimeo(1000).unwrap(); - socket.connect(&uri).unwrap(); - socket.set_rcvhwm(4 * 1024).unwrap(); - - setup_monitor(ctx, "DectrisReceiver".to_string(), &socket); - - loop { - // control: main threads tells us to quit - let control = to_thread_r.recv_timeout(Duration::from_millis(100)); - match control { - Ok(ControlMsg::StartAcquisitionPassive) => { - match passive_acquisition( - to_thread_r, - from_thread_s, - &socket, - frame_stack_size, - &mut shm, - ) { - Ok(_) => {} - Err(AcquisitionError::Disconnected | AcquisitionError::Cancelled) => { - return Ok(()); - } - e => { - return e; + 'outer: loop { + let ctx = zmq::Context::new(); + let socket = ctx.socket(zmq::PULL).unwrap(); + socket.set_rcvtimeo(1000).unwrap(); + socket.connect(&uri).unwrap(); + socket.set_rcvhwm(4 * 1024).unwrap(); + + setup_monitor(ctx, "DectrisReceiver".to_string(), &socket); + + loop { + // control: main threads tells us to quit + let control = to_thread_r.recv_timeout(Duration::from_millis(100)); + match control { + Ok(ControlMsg::StartAcquisitionPassive) => { + match passive_acquisition( + to_thread_r, + from_thread_s, + &socket, + frame_stack_size, + &mut shm, + ) { + Ok(_) => {} + Err(AcquisitionError::Disconnected | AcquisitionError::Cancelled) => { + return Ok(()); + } + Err(e) => { + let msg = format!("passive_acquisition error: {}", e); + from_thread_s + .send(ResultMsg::Error { + msg, + }) + .unwrap(); + error!("background_thread: error: {}; re-connecting", e); + continue 'outer; + } } } - } - Ok(ControlMsg::StartAcquisition { series }) => { - let mut msg: Message = Message::new(); - recv_part(&mut msg, &socket, to_thread_r)?; - - drain_if_mismatch(&mut msg, &socket, series, to_thread_r)?; - - let dheader_res: Result = serde_json::from_str(msg.as_str().unwrap()); - let dheader: DHeader = match dheader_res { - Ok(header) => header, - Err(err) => { - from_thread_s - .send(ResultMsg::SerdeError { - msg: err.to_string(), - recvd_msg: msg - .as_str() - .map_or_else(|| "".to_string(), |m| m.to_string()), - }) - .unwrap(); - log::error!( - "background_thread: serialization error: {}", - err.to_string() - ); - break; - } - }; - debug!("dheader: {dheader:?}"); - - // second message: the header itself - recv_part(&mut msg, &socket, to_thread_r)?; - let detector_config: DetectorConfig = - serde_json::from_str(msg.as_str().unwrap()).unwrap(); - - match acquisition( - detector_config, - to_thread_r, - from_thread_s, - &socket, - series, - frame_stack_size, - &mut shm, - ) { - Ok(_) => {} - Err(AcquisitionError::Disconnected | AcquisitionError::Cancelled) => { - return Ok(()); - } - e => { - return e; + Ok(ControlMsg::StartAcquisition { series }) => { + let mut msg: Message = Message::new(); + recv_part(&mut msg, &socket, to_thread_r)?; + + drain_if_mismatch(&mut msg, &socket, series, to_thread_r)?; + + let dheader_res: Result = serde_json::from_str(msg.as_str().unwrap()); + let dheader: DHeader = match dheader_res { + Ok(header) => header, + Err(err) => { + from_thread_s + .send(ResultMsg::SerdeError { + msg: err.to_string(), + recvd_msg: msg + .as_str() + .map_or_else(|| "".to_string(), |m| m.to_string()), + }) + .unwrap(); + log::error!( + "background_thread: serialization error: {}", + err.to_string() + ); + break; + } + }; + debug!("dheader: {dheader:?}"); + + // second message: the header itself + recv_part(&mut msg, &socket, to_thread_r)?; + let detector_config: DetectorConfig = + serde_json::from_str(msg.as_str().unwrap()).unwrap(); + + match acquisition( + detector_config, + to_thread_r, + from_thread_s, + &socket, + series, + frame_stack_size, + &mut shm, + ) { + Ok(_) => {} + Err(AcquisitionError::Disconnected | AcquisitionError::Cancelled) => { + return Ok(()); + } + Err(e) => { + let msg = format!("acquisition error: {}", e); + from_thread_s + .send(ResultMsg::Error { + msg, + }) + .unwrap(); + error!("background_thread: error: {}; re-connecting", e); + continue 'outer; + } } } + Ok(ControlMsg::StopThread) => { + debug!("background_thread: got a StopThread message"); + break 'outer; + } + Err(RecvTimeoutError::Disconnected) => { + debug!("background_thread: control channel has disconnected"); + break 'outer; + } + Err(RecvTimeoutError::Timeout) => (), // no message, nothing to do } - Ok(ControlMsg::StopThread) => { - debug!("background_thread: got a StopThread message"); - break; - } - Err(RecvTimeoutError::Disconnected) => { - debug!("background_thread: control channel has disconnected"); - break; - } - Err(RecvTimeoutError::Timeout) => (), // no message, nothing to do } } debug!("background_thread: is done");