Skip to content

Commit

Permalink
Extract tcp helpers and use interruptible reads in asi_mpx
Browse files Browse the repository at this point in the history
  • Loading branch information
sk1p committed Jul 17, 2024
1 parent 738a62a commit 83c2303
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 97 deletions.
1 change: 1 addition & 0 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ pub mod generic_connection;
pub mod generic_receiver;
pub mod py_cam_client;
pub mod py_connection;
pub mod tcp;
pub mod utils;
129 changes: 129 additions & 0 deletions common/src/tcp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use std::{
io::{ErrorKind, Read},
net::TcpStream,
time::Duration,
};

use log::trace;

#[derive(thiserror::Error, Debug)]
pub enum ReadExactError<E> {
#[error("read interrupted after {size} bytes; error: {err}")]
Interrupted { size: usize, err: E },

#[error("i/o error")]
IOError {
#[from]
err: std::io::Error,
},

#[error("could not peek {size} bytes")]
PeekError { size: usize },
}

/// Fill `buf` from `stream` (like TcpStream::read_exact); if we hit a read timeout
/// on the stream, invoke `f`. If `f` returns an `Err`, stop reading from the stream.
///
/// In case of interruption, `buf` may contain a partial result, and
/// `ReadExactError::Interrupted` will contain the number of bytes read in the
/// `size` field.
pub fn read_exact_interruptible<E, F>(
stream: &mut impl Read,
buf: &mut [u8],
f: F,
) -> Result<(), ReadExactError<E>>
where
F: Fn() -> Result<(), E>,
{
let total_to_read = buf.len();
let mut buf_sliced = buf;
let mut bytes_read: usize = 0;
loop {
if let Err(e) = f() {
return Err(ReadExactError::Interrupted {
size: bytes_read,
err: e,
});
}
match stream.read(buf_sliced) {
Ok(size) => {
bytes_read += size;
buf_sliced = &mut buf_sliced[size..];
// it's full! we are done...
if bytes_read == total_to_read {
return Ok(());
}
}
Err(e) => match e.kind() {
ErrorKind::WouldBlock | ErrorKind::TimedOut => {
continue;
}
_ => return Err(ReadExactError::from(e)),
},
}
}
}

/// Fill `buf` from `stream`; if we hit a read timeout on the stream, invoke
/// `f`. If `f` returns an `Err`, stop peeking.
///
/// In case of interruption, `buf` may contain a partial result, which is also
/// still in the internal queue of the socket.
///
/// Retry `max_retries` times with a sleep of `retry_interval` in between
/// retries; this allows to break free if the socket buffer is too small to peek
/// the requested amount of data, and sleeping in between means we don't
/// completely hog a CPU core.
pub fn peek_exact_interruptible<E, F>(
stream: &mut TcpStream,
buf: &mut [u8],
retry_interval: Duration,
max_retries: usize,
f: F,
) -> Result<(), ReadExactError<E>>
where
F: Fn() -> Result<(), E>,
{
loop {
if let Err(e) = f() {
return Err(ReadExactError::Interrupted {
// we may write bytes to `buf`, but they are not consumed from the socket,
// so we report zero here:
size: 0,
err: e,
});
}

let mut retries = 0;
match stream.peek(buf) {
Ok(size) => {
if size == buf.len() {
// it's full! we are done...
return Ok(());
} else {
trace!(
"peek_exact_interruptible: not full; {size} != {}",
buf.len()
);
retries += 1;
if retries > max_retries {
return Err(ReadExactError::PeekError { size: buf.len() });
}
// we are only using this for peeking at the first frame, or its header,
// so we can be a bit sleepy here:
std::thread::sleep(retry_interval);
}
}
Err(e) => {
trace!("peek_exact_interruptible: err: {e}");
match e.kind() {
// in this case, we couldn't peek the full size, so we try again
ErrorKind::WouldBlock | ErrorKind::TimedOut => {
continue;
}
_ => return Err(ReadExactError::from(e)),
}
}
}
}
}
94 changes: 57 additions & 37 deletions libertem_asi_mpx3/src/background_thread.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{
fmt::{Debug, Display},
io::{ErrorKind, Read},
mem::replace,
fmt::Display,
io::ErrorKind,
net::TcpStream,
sync::mpsc::{channel, Receiver, RecvTimeoutError, SendError, Sender, TryRecvError},
thread::JoinHandle,
Expand All @@ -11,6 +10,7 @@ use std::{
use common::{
background_thread::{BackgroundThread, BackgroundThreadSpawnError, ControlMsg, ReceiverMsg},
frame_stack::{FrameStackForWriting, FrameStackWriteError},
tcp::{self, ReadExactError},
utils::{num_from_byte_slice, three_way_shift, NumParseError},
};
use ipc_test::SharedSlabAllocator;
Expand Down Expand Up @@ -49,29 +49,38 @@ pub enum ReceiverStatus {
Closed,
}

/// Peek and parse the first frame header
fn peek_header(stream: &mut TcpStream) -> Result<ASIMpxFrameMeta, AcquisitionError> {
/// Peek and parse the first frame header. Retries until either a header was
/// received or a control message arrives in the control channel.
fn peek_header(
stream: &mut TcpStream,
control_channel: &Receiver<ASIMpxControlMsg>,
) -> Result<ASIMpxFrameMeta, AcquisitionError> {
let mut buf: [u8; HEADER_BUF_SIZE] = [0; HEADER_BUF_SIZE];
// FIXME: error handling, timeout, ...

let mut nbytes = 0;

// Ugh.. wait until enough data is in the buffer
// possibly, the sender sends the header and payload separately,
// Wait until enough data is in the buffer.
// Possibly, the sender sends the header and payload separately,
// in which case we get only a short header, and we need to retry.
// All because we don't really know how large the header is supposed to be.
// This is broken for very small frames (where header+data < 512),
// so if an acquisition only contains <512 bytes in total, we will wait
// here indefinitely.
while nbytes < HEADER_BUF_SIZE {
nbytes = match stream.peek(&mut buf) {
Ok(n) => n,
Err(e) => return Err(AcquisitionError::ConnectionError { msg: e.to_string() }),

loop {
match tcp::peek_exact_interruptible(stream, &mut buf, Duration::from_millis(10), 10, || {
check_for_control(control_channel)
}) {
Ok(_) => {
return Ok(parse_header(&buf, 0)?);
}
Err(ReadExactError::PeekError { size }) => {
trace!("peek of {size} bytes failed; retrying...");
continue;
}
Err(e) => {
return Err(AcquisitionError::from(e));
}
}
// FIXME: timeout!!
}

Ok(parse_header(&buf, 0)?)
}

const HEADER_BUF_SIZE: usize = 512;
Expand Down Expand Up @@ -173,19 +182,7 @@ fn recv_frame(

let mut buf: [u8; HEADER_BUF_SIZE] = [0; HEADER_BUF_SIZE];

// FIXME: need to have a timeout here!
// In the happy case, this succeeds, or we get a
// ConnectionReset/ConnectionAborted, but in case the network inbetween is
// going bad, we might block here indefinitely. But we must regularly check
// for control messages from the `control_channel`, which we can't do here
// like this.
match stream.read_exact(&mut buf) {
Ok(_) => {}
Err(e) => {
// any kind of connection error means something is gone bad
return Err(AcquisitionError::ConnectionError { msg: e.to_string() });
}
}
tcp::read_exact_interruptible(stream, &mut buf, || check_for_control(control_channel))?;

// 2) Parse the header, importantly reading width, height, bytes-per-pixel (maxval)

Expand Down Expand Up @@ -227,11 +224,9 @@ fn recv_frame(

let dest_rest = &mut dest_buf[head_src.len()..];

// FIXME: this blocks - we need to check for control messages every now and then
match stream.read_exact(dest_rest) {
Ok(_) => Ok(()),
Err(e) => Err(AcquisitionError::ConnectionError { msg: e.to_string() }),
}
tcp::read_exact_interruptible(stream, dest_rest, || check_for_control(control_channel))?;

Ok::<_, AcquisitionError>(())
})?;

Ok(meta)
Expand Down Expand Up @@ -289,6 +284,29 @@ impl From<ServalError> for AcquisitionError {
}
}

impl From<ReadExactError<AcquisitionError>> for AcquisitionError {
fn from(value: ReadExactError<AcquisitionError>) -> Self {
match value {
ReadExactError::Interrupted { size, err } => {
warn!("interrupted read after {size} bytes; discarding buffer");
err
}
ReadExactError::IOError { err } => Self::from(err),
ReadExactError::PeekError { size } => AcquisitionError::ConnectionError {
msg: format!("could not peek {size} bytes"),
},
}
}
}

impl From<std::io::Error> for AcquisitionError {
fn from(value: std::io::Error) -> Self {
Self::ConnectionError {
msg: format!("i/o error: {value}"),
}
}
}

/// With a running acquisition, check for control messages;
/// especially convert `ControlMsg::StopThread` to `AcquisitionError::Cancelled`.
fn check_for_control(control_channel: &Receiver<ASIMpxControlMsg>) -> Result<(), AcquisitionError> {
Expand Down Expand Up @@ -339,8 +357,10 @@ fn passive_acquisition(
},
};

stream.set_read_timeout(Some(Duration::from_millis(100)))?;

// block until we get the first frame:
let first_frame_meta = match peek_header(&mut stream) {
let first_frame_meta = match peek_header(&mut stream, control_channel) {
Ok(m) => m,
Err(AcquisitionError::ConnectionError { msg }) => {
warn!("connection error while peeking first frame: {msg}; reconnecting");
Expand Down Expand Up @@ -390,7 +410,7 @@ fn acquisition(
debug!("acquisition starting");

// approx uppper bound of image size in bytes
let peek_meta = peek_header(stream)?;
let peek_meta = peek_header(stream, to_thread_r)?;
let approx_size_bytes = 2 * peek_meta.get_size();

let slot = match shm.get_mut() {
Expand Down
Loading

0 comments on commit 83c2303

Please sign in to comment.