diff --git a/common/src/generic_connection.rs b/common/src/generic_connection.rs index 908f3ebc..722fa7c6 100644 --- a/common/src/generic_connection.rs +++ b/common/src/generic_connection.rs @@ -376,11 +376,18 @@ where return Err(ConnectionError::FatalError(error)); } ReceiverMsg::Finished { frame_stack } => { + // Finished here means we have seen all frame stacks of the acquisition, + // it does _not_ mean that the data consumer has processed them all. + + // do stats update here to make sure we count the last frame stack! + self.stats.count_stats_item(&frame_stack); self.stats.log_stats(); self.stats.reset(); + return Ok(Some(frame_stack)); } ReceiverMsg::FrameStack { frame_stack } => { + self.stats.count_stats_item(&frame_stack); return Ok(Some(frame_stack)); } } @@ -408,7 +415,6 @@ where return Ok(Some(left)); } assert!(frame_stack.len() <= max_size); - self.stats.count_stats_item(&frame_stack); Ok(Some(frame_stack)) } Ok(None) => Ok(None), @@ -441,6 +447,7 @@ where } pub fn close(mut self) { + debug!("GenericConnection::close"); if self .bg_thread .channel_to_thread() diff --git a/common/src/lib.rs b/common/src/lib.rs index 8e029341..2f2ef0ef 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -4,7 +4,6 @@ pub mod decoder; pub mod frame_stack; pub mod generic_connection; pub mod generic_receiver; -pub mod py_buffer_for_slot; pub mod py_cam_client; pub mod py_connection; pub mod stats; diff --git a/common/src/py_buffer_for_slot.rs b/common/src/py_buffer_for_slot.rs deleted file mode 100644 index 4d839246..00000000 --- a/common/src/py_buffer_for_slot.rs +++ /dev/null @@ -1,19 +0,0 @@ -use std::ffi::c_int; - -use ipc_test::Slot; -use pyo3::{ffi, pyclass, pymethods}; - -// FIXME: can/should we make this Send'able? -#[pyclass(unsendable)] -struct SlotBuffer { - slot: Slot, -} - -impl SlotBuffer {} - -#[pymethods] -impl SlotBuffer { - unsafe fn __getbuffer__(&self, view: *mut ffi::Py_buffer, flags: c_int) { - todo!() - } -} diff --git a/common/src/py_connection.rs b/common/src/py_connection.rs index 65cff3cb..3cf3e06a 100644 --- a/common/src/py_connection.rs +++ b/common/src/py_connection.rs @@ -177,10 +177,14 @@ macro_rules! impl_py_connection { } pub fn close(&mut self) -> PyResult<()> { - let conn_impl = self.get_conn_mut()?; - conn_impl.log_shm_stats(); - conn_impl.reset_stats(); - Ok(()) + if let Some(mut conn_impl) = self.conn_impl.take() { + conn_impl.log_shm_stats(); + conn_impl.reset_stats(); + conn_impl.close(); + Ok(()) + } else { + Err(PyConnectionError::new_err("already closed".to_owned())) + } } pub fn is_running(&self) -> PyResult { diff --git a/libertem_dectris/src/background_thread.rs b/libertem_dectris/src/background_thread.rs index e28968ae..93f2a4bb 100644 --- a/libertem_dectris/src/background_thread.rs +++ b/libertem_dectris/src/background_thread.rs @@ -154,8 +154,16 @@ fn check_for_control( msg: "received StartAcquisitionPassive while an acquisition was already running" .to_string(), }), - Ok(ControlMsg::StopThread) => Err(AcquisitionError::Cancelled), - Err(TryRecvError::Disconnected) => Err(AcquisitionError::Cancelled), + Ok(ControlMsg::StopThread) => { + debug!("check_for_control: StopThread received"); + + Err(AcquisitionError::Cancelled) + } + Err(TryRecvError::Disconnected) => { + debug!("check_for_control: Disconnected"); + + Err(AcquisitionError::Cancelled) + } Err(TryRecvError::Empty) => Ok(()), } } @@ -165,6 +173,10 @@ fn serialization_error( msg: &Message, err: &serde_json::Error, ) { + log::error!( + "background_thread: serialization error: {}", + err.to_string() + ); from_thread_s .send(ReceiverMsg::FatalError { error: Box::new(AcquisitionError::SerdeError { @@ -175,10 +187,6 @@ fn serialization_error( }), }) .unwrap(); - log::error!( - "background_thread: serialization error: {}", - err.to_string() - ); } /// Passively listen for global acquisition headers @@ -396,6 +404,7 @@ fn background_thread_wrap( }) .unwrap(); } + info!("background_thread_wrap: done"); } fn drain_if_mismatch( @@ -444,6 +453,8 @@ fn background_thread( mut shm: SharedSlabAllocator, ) -> Result<(), AcquisitionError> { 'outer: loop { + let thread_id = std::thread::current().id(); + info!("background_thread: connecting to {uri} ({thread_id:?})"); let ctx = zmq::Context::new(); let socket = ctx.socket(zmq::PULL).unwrap(); socket.set_rcvtimeo(1000).unwrap(); @@ -466,8 +477,11 @@ fn background_thread( &mut shm, ) { Ok(_) => {} - Err(AcquisitionError::Disconnected | AcquisitionError::Cancelled) => { - return Ok(()); + Err( + msg @ (AcquisitionError::Disconnected | AcquisitionError::Cancelled), + ) => { + debug!("background_thread: passive_acquisition returned {msg:?}"); + break 'outer; } Err(e) => { from_thread_s @@ -496,7 +510,7 @@ fn background_thread( Ok(header) => header, Err(err) => { serialization_error(from_thread_s, &msg, &err); - break; + continue 'outer; } }; debug!("dheader: {dheader:?}"); @@ -510,7 +524,7 @@ fn background_thread( Ok(header) => header, Err(err) => { serialization_error(from_thread_s, &msg, &err); - break; + continue 'outer; } } } else { @@ -529,7 +543,7 @@ fn background_thread( ) { Ok(_) => {} Err(AcquisitionError::Disconnected | AcquisitionError::Cancelled) => { - return Ok(()); + break 'outer; } Err(e) => { from_thread_s