Skip to content

Commit

Permalink
Misc. fixes for dectris: LiberTEM-live tests pass
Browse files Browse the repository at this point in the history
* Fix stats: properly count the last frame stack
* Fix `impl_py_connection` close implementation to actually
  `Option::take` the connection implementation and call `close` on it
* Add a lot of debug logging to make it easier to diagnose issues
  • Loading branch information
sk1p committed Jul 3, 2024
1 parent a9484a5 commit b9ed660
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 36 deletions.
9 changes: 8 additions & 1 deletion common/src/generic_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,11 +376,18 @@ where
return Err(ConnectionError::FatalError(error));
}
ReceiverMsg::Finished { frame_stack } => {
// Finished here means we have seen all frame stacks of the acquisition,
// it does _not_ mean that the data consumer has processed them all.

// do stats update here to make sure we count the last frame stack!
self.stats.count_stats_item(&frame_stack);
self.stats.log_stats();
self.stats.reset();

return Ok(Some(frame_stack));
}
ReceiverMsg::FrameStack { frame_stack } => {
self.stats.count_stats_item(&frame_stack);
return Ok(Some(frame_stack));
}
}
Expand Down Expand Up @@ -408,7 +415,6 @@ where
return Ok(Some(left));
}
assert!(frame_stack.len() <= max_size);
self.stats.count_stats_item(&frame_stack);
Ok(Some(frame_stack))
}
Ok(None) => Ok(None),
Expand Down Expand Up @@ -441,6 +447,7 @@ where
}

pub fn close(mut self) {
debug!("GenericConnection::close");
if self
.bg_thread
.channel_to_thread()
Expand Down
1 change: 0 additions & 1 deletion common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ pub mod decoder;
pub mod frame_stack;
pub mod generic_connection;
pub mod generic_receiver;
pub mod py_buffer_for_slot;
pub mod py_cam_client;
pub mod py_connection;
pub mod stats;
19 changes: 0 additions & 19 deletions common/src/py_buffer_for_slot.rs

This file was deleted.

12 changes: 8 additions & 4 deletions common/src/py_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,14 @@ macro_rules! impl_py_connection {
}

pub fn close(&mut self) -> PyResult<()> {
let conn_impl = self.get_conn_mut()?;
conn_impl.log_shm_stats();
conn_impl.reset_stats();
Ok(())
if let Some(mut conn_impl) = self.conn_impl.take() {
conn_impl.log_shm_stats();
conn_impl.reset_stats();
conn_impl.close();
Ok(())
} else {
Err(PyConnectionError::new_err("already closed".to_owned()))
}
}

pub fn is_running(&self) -> PyResult<bool> {
Expand Down
36 changes: 25 additions & 11 deletions libertem_dectris/src/background_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,16 @@ fn check_for_control(
msg: "received StartAcquisitionPassive while an acquisition was already running"
.to_string(),
}),
Ok(ControlMsg::StopThread) => Err(AcquisitionError::Cancelled),
Err(TryRecvError::Disconnected) => Err(AcquisitionError::Cancelled),
Ok(ControlMsg::StopThread) => {
debug!("check_for_control: StopThread received");

Err(AcquisitionError::Cancelled)
}
Err(TryRecvError::Disconnected) => {
debug!("check_for_control: Disconnected");

Err(AcquisitionError::Cancelled)
}
Err(TryRecvError::Empty) => Ok(()),
}
}
Expand All @@ -165,6 +173,10 @@ fn serialization_error(
msg: &Message,
err: &serde_json::Error,
) {
log::error!(
"background_thread: serialization error: {}",
err.to_string()
);
from_thread_s
.send(ReceiverMsg::FatalError {
error: Box::new(AcquisitionError::SerdeError {
Expand All @@ -175,10 +187,6 @@ fn serialization_error(
}),
})
.unwrap();
log::error!(
"background_thread: serialization error: {}",
err.to_string()
);
}

/// Passively listen for global acquisition headers
Expand Down Expand Up @@ -396,6 +404,7 @@ fn background_thread_wrap(
})
.unwrap();
}
info!("background_thread_wrap: done");
}

fn drain_if_mismatch(
Expand Down Expand Up @@ -444,6 +453,8 @@ fn background_thread(
mut shm: SharedSlabAllocator,
) -> Result<(), AcquisitionError> {
'outer: loop {
let thread_id = std::thread::current().id();
info!("background_thread: connecting to {uri} ({thread_id:?})");
let ctx = zmq::Context::new();
let socket = ctx.socket(zmq::PULL).unwrap();
socket.set_rcvtimeo(1000).unwrap();
Expand All @@ -466,8 +477,11 @@ fn background_thread(
&mut shm,
) {
Ok(_) => {}
Err(AcquisitionError::Disconnected | AcquisitionError::Cancelled) => {
return Ok(());
Err(
msg @ (AcquisitionError::Disconnected | AcquisitionError::Cancelled),
) => {
debug!("background_thread: passive_acquisition returned {msg:?}");
break 'outer;
}
Err(e) => {
from_thread_s
Expand Down Expand Up @@ -496,7 +510,7 @@ fn background_thread(
Ok(header) => header,
Err(err) => {
serialization_error(from_thread_s, &msg, &err);
break;
continue 'outer;
}
};
debug!("dheader: {dheader:?}");
Expand All @@ -510,7 +524,7 @@ fn background_thread(
Ok(header) => header,
Err(err) => {
serialization_error(from_thread_s, &msg, &err);
break;
continue 'outer;
}
}
} else {
Expand All @@ -529,7 +543,7 @@ fn background_thread(
) {
Ok(_) => {}
Err(AcquisitionError::Disconnected | AcquisitionError::Cancelled) => {
return Ok(());
break 'outer;
}
Err(e) => {
from_thread_s
Expand Down

0 comments on commit b9ed660

Please sign in to comment.