diff --git a/Cargo.toml b/Cargo.toml index e1c86f7e..bce20921 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,4 +1,5 @@ [workspace] +resolver = "2" members = [ "libertem_dectris", "libertem_asi_tpx3", diff --git a/k2o/benches/decode_benchmarks.rs b/k2o/benches/decode_benchmarks.rs index 6e8d03d2..0912f1c9 100644 --- a/k2o/benches/decode_benchmarks.rs +++ b/k2o/benches/decode_benchmarks.rs @@ -22,9 +22,9 @@ fn criterion_benchmark(c: &mut Criterion) { // let mut out_f32: [f32; DECODED_SIZE * NUM_PACKETS] = [0.0; DECODED_SIZE * NUM_PACKETS]; // (in production code, we don't need to make these large allocations, as we only look at ~single packets at a time) - let input = vec![0 as u8; PACKET_SIZE * NUM_PACKETS].into_boxed_slice(); - let mut out = vec![0 as u16; DECODED_SIZE * NUM_PACKETS].into_boxed_slice(); - let mut out_f32 = vec![0.0 as f32; DECODED_SIZE * NUM_PACKETS].into_boxed_slice(); + let input = vec![0_u8; PACKET_SIZE * NUM_PACKETS].into_boxed_slice(); + let mut out = vec![0_u16; DECODED_SIZE * NUM_PACKETS].into_boxed_slice(); + let mut out_f32 = vec![0.0_f32; DECODED_SIZE * NUM_PACKETS].into_boxed_slice(); const TOTAL_INPUT_SIZE: usize = NUM_PACKETS * PACKET_SIZE; @@ -40,7 +40,7 @@ fn criterion_benchmark(c: &mut Criterion) { let out_chunks = out.chunks_exact_mut(DECODED_SIZE); for (chunk, o_chunk) in in_chunks.zip(out_chunks) { - decode::(black_box(&chunk), black_box(o_chunk)); + decode::(black_box(chunk), black_box(o_chunk)); } }) }, @@ -56,7 +56,7 @@ fn criterion_benchmark(c: &mut Criterion) { for (in_chunk, out_chunk) in in_chunks.zip(out_chunks) { decode_unrolled::( - black_box(&in_chunk), + black_box(in_chunk), black_box(out_chunk), ); } @@ -74,7 +74,7 @@ fn criterion_benchmark(c: &mut Criterion) { for (in_chunk, out_chunk) in in_chunks.zip(out_chunks) { decode_map::<_, _, PACKET_SIZE, DECODED_SIZE>( - black_box(&in_chunk), + black_box(in_chunk), black_box(out_chunk), |x| x, ); @@ -93,7 +93,7 @@ fn criterion_benchmark(c: &mut Criterion) { for (in_chunk, out_chunk) in in_chunks.zip(out_chunks) { decode_map::<_, _, PACKET_SIZE, DECODED_SIZE>( - black_box(&in_chunk), + black_box(in_chunk), black_box(out_chunk), |x| x as f32, ); @@ -112,7 +112,7 @@ fn criterion_benchmark(c: &mut Criterion) { for (in_chunk, out_chunk) in in_chunks.zip(out_chunks) { decode_converted::<_, PACKET_SIZE, DECODED_SIZE>( - black_box(&in_chunk), + black_box(in_chunk), black_box(out_chunk), ); } @@ -130,7 +130,7 @@ fn criterion_benchmark(c: &mut Criterion) { for (in_chunk, out_chunk) in in_chunks.zip(out_chunks) { decode_converted::<_, PACKET_SIZE, DECODED_SIZE>( - black_box(&in_chunk), + black_box(in_chunk), black_box(out_chunk), ); } diff --git a/k2o/src/acquisition.rs b/k2o/src/acquisition.rs index ad110110..a486639d 100644 --- a/k2o/src/acquisition.rs +++ b/k2o/src/acquisition.rs @@ -1,7 +1,4 @@ -use std::{ - fmt::Debug, - time::{Duration, Instant}, -}; +use std::time::{Duration, Instant}; use crossbeam_channel::{Receiver, RecvError, Select, SelectedOperation, Sender}; use human_bytes::human_bytes; @@ -9,7 +6,7 @@ use ipc_test::SharedSlabAllocator; use log::{error, info, warn}; use opentelemetry::{ global, - trace::{self, TraceContextExt, Tracer}, + trace::{self, Span, TraceContextExt, Tracer}, Context, Key, }; use partialdebug::placeholder::PartialDebug; @@ -188,7 +185,11 @@ impl<'a, F: K2Frame> FrameHandler<'a, F> { #[must_use] fn handle_frames(mut self) -> HandleFramesResult { let tracer = get_tracer(); - let span = tracer.start("handle_frames"); + let mut span = tracer.start("handle_frames"); + span.add_event( + "start", + vec![Key::new("ref_frame_id").i64(self.ref_frame_id as i64)], + ); let _guard = trace::mark_span_as_active(span); let mut sel = Select::new(); let op_events = sel.recv(self.events_rx); @@ -209,7 +210,7 @@ impl<'a, F: K2Frame> FrameHandler<'a, F> { match oper.index() { i if i == op_events => match oper.recv(self.events_rx) { Ok(EventMsg::Shutdown {}) => return HandleFramesResult::Shutdown, - Ok(EventMsg::CancelAcquisition { acquisition_id }) => { + Ok(EventMsg::CancelAcquisition { acquisition_id: _ }) => { return HandleFramesResult::Aborted { dropped: self.dropped, } diff --git a/k2o/src/assemble.rs b/k2o/src/assemble.rs index 8fdb15d9..1f61b7f6 100644 --- a/k2o/src/assemble.rs +++ b/k2o/src/assemble.rs @@ -9,7 +9,7 @@ use std::{ use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, SendError, Sender}; use ipc_test::SharedSlabAllocator; -use log::{debug, error, info}; +use log::{debug, error, info, warn}; use opentelemetry::Context; use crate::{ @@ -28,7 +28,7 @@ impl PendingFrames { pub fn new() -> Self { PendingFrames { frames: HashMap::new(), - timeout: Duration::from_millis(5), + timeout: Duration::from_millis(15), } } @@ -94,6 +94,14 @@ impl PendingFrames { for (_, frame) in self.frames.iter() { let delta = now - frame.get_modified_timestamp(); if !frame.is_finished() && delta > self.timeout { + let tracker = frame.get_tracker(); + let num_missing = tracker.iter().filter(|have_block| !**have_block).count(); + warn!( + "dropping frame {}, num_missing: {:?} of {}", + frame.get_frame_id(), + num_missing, + tracker.len() + ); to_remove.push(frame.get_frame_id()); } } diff --git a/k2o/src/bin/decode_only.rs b/k2o/src/bin/decode_only.rs index c78ca5c8..4729b75f 100644 --- a/k2o/src/bin/decode_only.rs +++ b/k2o/src/bin/decode_only.rs @@ -1,5 +1,5 @@ +use k2o::decode::decode_unrolled; use k2o::decode::HEADER_SIZE; -use k2o::decode::{decode, decode_unrolled}; fn main() { const PACKET_SIZE: usize = 0x5758; diff --git a/k2o/src/cli_args.rs b/k2o/src/cli_args.rs index 848e0289..d2b6e77a 100644 --- a/k2o/src/cli_args.rs +++ b/k2o/src/cli_args.rs @@ -15,9 +15,9 @@ pub enum WriteMode { HDF5, } -impl Into for WriteMode { - fn into(self) -> WriterType { - match self { +impl From for WriterType { + fn from(val: WriteMode) -> Self { + match val { WriteMode::Direct => WriterType::Direct, WriteMode::MMAP => WriterType::Mmap, #[cfg(not(feature = "hdf5"))] diff --git a/k2o/src/control.rs b/k2o/src/control.rs index 8c8e2aa3..6273cf0e 100644 --- a/k2o/src/control.rs +++ b/k2o/src/control.rs @@ -1,7 +1,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use crossbeam_channel::TryRecvError; -use log::{debug, info}; +use log::debug; use opentelemetry::trace::{TraceContextExt, Tracer}; use crate::{ @@ -162,12 +162,12 @@ impl StateTracker { acquisition_id: *acquisition_id, }), EventMsg::Shutdown => Ok(AcquisitionState::Shutdown), - EventMsg::AcquisitionError { msg } => Ok(AcquisitionState::Shutdown), + EventMsg::AcquisitionError { msg: _ } => Ok(AcquisitionState::Shutdown), } } AcquisitionState::Armed { params, - acquisition_id, + acquisition_id: _, } => { match event { // invalid transitions: @@ -217,7 +217,7 @@ impl StateTracker { EventMsg::AcquisitionEnded { acquisition_id: _ } => Ok(AcquisitionState::Idle), EventMsg::CancelAcquisition { acquisition_id: _ } => Ok(AcquisitionState::Idle), EventMsg::Shutdown => Ok(AcquisitionState::Shutdown), - EventMsg::AcquisitionError { msg } => Ok(AcquisitionState::Shutdown), + EventMsg::AcquisitionError { msg: _ } => Ok(AcquisitionState::Shutdown), } } AcquisitionState::AcquisitionStarted { @@ -275,15 +275,15 @@ impl StateTracker { acquisition_id: *acquisition_id, }) } - EventMsg::CancelAcquisition { acquisition_id } => Ok(AcquisitionState::Idle), + EventMsg::CancelAcquisition { acquisition_id: _ } => Ok(AcquisitionState::Idle), EventMsg::Shutdown => Ok(AcquisitionState::Shutdown), - EventMsg::AcquisitionError { msg } => Ok(AcquisitionState::Shutdown), + EventMsg::AcquisitionError { msg: _ } => Ok(AcquisitionState::Shutdown), } } AcquisitionState::AcquisitionFinishing { params: _, frame_id: _, - acquisition_id: acquisition_id_outer, + acquisition_id: _acquisition_id_outer, } => { match event { // invalid transitions: @@ -330,7 +330,7 @@ impl StateTracker { EventMsg::CancelAcquisition { acquisition_id: _ } => Ok(AcquisitionState::Idle), EventMsg::Shutdown => Ok(AcquisitionState::Shutdown), EventMsg::ProcessingDone { acquisition_id: _ } => Ok(AcquisitionState::Idle), - EventMsg::AcquisitionError { msg } => Ok(AcquisitionState::Shutdown), + EventMsg::AcquisitionError { msg: _ } => Ok(AcquisitionState::Shutdown), } } } diff --git a/k2o/src/events.rs b/k2o/src/events.rs index ac77ea23..b35d98da 100644 --- a/k2o/src/events.rs +++ b/k2o/src/events.rs @@ -5,7 +5,7 @@ use std::{ }; use crossbeam_channel::{select, unbounded, Receiver, RecvError, SendError, Sender}; -use log::{debug, info}; +use log::debug; use crate::write::{DirectWriterBuilder, MMapWriterBuilder, NoopWriterBuilder, WriterBuilder}; #[cfg(feature = "hdf5")] diff --git a/k2o/src/frame_is.rs b/k2o/src/frame_is.rs index 205e4222..b59c73d4 100644 --- a/k2o/src/frame_is.rs +++ b/k2o/src/frame_is.rs @@ -95,7 +95,6 @@ impl FrameForWriting for K2ISFrameForWriting { let slot_info = shm.writing_done(self.payload); K2ISFrame { payload: slot_info, - subframe_idx: self.subframe_idx, frame_id: self.frame_id, created_timestamp: self.created_timestamp, modified_timestamp: self.modified_timestamp, @@ -140,25 +139,12 @@ impl K2ISFrameForWriting { counts.get(&false).unwrap_or(&0) ); } - - fn into_readonly(self, slot_info: SlotInfo) -> K2ISFrame { - K2ISFrame { - payload: slot_info, - subframe_idx: self.subframe_idx, - frame_id: self.frame_id, - created_timestamp: self.created_timestamp, - modified_timestamp: self.modified_timestamp, - acquisition_id: self.acquisition_id, - } - } } pub struct K2ISFrame { /// a reference to the decoded payload of the whole frame pub payload: SlotInfo, - subframe_idx: u8, - /// the frame id as received pub frame_id: u32, diff --git a/k2o/src/ordering.rs b/k2o/src/ordering.rs index 943ea7f3..57cf27f7 100644 --- a/k2o/src/ordering.rs +++ b/k2o/src/ordering.rs @@ -3,10 +3,7 @@ use std::{ collections::{BinaryHeap, HashSet}, }; -use crate::{ - acquisition::AcquisitionResult, - frame::{GenericFrame, K2Frame}, -}; +use crate::{acquisition::AcquisitionResult, frame::GenericFrame}; pub enum FrameOrderingResult { Buffered, @@ -19,9 +16,9 @@ pub enum FrameWithIdx { DroppedFrame(GenericFrame, u32), } -impl Into> for FrameWithIdx { - fn into(self) -> AcquisitionResult { - match self { +impl From for AcquisitionResult { + fn from(val: FrameWithIdx) -> Self { + match val { FrameWithIdx::Frame(frame, frame_idx) => AcquisitionResult::Frame(frame, frame_idx), FrameWithIdx::DroppedFrame(frame, frame_idx) => { AcquisitionResult::DroppedFrame(frame, frame_idx) @@ -103,8 +100,8 @@ impl FrameOrdering { pub fn handle_frame(&mut self, frame_w_idx: FrameWithIdx) -> FrameOrderingResult { let expected_frame_idx = self.next_expected_frame_idx; - // frame indices can be repeated, in case we drop a frame and later a - // block of said frame arrives, which starts a new frame with the old index + // Frame indices can be repeated, in case we drop a frame and later a + // block of said frame arrives, which starts a new frame with the old index. // filter these out here: if self.dropped_idxs.contains(&frame_w_idx.get_idx()) { return FrameOrderingResult::Dropped; diff --git a/k2o/src/recv.rs b/k2o/src/recv.rs index ce3c84a4..3fe8a4a7 100644 --- a/k2o/src/recv.rs +++ b/k2o/src/recv.rs @@ -135,6 +135,14 @@ pub fn recv_decode_loop( acquisition_id, }); state = RecvState::Receiving { acquisition_id }; + // send out the first block: + let route_info = BlockRouteInfo::new(&block); + if assembly_channel.send((block, route_info)).is_err() { + events.send(&EventMsg::AcquisitionError { + msg: "failed to send to assembly threads".to_string(), + }); + break; + } } } RecvState::WaitForNext { acquisition_id } => { diff --git a/k2o/src/write.rs b/k2o/src/write.rs index 16927442..e8279c6a 100644 --- a/k2o/src/write.rs +++ b/k2o/src/write.rs @@ -9,7 +9,7 @@ use hdf5::{ types::{IntSize, TypeDescriptor}, Dataset, Extent, }; -use log::{info, trace}; +use log::trace; use memmap2::{MmapMut, MmapOptions}; use ndarray::s; use ndarray_npy::{write_zeroed_npy, ViewMutNpyExt}; diff --git a/libertem_k2is/examples/fast_acq_integrate.py b/libertem_k2is/examples/fast_acq_integrate.py index 9a2caa25..5329fa19 100644 --- a/libertem_k2is/examples/fast_acq_integrate.py +++ b/libertem_k2is/examples/fast_acq_integrate.py @@ -6,9 +6,16 @@ 3) Each acquisition start point is properly synchronized, meaning it doesn't include data from any point in time before explicitly starting the acquisition. + +To run with full instrumentation and debug logging: + +$ OTEL_ENABLE=1 OTLP_ENDPOINT=localhost:4317 LIBERTEM_K2IS_LOG_LEVEL=debug\ + python examples/fast_acq_integrate.py\ + --mode summit --num-parts=2 --frames-per-part=4 """ import time +import datetime import click import numpy as np @@ -23,15 +30,15 @@ tracer = trace.get_tracer("write_and_iterate") -def iterate(aq, cam, cam_client, frame_arr): +def iterate(outer_i, aq, cam, cam_client, frame_arr): t0 = time.time() i = 0 try: while frame := cam.get_next_frame(): - i += 1 if frame.is_dropped(): print(f"dropped frame {frame.get_idx()}") continue + i += 1 slot = cam.get_frame_slot(frame) frame_ref = cam_client.get_frame_ref(slot) mv = frame_ref.get_memoryview() @@ -43,7 +50,7 @@ def iterate(aq, cam, cam_client, frame_arr): finally: t1 = time.time() print(f"acquisition {aq} done, got {i} frames in {t1-t0:.2f}s...") - # np.save(f"/cachedata/alex/bar-sum-{aq.get_id()}.npy", frame_arr) + np.save(f"/cachedata/alex/bar-sum-{outer_i}.npy", frame_arr) @click.command @@ -53,10 +60,7 @@ def iterate(aq, cam, cam_client, frame_arr): def main(mode, num_parts, frames_per_part): maybe_setup_tracing("write_and_iterate") with tracer.start_as_current_span("main"): - if mode.lower() == "summit": - mode = Mode.Summit - elif mode.lower() == "is": - mode = Mode.IS + mode = Mode.from_string(mode) shm_socket = "/tmp/k2shm.socket" cam = Cam( local_addr_top="192.168.10.99", @@ -72,10 +76,11 @@ def main(mode, num_parts, frames_per_part): try: for i in range(num_parts): frame_arr[:] = 0 + ts = datetime.datetime.now().strftime("%H:%M:%S") writer = Writer( method="direct", # method="mmap", - filename=f"/cachedata/alex/bar-{i}.raw", + filename=f"/cachedata/alex/bar-{i}-{ts}.raw", ) aqp = AcquisitionParams( size=frames_per_part, @@ -87,9 +92,9 @@ def main(mode, num_parts, frames_per_part): aq = cam.make_acquisition(aqp) print(f"acquisition {aq}") cam.wait_for_start() - iterate(aq, cam, cam_client, frame_arr) + iterate(i, aq, cam, cam_client, frame_arr) finally: - # this shuts down the runtime, backgrounds threads an all... + # this shuts down the runtime, backgrounds threads and all... cam.stop() diff --git a/libertem_k2is/src/bgthread.rs b/libertem_k2is/src/bgthread.rs index 97b0c33e..82cde904 100644 --- a/libertem_k2is/src/bgthread.rs +++ b/libertem_k2is/src/bgthread.rs @@ -17,8 +17,8 @@ use k2o::helpers::{set_cpu_affinity, CPU_AFF_WRITER}; use k2o::params::CameraMode; use k2o::recv::recv_decode_loop; use k2o::tracing::get_tracer; -use k2o::write::WriterBuilder; -use log::{debug, info}; + +use log::debug; use opentelemetry::trace::Tracer; use opentelemetry::{global, Context}; #[derive(Debug, Clone)] @@ -229,10 +229,6 @@ impl WaitResult { pub fn is_success(&self) -> bool { matches!(self, WaitResult::PredSuccess) } - - pub fn is_timeout(&self) -> bool { - matches!(self, WaitResult::Timeout) - } } /// The `AcquisitionRuntime` starts and communicates with a background thread, @@ -462,26 +458,6 @@ impl AcquisitionRuntime { Some(()) } - pub fn try_join_timeout(&mut self, timeout: Duration) -> Result<(), RuntimeError> { - if let Some(join_handle) = self.bg_thread.take() { - let deadline = Instant::now() + timeout; - while !join_handle.is_finished() && Instant::now() < deadline { - std::thread::sleep(Duration::from_millis(100)); - } - if !join_handle.is_finished() { - self.bg_thread = Some(join_handle); - Err(RuntimeError::Timeout) - } else { - join_handle - .join() - .expect("could not join background thread!"); - Ok(()) - } - } else { - Ok(()) // join on non-running thread is not an error - } - } - pub fn wait_predicate

(&mut self, timeout: Duration, pred: P) -> WaitResult where P: Fn(&Self) -> bool, diff --git a/libertem_k2is/src/lib.rs b/libertem_k2is/src/lib.rs index 214e2854..1dcf8458 100644 --- a/libertem_k2is/src/lib.rs +++ b/libertem_k2is/src/lib.rs @@ -5,7 +5,6 @@ use env_logger::Builder; use ipc_test::SharedSlabAllocator; use shm_helpers::{CamClient, FrameRef}; use std::{ - panic, path::Path, sync::{Arc, Barrier}, time::{Duration, Instant}, @@ -16,15 +15,10 @@ use bgthread::{AcquisitionRuntime, AddrConfig, RuntimeError, WaitResult}; use k2o::{ acquisition::AcquisitionResult, - block::K2Block, - block_is::K2ISBlock, events::{AcquisitionParams, AcquisitionSync, WriterSettings, WriterTypeError}, - frame::{GenericFrame, K2Frame}, - frame_is::K2ISFrame, - frame_summit::K2SummitFrame, + frame::GenericFrame, params::CameraMode, tracing::{get_tracer, init_tracer}, - write::{DirectWriterBuilder, MMapWriterBuilder, WriterBuilder}, }; #[cfg(feature = "hdf5")] @@ -34,7 +28,7 @@ use opentelemetry::{ trace::{self, SpanContext, SpanId, TraceContextExt, TraceFlags, TraceId, TraceState, Tracer}, Context, ContextGuard, }; -use pyo3::{exceptions, prelude::*}; +use pyo3::{exceptions, prelude::*, types::PyType}; fn tracing_thread() { let thread_builder = std::thread::Builder::new(); @@ -168,15 +162,30 @@ impl From for PyMode { } } -impl Into for PyMode { - fn into(self) -> CameraMode { - match self { +impl From for CameraMode { + fn from(val: PyMode) -> Self { + match val { PyMode::IS => CameraMode::IS, PyMode::Summit => CameraMode::Summit, } } } +#[pymethods] +impl PyMode { + #[classmethod] + fn from_string(_cls: &PyType, mode: &str) -> PyResult { + match mode.to_lowercase().as_ref() { + "is" => Ok(Self::IS), + "summit" => Ok(Self::Summit), + _ => Err(exceptions::PyValueError::new_err(format!( + "unknown mode: {}", + mode + ))), + } + } +} + #[pyclass(name = "Writer")] #[derive(Debug, Clone)] struct PyWriter { @@ -299,6 +308,14 @@ impl Acquisition { self.id } + fn get_mode(&self) -> PyMode { + self.camera_mode.into() + } + + fn get_params(&self) -> PyAcquisitionParams { + self.params.clone() + } + fn __repr__(&self) -> String { let id = self.id; format!("") @@ -307,9 +324,7 @@ impl Acquisition { #[pyclass] struct Cam { - addr_config: AddrConfig, camera_mode: PyMode, - shm_path: String, enable_frame_iterator: bool, shm: Option, runtime: Option, @@ -347,9 +362,7 @@ impl Cam { ); Ok(Cam { - addr_config, camera_mode: mode, - shm_path: shm_path.to_owned(), enable_frame_iterator, shm: Some(shm), runtime: Some(runtime), @@ -488,11 +501,11 @@ impl Cam { } AcquisitionResult::DoneAborted { dropped, - acquisition_id, + acquisition_id: _, } | AcquisitionResult::DoneSuccess { dropped, - acquisition_id, + acquisition_id: _, } => { eprintln!("dropped {dropped} frames in this acquisition"); return Ok(None);