Skip to content

Commit

Permalink
Stability improvements
Browse files Browse the repository at this point in the history
* Check input/output buffer sizes when decoding integer formatted frames
* `start_passive`
    * short-circuit if we are already Armed
    * drop the GIL while waiting for the message from the background
      thread
    * Allow to customize the timeout for waiting for the status change;
      this is useful for example in case we know that the bg thread
      first has to drain stuff
* Implement draining (mostly for backwards-compat.)
* Add numerous logging messages
* `QdFrameMeta::parse_bytes`: ignore additional data after the header
* Add `QdAcquisitionHeader::frames_in_acquisition` Python method
  • Loading branch information
sk1p committed Jul 16, 2024
1 parent 88fdea1 commit 738a62a
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 77 deletions.
49 changes: 28 additions & 21 deletions common/src/decoder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{any::type_name, fmt::Debug};
use std::{any::type_name, fmt::Debug, mem::size_of};

use ipc_test::SharedSlabAllocator;
use ndarray::ArrayViewMut3;
Expand Down Expand Up @@ -90,37 +90,44 @@ where
}
}

pub fn decode_ints_be<'a, O, FromT>(input: &'a [u8], output: &mut [O]) -> Result<(), DecoderError>
pub fn decode_ints_be<'a, O, ItemType>(
input: &'a [u8],
output: &mut [O],
) -> Result<(), DecoderError>
where
O: DecoderTargetPixelType,
FromT: ToPrimitive + Debug + Copy + num::traits::FromBytes,
&'a <FromT as num::traits::FromBytes>::Bytes: std::convert::TryFrom<&'a [u8]>,
<&'a <FromT as num::traits::FromBytes>::Bytes as std::convert::TryFrom<&'a [u8]>>::Error: Debug,
<FromT as num::traits::FromBytes>::Bytes: 'a,
ItemType: ToPrimitive + Debug + Copy + num::traits::FromBytes,
&'a <ItemType as num::traits::FromBytes>::Bytes: std::convert::TryFrom<&'a [u8]>,
<&'a <ItemType as num::traits::FromBytes>::Bytes as std::convert::TryFrom<&'a [u8]>>::Error:
Debug,
<ItemType as num::traits::FromBytes>::Bytes: 'a,
{
if input.len() % std::mem::size_of::<FromT>() != 0 {
if input.len() % std::mem::size_of::<ItemType>() != 0 {
return Err(DecoderError::FrameDecodeFailed {
msg: format!(
"input length {} is not divisible by {}",
"input length {} is not divisible by item size {}",
input.len(),
std::mem::size_of::<FromT>(),
std::mem::size_of::<ItemType>(),
),
});
}

let chunks = input.chunks_exact(std::mem::size_of::<FromT>());
if input.len() / std::mem::size_of::<ItemType>() != output.len() {
return Err(DecoderError::FrameDecodeFailed {
msg: format!(
"input length {} does not match output size {} for type {} item size {}",
input.len(),
output.len(),
type_name::<ItemType>(),
size_of::<ItemType>(),
),
});
}

let chunks = input.chunks_exact(std::mem::size_of::<ItemType>());
for (in_chunk, out_dest) in chunks.zip(output.iter_mut()) {
let swapped = FromT::from_be_bytes(in_chunk.try_into().expect("chunked"));
*out_dest = if let Some(value) = NumCast::from(swapped) {
value
} else {
return Err(DecoderError::FrameDecodeFailed {
msg: format!(
"dtype conversion error: {swapped:?} does not fit {0}",
type_name::<O>()
),
});
}
let swapped = ItemType::from_be_bytes(in_chunk.try_into().expect("chunked"));
*out_dest = try_cast_primitive(swapped)?;
}

Ok(())
Expand Down
13 changes: 8 additions & 5 deletions common/src/generic_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,20 +319,23 @@ where
pub fn start_passive<E>(
&mut self,
periodic_callback: impl Fn() -> Result<(), E>,
timeout: &Duration,
) -> Result<(), ConnectionError>
where
E: std::error::Error + 'static + Send + Sync,
{
if self.status == ConnectionStatus::Armed {
// already armed, don't have to do anything
debug!("start_passive: already armed, nothing to do");
return Ok(());
}

self.bg_thread
.channel_to_thread()
.send(ControlMsg::StartAcquisitionPassive)
.map_err(|_e| ConnectionError::Disconnected)?;

self.wait_for_status(
ConnectionStatus::Armed,
Duration::from_millis(1000),
periodic_callback,
)
self.wait_for_status(ConnectionStatus::Armed, *timeout, periodic_callback)
}

/// Receive the next frame stack from the background thread and handle any
Expand Down
40 changes: 21 additions & 19 deletions common/src/py_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,30 +195,32 @@ macro_rules! impl_py_connection {
Ok(conn_impl.is_running())
}

pub fn start_passive(&mut self) -> PyResult<()> {
let conn_impl = self.get_conn_mut()?;
conn_impl
.start_passive(|| {
Python::with_gil(|py| py.check_signals())?;
Ok::<_, PyErr>(())
})
.map_err(|e| {
PyConnectionError::new_err(format!("start_passive failed: {e}"))
})?;
pub fn start_passive(&mut self, timeout: f32, py: Python<'_>) -> PyResult<()> {
let timeout = Duration::from_secs_f32(timeout);
py.allow_threads(|| {
let conn_impl = self.get_conn_mut()?;
conn_impl
.start_passive(
|| {
Python::with_gil(|py| py.check_signals())?;
Ok::<_, PyErr>(())
},
&timeout,
)
.map_err(|e| {
PyConnectionError::new_err(format!("start_passive failed: {e}"))
})?;

conn_impl
.wait_for_status(
ConnectionStatus::Armed,
Duration::from_millis(100),
|| {
conn_impl
.wait_for_status(ConnectionStatus::Armed, timeout, || {
// re-acquire GIL to check if we need to break
Python::with_gil(|py| py.check_signals())?;
Ok::<_, PyErr>(())
},
)
.map_err(|e| PyConnectionError::new_err(e.to_string()))?;
})
.map_err(|e| PyConnectionError::new_err(e.to_string()))?;

Ok(())
Ok(())
})
}

pub fn log_shm_stats(&self) -> PyResult<()> {
Expand Down
4 changes: 2 additions & 2 deletions libertem_asi_mpx3/src/main_py.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ impl ServalConnection {
self.conn.is_running()
}

fn start_passive(&mut self) -> PyResult<()> {
self.conn.start_passive()
fn start_passive(&mut self, py: Python<'_>) -> PyResult<()> {
self.conn.start_passive(0.1, py)
}

fn close(&mut self) -> PyResult<()> {
Expand Down
4 changes: 2 additions & 2 deletions libertem_dectris/src/dectris_py.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ impl DectrisConnection {
}

/// Start listening for global acquisition headers on the zeromq socket.
fn start_passive(&mut self) -> PyResult<()> {
self.conn.start_passive()
fn start_passive(&mut self, py: Python<'_>) -> PyResult<()> {
self.conn.start_passive(0.1, py)
}

fn close(&mut self) -> PyResult<()> {
Expand Down
62 changes: 58 additions & 4 deletions libertem_qd_mpx/src/background_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,17 @@ fn read_exact_interruptible(
buf: &mut [u8],
to_thread_r: &Receiver<QdControlMsg>,
) -> Result<(), AcquisitionError> {
let total_to_read = buf.len();
let mut buf_sliced = buf;
let mut bytes_read: usize = 0;
loop {
check_for_control(to_thread_r)?;
match stream.read(buf_sliced) {
Ok(size) => {
bytes_read += size;
buf_sliced = &mut buf_sliced[size..];
// it's full! we are done...
if buf_sliced.is_empty() {
if bytes_read == total_to_read {
return Ok(());
}
}
Expand All @@ -151,6 +154,42 @@ fn read_exact_interruptible(
}
}

/// Read from `stream` until we hit the timeout. Original socket timeout is
/// overwritten and should be restored by the caller.
fn drain_until_timeout(
stream: &mut TcpStream,
to_thread_r: &Receiver<QdControlMsg>,
timeout: &Duration,
) -> Result<usize, AcquisitionError> {
let mut tmp = vec![0; 1024 * 1024];
let mut total_drained: usize = 0;
stream.set_read_timeout(Some(*timeout))?;
loop {
check_for_control(to_thread_r)?;
total_drained += match stream.read(&mut tmp) {
Ok(size) => {
trace!("drain_until_timeout: drained {size}");
if size == 0 {
// EOF: okay....
warn!("drained until EOF!");
return Ok(total_drained);
}
size
}
Err(e) => match e.kind() {
ErrorKind::TimedOut | ErrorKind::WouldBlock => {
trace!("drain_until_timeout: timeout: {e:?}");
return Ok(total_drained);
}
kind => {
trace!("drain_until_timeout: other error kind: {kind:?}");
return Err(e.into());
}
},
};
}
}

/// Fill `buf` from `stream` (like TcpStream::peek), but periodically check
/// `to_thread_r` for control messages, which allows to interrupt the acquisition.
/// Assumes a blocking socket with a timeout.
Expand Down Expand Up @@ -303,6 +342,8 @@ fn peek_mpx_message(
let mut prefix = [0u8; PREFIX_SIZE];
peek_exact_interruptible(stream, &mut prefix, to_thread_r)?;

trace!("peek_mpx_message: {}", String::from_utf8_lossy(&prefix));

let magic = &prefix[0..4];
if magic != b"MPX," {
return Err(AcquisitionError::HeaderParseError {
Expand Down Expand Up @@ -337,6 +378,10 @@ fn peek_first_frame_header(
to_thread_r: &Receiver<QdControlMsg>,
) -> Result<QdFrameMeta, AcquisitionError> {
let msg = peek_mpx_message(stream, to_thread_r, 2048)?;
trace!(
"parsing first frame header: '{}'",
String::from_utf8_lossy(&msg.payload)
);
Ok(QdFrameMeta::parse_bytes(&msg.payload, msg.length)?)
}

Expand Down Expand Up @@ -503,7 +548,7 @@ fn passive_acquisition(

loop {
let data_uri = format!("{host}:{port}");
trace!("connecting to {}...", &data_uri);
info!("connecting to {}...", &data_uri);

check_for_control(to_thread_r)?;
let mut stream: TcpStream = match TcpStream::connect(&data_uri) {
Expand All @@ -524,11 +569,20 @@ fn passive_acquisition(
},
};

stream.set_read_timeout(Some(Duration::from_millis(100)))?;
info!("connected to {}.", &data_uri);

if let Some(timeout) = &config.drain {
info!("draining for {timeout:?}...");
let drained = drain_until_timeout(&mut stream, to_thread_r, timeout)?;
if drained > 0 {
info!("drained {drained} bytes of garbage");
}
}

trace!("connected to {}", &data_uri);
from_thread_s.send(ReceiverMsg::ReceiverArmed)?;

stream.set_read_timeout(Some(Duration::from_millis(100)))?;

// wait for the acquisition header, which is sent when the detector
// is armed with STARTACQUISITION:
let acquisition_header: QdAcquisitionHeader =
Expand Down
Loading

0 comments on commit 738a62a

Please sign in to comment.