diff --git a/common/src/decoder.rs b/common/src/decoder.rs index 6be88d1d..f2f7708c 100644 --- a/common/src/decoder.rs +++ b/common/src/decoder.rs @@ -1,4 +1,4 @@ -use std::{any::type_name, fmt::Debug}; +use std::{any::type_name, fmt::Debug, mem::size_of}; use ipc_test::SharedSlabAllocator; use ndarray::ArrayViewMut3; @@ -90,37 +90,44 @@ where } } -pub fn decode_ints_be<'a, O, FromT>(input: &'a [u8], output: &mut [O]) -> Result<(), DecoderError> +pub fn decode_ints_be<'a, O, ItemType>( + input: &'a [u8], + output: &mut [O], +) -> Result<(), DecoderError> where O: DecoderTargetPixelType, - FromT: ToPrimitive + Debug + Copy + num::traits::FromBytes, - &'a ::Bytes: std::convert::TryFrom<&'a [u8]>, - <&'a ::Bytes as std::convert::TryFrom<&'a [u8]>>::Error: Debug, - ::Bytes: 'a, + ItemType: ToPrimitive + Debug + Copy + num::traits::FromBytes, + &'a ::Bytes: std::convert::TryFrom<&'a [u8]>, + <&'a ::Bytes as std::convert::TryFrom<&'a [u8]>>::Error: + Debug, + ::Bytes: 'a, { - if input.len() % std::mem::size_of::() != 0 { + if input.len() % std::mem::size_of::() != 0 { return Err(DecoderError::FrameDecodeFailed { msg: format!( - "input length {} is not divisible by {}", + "input length {} is not divisible by item size {}", input.len(), - std::mem::size_of::(), + std::mem::size_of::(), ), }); } - let chunks = input.chunks_exact(std::mem::size_of::()); + if input.len() / std::mem::size_of::() != output.len() { + return Err(DecoderError::FrameDecodeFailed { + msg: format!( + "input length {} does not match output size {} for type {} item size {}", + input.len(), + output.len(), + type_name::(), + size_of::(), + ), + }); + } + + let chunks = input.chunks_exact(std::mem::size_of::()); for (in_chunk, out_dest) in chunks.zip(output.iter_mut()) { - let swapped = FromT::from_be_bytes(in_chunk.try_into().expect("chunked")); - *out_dest = if let Some(value) = NumCast::from(swapped) { - value - } else { - return Err(DecoderError::FrameDecodeFailed { - msg: format!( - "dtype conversion error: {swapped:?} does not fit {0}", - type_name::() - ), - }); - } + let swapped = ItemType::from_be_bytes(in_chunk.try_into().expect("chunked")); + *out_dest = try_cast_primitive(swapped)?; } Ok(()) diff --git a/common/src/generic_connection.rs b/common/src/generic_connection.rs index 722fa7c6..3b3232d4 100644 --- a/common/src/generic_connection.rs +++ b/common/src/generic_connection.rs @@ -319,20 +319,23 @@ where pub fn start_passive( &mut self, periodic_callback: impl Fn() -> Result<(), E>, + timeout: &Duration, ) -> Result<(), ConnectionError> where E: std::error::Error + 'static + Send + Sync, { + if self.status == ConnectionStatus::Armed { + // already armed, don't have to do anything + debug!("start_passive: already armed, nothing to do"); + return Ok(()); + } + self.bg_thread .channel_to_thread() .send(ControlMsg::StartAcquisitionPassive) .map_err(|_e| ConnectionError::Disconnected)?; - self.wait_for_status( - ConnectionStatus::Armed, - Duration::from_millis(1000), - periodic_callback, - ) + self.wait_for_status(ConnectionStatus::Armed, *timeout, periodic_callback) } /// Receive the next frame stack from the background thread and handle any diff --git a/common/src/py_connection.rs b/common/src/py_connection.rs index 1c767304..c1ea5de8 100644 --- a/common/src/py_connection.rs +++ b/common/src/py_connection.rs @@ -195,30 +195,32 @@ macro_rules! impl_py_connection { Ok(conn_impl.is_running()) } - pub fn start_passive(&mut self) -> PyResult<()> { - let conn_impl = self.get_conn_mut()?; - conn_impl - .start_passive(|| { - Python::with_gil(|py| py.check_signals())?; - Ok::<_, PyErr>(()) - }) - .map_err(|e| { - PyConnectionError::new_err(format!("start_passive failed: {e}")) - })?; + pub fn start_passive(&mut self, timeout: f32, py: Python<'_>) -> PyResult<()> { + let timeout = Duration::from_secs_f32(timeout); + py.allow_threads(|| { + let conn_impl = self.get_conn_mut()?; + conn_impl + .start_passive( + || { + Python::with_gil(|py| py.check_signals())?; + Ok::<_, PyErr>(()) + }, + &timeout, + ) + .map_err(|e| { + PyConnectionError::new_err(format!("start_passive failed: {e}")) + })?; - conn_impl - .wait_for_status( - ConnectionStatus::Armed, - Duration::from_millis(100), - || { + conn_impl + .wait_for_status(ConnectionStatus::Armed, timeout, || { // re-acquire GIL to check if we need to break Python::with_gil(|py| py.check_signals())?; Ok::<_, PyErr>(()) - }, - ) - .map_err(|e| PyConnectionError::new_err(e.to_string()))?; + }) + .map_err(|e| PyConnectionError::new_err(e.to_string()))?; - Ok(()) + Ok(()) + }) } pub fn log_shm_stats(&self) -> PyResult<()> { diff --git a/libertem_asi_mpx3/src/main_py.rs b/libertem_asi_mpx3/src/main_py.rs index 9edaac38..896495da 100644 --- a/libertem_asi_mpx3/src/main_py.rs +++ b/libertem_asi_mpx3/src/main_py.rs @@ -186,8 +186,8 @@ impl ServalConnection { self.conn.is_running() } - fn start_passive(&mut self) -> PyResult<()> { - self.conn.start_passive() + fn start_passive(&mut self, py: Python<'_>) -> PyResult<()> { + self.conn.start_passive(0.1, py) } fn close(&mut self) -> PyResult<()> { diff --git a/libertem_dectris/src/dectris_py.rs b/libertem_dectris/src/dectris_py.rs index 0703df93..17855d38 100644 --- a/libertem_dectris/src/dectris_py.rs +++ b/libertem_dectris/src/dectris_py.rs @@ -147,8 +147,8 @@ impl DectrisConnection { } /// Start listening for global acquisition headers on the zeromq socket. - fn start_passive(&mut self) -> PyResult<()> { - self.conn.start_passive() + fn start_passive(&mut self, py: Python<'_>) -> PyResult<()> { + self.conn.start_passive(0.1, py) } fn close(&mut self) -> PyResult<()> { diff --git a/libertem_qd_mpx/src/background_thread.rs b/libertem_qd_mpx/src/background_thread.rs index 634f1773..a5e1e22c 100644 --- a/libertem_qd_mpx/src/background_thread.rs +++ b/libertem_qd_mpx/src/background_thread.rs @@ -129,14 +129,17 @@ fn read_exact_interruptible( buf: &mut [u8], to_thread_r: &Receiver, ) -> Result<(), AcquisitionError> { + let total_to_read = buf.len(); let mut buf_sliced = buf; + let mut bytes_read: usize = 0; loop { check_for_control(to_thread_r)?; match stream.read(buf_sliced) { Ok(size) => { + bytes_read += size; buf_sliced = &mut buf_sliced[size..]; // it's full! we are done... - if buf_sliced.is_empty() { + if bytes_read == total_to_read { return Ok(()); } } @@ -151,6 +154,42 @@ fn read_exact_interruptible( } } +/// Read from `stream` until we hit the timeout. Original socket timeout is +/// overwritten and should be restored by the caller. +fn drain_until_timeout( + stream: &mut TcpStream, + to_thread_r: &Receiver, + timeout: &Duration, +) -> Result { + let mut tmp = vec![0; 1024 * 1024]; + let mut total_drained: usize = 0; + stream.set_read_timeout(Some(*timeout))?; + loop { + check_for_control(to_thread_r)?; + total_drained += match stream.read(&mut tmp) { + Ok(size) => { + trace!("drain_until_timeout: drained {size}"); + if size == 0 { + // EOF: okay.... + warn!("drained until EOF!"); + return Ok(total_drained); + } + size + } + Err(e) => match e.kind() { + ErrorKind::TimedOut | ErrorKind::WouldBlock => { + trace!("drain_until_timeout: timeout: {e:?}"); + return Ok(total_drained); + } + kind => { + trace!("drain_until_timeout: other error kind: {kind:?}"); + return Err(e.into()); + } + }, + }; + } +} + /// Fill `buf` from `stream` (like TcpStream::peek), but periodically check /// `to_thread_r` for control messages, which allows to interrupt the acquisition. /// Assumes a blocking socket with a timeout. @@ -303,6 +342,8 @@ fn peek_mpx_message( let mut prefix = [0u8; PREFIX_SIZE]; peek_exact_interruptible(stream, &mut prefix, to_thread_r)?; + trace!("peek_mpx_message: {}", String::from_utf8_lossy(&prefix)); + let magic = &prefix[0..4]; if magic != b"MPX," { return Err(AcquisitionError::HeaderParseError { @@ -337,6 +378,10 @@ fn peek_first_frame_header( to_thread_r: &Receiver, ) -> Result { let msg = peek_mpx_message(stream, to_thread_r, 2048)?; + trace!( + "parsing first frame header: '{}'", + String::from_utf8_lossy(&msg.payload) + ); Ok(QdFrameMeta::parse_bytes(&msg.payload, msg.length)?) } @@ -503,7 +548,7 @@ fn passive_acquisition( loop { let data_uri = format!("{host}:{port}"); - trace!("connecting to {}...", &data_uri); + info!("connecting to {}...", &data_uri); check_for_control(to_thread_r)?; let mut stream: TcpStream = match TcpStream::connect(&data_uri) { @@ -524,11 +569,20 @@ fn passive_acquisition( }, }; - stream.set_read_timeout(Some(Duration::from_millis(100)))?; + info!("connected to {}.", &data_uri); + + if let Some(timeout) = &config.drain { + info!("draining for {timeout:?}..."); + let drained = drain_until_timeout(&mut stream, to_thread_r, timeout)?; + if drained > 0 { + info!("drained {drained} bytes of garbage"); + } + } - trace!("connected to {}", &data_uri); from_thread_s.send(ReceiverMsg::ReceiverArmed)?; + stream.set_read_timeout(Some(Duration::from_millis(100)))?; + // wait for the acquisition header, which is sent when the detector // is armed with STARTACQUISITION: let acquisition_header: QdAcquisitionHeader = diff --git a/libertem_qd_mpx/src/base_types.rs b/libertem_qd_mpx/src/base_types.rs index 1a3f850a..0cacc670 100644 --- a/libertem_qd_mpx/src/base_types.rs +++ b/libertem_qd_mpx/src/base_types.rs @@ -1,19 +1,19 @@ use std::{ + any::type_name, collections::HashMap, fmt::Debug, str::{FromStr, Split, Utf8Error}, string::FromUtf8Error, + time::Duration, }; use common::{ - decoder::{try_cast_primitive, DecoderError}, frame_stack::FrameMeta, generic_connection::{AcquisitionConfig, DetectorConnectionConfig}, }; -use itertools::Itertools; -use log::warn; -use num::{Num, NumCast, ToPrimitive}; -use pyo3::pyclass; +use log::{trace, warn}; +use num::Num; +use pyo3::{pyclass, pymethods}; use serde::{Deserialize, Serialize}; /// Size of the full prefix, in bytes, including the comma separator to the payload: 'MPX,,' @@ -123,10 +123,16 @@ impl From for FrameMetaParseError { } } -fn next_part_from_str(parts: &mut Split) -> Result { +fn next_part_from_str(parts: &mut Split) -> Result +where + ::Err: std::fmt::Debug, +{ let part_str = parts.next().ok_or(NextPartError::Eof)?; - let value: T = part_str.parse().map_err(|_e| NextPartError::ValueError { - msg: format!("unexpected value: {part_str}"), + let value: T = part_str.parse().map_err(|e| NextPartError::ValueError { + msg: format!( + "unexpected value: '{part_str}', expected {} ({e:?})", + type_name::() + ), })?; Ok(value) } @@ -134,22 +140,31 @@ fn next_part_from_str(parts: &mut Split) -> Result( parts: &mut Split, f: impl Fn(&str) -> Result<&str, NextPartError>, -) -> Result { +) -> Result +where + ::Err: std::fmt::Debug, +{ let part_str = parts.next().ok_or(NextPartError::Eof)?; let part_str = f(part_str)?; - let value: T = part_str.parse().map_err(|_e| NextPartError::ValueError { - msg: format!("unexpected value: {part_str}"), + let value: T = part_str.parse().map_err(|e| NextPartError::ValueError { + msg: format!( + "unexpected value: '{part_str}', expected {} ({e:?})", + type_name::() + ), })?; Ok(value) } -fn next_part_from_str_radix( - parts: &mut Split, - radix: u32, -) -> Result { +fn next_part_from_str_radix(parts: &mut Split, radix: u32) -> Result +where + ::FromStrRadixErr: std::fmt::Debug, +{ let part_str = parts.next().ok_or(NextPartError::Eof)?; - let value = T::from_str_radix(part_str, radix).map_err(|_e| NextPartError::ValueError { - msg: format!("unexpected value: {part_str}"), + let value = T::from_str_radix(part_str, radix).map_err(|e| NextPartError::ValueError { + msg: format!( + "unexpected value: '{part_str}', expected {} ({e:?}); radix={radix}", + type_name::() + ), })?; Ok(value) } @@ -292,6 +307,23 @@ impl QdFrameMeta { /// Parse frame header, including the MQ1 prefix pub fn parse_bytes(input: &[u8], mpx_length: usize) -> Result { + if input.len() < 16 { + return Err(FrameMetaParseError::Eof); + } + + // first, split off the header if there is more stuff after it: + let data_offset: u32 = std::str::from_utf8(&input[11..16])?.parse().map_err(|e| { + FrameMetaParseError::ValueError { + msg: format!("could not parse data offset! {e:?}"), + } + })?; + + let input = if input.len() > data_offset as usize { + &input[..data_offset as usize] + } else { + input + }; + let input_string = std::str::from_utf8(input)?; let mut parts = input_string.split(','); @@ -323,7 +355,7 @@ impl QdFrameMeta { let timestamp_ext = next_part_from_str(&mut parts)?; let acquisition_time_shutter_ns = next_part_from_str_with_map(&mut parts, |s| Ok(&s[0..s.len() - 2]))?; - let counter_depth = next_part_from_str(&mut parts)?; + let counter_depth = next_part_from_str_with_map(&mut parts, |s| Ok(s.trim()))?; Some(MQ1A { timestamp_ext, @@ -425,6 +457,13 @@ impl AcquisitionConfig for QdAcquisitionHeader { } } +#[pymethods] +impl QdAcquisitionHeader { + fn frames_in_acquisition(&self) -> usize { + self.frames_in_acquisition + } +} + fn get_key_and_parse( raw_kv: &HashMap, key: &str, @@ -469,7 +508,10 @@ impl QdAcquisitionHeader { pub fn parse_bytes(input: &[u8]) -> Result { // the acquisition header can actually be latin-1 (µA...) so let's try that: let input_string = encoding_rs::mem::decode_latin1(input); - // trace!("parsing acquisition header: {}", String::from_utf8_lossy(input)); + trace!( + "parsing acquisition header: {}", + String::from_utf8_lossy(input) + ); // trace!("parsing acquisition header: {}", String::from_utf8_lossy(&input[1003..])); // let input= input.strip_suffix(&[0]).unwrap_or(input); @@ -530,12 +572,15 @@ pub struct QdDetectorConnConfig { /// with `frame_stack_size` pub bytes_per_frame: usize, + pub drain: Option, + num_slots: usize, enable_huge_pages: bool, shm_handle_path: String, } impl QdDetectorConnConfig { + #[allow(clippy::too_many_arguments)] pub fn new( data_host: &str, data_port: usize, @@ -544,6 +589,7 @@ impl QdDetectorConnConfig { num_slots: usize, enable_huge_pages: bool, shm_handle_path: &str, + drain: Option, ) -> Self { Self { data_host: data_host.to_owned(), @@ -553,6 +599,7 @@ impl QdDetectorConnConfig { num_slots, enable_huge_pages, shm_handle_path: shm_handle_path.to_owned(), + drain, } } } @@ -649,6 +696,29 @@ mod test { assert_eq!(fm.get_shape(), (256, 256)); } + #[test] + fn test_parse_frame_header_from_mock() { + let inp = "MQ1,000001,00340,01,0016,0016,U08, 1x1,01,2020-05-18 16:51:49.971626,0.000555,0,0,0,1.200000E+2,5.110000E+2,0.000000E+0,0.000000E+0,0.000000E+0,0.000000E+0,0.000000E+0,0.000000E+0,3RX,175,511,000,000,000,000,000,000,125,255,125,125,100,100,082,100,087,030,128,004,255,129,128,176,168,511,511,MQ1A,2020-05-18T14:51:49.971626178Z,555000ns,6\n"; + let inp_bytes = inp.as_bytes(); + let fm = QdFrameMeta::parse_bytes(inp_bytes, 384 + 256 * 256 + 1).unwrap(); + + eprintln!("{fm:?}"); + } + + #[test] + fn test_parse_frame_header_from_mock_with_additional_stuff() { + let inp = "MQ1,000001,00340,01,0016,0016,U08, 1x1,01,2020-05-18 16:51:49.971626,0.000555,0,0,0,1.200000E+2,5.110000E+2,0.000000E+0,0.000000E+0,0.000000E+0,0.000000E+0,0.000000E+0,0.000000E+0,3RX,175,511,000,000,000,000,000,000,125,255,125,125,100,100,082,100,087,030,128,004,255,129,128,176,168,511,511,MQ1A,2020-05-18T14:51:49.971626178Z,555000ns,6\n"; + let inp_bytes = inp.as_bytes(); + let inp_plus_data: Vec = inp_bytes + .iter() + .chain(vec![0; 16 * 16].iter()) + .copied() + .collect(); + let fm = QdFrameMeta::parse_bytes(&inp_plus_data, 384 + 256 * 256 + 1).unwrap(); + + eprintln!("{fm:?}"); + } + #[test] fn test_parse_frame_header_example_4_u16_quad() { let inp = "MQ1,000001,00768,04,0512,0512,U16, 2x2,0F,2022-03-28 15:06:08.361851,0.000832,0,0,0,1.000000E+1,5.000000E+2,0.000000E+0,0.000000E+0,0.000000E+0,0.000000E+0,0.000000E+0,0.000000E+0,3RX,033,511,000,000,000,000,000,000,100,255,100,125,100,100,065,100,069,030,128,004,255,145,128,199,191,511,511,3RX,030,511,000,000,000,000,000,000,100,255,100,125,100,100,066,100,064,030,128,004,255,143,128,201,193,511,511,3RX,032,511,000,000,000,000,000,000,100,255,100,125,100,100,066,100,071,030,128,004,255,147,128,191,184,511,511,3RX,030,511,000,000,000,000,000,000,100,255,100,125,100,100,066,100,074,030,128,004,255,147,128,191,184,511,511,MQ1A,2022-03-28T14:06:08.361851667Z,831990ns,12,"; diff --git a/libertem_qd_mpx/src/decoder.rs b/libertem_qd_mpx/src/decoder.rs index c6d82785..5c2f0ca0 100644 --- a/libertem_qd_mpx/src/decoder.rs +++ b/libertem_qd_mpx/src/decoder.rs @@ -112,7 +112,9 @@ impl QdDecoder { O: DecoderTargetPixelType, { match &frame_meta.layout { - crate::base_types::Layout::L1x1 => self.decode_frame_single(frame_meta, input, output), + crate::base_types::Layout::L1x1 => { + self.decode_frame_single_chip(frame_meta, input, output) + } crate::base_types::Layout::L2x2 => self.decode_frame_quad(frame_meta, input, output), crate::base_types::Layout::L2x2G => self.decode_frame_quad(frame_meta, input, output), layout @ (crate::base_types::Layout::LNx1 | crate::base_types::Layout::LNx1G) => { @@ -160,7 +162,7 @@ impl QdDecoder { } } - fn decode_frame_single( + fn decode_frame_single_chip( &self, frame_meta: &QdFrameMeta, input: &[u8], diff --git a/libertem_qd_mpx/src/main_py.rs b/libertem_qd_mpx/src/main_py.rs index 87710140..01a5eb4b 100644 --- a/libertem_qd_mpx/src/main_py.rs +++ b/libertem_qd_mpx/src/main_py.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use common::{decoder::DecoderTargetPixelType, generic_connection::GenericConnection}; use numpy::{Element, PyArray1, PyArrayMethods, PyUntypedArray, PyUntypedArrayMethods}; use pyo3::{ @@ -42,16 +44,19 @@ impl_py_connection!( #[pyclass] struct QdConnection { conn: _PyQdConnection, + config: QdDetectorConnConfig, } #[pymethods] impl QdConnection { #[new] + #[allow(clippy::too_many_arguments)] fn new( data_host: &str, data_port: usize, frame_stack_size: usize, shm_handle_path: &str, + drain: Option, num_slots: Option, bytes_per_frame: Option, huge: Option, @@ -61,6 +66,12 @@ impl QdConnection { let num_slots = num_slots.unwrap_or(2000); let bytes_per_frame = bytes_per_frame.unwrap_or(256 * 256 * 2); + let drain = if drain.unwrap_or(false) { + Some(Duration::from_millis(100)) + } else { + None + }; + let config = QdDetectorConnConfig::new( data_host, data_port, @@ -69,6 +80,7 @@ impl QdConnection { num_slots, huge.unwrap_or(false), shm_handle_path, + drain, ); let shm = @@ -83,7 +95,7 @@ impl QdConnection { let conn = _PyQdConnection::new(shm, generic_conn); - Ok(QdConnection { conn }) + Ok(QdConnection { conn, config }) } fn wait_for_arm( @@ -102,8 +114,10 @@ impl QdConnection { self.conn.is_running() } - fn start_passive(&mut self) -> PyResult<()> { - self.conn.start_passive() + fn start_passive(&mut self, py: Python<'_>) -> PyResult<()> { + let timeout = + Duration::from_millis(100) + self.config.drain.unwrap_or(Duration::from_millis(1)); + self.conn.start_passive(timeout.as_secs_f32(), py) } fn close(&mut self) -> PyResult<()> {