diff --git a/Cargo.lock b/Cargo.lock index 069b44be..a744f6f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -152,6 +152,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc7eb209b1518d6bb87b283c20095f5228ecda460da70b44f0802523dea6da04" +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + [[package]] name = "anstream" version = "0.6.11" @@ -664,6 +670,12 @@ dependencies = [ "vec_map", ] +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + [[package]] name = "cc" version = "1.0.83" @@ -726,6 +738,33 @@ dependencies = [ "libc", ] +[[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half", +] + [[package]] name = "clang-sys" version = "1.7.0" @@ -854,15 +893,18 @@ name = "common" version = "0.1.0" dependencies = [ "bincode", + "criterion", "ipc-test", "log", "ndarray", + "num", "numpy", "pyo3", "serde", "stats", "tempfile", "thiserror", + "zerocopy 0.6.6", ] [[package]] @@ -932,6 +974,42 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "criterion" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" +dependencies = [ + "anes", + "cast", + "ciborium", + "clap 4.4.18", + "criterion-plot", + "is-terminal", + "itertools", + "num-traits", + "once_cell", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_derive", + "serde_json", + "tinytemplate", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" +dependencies = [ + "cast", + "itertools", +] + [[package]] name = "crossbeam" version = "0.8.4" @@ -1860,6 +1938,17 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" +[[package]] +name = "is-terminal" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b" +dependencies = [ + "hermit-abi 0.3.4", + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "itertools" version = "0.10.5" @@ -2018,6 +2107,7 @@ dependencies = [ "md5", "memmap2", "nix 0.26.4", + "num", "numpy", "pyo3", "serde", @@ -2400,30 +2490,75 @@ dependencies = [ "winapi", ] +[[package]] +name = "num" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35bd024e8b2ff75562e5f34e7f4905839deb4b22955ef5e73d2fea1b9813cb23" +dependencies = [ + "num-bigint", + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits", +] + +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", +] + [[package]] name = "num-complex" -version = "0.4.4" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ba157ca0885411de85d6ca030ba7e2a83a28636056c7c699b07c8b6f7383214" +checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495" dependencies = [ "num-traits", ] [[package]] name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + +[[package]] +name = "num-iter" version = "0.1.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9" +checksum = "1429034a0490724d0075ebb2bc9e875d6503c3cf69e235a8941aa757d83ef5bf" dependencies = [ "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-rational" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824" +dependencies = [ + "num-bigint", + "num-integer", "num-traits", ] [[package]] name = "num-traits" -version = "0.2.17" +version = "0.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" dependencies = [ "autocfg", ] @@ -2565,6 +2700,12 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "oorandom" +version = "11.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" + [[package]] name = "openssl" version = "0.10.64" @@ -2759,6 +2900,34 @@ dependencies = [ "url", ] +[[package]] +name = "plotters" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a15b6eccb8484002195a3e44fe65a4ce8e93a625797a063735536fd59cb01cf3" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "414cec62c6634ae900ea1c56128dfe87cf63e7caece0852ec76aba307cebadb7" + +[[package]] +name = "plotters-svg" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81b30686a7d9c3e010b84284bdd26a29f2138574f52f5eb6f794fc0ad924e705" +dependencies = [ + "plotters-backend", +] + [[package]] name = "png" version = "0.17.11" @@ -3651,6 +3820,16 @@ dependencies = [ "strict-num", ] +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tinyvec" version = "1.6.0" diff --git a/bs-sys/src/lib.rs b/bs-sys/src/lib.rs index fe1c0253..cf8fad4c 100644 --- a/bs-sys/src/lib.rs +++ b/bs-sys/src/lib.rs @@ -108,8 +108,10 @@ pub fn decompress_lz4( let block_size = block_size.unwrap_or(0); let mut out: Vec = Vec::with_capacity(out_size); let out_ptr: *mut T = out.as_mut_ptr(); - decompress_lz4_into(in_, out_ptr, out_size, Some(block_size))?; - unsafe { out.set_len(out_size) }; + unsafe { + decompress_lz4_into(in_, out_ptr, out_size, Some(block_size))?; + out.set_len(out_size) + }; Ok(out) } @@ -128,7 +130,7 @@ pub fn decompress_lz4( /// /// The memory pointed to by `out` must be large enough to fit the output, i.e. /// at least `std::mem::size_of:: * out_size`. -pub fn decompress_lz4_into( +pub unsafe fn decompress_lz4_into( in_: &[u8], out: *mut T, // FIXME: replace with slice of MaybeUninit from Vec::spare_capacity_mut? out_size: usize, // number of elements diff --git a/common/Cargo.toml b/common/Cargo.toml index b776b50d..c350c5b7 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -16,3 +16,12 @@ tempfile = "3.10.1" thiserror = "1.0.59" log = "0.4.21" ndarray = { version = "0.15.6" } +zerocopy = "0.6.1" +num = "0.4.3" + +[dev-dependencies] +criterion = "0.5.1" + +[[bench]] +name = "casting" +harness = false diff --git a/common/benches/casting.rs b/common/benches/casting.rs new file mode 100644 index 00000000..642ac846 --- /dev/null +++ b/common/benches/casting.rs @@ -0,0 +1,145 @@ +use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput}; +use num::{cast::AsPrimitive, NumCast, ToPrimitive}; + +fn bench_ref_u8(input: &[u8], output: &mut [f32]) { + output.iter_mut().zip(input.iter()).for_each(|(dest, src)| { + *dest = *src as f32; + }) +} + +fn bench_ref_u16(input: &[u16], output: &mut [f32]) { + output.iter_mut().zip(input.iter()).for_each(|(dest, src)| { + *dest = *src as f32; + }) +} + +fn bench_ref_u32(input: &[u32], output: &mut [f32]) { + output.iter_mut().zip(input.iter()).for_each(|(dest, src)| { + *dest = *src as f32; + }) +} + +fn bench_as_primitive(input: &[T], output: &mut [f32]) +where + T: AsPrimitive, +{ + output.iter_mut().zip(input.iter()).for_each(|(dest, src)| { + *dest = (*src).as_(); + }) +} + +fn bench_checked_cast(input: &[T], output: &mut [f32]) -> Option<()> +where + T: ToPrimitive, +{ + for (dest, src) in output.iter_mut().zip(input.iter()) { + let converted = (*src).to_f32(); + if let Some(value) = converted { + *dest = value; + } else { + return None; + } + } + + Some(()) +} + +fn bench_checked_cast_to_int(input: &[T], output: &mut [u32]) -> Option<()> +where + T: ToPrimitive, +{ + for (dest, src) in output.iter_mut().zip(input.iter()) { + let converted = (*src).to_u32(); + if let Some(value) = converted { + *dest = value; + } else { + return None; + } + } + + Some(()) +} + +fn bench_checked_cast_to_generic(input: &[I], output: &mut [O]) -> Option<()> +where + O: Copy + NumCast, + I: Copy + ToPrimitive, +{ + for (dest, src) in output.iter_mut().zip(input.iter()) { + let converted = NumCast::from(*src); + if let Some(value) = converted { + *dest = value; + } else { + return None; + } + } + + Some(()) +} + +pub fn bench_num_casting(c: &mut Criterion) { + let input_u8 = (0..(96 * 96u16)) + .map(|i| (i % 255) as u8) + .collect::>(); + let input_u16 = (0..(96 * 96)).collect::>(); + let input_u32 = (0..(96 * 96)).collect::>(); + let mut output = Vec::::new(); + output.resize(input_u16.len(), 0.0); + + let mut output_u32 = vec![0; input_u16.len()]; + //output_u32.resize(input_u16.len(), 0); + + let mut group = c.benchmark_group("cast_to_f32_throughput"); + + group.throughput(Throughput::Bytes( + (input_u16.len() * std::mem::size_of::()) as u64, + )); + // group.throughput(Throughput::Elements((input_u16.len()) as u64)); + + group.bench_function("bench_ref_u8", |b| { + b.iter(|| bench_ref_u8(black_box(&input_u8[..]), black_box(&mut output))) + }); + + group.bench_function("bench_ref_u16", |b| { + b.iter(|| bench_ref_u16(black_box(&input_u16[..]), black_box(&mut output))) + }); + + group.bench_function("bench_ref_u32", |b| { + b.iter(|| bench_ref_u32(black_box(&input_u32[..]), black_box(&mut output))) + }); + + group.bench_function("bench_as_primitive-u16", |b| { + b.iter(|| bench_as_primitive(black_box(&input_u32[..]), black_box(&mut output))) + }); + + group.bench_function("bench_as_primitive-u32", |b| { + b.iter(|| bench_as_primitive(black_box(&input_u32[..]), black_box(&mut output))) + }); + + group.bench_function("bench_checked_cast-u16", |b| { + b.iter(|| bench_checked_cast(black_box(&input_u32[..]), black_box(&mut output))) + }); + + group.bench_function("bench_checked_cast-u32", |b| { + b.iter(|| bench_checked_cast(black_box(&input_u32[..]), black_box(&mut output))) + }); + + group.bench_function("bench_checked_cast_to_int-u16", |b| { + b.iter(|| bench_checked_cast_to_int(black_box(&input_u32[..]), black_box(&mut output_u32))) + }); + + group.bench_function("bench_checked_cast_to_generic-u16", |b| { + b.iter(|| { + bench_checked_cast_to_generic(black_box(&input_u32[..]), black_box(&mut output_u32)) + }) + }); + + group.bench_function("bench_checked_cast_to_generic-u16-to-f32", |b| { + b.iter(|| bench_checked_cast_to_generic(black_box(&input_u32[..]), black_box(&mut output))) + }); + + group.finish(); +} + +criterion_group!(benches, bench_num_casting); +criterion_main!(benches); diff --git a/common/src/cam_client.rs b/common/src/cam_client.rs index de073621..4c0ca023 100644 --- a/common/src/cam_client.rs +++ b/common/src/cam_client.rs @@ -1,5 +1,7 @@ use ipc_test::{slab::SlabInitError, SharedSlabAllocator}; -use ndarray::{ArrayViewMut, ArrayViewMut3, Dimension}; +use ndarray::ArrayViewMut3; +use num::{cast::AsPrimitive, Num, NumCast}; +use zerocopy::{AsBytes, FromBytes}; use crate::{ decoder::{Decoder, DecoderError}, @@ -67,45 +69,42 @@ where } /// Is the data already in a native integer/float format which we can - /// directly use from numpy? + /// directly use from numpy? Also requires the data to be in a C-contiguous layout. /// /// In case it is available, use `get_array_zero_copy` to get access to the /// array. - pub fn zero_copy_available( + pub fn zero_copy_available( &self, - handle: &FrameStackHandle, - ) -> Result - where - M: FrameMeta, - { - todo!("implement zero_copy_available; delegate to decoder!"); + handle: &FrameStackHandle, + ) -> Result { + Ok(self.decoder.zero_copy_available(handle)?) } - /// Get an array of the whole frame stack as a strided array. + /// Get an array of the whole frame stack as a C-contiguous array. /// - /// This requires that the data is already layed out as a strided array + /// This requires that the data is already layed out as a C-contiguous array /// in the `FrameStackHandle`. pub fn get_array_zero_copy(&self, handle: &FrameStackHandle) -> Result<(), CamClientError> where M: FrameMeta, { // FIXME: we need to make sure we only loan out the buffer underlying `handle` as long as - // the `FrameStackHandle` is valid, i.e. + // the `FrameStackHandle` is valid todo!("implement get_array_zero_copy; delegate to decoder!"); } /// Decode into a pre-allocated array. /// /// This supports user-allocated memory, which enables things like copying - /// directly into CUDA locked memory and thus getting rid of a memcpy in the - /// case of CUDA. - pub fn decode_into_buffer( + /// directly into CUDA locked host memory and thus getting rid of a memcpy + /// in the case of CUDA. + pub fn decode_into_buffer( &self, - input: &FrameStackHandle, - dest: ArrayViewMut3<'_, T>, + input: &FrameStackHandle, + dest: &mut ArrayViewMut3<'_, T>, ) -> Result<(), CamClientError> where - M: FrameMeta, + T: 'static + AsBytes + FromBytes + Copy + NumCast, { self.decode_range_into_buffer(input, dest, 0, input.len() - 1) } @@ -114,17 +113,18 @@ where /// /// This allows for decoding only the data that will be processed /// immediately afterwards, allowing for more cache-efficient operations. - pub fn decode_range_into_buffer( + pub fn decode_range_into_buffer( &self, - input: &FrameStackHandle, - dest: ArrayViewMut3<'_, T>, + input: &FrameStackHandle, + dest: &mut ArrayViewMut3<'_, T>, start_idx: usize, end_idx: usize, ) -> Result<(), CamClientError> where - M: FrameMeta, + T: 'static + AsBytes + FromBytes + Copy + NumCast, { - Ok(self.decoder.decode(input, dest, start_idx, end_idx)?) + let shm = self.get_shm()?; + Ok(self.decoder.decode(shm, input, dest, start_idx, end_idx)?) } /// Free the given `FrameStackHandle`. When calling this, no Python objects diff --git a/common/src/decoder.rs b/common/src/decoder.rs index 3d6d8e39..c3d5bc14 100644 --- a/common/src/decoder.rs +++ b/common/src/decoder.rs @@ -1,18 +1,41 @@ +use ipc_test::SharedSlabAllocator; use ndarray::ArrayViewMut3; +use num::NumCast; +use zerocopy::{AsBytes, FromBytes}; use crate::frame_stack::{FrameMeta, FrameStackHandle}; pub trait Decoder: Default { - fn decode( + type FrameMeta: FrameMeta; + + /// Decode a range designated by `start_idx` and `end_idx` of `input` in + /// `shm` into `output`, converting the data to `T` if safely possible, + /// returning an error otherwise. + /// + /// Note that the end index is exclusive, like the range + /// `start_idx..end_idx`, and these indices are independent of the indices + /// into `output`. For example, `decode(shm, input, output, 1, 2)` will decode + /// the second frame from the frame stack into `output[0, :, :]` (using numpy + /// slicing notation here) + fn decode( &self, - input: &FrameStackHandle, - dest: ArrayViewMut3<'_, T>, + shm: &SharedSlabAllocator, + input: &FrameStackHandle, + output: &mut ArrayViewMut3<'_, T>, start_idx: usize, end_idx: usize, ) -> Result<(), DecoderError> where - M: FrameMeta; + T: 'static + AsBytes + FromBytes + Copy + NumCast; + + fn zero_copy_available( + &self, + handle: &FrameStackHandle, + ) -> Result; } #[derive(Debug, thiserror::Error)] -pub enum DecoderError {} +pub enum DecoderError { + #[error("decoding of frame failed: {msg}")] + FrameDecodeFailed { msg: String }, +} diff --git a/common/src/frame_stack.rs b/common/src/frame_stack.rs index ce5925c5..9f27d694 100644 --- a/common/src/frame_stack.rs +++ b/common/src/frame_stack.rs @@ -11,7 +11,7 @@ pub trait FrameMeta: Clone + Serialize + Debug { /// Length of the data that belongs to the frame corresponding to this meta object fn get_data_length_bytes(&self) -> usize; - /// numpy-like dtype of the data as string + /// numpy-like dtype of the data as string, including endianess /// (this is supposed to be the dtype closest to the raw data; the actual /// data in the frame stack may be encoded and/or compressed) fn get_dtype_string(&self) -> String; diff --git a/common/src/generic_connection.rs b/common/src/generic_connection.rs index 0a467164..908f3ebc 100644 --- a/common/src/generic_connection.rs +++ b/common/src/generic_connection.rs @@ -7,7 +7,7 @@ use std::{ }; use ipc_test::{slab::SlabInitError, SharedSlabAllocator}; -use log::{info, trace, warn}; +use log::{debug, info, trace, warn}; use stats::Stats; use crate::{ @@ -158,22 +158,36 @@ where fn adjust_status(&mut self, msg: &ReceiverMsg) { match msg { ReceiverMsg::AcquisitionStart { .. } => { + debug!("adjust_status: now Running"); self.status = ConnectionStatus::Running; } ReceiverMsg::Finished { .. } => { + debug!("adjust_status: now Idle"); self.status = ConnectionStatus::Idle; } ReceiverMsg::ReceiverArmed => { + debug!("adjust_status: now Armed"); self.status = ConnectionStatus::Armed; } - _ => {} + ReceiverMsg::FrameStack { .. } => { + trace!("adjust_status: FrameStack {{ .. }}"); + } + other => { + trace!("adjust_status: other message: {other:?}"); + } } } /// Get the next message from the background thread, waiting at most /// `timeout`. /// - /// If a `NextTimeoutError::Disconnected` error is encountered, + /// If a `NextTimeoutError::Disconnected` error is encountered, the background + /// thread is no longer running. + /// + /// When handling the results, care must be taken not to leak memory, + /// meaning the variants of `ReceiverMsg` that contain a `FrameStackHandle` + /// must be either returned to user code or free'd. When ignored, they will + /// fill up the shared memory. pub fn next_timeout( &mut self, timeout: Duration, @@ -221,14 +235,21 @@ where ReceiverMsg::AcquisitionStart { pending_acquisition, } => return Ok(Some(pending_acquisition)), - msg @ ReceiverMsg::Finished { .. } | msg @ ReceiverMsg::FrameStack { .. } => { - // FIXME: we might want to log + ignore instead? - let err = format!("unexpected message: {:?}", msg); - return Err(ConnectionError::UnexpectedMessage(err)); - } ReceiverMsg::FatalError { error } => { return Err(ConnectionError::FatalError(error)) } + ReceiverMsg::FrameStack { frame_stack } => { + frame_stack.free_slot(&mut self.shm); + return Err(ConnectionError::UnexpectedMessage( + "ReceiverMsg::FrameStack in wait_for_arm".to_owned(), + )); + } + ReceiverMsg::Finished { frame_stack } => { + frame_stack.free_slot(&mut self.shm); + return Err(ConnectionError::UnexpectedMessage( + "ReceiverMsg::Finished in wait_for_arm".to_owned(), + )); + } } } } @@ -242,9 +263,15 @@ where where E: std::error::Error + 'static + Send + Sync, { + debug!("wait_for_status: waiting for {desired_status:?}..."); let deadline = Instant::now() + timeout; let step = Duration::from_millis(100); + if self.status == desired_status { + debug!("wait_for_status: already in desired status: {desired_status:?}"); + return Ok(()); + } + loop { if let Err(e) = periodic_callback() { return Err(ConnectionError::PeriodicCallbackError(Box::new(e))); @@ -261,8 +288,29 @@ where } }; let res = res?; - self.adjust_status(&res); + match res { + ReceiverMsg::FrameStack { frame_stack } => { + trace!("wait_for_status: ignoring received FrameStackHandle"); + frame_stack.free_slot(&mut self.shm); + } + ReceiverMsg::Finished { frame_stack } => { + warn!("wait_for_status: ignoring FrameStackHandle received in ReceiverMsg::Finished message"); + frame_stack.free_slot(&mut self.shm); + } + ReceiverMsg::FatalError { error } => { + return Err(ConnectionError::FatalError(error)); + } + ReceiverMsg::ReceiverArmed => { + trace!("wait_for_status: received ReceiverMsg::ReceiverArmed"); + } + ReceiverMsg::AcquisitionStart { + pending_acquisition: _, + } => { + trace!("wait_for_status: received ReceiverMsg::AcquisitionStart"); + } + } if self.status == desired_status { + debug!("wait_for_status: successfully got status {desired_status:?}"); return Ok(()); } } @@ -282,7 +330,7 @@ where self.wait_for_status( ConnectionStatus::Armed, - Duration::from_millis(100), + Duration::from_millis(1000), periodic_callback, ) } diff --git a/common/src/py_cam_client.rs b/common/src/py_cam_client.rs index 1f49eef3..0fad1932 100644 --- a/common/src/py_cam_client.rs +++ b/common/src/py_cam_client.rs @@ -9,6 +9,7 @@ macro_rules! impl_py_cam_client { ( $name: ident, $decoder_type: ident, + $py_frame_stack: ident, $frame_meta_type: ident, $mod: ident ) => { @@ -16,17 +17,36 @@ macro_rules! impl_py_cam_client { use common::{ cam_client::GenericCamClient, decoder::Decoder, frame_stack::FrameStackHandle, }; - use numpy::PyArray3; + use num::NumCast; + use numpy::{ + dtype_bound, Element, PyArray3, PyArrayDescrMethods, PyArrayMethods, + PyUntypedArray, PyUntypedArrayMethods, + }; use pyo3::{create_exception, exceptions::PyException, prelude::*}; + use zerocopy::{AsBytes, FromBytes}; create_exception!($mod, PyCamClientError, PyException); #[pyclass] pub struct $name { - client_impl: GenericCamClient<$decoder_type>, + client_impl: GenericCamClient, } - impl $name {} + impl $name { + fn decode_impl<'py, T: Element + AsBytes + FromBytes + Copy + NumCast + 'static>( + &self, + input: &super::$py_frame_stack, + out: &Bound<'py, PyArray3>, + py: Python<'_>, + ) -> PyResult<()> { + let mut out_rw = out.try_readwrite()?; + let mut out_arr = out_rw.as_array_mut(); + return self + .client_impl + .decode_into_buffer(input.try_get_inner()?, &mut out_arr) + .map_err(|e| PyCamClientError::new_err(format!("decode failed: {e}"))); + } + } #[pymethods] impl $name { @@ -40,41 +60,63 @@ macro_rules! impl_py_cam_client { /// Decode into a pre-allocated array. /// - /// This supports user-allocated memory, which enables things like copying - /// directly into CUDA locked memory and thus getting rid of a memcpy in the - /// case of CUDA. - pub fn decode_into_buffer( + /// Fow now, this has to be a numpy array, but using the buffer + /// protocol, this can, for example, be a reference to pinned + /// memory for efficient use with CUDA. + pub fn decode_into_buffer<'py>( &self, - input: &FrameStackHandle<$frame_meta_type>, - out: Bound<'py, PyAny>, + input: &super::$py_frame_stack, + out: &Bound<'py, PyUntypedArray>, + py: Python<'_>, ) -> PyResult<()> { - todo!("decode_into_buffer: Python types; error handling; dtype dispatch"); - self.client_impl.decode_into_buffer(input, out) + if out.dtype().is_equiv_to(&dtype_bound::(py)) { + let out_u8 = out.downcast::>()?; + } + + let arr_u16: Result<&Bound<'py, PyArray3>, _> = out.downcast(); + let arr_u32: Result<&Bound<'py, PyArray3>, _> = out.downcast(); + let arr_i8: Result<&Bound<'py, PyArray3>, _> = out.downcast(); + let arr_i16: Result<&Bound<'py, PyArray3>, _> = out.downcast(); + let arr_i32: Result<&Bound<'py, PyArray3>, _> = out.downcast(); + + // todo!("decode_into_buffer: Python types; error handling; dtype dispatch"); + + Ok(()) } /// Decode a range of frames into a pre-allocated array. /// /// This allows for decoding only the data that will be processed /// immediately afterwards, allowing for more cache-efficient operations. - pub fn decode_range_into_buffer( + pub fn decode_range_into_buffer<'py>( &self, - input: &FrameStackHandle<$frame_meta_type>, - out: PyArray3, + input: &super::$py_frame_stack, + out: Bound<'py, PyAny>, start_idx: usize, end_idx: usize, ) -> PyResult<()> { todo!("decode_range_into_buffer: Python types; dtype dispatch"); - Ok(self.decoder.decode(input, out, start_idx, end_idx)?) + //Ok(self.client_impl.decode_range_into_buffer(input, out, start_idx, end_idx)?) } /// Free the given `FrameStackHandle`. When calling this, no Python objects /// may have references to the memory of the `handle`. pub fn frame_stack_done( &mut self, - handle: FrameStackHandle<$frame_meta_type>, + handle: &mut super::$py_frame_stack, ) -> PyResult<()> { - todo!("frame_stack_done: Python types; err handling"); - self.client_impl.frame_stack_done(handle) + let inner_handle = handle.take().ok_or_else(|| { + PyCamClientError::new_err( + "trying to take already free'd frame stack handle", + ) + })?; + self.client_impl + .frame_stack_done(inner_handle) + .map_err(|e| { + PyCamClientError::new_err(format!( + "GenericCamClient::frame_stack_done: {e}" + )) + }) } pub fn close(&mut self) -> PyResult<()> { diff --git a/common/src/py_connection.rs b/common/src/py_connection.rs index f769df29..65cff3cb 100644 --- a/common/src/py_connection.rs +++ b/common/src/py_connection.rs @@ -13,189 +13,284 @@ macro_rules! impl_py_connection { $pending_acquisition_type: ident, $mod: ident ) => { - create_exception!($mod, PyConnectionError, PyException); - - #[pyclass] - pub struct $name { - conn_impl: Option>, - shm: Option, - remainder: Vec>, - stats: Stats, - } + mod impl_connection { + use bincode::serialize; + use common::{ + background_thread::BackgroundThread, + cam_client::GenericCamClient, + decoder::Decoder, + frame_stack::{FrameMeta, FrameStackHandle}, + generic_connection::{ConnectionStatus, GenericConnection}, + }; + use ipc_test::SharedSlabAllocator; + use num::NumCast; + use numpy::{ + dtype_bound, Element, PyArray3, PyArrayDescrMethods, PyArrayMethods, + PyUntypedArray, PyUntypedArrayMethods, + }; + use pyo3::{ + create_exception, + exceptions::PyException, + prelude::*, + types::{PyBytes, PyType}, + }; + use stats::Stats; + use std::time::Duration; + use zerocopy::{AsBytes, FromBytes}; - impl $name { - pub fn new(shm: SharedSlabAllocator, conn_impl: GenericConnection<$background_thread_type, $pending_acquisition_type>) -> Self { - Self { - conn_impl: Some(conn_impl), - shm: Some(shm), - remainder: Vec::new(), - stats: Stats::new(), - } + create_exception!($mod, PyConnectionError, PyException); + + #[pyclass] + pub struct $name { + conn_impl: Option< + GenericConnection< + super::$background_thread_type, + super::$pending_acquisition_type, + >, + >, + shm: Option, + remainder: Vec>, + stats: Stats, } - fn get_conn_mut(&mut self) -> PyResult<&mut GenericConnection<$background_thread_type, $pending_acquisition_type>> { - match &mut self.conn_impl { - None => Err(PyConnectionError::new_err("connection is closed")), - Some(c) => Ok(c), + impl $name { + pub fn new( + shm: SharedSlabAllocator, + conn_impl: GenericConnection< + super::$background_thread_type, + super::$pending_acquisition_type, + >, + ) -> Self { + Self { + conn_impl: Some(conn_impl), + shm: Some(shm), + remainder: Vec::new(), + stats: Stats::new(), + } } - } - fn get_conn(&self) -> PyResult<&GenericConnection<$background_thread_type, $pending_acquisition_type>> { - match &self.conn_impl { - None => Err(PyConnectionError::new_err("connection is closed")), - Some(c) => Ok(c), + fn get_conn_mut( + &mut self, + ) -> PyResult< + &mut GenericConnection< + super::$background_thread_type, + super::$pending_acquisition_type, + >, + > { + match &mut self.conn_impl { + None => Err(PyConnectionError::new_err("connection is closed")), + Some(c) => Ok(c), + } } - } - fn get_shm_mut(&mut self) -> PyResult<&mut SharedSlabAllocator> { - match &mut self.shm { - None => Err(PyConnectionError::new_err("shm is closed")), - Some(shm) => Ok(shm), + fn get_conn( + &self, + ) -> PyResult< + &GenericConnection< + super::$background_thread_type, + super::$pending_acquisition_type, + >, + > { + match &self.conn_impl { + None => Err(PyConnectionError::new_err("connection is closed")), + Some(c) => Ok(c), + } } - } - fn get_shm(&self) -> PyResult<&SharedSlabAllocator> { - match &self.shm { - None => Err(PyConnectionError::new_err("shm is closed")), - Some(shm) => Ok(shm), + fn get_shm_mut(&mut self) -> PyResult<&mut SharedSlabAllocator> { + match &mut self.shm { + None => Err(PyConnectionError::new_err("shm is closed")), + Some(shm) => Ok(shm), + } } - } - fn send_specialized(&mut self, msg: <$background_thread_type as BackgroundThread>::ExtraControl) -> PyResult<()> { - let mut conn = self.get_conn_mut()?; - conn.send_specialized(msg).map_err(|e| PyConnectionError::new_err(e.to_string()))?; - Ok(()) - } + fn get_shm(&self) -> PyResult<&SharedSlabAllocator> { + match &self.shm { + None => Err(PyConnectionError::new_err("shm is closed")), + Some(shm) => Ok(shm), + } + } - fn wait_for_status( - &mut self, - desired_status: ConnectionStatus, - timeout: Duration, - ) -> PyResult<()> - { - let mut conn = self.get_conn_mut()?; - conn.wait_for_status(desired_status, timeout, || { - // re-acquire GIL to check if we need to break - Python::with_gil(|py| py.check_signals())?; - Ok::<_, PyErr>(()) - }).map_err(|e| PyConnectionError::new_err(e.to_string())) - } - } + pub fn send_specialized( + &mut self, + msg: ::ExtraControl, + ) -> PyResult<()> { + let mut conn = self.get_conn_mut()?; + conn.send_specialized(msg) + .map_err(|e| PyConnectionError::new_err(e.to_string()))?; + Ok(()) + } - #[pymethods] - impl $name { - pub fn get_next_stack( - &mut self, - max_size: usize, - py: Python<'_>, - ) -> PyResult> { - let conn_impl = self.get_conn_mut()?; - match py.allow_threads(|| { - conn_impl.get_next_stack(max_size, || { + pub fn wait_for_status( + &mut self, + desired_status: ConnectionStatus, + timeout: Duration, + ) -> PyResult<()> { + let mut conn = self.get_conn_mut()?; + conn.wait_for_status(desired_status, timeout, || { // re-acquire GIL to check if we need to break Python::with_gil(|py| py.check_signals())?; Ok::<_, PyErr>(()) }) - }) { - Ok(None) => Ok(None), - Ok(Some(stack)) => { - Ok(Some($name_frame_stack::new(stack))) - }, - Err(e) => { - Err(PyConnectionError::new_err(e.to_string())) - } + .map_err(|e| PyConnectionError::new_err(e.to_string())) } } - pub fn wait_for_arm(&mut self, timeout: f32) -> PyResult> { - let conn_impl = self.get_conn_mut()?; - conn_impl.wait_for_arm(Duration::from_secs_f32(timeout), || { - // re-acquire GIL to check if we need to break - Python::with_gil(|py| py.check_signals())?; - Ok::<_, PyErr>(()) - }).map_err(|e| { - PyConnectionError::new_err(e.to_string()) - }) - } + #[pymethods] + impl $name { + pub fn get_next_stack( + &mut self, + max_size: usize, + py: Python<'_>, + ) -> PyResult> { + let conn_impl = self.get_conn_mut()?; + match py.allow_threads(|| { + conn_impl.get_next_stack(max_size, || { + // re-acquire GIL to check if we need to break + Python::with_gil(|py| py.check_signals())?; + Ok::<_, PyErr>(()) + }) + }) { + Ok(None) => Ok(None), + Ok(Some(stack)) => Ok(Some($name_frame_stack::new(stack))), + Err(e) => Err(PyConnectionError::new_err(e.to_string())), + } + } - pub fn get_socket_path(&self) -> PyResult { - let shm = self.get_shm()?; - Ok(shm.get_handle().os_handle) - } + pub fn wait_for_arm( + &mut self, + timeout: f32, + ) -> PyResult> { + let conn_impl = self.get_conn_mut()?; + conn_impl + .wait_for_arm(Duration::from_secs_f32(timeout), || { + // re-acquire GIL to check if we need to break + Python::with_gil(|py| py.check_signals())?; + Ok::<_, PyErr>(()) + }) + .map_err(|e| PyConnectionError::new_err(e.to_string())) + } - pub fn close(&mut self) -> PyResult<()> { - let conn_impl = self.get_conn_mut()?; - conn_impl.log_shm_stats(); - conn_impl.reset_stats(); - Ok(()) - } + pub fn get_socket_path(&self) -> PyResult { + let shm = self.get_shm()?; + Ok(shm.get_handle().os_handle) + } - pub fn is_running(&self) -> PyResult { - let conn_impl = self.get_conn()?; - Ok(conn_impl.is_running()) - } + pub fn close(&mut self) -> PyResult<()> { + let conn_impl = self.get_conn_mut()?; + conn_impl.log_shm_stats(); + conn_impl.reset_stats(); + Ok(()) + } - pub fn start_passive(&mut self) -> PyResult<()> { - let conn_impl = self.get_conn_mut()?; - conn_impl.start_passive(|| { - Python::with_gil(|py| py.check_signals())?; - Ok::<_, PyErr>(()) - }).map_err(|e| { - PyConnectionError::new_err(format!("start_passive failed: {e}")) - })?; - - conn_impl.wait_for_status(ConnectionStatus::Armed, Duration::from_millis(100), || { - // re-acquire GIL to check if we need to break - Python::with_gil(|py| py.check_signals())?; - Ok::<_, PyErr>(()) - }).map_err(|e| PyConnectionError::new_err(e.to_string()))?; - - Ok(()) - } + pub fn is_running(&self) -> PyResult { + let conn_impl = self.get_conn()?; + Ok(conn_impl.is_running()) + } - pub fn log_shm_stats(&self) -> PyResult<()> { - let conn_impl = self.get_conn()?; - conn_impl.log_shm_stats(); - Ok(()) - } - } + pub fn start_passive(&mut self) -> PyResult<()> { + let conn_impl = self.get_conn_mut()?; + conn_impl + .start_passive(|| { + Python::with_gil(|py| py.check_signals())?; + Ok::<_, PyErr>(()) + }) + .map_err(|e| { + PyConnectionError::new_err(format!("start_passive failed: {e}")) + })?; - #[pyclass] - pub struct $name_frame_stack { - inner: Option>, - } + conn_impl + .wait_for_status( + ConnectionStatus::Armed, + Duration::from_millis(100), + || { + // re-acquire GIL to check if we need to break + Python::with_gil(|py| py.check_signals())?; + Ok::<_, PyErr>(()) + }, + ) + .map_err(|e| PyConnectionError::new_err(e.to_string()))?; - impl $name_frame_stack { - fn new(inner: FrameStackHandle<$frame_meta_type>) -> Self { - Self { inner: Some(inner) } - } + Ok(()) + } - fn try_get_inner(&self) -> PyResult<&FrameStackHandle<$frame_meta_type>> { - if let Some(inner) = &self.inner { - Ok(inner) - } else { - Err(PyConnectionError::new_err("operation on free'd FrameStackHandle".to_owned())) + pub fn log_shm_stats(&self) -> PyResult<()> { + let conn_impl = self.get_conn()?; + conn_impl.log_shm_stats(); + Ok(()) } } - pub fn take(&mut self) -> Option> { - self.inner.take() + #[pyclass] + pub struct $name_frame_stack { + inner: Option>, } - } - #[pymethods] - impl $name_frame_stack { - fn __len__(&self) -> PyResult { - Ok(self.try_get_inner()?.len()) - } + impl $name_frame_stack { + pub fn new(inner: FrameStackHandle) -> Self { + Self { inner: Some(inner) } + } - fn get_dtype_string(&self) -> PyResult { - Ok(self.try_get_inner()?.first_meta().get_dtype_string()) + pub fn try_get_inner( + &self, + ) -> PyResult<&FrameStackHandle> { + if let Some(inner) = &self.inner { + Ok(inner) + } else { + Err(PyConnectionError::new_err( + "operation on free'd FrameStackHandle".to_owned(), + )) + } + } + + pub fn take(&mut self) -> Option> { + self.inner.take() + } + + pub fn deserialize_impl<'py>(serialized: Bound<'py, PyBytes>) -> PyResult { + let data = serialized.as_bytes(); + let inner: FrameStackHandle = + bincode::deserialize(data).map_err(|e| { + let msg = format!("could not deserialize FrameStackHandle: {e:?}"); + PyConnectionError::new_err(msg) + })?; + + Ok(Self { inner: Some(inner) }) + } } - fn get_shape(&self) -> PyResult<(u64, u64)> { - Ok(self.try_get_inner()?.first_meta().get_shape()) + #[pymethods] + impl $name_frame_stack { + pub fn __len__(&self) -> PyResult { + Ok(self.try_get_inner()?.len()) + } + + pub fn get_dtype_string(&self) -> PyResult { + Ok(self.try_get_inner()?.first_meta().get_dtype_string()) + } + + pub fn get_shape(&self) -> PyResult<(u64, u64)> { + Ok(self.try_get_inner()?.first_meta().get_shape()) + } + + pub fn serialize<'py>(&self, py: Python<'py>) -> PyResult> { + let bytes: Bound<'py, PyBytes> = PyBytes::new_bound( + py, + serialize(self.try_get_inner()?).unwrap().as_slice(), + ); + Ok(bytes.into()) + } + + #[classmethod] + pub fn deserialize<'py>( + _cls: Bound<'py, PyType>, + serialized: Bound<'py, PyBytes>, + ) -> PyResult { + Self::deserialize_impl(serialized) + } } } + + use impl_connection::{$name, $name_frame_stack, PyConnectionError}; }; } diff --git a/libertem_dectris/Cargo.toml b/libertem_dectris/Cargo.toml index 92d550eb..1666482b 100644 --- a/libertem_dectris/Cargo.toml +++ b/libertem_dectris/Cargo.toml @@ -38,6 +38,7 @@ nix = "0.26.1" lz4 = "1.24.0" zerocopy = "0.6.1" md5 = "0.7.0" +num = "0.4.3" [features] extension-module = ["pyo3/extension-module"] diff --git a/libertem_dectris/src/background_thread.rs b/libertem_dectris/src/background_thread.rs index 3a851a7c..e28968ae 100644 --- a/libertem_dectris/src/background_thread.rs +++ b/libertem_dectris/src/background_thread.rs @@ -24,6 +24,9 @@ use crate::common::{ type DectrisControlMsg = ControlMsg; +/// Receive a message into `msg`, and periodically check for control messages on +/// `control_channel` which are converted into `AcquisitionError`s. Return once +/// a message has been read into `msg`. fn recv_part( msg: &mut Message, socket: &Socket, @@ -52,40 +55,19 @@ fn recv_frame_into( ) -> Result<(DImage, DImageD, DConfig), AcquisitionError> { recv_part(msg, socket, control_channel)?; let dimage_res: Result = serde_json::from_str(msg.as_str().unwrap()); - - let dimage = match dimage_res { - Ok(image) => image, - Err(err) => { - return Err(AcquisitionError::SerdeError { - msg: err.to_string(), - recvd_msg: msg - .as_str() - .map_or_else(|| "".to_string(), |m| m.to_string()), - }); - } - }; + let dimage = dimage_res.map_err(|err| AcquisitionError::serde_from_msg(&err, msg))?; recv_part(msg, socket, control_channel)?; let dimaged_res: Result = serde_json::from_str(msg.as_str().unwrap()); - - let dimaged = match dimaged_res { - Ok(image) => image, - Err(err) => { - return Err(AcquisitionError::SerdeError { - msg: err.to_string(), - recvd_msg: msg - .as_str() - .map_or_else(|| "".to_string(), |m| m.to_string()), - }); - } - }; + let dimaged = dimaged_res.map_err(|err| AcquisitionError::serde_from_msg(&err, msg))?; // compressed image data: recv_part(msg_image, socket, control_channel)?; // DConfig: recv_part(msg, socket, control_channel)?; - let dconfig: DConfig = serde_json::from_str(msg.as_str().unwrap()).unwrap(); + let dconfig: DConfig = serde_json::from_str(msg.as_str().unwrap()) + .map_err(|err| AcquisitionError::serde_from_msg(&err, msg))?; Ok((dimage, dimaged, dconfig)) } @@ -141,11 +123,22 @@ impl Display for AcquisitionError { } impl From> for AcquisitionError { - fn from(value: SendError) -> Self { + fn from(_value: SendError) -> Self { AcquisitionError::Disconnected } } +impl AcquisitionError { + fn serde_from_msg(err: &serde_json::Error, msg: &Message) -> Self { + Self::SerdeError { + msg: err.to_string(), + recvd_msg: msg + .as_str() + .map_or_else(|| "".to_string(), |m| m.to_string()), + } + } +} + /// With a running acquisition, check for control messages; /// especially convert `ControlMsg::StopThread` to `AcquisitionError::Cancelled`. fn check_for_control( @@ -167,6 +160,27 @@ fn check_for_control( } } +fn serialization_error( + from_thread_s: &Sender>, + msg: &Message, + err: &serde_json::Error, +) { + from_thread_s + .send(ReceiverMsg::FatalError { + error: Box::new(AcquisitionError::SerdeError { + recvd_msg: msg + .as_str() + .map_or_else(|| "".to_string(), |m| m.to_string()), + msg: err.to_string(), + }), + }) + .unwrap(); + log::error!( + "background_thread: serialization error: {}", + err.to_string() + ); +} + /// Passively listen for global acquisition headers /// and automatically latch on to them. fn passive_acquisition( @@ -202,16 +216,19 @@ fn passive_acquisition( // second message: the header itself recv_part(&mut msg, socket, control_channel)?; - if let Some(msg_str) = msg.as_str() { + let detector_config: DetectorConfig = if let Some(msg_str) = msg.as_str() { debug!("detector config: {}", msg_str); + match serde_json::from_str(msg_str) { + Ok(header) => header, + Err(err) => { + serialization_error(from_thread_s, &msg, &err); + continue; + } + } } else { - warn!("non-string received as detector config!") - } - - let detector_config: DetectorConfig = - serde_json::from_str(msg.as_str().unwrap()).unwrap(); - - debug!("detector config: {}", msg.as_str().unwrap()); + warn!("non-string received as detector config! ignoring message."); + continue; + }; acquisition( detector_config, @@ -397,7 +414,7 @@ fn drain_if_mismatch( } } - debug!( + trace!( "drained message header: {} expected series {}", msg.as_str().unwrap(), series @@ -408,9 +425,9 @@ fn drain_if_mismatch( recv_part(msg, socket, control_channel)?; if let Some(msg_str) = msg.as_str() { - debug!("drained message part: {}", msg_str); + trace!("drained message part: {}", msg_str); } else { - debug!("drained non-utf message part"); + trace!("drained non-utf message part"); } } @@ -478,20 +495,7 @@ fn background_thread( let dheader: DHeader = match dheader_res { Ok(header) => header, Err(err) => { - from_thread_s - .send(ReceiverMsg::FatalError { - error: Box::new(AcquisitionError::SerdeError { - recvd_msg: msg - .as_str() - .map_or_else(|| "".to_string(), |m| m.to_string()), - msg: err.to_string(), - }), - }) - .unwrap(); - log::error!( - "background_thread: serialization error: {}", - err.to_string() - ); + serialization_error(from_thread_s, &msg, &err); break; } }; @@ -500,14 +504,19 @@ fn background_thread( // second message: the header itself recv_part(&mut msg, &socket, to_thread_r)?; - if let Some(msg_str) = msg.as_str() { + let detector_config: DetectorConfig = if let Some(msg_str) = msg.as_str() { debug!("detector config: {}", msg_str); + match serde_json::from_str(msg_str) { + Ok(header) => header, + Err(err) => { + serialization_error(from_thread_s, &msg, &err); + break; + } + } } else { - warn!("non-string received as detector config!") - } - - let detector_config: DetectorConfig = - serde_json::from_str(msg.as_str().unwrap()).unwrap(); + warn!("non-string received as detector config! re-connecting"); + continue 'outer; + }; match acquisition( detector_config, @@ -538,7 +547,7 @@ fn background_thread( break 'outer; } Err(RecvTimeoutError::Disconnected) => { - debug!("background_thread: control channel has disconnected"); + warn!("background_thread: control channel has disconnected"); break 'outer; } Err(RecvTimeoutError::Timeout) => (), // no message, nothing to do diff --git a/libertem_dectris/src/cam_client.rs b/libertem_dectris/src/cam_client.rs index 9e4fdd9d..8b137891 100644 --- a/libertem_dectris/src/cam_client.rs +++ b/libertem_dectris/src/cam_client.rs @@ -1,450 +1 @@ -use common::{decoder::Decoder, frame_stack::FrameStackHandle, impl_py_cam_client}; -use ipc_test::SharedSlabAllocator; -use log::trace; -use numpy::{PyArray3, PyArrayMethods}; -use pyo3::{ - exceptions::{self, PyRuntimeError}, - prelude::*, -}; -use zerocopy::{AsBytes, FromBytes}; -use crate::{ - common::{DectrisFrameMeta, PixelType}, - dectris_py::DectrisFrameStack, - exceptions::{ConnectionError, DecompressError}, -}; - -#[derive(Debug, Default)] -struct DectrisDecoder {} - -impl Decoder for DectrisDecoder { - fn decode( - &self, - input: &FrameStackHandle, - dest: numpy::ndarray::ArrayViewMut3<'_, T>, - start_idx: usize, - end_idx: usize, - ) -> Result<(), common::decoder::DecoderError> - where - M: common::frame_stack::FrameMeta, - { - todo!("DectrisDecoder::decode") - } -} - -impl_py_cam_client!( - _PyCamClient, - DectrisDecoder, - DectrisFrameMeta, - libertem_dectris -); - -#[pyclass] -pub struct CamClient { - shm: Option, -} - -impl CamClient { - fn decompress_bslz4_impl( - &self, - handle: &FrameStackHandle, - out: &Bound<'_, PyArray3>, - ) -> PyResult<()> { - let shm = if let Some(shm) = &self.shm { - shm - } else { - return Err(PyRuntimeError::new_err("can't decompress with closed SHM")); - }; - - handle.with_slot(shm, |slot| { - let mut out_rw = out.readwrite(); - let out_slice = out_rw.as_slice_mut().expect("`out` must be C-contiguous"); - - for (frame_meta, idx) in handle.get_meta().iter().zip(0..) { - let out_size = frame_meta.get_number_of_pixels(); - - // NOTE: frames should all have the same shape - // FIXME: frames in a stack can _theoretically_ have different bit depth? - let out_offset = idx * out_size; - let out_ptr: *mut T = out_slice[out_offset..out_offset + out_size] - .as_mut_ptr() - .cast(); - - let image_data = handle.get_slice_for_frame(idx, slot); - - match bs_sys::decompress_lz4_into(&image_data[12..], out_ptr, out_size, None) { - Ok(()) => {} - Err(e) => { - let msg = format!("decompression failed: {e:?}"); - return Err(DecompressError::new_err(msg)); - } - } - } - - Ok(()) - }) - } - - fn decompress_plain_lz4_impl( - &self, - handle: &FrameStackHandle, - out: &Bound<'_, PyArray3>, - ) -> PyResult<()> { - let shm = if let Some(shm) = &self.shm { - shm - } else { - return Err(PyRuntimeError::new_err("can't decompress with closed SHM")); - }; - - handle.with_slot(shm, |slot| { - let mut out_rw = out.readwrite(); - let out_slice = match out_rw.as_slice_mut() { - Ok(s) => s, - Err(e) => { - let msg = format!("`out` must be C-contiguous: {e:?}"); - return Err(DecompressError::new_err(msg)); - } - }; - - for (frame_meta, idx) in handle.get_meta().iter().zip(0..) { - // NOTE: frames should all have the same shape - // FIXME: frames in a stack can _theoretically_ have different bit depth? - let out_size = frame_meta.get_number_of_pixels(); - let out_slice_cast = out_slice[0..out_size].as_bytes_mut(); - let image_data = handle.get_slice_for_frame(idx, slot); - - println!("{} {}", image_data.len(), out_slice_cast.len()); - match lz4::block::decompress_to_buffer( - image_data, - Some(out_slice_cast.len().try_into().unwrap()), - out_slice_cast, - ) { - Ok(_) => {} - Err(e) => { - let msg = format!("decompression failed: {e:?}"); - return Err(DecompressError::new_err(msg)); - } - } - } - - Ok(()) - }) - } -} - -#[pymethods] -impl CamClient { - #[new] - fn new(handle_path: &str) -> PyResult { - match SharedSlabAllocator::connect(handle_path) { - Ok(shm) => Ok(CamClient { shm: Some(shm) }), - Err(e) => { - let msg = format!("failed to connect to SHM: {:?}", e); - Err(ConnectionError::new_err(msg)) - } - } - } - - fn decompress_frame_stack<'py>( - slf: PyRef, - handle: &DectrisFrameStack, - out: Bound<'py, PyAny>, - ) -> PyResult<()> { - let arr_u8: Result<&Bound<'py, PyArray3>, _> = out.downcast(); - let arr_u16: Result<&Bound<'py, PyArray3>, _> = out.downcast(); - let arr_u32: Result<&Bound<'py, PyArray3>, _> = out.downcast(); - - let handle_inner = handle.try_get_inner()?; - - let (encoding, type_) = if handle_inner.is_empty() { - return Ok(()); - } else { - let dimaged = &handle_inner.first_meta().dimaged; - (&dimaged.encoding, &dimaged.type_) - }; - - match encoding.as_str() { - // ": String of the form ”[bs][[-]lz4][<|>]”. bs stands for bit shuffling with bits, lz4 for - // lz4 compression and < (>) for little (big) endian. E.g. ”bs8-lz4<” stands for 8bit bitshuffling, lz4 compression - // and little endian. lz4 data is written as defined at https://code.google.com/p/lz4/ without any additional data like - // block size etc." - "bs32-lz4<" | "bs16-lz4<" | "bs8-lz4<" => match type_ { - PixelType::Uint8 => slf.decompress_bslz4_impl(handle_inner, arr_u8.unwrap())?, - PixelType::Uint16 => slf.decompress_bslz4_impl(handle_inner, arr_u16.unwrap())?, - PixelType::Uint32 => slf.decompress_bslz4_impl(handle_inner, arr_u32.unwrap())?, - }, - "lz4<" => match type_ { - PixelType::Uint8 => slf.decompress_plain_lz4_impl(handle_inner, arr_u8.unwrap())?, - PixelType::Uint16 => { - slf.decompress_plain_lz4_impl(handle_inner, arr_u16.unwrap())? - } - PixelType::Uint32 => { - slf.decompress_plain_lz4_impl(handle_inner, arr_u32.unwrap())? - } - }, - e => { - let msg = format!("can't deal with encoding {e}"); - return Err(exceptions::PyValueError::new_err(msg)); - } - } - Ok(()) - } - - fn done(&mut self, handle: &mut DectrisFrameStack) -> PyResult<()> { - if let Some(shm) = &mut self.shm { - if let Some(inner) = handle.take() { - inner.free_slot(shm); - Ok(()) - } else { - Err(PyRuntimeError::new_err( - "`done` called twice on the same handle", - )) - } - } else { - Err(PyRuntimeError::new_err( - "CamClient.done called with SHM closed", - )) - } - } - - fn close(&mut self) { - self.shm.take(); - } -} - -impl Drop for CamClient { - fn drop(&mut self) { - trace!("CamClient::drop"); - } -} - -#[cfg(test)] -mod tests { - use std::{convert::Infallible, io::Write, path::PathBuf}; - - use common::frame_stack::{FrameStackForWriting, FrameStackHandle}; - use lz4::block::CompressionMode; - use numpy::{PyArray, PyArrayMethods}; - use tempfile::tempdir; - - use ipc_test::SharedSlabAllocator; - use pyo3::{prepare_freethreaded_python, Python}; - use zerocopy::AsBytes; - - use crate::{cam_client::CamClient, common::DectrisFrameMeta}; - use tempfile::TempDir; - - fn get_socket_path() -> (TempDir, PathBuf) { - let socket_dir = tempdir().unwrap(); - let socket_as_path = socket_dir.path().join("stuff.socket"); - - (socket_dir, socket_as_path) - } - - #[test] - fn test_cam_client() { - let (_socket_dir, socket_as_path) = get_socket_path(); - let mut shm = SharedSlabAllocator::new(1, 4096, false, &socket_as_path).unwrap(); - let slot = shm.get_mut().expect("get a free shm slot"); - let mut fs = FrameStackForWriting::new(slot, 1, 512); - let dimage = crate::common::DImage { - htype: "dimage-1.0".to_string().try_into().unwrap(), - series: 1, - frame: 1, - hash: "aaaabbbb".to_string().try_into().unwrap(), - }; - let dimaged = crate::common::DImageD { - htype: "d-image_d-1.0".to_string().try_into().unwrap(), - shape: (16, 16), - type_: crate::common::PixelType::Uint16, - encoding: "bs16-lz4<".to_string().try_into().unwrap(), - }; - let dconfig = crate::common::DConfig { - htype: "dconfig-1.0".to_string().try_into().unwrap(), - start_time: 0, - stop_time: 0, - real_time: 0, - }; - - // some predictable test data: - let in_: Vec = (0..256).map(|i| i % 16).collect(); - let compressed_data = bs_sys::compress_lz4(&in_, None).unwrap(); - - // compressed dectris data stream has an (unknown) - // header in front of the compressed data, which we just cut off, - // so here we just prepend 12 zero-bytes - let mut data_with_prefix = vec![0; 12]; - data_with_prefix.extend_from_slice(&compressed_data); - assert!(data_with_prefix.len() < 512); - data_with_prefix.iter().take(12).for_each(|&e| { - assert_eq!(e, 0); - }); - println!("compressed_data: {:x?}", &compressed_data); - println!("data_with_prefix[12..]: {:x?}", &data_with_prefix[12..]); - assert_eq!(fs.get_cursor(), 0); - - let meta = DectrisFrameMeta { - dimage, - dimaged, - dconfig, - data_length_bytes: data_with_prefix.len(), - }; - - fs.write_frame(&meta, |mut b| -> Result<(), Infallible> { - b.write_all(&data_with_prefix).unwrap(); - Ok(()) - }) - .unwrap(); - - assert_eq!(fs.get_cursor(), data_with_prefix.len()); - - // we have one frame in there: - assert_eq!(fs.len(), 1); - - let fs_handle = fs.writing_done(&mut shm).unwrap(); - - // we still have one frame in there: - assert_eq!(fs_handle.len(), 1); - - // initialize a Python interpreter so we are able to construct a PyBytes instance: - prepare_freethreaded_python(); - - // roundtrip serialize/deserialize: - Python::with_gil(|_py| { - let bytes = fs_handle.serialize().unwrap(); - let new_handle = FrameStackHandle::deserialize_impl(&bytes).unwrap(); - assert_eq!(fs_handle, new_handle); - }); - - let client = CamClient::new(socket_as_path.to_str().unwrap()).unwrap(); - - fs_handle.with_slot(&shm, |slot_r| { - let slice = slot_r.as_slice(); - println!("slice: {:x?}", slice); - - Python::with_gil(|py| { - let flat: Vec = (0..256).collect(); - let out = PyArray::from_vec_bound(py, flat) - .reshape((1, 16, 16)) - .unwrap(); - client.decompress_bslz4_impl(&fs_handle, &out).unwrap(); - - out.readonly() - .as_slice() - .unwrap() - .iter() - .zip(0..) - .for_each(|(&item, idx)| { - assert_eq!(item, in_[idx]); - assert_eq!(item, (idx % 16) as u16); - }); - }); - }); - } - - #[test] - fn test_cam_client_lz4() { - let (_socket_dir, socket_as_path) = get_socket_path(); - let mut shm = SharedSlabAllocator::new(1, 4096, false, &socket_as_path).unwrap(); - let slot = shm.get_mut().expect("get a free shm slot"); - let mut fs = FrameStackForWriting::new(slot, 1, 512); - let dimage = crate::common::DImage { - htype: "dimage-1.0".to_string().try_into().unwrap(), - series: 1, - frame: 1, - hash: "aaaabbbb".to_string().try_into().unwrap(), - }; - let dimaged = crate::common::DImageD { - htype: "dimage_d-1.0".to_string().try_into().unwrap(), - shape: (16, 16), - type_: crate::common::PixelType::Uint16, - encoding: "lz4<".to_string().try_into().unwrap(), - }; - let dconfig = crate::common::DConfig { - htype: "dconfig-1.0".to_string().try_into().unwrap(), - start_time: 0, - stop_time: 0, - real_time: 0, - }; - - // some predictable test data: - let in_: Vec = (0..256).map(|i| i % 16).collect(); - let in_bytes = in_.as_bytes(); - let compressed_data = - lz4::block::compress(in_bytes, Some(CompressionMode::DEFAULT), false).unwrap(); - - println!("compressed_data: {:x?}", &compressed_data); - assert_eq!(fs.get_cursor(), 0); - - let meta = DectrisFrameMeta { - dimage, - dimaged, - dconfig, - data_length_bytes: compressed_data.len(), - }; - - fs.write_frame(&meta, |buf| { - buf.copy_from_slice(&compressed_data); - Ok::<_, Infallible>(()) - }) - .unwrap(); - - assert_eq!(fs.get_cursor(), compressed_data.len()); - - // we have one frame in there: - assert_eq!(fs.len(), 1); - - let fs_handle = fs.writing_done(&mut shm).unwrap(); - - // we still have one frame in there: - assert_eq!(fs_handle.len(), 1); - - // initialize a Python interpreter so we are able to construct a PyBytes instance: - prepare_freethreaded_python(); - - // roundtrip serialize/deserialize: - Python::with_gil(|_py| { - let bytes = fs_handle.serialize().unwrap(); - let new_handle = FrameStackHandle::deserialize_impl(&bytes).unwrap(); - assert_eq!(fs_handle, new_handle); - }); - - let client = CamClient::new(socket_as_path.to_str().unwrap()).unwrap(); - - fs_handle.with_slot(&shm, |slot_r| { - let slice = slot_r.as_slice(); - let slice_for_frame = fs_handle.get_slice_for_frame(0, slot_r); - - // try decompression directly: - let out_size = 256 * TryInto::::try_into(std::mem::size_of::()).unwrap(); - println!( - "slice_for_frame.len(): {}, uncompressed_size: {}", - slice_for_frame.len(), - out_size - ); - lz4::block::decompress(slice_for_frame, Some(out_size)).unwrap(); - - println!("slice_for_frame: {:x?}", slice_for_frame); - println!("slice: {:x?}", slice); - - Python::with_gil(|py| { - let flat: Vec = (0..256).collect(); - let out = PyArray::from_vec_bound(py, flat) - .reshape((1, 16, 16)) - .unwrap(); - - client.decompress_plain_lz4_impl(&fs_handle, &out).unwrap(); - - out.readonly() - .as_slice() - .unwrap() - .iter() - .zip(0..) - .for_each(|(&item, idx)| { - assert_eq!(item, in_[idx]); - assert_eq!(item, (idx % 16) as u16); - }); - }); - }); - } -} diff --git a/libertem_dectris/src/common.rs b/libertem_dectris/src/common.rs index 7933d606..db9e1844 100644 --- a/libertem_dectris/src/common.rs +++ b/libertem_dectris/src/common.rs @@ -19,9 +19,9 @@ impl TryFrom for NonEmptyString { fn try_from(value: String) -> Result { if value.is_empty() { - Ok(NonEmptyString(value)) - } else { Err("empty string provided where non-empty was expected".to_owned()) + } else { + Ok(NonEmptyString(value)) } } } @@ -256,6 +256,15 @@ pub enum Endianess { Big, } +impl Endianess { + pub fn as_string(&self) -> String { + match self { + Endianess::Little => "<".to_owned(), + Endianess::Big => ">".to_owned(), + } + } +} + impl DectrisFrameMeta { /// number of pixels in the uncompressed frame (from the shape) pub fn get_number_of_pixels(&self) -> usize { @@ -263,7 +272,7 @@ impl DectrisFrameMeta { } /// endianess after decompression (little/big) - fn get_endianess(&self) -> Endianess { + pub fn get_endianess(&self) -> Endianess { match self.dimaged.encoding.chars().last().unwrap() { '>' => Endianess::Big, '<' => Endianess::Little, @@ -283,12 +292,12 @@ impl FrameMeta for DectrisFrameMeta { let endianess = self.get_endianess(); // TODO: &'static str instead? match (endianess, &self.dimaged.type_) { - (Endianess::Little, PixelType::Uint8) => " " " ">uint8".to_owned(), - (Endianess::Big, PixelType::Uint16) => ">uint16".to_owned(), - (Endianess::Big, PixelType::Uint32) => ">uint32".to_owned(), + (Endianess::Little, PixelType::Uint8) => "uint8".to_owned(), + (Endianess::Little, PixelType::Uint16) => " " "uint8".to_owned(), + (Endianess::Big, PixelType::Uint16) => ">u2".to_owned(), + (Endianess::Big, PixelType::Uint32) => ">u4".to_owned(), } } diff --git a/libertem_dectris/src/decoder.rs b/libertem_dectris/src/decoder.rs new file mode 100644 index 00000000..8d56057e --- /dev/null +++ b/libertem_dectris/src/decoder.rs @@ -0,0 +1,218 @@ +use std::{ + any::{type_name, TypeId}, + fmt::Debug, +}; + +use common::{ + decoder::{Decoder, DecoderError}, + frame_stack::FrameStackHandle, +}; +use ipc_test::SharedSlabAllocator; + +use numpy::ndarray::s; +use zerocopy::{AsBytes, FromBytes}; + +use num::{NumCast, ToPrimitive}; + +use crate::common::{DectrisFrameMeta, NonEmptyString, PixelType}; + +fn cast_helper(input: &[I], output: &mut [O]) -> Result<(), DecoderError> +where + O: Copy + NumCast, + I: Copy + ToPrimitive + Debug, +{ + for (dest, src) in output.iter_mut().zip(input.iter()) { + let converted = NumCast::from(*src); + if let Some(value) = converted { + *dest = value; + } else { + return Err(DecoderError::FrameDecodeFailed { + msg: format!( + "dtype conversion error: {src:?} does not fit {0}", + type_name::() + ), + }); + } + } + + Ok(()) +} + +#[derive(Debug, Default)] +pub struct DectrisDecoder {} + +impl Decoder for DectrisDecoder { + type FrameMeta = DectrisFrameMeta; + + /// Decode (a part of a) compressed frame stack from the handle `input` to + /// the array `output`. + /// + /// There are ~ three types involved here: + /// + /// 1) The output dtype `T`, which is what the user wants to work in (this + /// can be an integer type, but also one of the floats, or possibly even + /// complex{128,64}) + /// + /// 2) The "native" output dtype (`DImageD::type_`), which is what we have + /// to decompress into (we can't easily map a function over individual + /// pixels as part of the decompression) let's call this one `N` (even + /// though we don't have it as a concrete type parameter here). We could + /// also call this the intermediate dtype, as it may be different from the + /// final output type `T`. + /// + /// 3) The per-frame encoding type, like "bs32-lz4<", where the 32 means + /// that 32 bits of the input are shuffled together - it doesn't have to + /// match `N`! This is mostly an internal encoding type, but we have to be + /// sure to handle it independently from the other types. + /// + /// The goal is to handle decompression as efficiently as possible, and + /// especially to handle the "native" case where `T == N` without an extra + /// copy, but fall back to a almost-as-good method of buffering the data in + /// as small as possible chunks (sadly in this case: a frame) in the + /// intermediate dtype, before converting to `T`. + fn decode( + &self, + shm: &SharedSlabAllocator, + input: &FrameStackHandle, + output: &mut numpy::ndarray::ArrayViewMut3<'_, T>, + start_idx: usize, + end_idx: usize, + ) -> Result<(), DecoderError> + where + T: 'static + AsBytes + FromBytes + Copy + NumCast, + { + input.with_slot(shm, |slot| { + // out three cute special cases: + let mut tmp_u8: Vec = Vec::new(); + let mut tmp_u16: Vec = Vec::new(); + let mut tmp_u32: Vec = Vec::new(); + + for ((frame_meta, out_idx), in_idx) in + input.get_meta().iter().zip(0..).zip(start_idx..end_idx) + { + let mut out_view = output.slice_mut(s![out_idx, .., ..]); + let frame_compressed_data = input.get_slice_for_frame(in_idx, slot); + let out_slice = + out_view + .as_slice_mut() + .ok_or_else(|| DecoderError::FrameDecodeFailed { + msg: "out slice not C-order contiguous".to_owned(), + })?; + + let pixel_type = &frame_meta.dimaged.type_; + let u8_t = TypeId::of::(); + let u16_t = TypeId::of::(); + let u32_t = TypeId::of::(); + + let t_type = TypeId::of::(); + + if t_type == u8_t || t_type == u16_t || t_type == u32_t { + // "zero"-copy shortcut: decompress directly into the destination + self.decode_single_frame( + frame_compressed_data, + out_slice, + &frame_meta.dimaged.encoding, + )?; + } else { + // in the general case, we need the temporary Vec: + let dest_size = frame_meta.get_number_of_pixels(); + + match pixel_type { + PixelType::Uint8 => { + if tmp_u8.capacity() < dest_size { + tmp_u8.resize(dest_size, 0); + } + self.decode_single_frame( + frame_compressed_data, + &mut tmp_u8, + &frame_meta.dimaged.encoding, + )?; + // from u8 to T: + cast_helper(&tmp_u8, out_slice)?; + } + PixelType::Uint16 => { + if tmp_u16.capacity() < dest_size { + tmp_u16.resize(dest_size, 0); + } + self.decode_single_frame( + frame_compressed_data, + &mut tmp_u16, + &frame_meta.dimaged.encoding, + )?; + cast_helper(&tmp_u16, out_slice)?; + } + PixelType::Uint32 => { + if tmp_u32.capacity() < dest_size { + tmp_u32.resize(dest_size, 0); + } + self.decode_single_frame( + frame_compressed_data, + &mut tmp_u32, + &frame_meta.dimaged.encoding, + )?; + cast_helper(&tmp_u32, out_slice)?; + } + } + } + } + + Ok(()) + }) + } + + fn zero_copy_available( + &self, + _handle: &FrameStackHandle, + ) -> Result { + Ok(false) + } +} + +impl DectrisDecoder { + fn decode_single_frame( + &self, + input: &[u8], + output: &mut [T], + encoding: &NonEmptyString, + ) -> Result<(), DecoderError> { + match encoding.as_str() { + "bs32-lz4<" | "bs16-lz4<" | "bs8-lz4<" => self.decode_single_frame_bslz4(input, output), + "lz4<" => self.decode_single_frame_plain_lz4(input, output), + enc => Err(DecoderError::FrameDecodeFailed { + msg: format!("unknown or unsupported encoding: {enc}"), + }), + } + } + + fn decode_single_frame_bslz4( + &self, + input: &[u8], + output: &mut [T], + ) -> Result<(), DecoderError> { + let out_ptr = output.as_mut_ptr(); + unsafe { bs_sys::decompress_lz4_into(&input[12..], out_ptr, output.len(), None) }.map_err( + |e| { + let msg = format!("decompression failed: {e:?}"); + DecoderError::FrameDecodeFailed { msg } + }, + ) + } + + fn decode_single_frame_plain_lz4( + &self, + input: &[u8], + output: &mut [T], + ) -> Result<(), DecoderError> { + let out_slice_bytes = output.as_bytes_mut(); + let out_size: i32 = out_slice_bytes.len().try_into().map_err(|e| { + let msg = format!("output buffer size error: {e}"); + DecoderError::FrameDecodeFailed { msg } + })?; + + lz4::block::decompress_to_buffer(input, Some(out_size), out_slice_bytes).map_err(|e| { + let msg = format!("plain lz4 decompression failed: {e}"); + DecoderError::FrameDecodeFailed { msg } + })?; + Ok(()) + } +} diff --git a/libertem_dectris/src/dectris_py.rs b/libertem_dectris/src/dectris_py.rs index 7ab2bc47..d383114f 100644 --- a/libertem_dectris/src/dectris_py.rs +++ b/libertem_dectris/src/dectris_py.rs @@ -4,15 +4,10 @@ //! pending future unification with full compatability between detectors. use std::time::Duration; -use common::{ - background_thread::BackgroundThread, - frame_stack::FrameMeta, - generic_connection::{ConnectionStatus, GenericConnection}, -}; +use common::generic_connection::{ConnectionStatus, GenericConnection}; use crate::{ background_thread::{DectrisBackgroundThread, DectrisDetectorConnConfig, DectrisExtraControl}, - cam_client::CamClient, common::{ DConfig, DHeader, DImage, DImageD, DSeriesEnd, DectrisFrameMeta, DectrisPendingAcquisition, DetectorConfig, PixelType, TriggerMode, @@ -21,10 +16,17 @@ use crate::{ sim::DectrisSim, }; -use common::{frame_stack::FrameStackHandle, impl_py_connection}; -use ipc_test::SharedSlabAllocator; -use pyo3::{create_exception, exceptions::PyException, prelude::*}; -use stats::Stats; +use common::{impl_py_cam_client, impl_py_connection}; + +use pyo3::{ + prelude::*, + types::{PyBytes, PyType}, +}; + +use log::trace; +use numpy::PyUntypedArray; + +use crate::decoder::DectrisDecoder; #[pymodule] fn libertem_dectris(py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { @@ -172,6 +174,14 @@ impl DectrisFrameStack { pub fn new(inner: _PyDectrisFrameStack) -> Self { Self { inner } } + + pub fn get_inner(&self) -> &_PyDectrisFrameStack { + &self.inner + } + + pub fn get_inner_mut(&mut self) -> &mut _PyDectrisFrameStack { + &mut self.inner + } } #[pymethods] @@ -184,10 +194,60 @@ impl DectrisFrameStack { self.inner.get_dtype_string() } + /// use `get_dtype_string` instead + #[deprecated] + fn get_pixel_type(&self) -> PyResult { + let meta = self.inner.try_get_inner()?.first_meta(); + + Ok(match meta.dimaged.type_ { + PixelType::Uint8 => "uint8", + PixelType::Uint16 => "uint16", + PixelType::Uint32 => "uint32", + } + .to_owned()) + } + + /// use `get_dtype_string` instead, that should include endianess + #[deprecated] + fn get_endianess(&self) -> PyResult { + Ok(self + .inner + .try_get_inner()? + .first_meta() + .get_endianess() + .as_string()) + } + + /// implementation detail that Python shouldn't care about + #[deprecated] + fn get_encoding(&self) -> PyResult { + Ok(self + .inner + .try_get_inner()? + .first_meta() + .dimaged + .encoding + .to_string()) + } + fn get_shape(&self) -> PyResult<(u64, u64)> { self.inner.get_shape() } + fn serialize<'py>(&self, py: Python<'py>) -> PyResult> { + self.inner.serialize(py) + } + + #[classmethod] + fn deserialize<'py>( + _cls: Bound<'py, PyType>, + serialized: Bound<'py, PyBytes>, + ) -> PyResult { + Ok(Self { + inner: _PyDectrisFrameStack::deserialize_impl(serialized)?, + }) + } + // fn get_series_id // fn get_frame_id // fn get_hash @@ -196,3 +256,297 @@ impl DectrisFrameStack { #[pyclass(name = "CamClient")] pub struct DectrisCamClient {} + +impl_py_cam_client!( + _PyDectrisCamClient, + DectrisDecoder, + _PyDectrisFrameStack, + DectrisFrameMeta, + libertem_dectris +); + +#[pyclass] +pub struct CamClient { + inner: _PyDectrisCamClient, +} + +#[pymethods] +impl CamClient { + #[new] + fn new(handle_path: &str) -> PyResult { + Ok(Self { + inner: _PyDectrisCamClient::new(handle_path)?, + }) + } + + fn decode_into_buffer<'py>( + &self, + input: &DectrisFrameStack, + out: &Bound<'py, PyUntypedArray>, + py: Python<'py>, + ) -> PyResult<()> { + self.inner.decode_into_buffer(input.get_inner(), out, py) + } + + #[deprecated] + fn decompress_frame_stack<'py>( + &self, + handle: &DectrisFrameStack, + out: &Bound<'py, PyUntypedArray>, + py: Python<'py>, + ) -> PyResult<()> { + self.inner.decode_into_buffer(handle.get_inner(), out, py) + } + + fn done(&mut self, handle: &mut DectrisFrameStack) -> PyResult<()> { + self.inner.frame_stack_done(handle.get_inner_mut()) + } + + fn close(&mut self) -> PyResult<()> { + self.inner.close() + } +} + +impl Drop for CamClient { + fn drop(&mut self) { + trace!("CamClient::drop"); + } +} + +#[cfg(test)] +mod tests { + use std::{convert::Infallible, io::Write, path::PathBuf}; + + use common::frame_stack::{FrameStackForWriting, FrameStackHandle}; + use lz4::block::CompressionMode; + use numpy::{PyArray, PyArrayMethods}; + use tempfile::tempdir; + + use ipc_test::SharedSlabAllocator; + use pyo3::{prepare_freethreaded_python, Python}; + use zerocopy::AsBytes; + + use crate::{ + common::DectrisFrameMeta, + dectris_py::{CamClient, DectrisFrameStack, _PyDectrisFrameStack}, + }; + use tempfile::TempDir; + + fn get_socket_path() -> (TempDir, PathBuf) { + let socket_dir = tempdir().unwrap(); + let socket_as_path = socket_dir.path().join("stuff.socket"); + + (socket_dir, socket_as_path) + } + + #[test] + fn test_cam_client() { + let (_socket_dir, socket_as_path) = get_socket_path(); + let mut shm = SharedSlabAllocator::new(1, 4096, false, &socket_as_path).unwrap(); + let slot = shm.get_mut().expect("get a free shm slot"); + let mut fs = FrameStackForWriting::new(slot, 1, 512); + let dimage = crate::common::DImage { + htype: "dimage-1.0".to_string().try_into().unwrap(), + series: 1, + frame: 1, + hash: "aaaabbbb".to_string().try_into().unwrap(), + }; + let dimaged = crate::common::DImageD { + htype: "d-image_d-1.0".to_string().try_into().unwrap(), + shape: (16, 16), + type_: crate::common::PixelType::Uint16, + encoding: "bs16-lz4<".to_string().try_into().unwrap(), + }; + let dconfig = crate::common::DConfig { + htype: "dconfig-1.0".to_string().try_into().unwrap(), + start_time: 0, + stop_time: 0, + real_time: 0, + }; + + // some predictable test data: + let in_: Vec = (0..256).map(|i| i % 16).collect(); + let compressed_data = bs_sys::compress_lz4(&in_, None).unwrap(); + + // compressed dectris data stream has an (unknown) + // header in front of the compressed data, which we just cut off, + // so here we just prepend 12 zero-bytes + let mut data_with_prefix = vec![0; 12]; + data_with_prefix.extend_from_slice(&compressed_data); + assert!(data_with_prefix.len() < 512); + data_with_prefix.iter().take(12).for_each(|&e| { + assert_eq!(e, 0); + }); + println!("compressed_data: {:x?}", &compressed_data); + println!("data_with_prefix[12..]: {:x?}", &data_with_prefix[12..]); + assert_eq!(fs.get_cursor(), 0); + + let meta = DectrisFrameMeta { + dimage, + dimaged, + dconfig, + data_length_bytes: data_with_prefix.len(), + }; + + fs.write_frame(&meta, |mut b| -> Result<(), Infallible> { + b.write_all(&data_with_prefix).unwrap(); + Ok(()) + }) + .unwrap(); + + assert_eq!(fs.get_cursor(), data_with_prefix.len()); + + // we have one frame in there: + assert_eq!(fs.len(), 1); + + let fs_handle = fs.writing_done(&mut shm).unwrap(); + + // we still have one frame in there: + assert_eq!(fs_handle.len(), 1); + + // initialize a Python interpreter so we are able to construct a PyBytes instance: + prepare_freethreaded_python(); + + // roundtrip serialize/deserialize: + Python::with_gil(|_py| { + let bytes = fs_handle.serialize().unwrap(); + let new_handle = FrameStackHandle::deserialize_impl(&bytes).unwrap(); + assert_eq!(fs_handle, new_handle); + }); + + let client = CamClient::new(socket_as_path.to_str().unwrap()).unwrap(); + + fs_handle.with_slot(&shm, |slot_r| { + let slice = slot_r.as_slice(); + println!("slice: {:x?}", slice); + + Python::with_gil(|py| { + let flat: Vec = (0..256).collect(); + let out = PyArray::from_vec_bound(py, flat) + .reshape((1, 16, 16)) + .unwrap(); + + let out_untyped = out.as_untyped(); + let dfsh = DectrisFrameStack::new(_PyDectrisFrameStack::new(fs_handle)); + client.decode_into_buffer(&dfsh, out_untyped, py).unwrap(); + + out.readonly() + .as_slice() + .unwrap() + .iter() + .zip(0..) + .for_each(|(&item, idx)| { + assert_eq!(item, in_[idx]); + assert_eq!(item, (idx % 16) as u16); + }); + }); + }); + } + + #[test] + fn test_cam_client_lz4() { + let (_socket_dir, socket_as_path) = get_socket_path(); + let mut shm = SharedSlabAllocator::new(1, 4096, false, &socket_as_path).unwrap(); + let slot = shm.get_mut().expect("get a free shm slot"); + let mut fs = FrameStackForWriting::new(slot, 1, 512); + let dimage = crate::common::DImage { + htype: "dimage-1.0".to_string().try_into().unwrap(), + series: 1, + frame: 1, + hash: "aaaabbbb".to_string().try_into().unwrap(), + }; + let dimaged = crate::common::DImageD { + htype: "dimage_d-1.0".to_string().try_into().unwrap(), + shape: (16, 16), + type_: crate::common::PixelType::Uint16, + encoding: "lz4<".to_string().try_into().unwrap(), + }; + let dconfig = crate::common::DConfig { + htype: "dconfig-1.0".to_string().try_into().unwrap(), + start_time: 0, + stop_time: 0, + real_time: 0, + }; + + // some predictable test data: + let in_: Vec = (0..256).map(|i| i % 16).collect(); + let in_bytes = in_.as_bytes(); + let compressed_data = + lz4::block::compress(in_bytes, Some(CompressionMode::DEFAULT), false).unwrap(); + + println!("compressed_data: {:x?}", &compressed_data); + assert_eq!(fs.get_cursor(), 0); + + let meta = DectrisFrameMeta { + dimage, + dimaged, + dconfig, + data_length_bytes: compressed_data.len(), + }; + + fs.write_frame(&meta, |buf| { + buf.copy_from_slice(&compressed_data); + Ok::<_, Infallible>(()) + }) + .unwrap(); + + assert_eq!(fs.get_cursor(), compressed_data.len()); + + // we have one frame in there: + assert_eq!(fs.len(), 1); + + let fs_handle = fs.writing_done(&mut shm).unwrap(); + + // we still have one frame in there: + assert_eq!(fs_handle.len(), 1); + + // initialize a Python interpreter so we are able to construct a PyBytes instance: + prepare_freethreaded_python(); + + // roundtrip serialize/deserialize: + Python::with_gil(|_py| { + let bytes = fs_handle.serialize().unwrap(); + let new_handle = FrameStackHandle::deserialize_impl(&bytes).unwrap(); + assert_eq!(fs_handle, new_handle); + }); + + let client = CamClient::new(socket_as_path.to_str().unwrap()).unwrap(); + + fs_handle.with_slot(&shm, |slot_r| { + let slice = slot_r.as_slice(); + let slice_for_frame = fs_handle.get_slice_for_frame(0, slot_r); + + // try decompression directly: + let out_size = 256 * TryInto::::try_into(std::mem::size_of::()).unwrap(); + println!( + "slice_for_frame.len(): {}, uncompressed_size: {}", + slice_for_frame.len(), + out_size + ); + lz4::block::decompress(slice_for_frame, Some(out_size)).unwrap(); + + println!("slice_for_frame: {:x?}", slice_for_frame); + println!("slice: {:x?}", slice); + + Python::with_gil(|py| { + let flat: Vec = (0..256).collect(); + let out = PyArray::from_vec_bound(py, flat) + .reshape((1, 16, 16)) + .unwrap(); + let out_untyped = out.as_untyped(); + let dfsh = DectrisFrameStack::new(_PyDectrisFrameStack::new(fs_handle)); + client.decode_into_buffer(&dfsh, &out_untyped, py).unwrap(); + + out.readonly() + .as_slice() + .unwrap() + .iter() + .zip(0..) + .for_each(|(&item, idx)| { + assert_eq!(item, in_[idx]); + assert_eq!(item, (idx % 16) as u16); + }); + }); + }); + } +} diff --git a/libertem_dectris/src/lib.rs b/libertem_dectris/src/lib.rs index f1cfc9c2..198677f3 100644 --- a/libertem_dectris/src/lib.rs +++ b/libertem_dectris/src/lib.rs @@ -2,6 +2,7 @@ pub mod background_thread; pub mod bin_fmt; pub mod cam_client; pub mod common; +pub mod decoder; pub mod dectris_py; pub mod exceptions; pub mod frame_stack_py; diff --git a/libertem_dectris/src/sim.rs b/libertem_dectris/src/sim.rs index 3f73f392..8d419c14 100644 --- a/libertem_dectris/src/sim.rs +++ b/libertem_dectris/src/sim.rs @@ -77,8 +77,12 @@ impl FrameSender { cursor.seek_to_first_header_of_type("dheader-1.0"); let dheader_raw = cursor.read_raw_msg().to_owned(); - let dheader: DHeader = serde_json::from_slice(&dheader_raw) - .expect("json should match our serialization schema"); + let dheader: DHeader = serde_json::from_slice(&dheader_raw).unwrap_or_else(|e| { + panic!( + "json should match our serialization schema, got '{}' ({e})", + String::from_utf8_lossy(&dheader_raw) + ) + }); debug!("{dheader:?}"); @@ -255,8 +259,8 @@ impl DectrisSim { } #[classmethod] - fn new_mocked<'py>( - _cls: Bound<'py, PyType>, + fn new_mocked( + _cls: Bound<'_, PyType>, uri: &str, num_frames: usize, dwelltime: Option, diff --git a/playegui/src/background.rs b/playegui/src/background.rs index 2b7bb3b1..eea09071 100644 --- a/playegui/src/background.rs +++ b/playegui/src/background.rs @@ -28,7 +28,7 @@ pub fn decompress_into( ) -> Option<()> { let out_ptr = dest.as_mut_ptr(); let out_size = out_size[0] * out_size[1]; - match bs_sys::decompress_lz4_into(data, out_ptr, out_size, None) { + match unsafe { bs_sys::decompress_lz4_into(data, out_ptr, out_size, None) } { Ok(_) => Some(()), Err(e) => { error!("decompression failed: {e:?}");