From 83c2303af2d233cca0aff47222e07b6c3dc415e8 Mon Sep 17 00:00:00 2001 From: Alexander Clausen Date: Wed, 17 Jul 2024 12:30:13 +0200 Subject: [PATCH] Extract tcp helpers and use interruptible reads in `asi_mpx` --- common/src/lib.rs | 1 + common/src/tcp.rs | 129 +++++++++++++++++++++ libertem_asi_mpx3/src/background_thread.rs | 94 +++++++++------ libertem_qd_mpx/src/background_thread.rs | 84 ++++---------- 4 files changed, 211 insertions(+), 97 deletions(-) create mode 100644 common/src/tcp.rs diff --git a/common/src/lib.rs b/common/src/lib.rs index 11238cea..9597aa09 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -6,4 +6,5 @@ pub mod generic_connection; pub mod generic_receiver; pub mod py_cam_client; pub mod py_connection; +pub mod tcp; pub mod utils; diff --git a/common/src/tcp.rs b/common/src/tcp.rs new file mode 100644 index 00000000..d2e10d69 --- /dev/null +++ b/common/src/tcp.rs @@ -0,0 +1,129 @@ +use std::{ + io::{ErrorKind, Read}, + net::TcpStream, + time::Duration, +}; + +use log::trace; + +#[derive(thiserror::Error, Debug)] +pub enum ReadExactError { + #[error("read interrupted after {size} bytes; error: {err}")] + Interrupted { size: usize, err: E }, + + #[error("i/o error")] + IOError { + #[from] + err: std::io::Error, + }, + + #[error("could not peek {size} bytes")] + PeekError { size: usize }, +} + +/// Fill `buf` from `stream` (like TcpStream::read_exact); if we hit a read timeout +/// on the stream, invoke `f`. If `f` returns an `Err`, stop reading from the stream. +/// +/// In case of interruption, `buf` may contain a partial result, and +/// `ReadExactError::Interrupted` will contain the number of bytes read in the +/// `size` field. +pub fn read_exact_interruptible( + stream: &mut impl Read, + buf: &mut [u8], + f: F, +) -> Result<(), ReadExactError> +where + F: Fn() -> Result<(), E>, +{ + let total_to_read = buf.len(); + let mut buf_sliced = buf; + let mut bytes_read: usize = 0; + loop { + if let Err(e) = f() { + return Err(ReadExactError::Interrupted { + size: bytes_read, + err: e, + }); + } + match stream.read(buf_sliced) { + Ok(size) => { + bytes_read += size; + buf_sliced = &mut buf_sliced[size..]; + // it's full! we are done... + if bytes_read == total_to_read { + return Ok(()); + } + } + Err(e) => match e.kind() { + ErrorKind::WouldBlock | ErrorKind::TimedOut => { + continue; + } + _ => return Err(ReadExactError::from(e)), + }, + } + } +} + +/// Fill `buf` from `stream`; if we hit a read timeout on the stream, invoke +/// `f`. If `f` returns an `Err`, stop peeking. +/// +/// In case of interruption, `buf` may contain a partial result, which is also +/// still in the internal queue of the socket. +/// +/// Retry `max_retries` times with a sleep of `retry_interval` in between +/// retries; this allows to break free if the socket buffer is too small to peek +/// the requested amount of data, and sleeping in between means we don't +/// completely hog a CPU core. +pub fn peek_exact_interruptible( + stream: &mut TcpStream, + buf: &mut [u8], + retry_interval: Duration, + max_retries: usize, + f: F, +) -> Result<(), ReadExactError> +where + F: Fn() -> Result<(), E>, +{ + loop { + if let Err(e) = f() { + return Err(ReadExactError::Interrupted { + // we may write bytes to `buf`, but they are not consumed from the socket, + // so we report zero here: + size: 0, + err: e, + }); + } + + let mut retries = 0; + match stream.peek(buf) { + Ok(size) => { + if size == buf.len() { + // it's full! we are done... + return Ok(()); + } else { + trace!( + "peek_exact_interruptible: not full; {size} != {}", + buf.len() + ); + retries += 1; + if retries > max_retries { + return Err(ReadExactError::PeekError { size: buf.len() }); + } + // we are only using this for peeking at the first frame, or its header, + // so we can be a bit sleepy here: + std::thread::sleep(retry_interval); + } + } + Err(e) => { + trace!("peek_exact_interruptible: err: {e}"); + match e.kind() { + // in this case, we couldn't peek the full size, so we try again + ErrorKind::WouldBlock | ErrorKind::TimedOut => { + continue; + } + _ => return Err(ReadExactError::from(e)), + } + } + } + } +} diff --git a/libertem_asi_mpx3/src/background_thread.rs b/libertem_asi_mpx3/src/background_thread.rs index 459136bf..ade85bc8 100644 --- a/libertem_asi_mpx3/src/background_thread.rs +++ b/libertem_asi_mpx3/src/background_thread.rs @@ -1,7 +1,6 @@ use std::{ - fmt::{Debug, Display}, - io::{ErrorKind, Read}, - mem::replace, + fmt::Display, + io::ErrorKind, net::TcpStream, sync::mpsc::{channel, Receiver, RecvTimeoutError, SendError, Sender, TryRecvError}, thread::JoinHandle, @@ -11,6 +10,7 @@ use std::{ use common::{ background_thread::{BackgroundThread, BackgroundThreadSpawnError, ControlMsg, ReceiverMsg}, frame_stack::{FrameStackForWriting, FrameStackWriteError}, + tcp::{self, ReadExactError}, utils::{num_from_byte_slice, three_way_shift, NumParseError}, }; use ipc_test::SharedSlabAllocator; @@ -49,29 +49,38 @@ pub enum ReceiverStatus { Closed, } -/// Peek and parse the first frame header -fn peek_header(stream: &mut TcpStream) -> Result { +/// Peek and parse the first frame header. Retries until either a header was +/// received or a control message arrives in the control channel. +fn peek_header( + stream: &mut TcpStream, + control_channel: &Receiver, +) -> Result { let mut buf: [u8; HEADER_BUF_SIZE] = [0; HEADER_BUF_SIZE]; - // FIXME: error handling, timeout, ... - - let mut nbytes = 0; - // Ugh.. wait until enough data is in the buffer - // possibly, the sender sends the header and payload separately, + // Wait until enough data is in the buffer. + // Possibly, the sender sends the header and payload separately, // in which case we get only a short header, and we need to retry. // All because we don't really know how large the header is supposed to be. // This is broken for very small frames (where header+data < 512), // so if an acquisition only contains <512 bytes in total, we will wait // here indefinitely. - while nbytes < HEADER_BUF_SIZE { - nbytes = match stream.peek(&mut buf) { - Ok(n) => n, - Err(e) => return Err(AcquisitionError::ConnectionError { msg: e.to_string() }), + + loop { + match tcp::peek_exact_interruptible(stream, &mut buf, Duration::from_millis(10), 10, || { + check_for_control(control_channel) + }) { + Ok(_) => { + return Ok(parse_header(&buf, 0)?); + } + Err(ReadExactError::PeekError { size }) => { + trace!("peek of {size} bytes failed; retrying..."); + continue; + } + Err(e) => { + return Err(AcquisitionError::from(e)); + } } - // FIXME: timeout!! } - - Ok(parse_header(&buf, 0)?) } const HEADER_BUF_SIZE: usize = 512; @@ -173,19 +182,7 @@ fn recv_frame( let mut buf: [u8; HEADER_BUF_SIZE] = [0; HEADER_BUF_SIZE]; - // FIXME: need to have a timeout here! - // In the happy case, this succeeds, or we get a - // ConnectionReset/ConnectionAborted, but in case the network inbetween is - // going bad, we might block here indefinitely. But we must regularly check - // for control messages from the `control_channel`, which we can't do here - // like this. - match stream.read_exact(&mut buf) { - Ok(_) => {} - Err(e) => { - // any kind of connection error means something is gone bad - return Err(AcquisitionError::ConnectionError { msg: e.to_string() }); - } - } + tcp::read_exact_interruptible(stream, &mut buf, || check_for_control(control_channel))?; // 2) Parse the header, importantly reading width, height, bytes-per-pixel (maxval) @@ -227,11 +224,9 @@ fn recv_frame( let dest_rest = &mut dest_buf[head_src.len()..]; - // FIXME: this blocks - we need to check for control messages every now and then - match stream.read_exact(dest_rest) { - Ok(_) => Ok(()), - Err(e) => Err(AcquisitionError::ConnectionError { msg: e.to_string() }), - } + tcp::read_exact_interruptible(stream, dest_rest, || check_for_control(control_channel))?; + + Ok::<_, AcquisitionError>(()) })?; Ok(meta) @@ -289,6 +284,29 @@ impl From for AcquisitionError { } } +impl From> for AcquisitionError { + fn from(value: ReadExactError) -> Self { + match value { + ReadExactError::Interrupted { size, err } => { + warn!("interrupted read after {size} bytes; discarding buffer"); + err + } + ReadExactError::IOError { err } => Self::from(err), + ReadExactError::PeekError { size } => AcquisitionError::ConnectionError { + msg: format!("could not peek {size} bytes"), + }, + } + } +} + +impl From for AcquisitionError { + fn from(value: std::io::Error) -> Self { + Self::ConnectionError { + msg: format!("i/o error: {value}"), + } + } +} + /// With a running acquisition, check for control messages; /// especially convert `ControlMsg::StopThread` to `AcquisitionError::Cancelled`. fn check_for_control(control_channel: &Receiver) -> Result<(), AcquisitionError> { @@ -339,8 +357,10 @@ fn passive_acquisition( }, }; + stream.set_read_timeout(Some(Duration::from_millis(100)))?; + // block until we get the first frame: - let first_frame_meta = match peek_header(&mut stream) { + let first_frame_meta = match peek_header(&mut stream, control_channel) { Ok(m) => m, Err(AcquisitionError::ConnectionError { msg }) => { warn!("connection error while peeking first frame: {msg}; reconnecting"); @@ -390,7 +410,7 @@ fn acquisition( debug!("acquisition starting"); // approx uppper bound of image size in bytes - let peek_meta = peek_header(stream)?; + let peek_meta = peek_header(stream, to_thread_r)?; let approx_size_bytes = 2 * peek_meta.get_size(); let slot = match shm.get_mut() { diff --git a/libertem_qd_mpx/src/background_thread.rs b/libertem_qd_mpx/src/background_thread.rs index a5e1e22c..d746ee37 100644 --- a/libertem_qd_mpx/src/background_thread.rs +++ b/libertem_qd_mpx/src/background_thread.rs @@ -10,6 +10,7 @@ use common::{ background_thread::{BackgroundThread, BackgroundThreadSpawnError, ControlMsg, ReceiverMsg}, frame_stack::{FrameMeta, FrameStackForWriting, FrameStackWriteError}, generic_connection::AcquisitionConfig, + tcp::{self, ReadExactError}, utils::{num_from_byte_slice, three_way_shift, NumParseError}, }; use ipc_test::{slab::ShmError, SharedSlabAllocator}; @@ -98,6 +99,19 @@ impl From for AcquisitionError { } } +impl From> for AcquisitionError { + fn from(value: ReadExactError) -> Self { + match value { + ReadExactError::Interrupted { size, err } => { + warn!("interrupted read after {size} bytes; discarding buffer"); + err + } + ReadExactError::IOError { err } => AcquisitionError::from(err), + ReadExactError::PeekError { size } => AcquisitionError::PeekError { nbytes: size }, + } + } +} + pub struct QdBackgroundThread { bg_thread: JoinHandle<()>, to_thread: Sender, @@ -123,35 +137,16 @@ fn check_for_control(control_channel: &Receiver) -> Result<(), Acq /// Fill `buf` from `stream` (like TcpStream::read_exact), but periodically check /// `to_thread_r` for control messages, which allows to interrupt the acquisition. -/// Assumes a blocking socket with a timeout. +/// Assumes a blocking socket with a timeout. In case of interruption, data will be discarded +/// (it's still in `buf`, but we don't keep track how much we read...) fn read_exact_interruptible( stream: &mut impl Read, buf: &mut [u8], to_thread_r: &Receiver, ) -> Result<(), AcquisitionError> { - let total_to_read = buf.len(); - let mut buf_sliced = buf; - let mut bytes_read: usize = 0; - loop { - check_for_control(to_thread_r)?; - match stream.read(buf_sliced) { - Ok(size) => { - bytes_read += size; - buf_sliced = &mut buf_sliced[size..]; - // it's full! we are done... - if bytes_read == total_to_read { - return Ok(()); - } - } - Err(e) => match e.kind() { - ErrorKind::WouldBlock | ErrorKind::TimedOut => { - check_for_control(to_thread_r)?; - continue; - } - _ => return Err(AcquisitionError::from(e)), - }, - } - } + common::tcp::read_exact_interruptible(stream, buf, || check_for_control(to_thread_r))?; + + Ok(()) } /// Read from `stream` until we hit the timeout. Original socket timeout is @@ -198,42 +193,11 @@ fn peek_exact_interruptible( buf: &mut [u8], to_thread_r: &Receiver, ) -> Result<(), AcquisitionError> { - loop { - trace!("peek_exact_interruptible: loop head"); - check_for_control(to_thread_r)?; - let mut retries = 0; - match stream.peek(buf) { - Ok(size) => { - if size == buf.len() { - // it's full! we are done... - return Ok(()); - } else { - trace!( - "peek_exact_interruptible: not full; {size} != {}", - buf.len() - ); - retries += 1; - if retries > 10 { - return Err(AcquisitionError::PeekError { nbytes: buf.len() }); - } - // we are only using this for peeking at the first frame, or its header, - // so we can be a bit sleepy here: - std::thread::sleep(Duration::from_millis(10)); - } - } - Err(e) => { - trace!("peek_exact_interruptible: err: {e}"); - match e.kind() { - // in this case, we couldn't peek the full size, so we try again - ErrorKind::WouldBlock | ErrorKind::TimedOut => { - check_for_control(to_thread_r)?; - continue; - } - _ => return Err(AcquisitionError::from(e)), - } - } - } - } + tcp::peek_exact_interruptible(stream, buf, Duration::from_millis(10), 10, || { + check_for_control(to_thread_r) + })?; + + Ok(()) } fn parse_mpx_length(buf: &[u8]) -> Result {