diff --git a/Cargo.lock b/Cargo.lock index b230ee17..9ec7bc3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2020,6 +2020,7 @@ dependencies = [ "opentelemetry", "opentelemetry-otlp", "opentelemetry-semantic-conventions", + "partialdebug", "rayon", "shared_memory", "tempfile", @@ -2806,6 +2807,26 @@ dependencies = [ "windows-sys 0.45.0", ] +[[package]] +name = "partialdebug" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589e1f540d19ee0565c1e05f6737b442d8d71e477c197acf368ddd89914dd1e8" +dependencies = [ + "partialdebug-derive", +] + +[[package]] +name = "partialdebug-derive" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc64c42a1af5b98ba812b0b8ec10ff222b44fec0cc9637246fe231915a2d31bc" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "paste" version = "1.0.12" diff --git a/k2o/Cargo.toml b/k2o/Cargo.toml index 0ccf62cb..4a213679 100644 --- a/k2o/Cargo.toml +++ b/k2o/Cargo.toml @@ -31,6 +31,7 @@ log = "0.4.18" dbus = "0.9.7" libc = "0.2.147" env_logger = "0.10.0" +partialdebug = "0.2.0" [features] hdf5 = ["dep:hdf5"] diff --git a/k2o/NOTES.md b/k2o/NOTES.md index 87b4cd91..1127a33f 100644 --- a/k2o/NOTES.md +++ b/k2o/NOTES.md @@ -19,6 +19,38 @@ - => no change in performance, still spending 88% of our time in the page fault handler - assembly of full frames is required to get a good access pattern! +# Notes on reworking the life cycles + +To make sure we can quickly start acquisitions after another, +we need to make sure of some boundary conditions: + +- The SHM area needs to be kept alive over multiple acquisitions, as well as + the "recycling" system for blocks, such that we don't have malloc/pagefault + caused jitter +- The sockets need to be kept open, otherwise the multicast traffic may only be + received after an initial delay (IIRC) +- That also means that switching between more fundamental settings may take longer + - changing network settings + - changing camera mode (IS/Summit) + - SHM socket path + - enabling / disabling the frame iterator +- What changes must be fast: + - file writer destination (filename) -> we may need to disable pre-allocation of files! + - number of frames per acquisition + - probably: camera sync mode (immediately vs. wait for sync flag) + +## Action items +- [ ] Refactor... + - [x] The `AcquisitionRuntime` should be the object that lives over multiple acquisitions + - [x] On the Python side, the `Cam` already needs to start the `AcquisitionRuntime` + - [ ] For error handling, we should have the option to completely tear down + and re-create the `AcquisitionRuntime`, perform this step automatically such that + we don't have "restart the {script,notebook,app,server}" situations as often as currently + - [x] `WriterBuilder` must be set per acquisition, not when starting the runtime +- [x] Figure out where the acquisition ID should be generated + - probably in the `AcquisitionRuntime`, as that is the long-living object + which manages the background thread(s) etc. + # TODO - [ ] fix busy waiting (one of the `try_recv` loops, I guess) - [ ] fix subframe shtuff diff --git a/k2o/benches/assemble_benchmarks.rs b/k2o/benches/assemble_benchmarks.rs index a53bcf53..bed5a016 100644 --- a/k2o/benches/assemble_benchmarks.rs +++ b/k2o/benches/assemble_benchmarks.rs @@ -14,8 +14,8 @@ fn criterion_benchmark(c: &mut Criterion) { const TOTAL_INPUT_SIZE: usize = 1 * PACKET_SIZE; - let block = K2ISBlock::empty(42); - let mut frame: K2ISFrame = K2ISFrame::empty::(42); + let block = K2ISBlock::empty(42, 0); + let mut frame: K2ISFrame = K2ISFrame::empty::(42, 0); let mut assign_block = c.benchmark_group("assign_block* functions"); assign_block.measurement_time(Duration::new(10, 0)); diff --git a/k2o/benches/decode_benchmarks.rs b/k2o/benches/decode_benchmarks.rs index a2feacae..6e8d03d2 100644 --- a/k2o/benches/decode_benchmarks.rs +++ b/k2o/benches/decode_benchmarks.rs @@ -152,7 +152,7 @@ fn criterion_benchmark(c: &mut Criterion) { let in_chunks = input.chunks_exact(PACKET_SIZE); for chunk in in_chunks { - K2ISBlock::from_bytes(black_box(chunk), 0); + K2ISBlock::from_bytes(black_box(chunk), 0, 0); } }) }, diff --git a/k2o/src/acquisition.rs b/k2o/src/acquisition.rs index 0ace8ce9..ad110110 100644 --- a/k2o/src/acquisition.rs +++ b/k2o/src/acquisition.rs @@ -1,20 +1,24 @@ -use std::time::{Duration, Instant}; +use std::{ + fmt::Debug, + time::{Duration, Instant}, +}; use crossbeam_channel::{Receiver, RecvError, Select, SelectedOperation, Sender}; use human_bytes::human_bytes; use ipc_test::SharedSlabAllocator; -use log::{info, warn}; +use log::{error, info, warn}; use opentelemetry::{ global, trace::{self, TraceContextExt, Tracer}, Context, Key, }; +use partialdebug::placeholder::PartialDebug; use crate::{ assemble::AssemblyResult, control::AcquisitionState, events::{AcquisitionParams, AcquisitionSize, EventBus, EventMsg, EventReceiver, Events}, - frame::{GenericFrame, K2Frame}, + frame::{FrameMeta, GenericFrame, K2Frame}, ordering::{FrameOrdering, FrameOrderingResult, FrameWithIdx}, tracing::get_tracer, write::{Writer, WriterBuilder}, @@ -28,13 +32,30 @@ enum HandleFramesResult { Shutdown, } +#[derive(PartialDebug)] pub enum AcquisitionResult { Frame(F, u32), DroppedFrame(F, u32), DroppedFrameOutside(F), - DoneSuccess { dropped: usize }, - DoneAborted { dropped: usize }, - DoneError, // some possibly unhandled error happened, we don't know a lot here... + DoneSuccess { + dropped: usize, + acquisition_id: usize, + }, + DoneAborted { + dropped: usize, + acquisition_id: usize, + }, + + /// This is the result if some threads upstream closed their end of the + /// channel and we get a receive error, while an acquisition is running - + /// the system is probably shutting down (or crashing). + DoneShuttingDown { + acquisition_id: usize, + }, + + /// This is the result if some threads upstream closed their end of the + /// channel while no current acquisition is known + ShutdownIdle, } impl AcquisitionResult { @@ -43,9 +64,16 @@ impl AcquisitionResult { AcquisitionResult::Frame(f, _) => Some(f), AcquisitionResult::DroppedFrame(f, _) => Some(f), AcquisitionResult::DroppedFrameOutside(f) => Some(f), - AcquisitionResult::DoneSuccess { dropped: _ } => None, - AcquisitionResult::DoneAborted { dropped: _ } => None, - AcquisitionResult::DoneError => None, + AcquisitionResult::DoneSuccess { + dropped: _, + acquisition_id: _, + } => None, + AcquisitionResult::DoneAborted { + dropped: _, + acquisition_id: _, + } => None, + AcquisitionResult::DoneShuttingDown { acquisition_id: _ } => None, + AcquisitionResult::ShutdownIdle => None, } } @@ -54,9 +82,16 @@ impl AcquisitionResult { AcquisitionResult::Frame(f, _) => Some(f), AcquisitionResult::DroppedFrame(f, _) => Some(f), AcquisitionResult::DroppedFrameOutside(f) => Some(f), - AcquisitionResult::DoneSuccess { dropped: _ } => None, - AcquisitionResult::DoneAborted { dropped: _ } => None, - AcquisitionResult::DoneError => None, + AcquisitionResult::DoneSuccess { + dropped: _, + acquisition_id: _, + } => None, + AcquisitionResult::DoneAborted { + dropped: _, + acquisition_id: _, + } => None, + AcquisitionResult::DoneShuttingDown { acquisition_id: _ } => None, + AcquisitionResult::ShutdownIdle => None, } } } @@ -174,7 +209,7 @@ impl<'a, F: K2Frame> FrameHandler<'a, F> { match oper.index() { i if i == op_events => match oper.recv(self.events_rx) { Ok(EventMsg::Shutdown {}) => return HandleFramesResult::Shutdown, - Ok(EventMsg::CancelAcquisition {}) => { + Ok(EventMsg::CancelAcquisition { acquisition_id }) => { return HandleFramesResult::Aborted { dropped: self.dropped, } @@ -222,7 +257,20 @@ impl<'a, F: K2Frame> FrameHandler<'a, F> { } Ok(AssemblyResult::AssemblyTimeout { frame, frame_id }) => { span.add_event("timeout", vec![Key::new("frame_id").i64(frame_id as i64)]); + let frame_meta = frame.get_meta(); self.timeout(frame_id, frame); + + // handle the case that the last frame was dropped: + if let AcquisitionSize::NumFrames(num) = self.params.size { + if self.counter == num as usize { + if self.counter % 100 != 0 { + self.print_stats(&frame_meta); + } + return Some(HandleFramesResult::Done { + dropped: self.dropped, + }); + } + } None } Err(RecvError) => Some(HandleFramesResult::Shutdown), @@ -258,8 +306,9 @@ impl<'a, F: K2Frame> FrameHandler<'a, F> { self.ref_bytes_written += frame_size_bytes; self.counter += 1; } + let frame_meta = frame.get_meta(); if self.counter % 100 == 0 { - self.print_stats(&frame); + self.print_stats(&frame_meta); } if let AcquisitionSize::NumFrames(num) = self.params.size { // FIXME: NumFrames should always be a @@ -267,7 +316,7 @@ impl<'a, F: K2Frame> FrameHandler<'a, F> { // otherwise this check can fail! if self.counter == num as usize { if self.counter % 100 != 0 { - self.print_stats(&frame); + self.print_stats(&frame_meta); } let result = FrameWithIdx::Frame(frame.into_generic(), frame_idx); next_hop_ordered(&mut self.ordering, self.next_hop_tx, result); @@ -296,7 +345,7 @@ impl<'a, F: K2Frame> FrameHandler<'a, F> { vec![Key::new("frame_id").i64(frame_id as i64)], ); self.dropped += 1; - self.counter += 1; + self.counter += 1; // FIXME: might need to increment by number of subframes? let result = FrameWithIdx::DroppedFrame(frame.into_generic(), frame_idx); next_hop_ordered(&mut self.ordering, self.next_hop_tx, result); } else { @@ -309,9 +358,9 @@ impl<'a, F: K2Frame> FrameHandler<'a, F> { } } - fn print_stats(&mut self, frame: &F) { + fn print_stats(&mut self, frame_meta: &FrameMeta) { let now = Instant::now(); - let latency = frame.get_created_timestamp().elapsed(); + let latency = frame_meta.get_created_timestamp().elapsed(); let channel_size = self.channel.len(); let delta_t = now - self.ref_ts; let throughput = { @@ -329,8 +378,11 @@ impl<'a, F: K2Frame> FrameHandler<'a, F> { String::from("") } }; - info!("frame counter={} frame_id={} dropped={} dropped_outside={}, latency first block -> frame written={:?} channel.len()={} write throughput={}/s fps={}", - self.counter, frame.get_frame_id(), self.dropped, self.dropped_outside, latency, channel_size, throughput, fps); + info!("frame acq#{} counter={} frame_id={} dropped={} dropped_outside={}, latency first block -> frame written={:?} channel.len()={} write throughput={}/s fps={}", + frame_meta.get_acquisition_id(), self.counter, frame_meta.get_frame_id(), + self.dropped, self.dropped_outside, latency, channel_size, + throughput, fps + ); self.ref_ts = Instant::now(); self.ref_bytes_written = 0; @@ -339,7 +391,7 @@ impl<'a, F: K2Frame> FrameHandler<'a, F> { /// /// Instantiate a writer, receive and write N frames, and forward frames -/// to the next hop channel. +/// to the next hop channel. This is started in a background thread. /// /// Filters out frames that don't belong to the current acquisition. /// @@ -349,7 +401,6 @@ pub fn acquisition_loop( next_hop_tx: &Sender>, events_rx: &EventReceiver, events: &Events, - writer_builder: Box, mut shm: SharedSlabAllocator, ) { let tracer = get_tracer(); @@ -369,9 +420,10 @@ pub fn acquisition_loop( i if i == op_events => { let msg_result = oper.recv(events_rx); match msg_result { - Ok(EventMsg::Arm { params }) => { + Ok(EventMsg::Arm { params, acquisition_id }) => { state = AcquisitionState::Armed { params: params.clone(), + acquisition_id, }; // Forward the start event for sectors, making sure we get the @@ -380,27 +432,34 @@ pub fn acquisition_loop( // we do here, we could possibly get the response from the // sectors before we are transitioning to the `Armed` state, // meaning we don't have the acquisition parameters yet etc... - events.send(&EventMsg::ArmSectors { params }); + events.send(&EventMsg::ArmSectors { params, acquisition_id}); } Ok(EventMsg::AcquisitionStartedSector { sector_id: _, frame_id, + acquisition_id: acquisition_id_outer, }) => { match state { - AcquisitionState::Armed { params } => { + AcquisitionState::Armed { params , acquisition_id } => { + if acquisition_id != acquisition_id_outer { + error!("acquisition id mismatch; {acquisition_id} != {acquisition_id_outer}"); + } state = AcquisitionState::AcquisitionStarted { params: params.clone(), frame_id, + acquisition_id, }; events.send(&EventMsg::AcquisitionStarted { frame_id, params: params.clone(), + acquisition_id, }); info!("acquisition started, first frame_id = {}", frame_id); Context::current() .span() .add_event("AcquisitionStarted", vec![]); + let writer_builder = params.writer_settings.get_writer_builder(); let fh = FrameHandler::new( channel, next_hop_tx, @@ -412,25 +471,25 @@ pub fn acquisition_loop( ); let write_result = fh.handle_frames(); info!("handle_frames done."); - events.send(&EventMsg::AcquisitionEnded {}); + events.send(&EventMsg::AcquisitionEnded { acquisition_id }); Context::current() .span() .add_event("AcquisitionEnded", vec![]); match write_result { HandleFramesResult::Done { dropped } => { next_hop_tx - .send(AcquisitionResult::DoneSuccess { dropped }) + .send(AcquisitionResult::DoneSuccess { dropped, acquisition_id }) .unwrap(); continue; } HandleFramesResult::Aborted { dropped } => { next_hop_tx - .send(AcquisitionResult::DoneAborted { dropped }) + .send(AcquisitionResult::DoneAborted { dropped, acquisition_id }) .unwrap(); continue; } HandleFramesResult::Shutdown => { - next_hop_tx.send(AcquisitionResult::DoneError).unwrap(); + next_hop_tx.send(AcquisitionResult::DoneShuttingDown { acquisition_id }).unwrap(); break; } } @@ -438,14 +497,15 @@ pub fn acquisition_loop( AcquisitionState::AcquisitionStarted { params: _, frame_id: _, + acquisition_id, } => { // we are only interested in the event from the first sector that starts the acquisition: warn!( - "ignoring AcuisitionStartedSector in AcquisitionStarted state" + "ignoring AcuisitionStartedSector in AcquisitionStarted state for acq#{acquisition_id}" ); } AcquisitionState::Idle - | AcquisitionState::AcquisitionFinishing { params: _, frame_id: _ } + | AcquisitionState::AcquisitionFinishing { params: _, frame_id: _, acquisition_id: _ } | AcquisitionState::Startup | AcquisitionState::Shutdown => { panic!( @@ -457,7 +517,7 @@ pub fn acquisition_loop( Ok(EventMsg::Shutdown {}) => break, Ok(_) => continue, Err(RecvError) => { - next_hop_tx.send(AcquisitionResult::DoneError).unwrap(); + next_hop_tx.send(AcquisitionResult::ShutdownIdle).unwrap(); break; } } diff --git a/k2o/src/assemble.rs b/k2o/src/assemble.rs index 62fb62ab..8fdb15d9 100644 --- a/k2o/src/assemble.rs +++ b/k2o/src/assemble.rs @@ -9,7 +9,7 @@ use std::{ use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, SendError, Sender}; use ipc_test::SharedSlabAllocator; -use log::{error, info}; +use log::{debug, error, info}; use opentelemetry::Context; use crate::{ @@ -28,19 +28,32 @@ impl PendingFrames { pub fn new() -> Self { PendingFrames { frames: HashMap::new(), - timeout: Duration::from_millis(100), + timeout: Duration::from_millis(5), } } pub fn assign_block(&mut self, block: &B, shm: &mut SharedSlabAllocator) { let frame = match self.frames.get_mut(&block.get_frame_id()) { None => { - let frame = F::FrameForWriting::empty_from_block(block, shm); + let frame = + F::FrameForWriting::empty_from_block(block, shm, block.get_acquisition_id()); self.frames.insert(block.get_frame_id(), frame); self.frames.get_mut(&block.get_frame_id()).unwrap() } Some(frame) => frame, }; + // in case we have an existing frame, the acquisition id must match + // (which is always the case if we allocate a new FrameForWriting): + if frame.get_acquisition_id() != block.get_acquisition_id() { + debug!( + "dropping block; block acquisition id {} != frame acquisition id {} (frame id {})", + block.get_acquisition_id(), + frame.get_acquisition_id(), + frame.get_frame_id(), + ); + // the block is from a different acquisition, drop it. + return; + } frame.assign_block(block); } @@ -199,7 +212,7 @@ pub fn assembler_main( // warmup for the block queue: for _ in 0..512 { - recycle_blocks_tx.send(B::empty(0)).unwrap(); + recycle_blocks_tx.send(B::empty(0, 0)).unwrap(); } crossbeam::scope(|s| { @@ -302,7 +315,7 @@ mod tests { &socket_as_path, ) .expect("create SHM area for testing"); - let mut frame: K2ISFrameForWriting = K2ISFrameForWriting::empty(FRAME_ID, &mut ssa); + let mut frame: K2ISFrameForWriting = K2ISFrameForWriting::empty(FRAME_ID, &mut ssa, 0); assert!(!frame.is_finished()); @@ -315,7 +328,7 @@ mod tests { let start_y = y_idx * 930; let block: K2ISBlock = - K2ISBlock::from_vec_and_pos(payload, start_x, start_y, FRAME_ID); + K2ISBlock::from_vec_and_pos(payload, start_x, start_y, FRAME_ID, 0); frame.assign_block(&block); } } @@ -368,7 +381,7 @@ mod tests { let start_y = y_idx * 930; let block: K2ISBlock = - K2ISBlock::from_vec_and_pos(payload, start_x, start_y, FRAME_ID); + K2ISBlock::from_vec_and_pos(payload, start_x, start_y, FRAME_ID, 0); pending_frames.assign_block(&block, &mut ssa); } } @@ -421,7 +434,7 @@ mod tests { let start_x = 2032; let start_y = 930; - let block: K2ISBlock = K2ISBlock::from_vec_and_pos(payload, start_x, start_y, FRAME_ID); + let block: K2ISBlock = K2ISBlock::from_vec_and_pos(payload, start_x, start_y, FRAME_ID, 0); assert_eq!(block.get_x_end(), 2047); assert_eq!(block.get_y_end(), 1859); pending_frames.assign_block(&block, &mut ssa); diff --git a/k2o/src/bin/main.rs b/k2o/src/bin/main.rs index eb9b4945..c429fd76 100644 --- a/k2o/src/bin/main.rs +++ b/k2o/src/bin/main.rs @@ -15,9 +15,12 @@ use k2o::block::BlockRouteInfo; use k2o::block::K2Block; use k2o::block_is::K2ISBlock; use k2o::block_summit::K2SummitBlock; -use k2o::cli_args::{Args, Mode, WriteMode}; +use k2o::cli_args::{Args, Mode}; use k2o::control::control_loop; -use k2o::events::{AcquisitionParams, AcquisitionSize, AcquisitionSync, Events, MessagePump}; +use k2o::events::{ + AcquisitionParams, AcquisitionSize, AcquisitionSync, Events, MessagePump, WriterSettings, + WriterType, +}; use k2o::events::{ChannelEventBus, EventBus, EventMsg}; use k2o::frame::K2Frame; use k2o::frame_is::K2ISFrame; @@ -26,7 +29,6 @@ use k2o::helpers::CPU_AFF_WRITER; use k2o::helpers::{recv_and_get_init, set_cpu_affinity}; use k2o::recv::recv_decode_loop; use k2o::tracing::init_tracer; -use k2o::write::{DirectWriterBuilder, MMapWriterBuilder, WriterBuilder}; use log::info; use tokio::runtime::Runtime; @@ -121,21 +123,11 @@ fn start_threads< let writer_shm = SharedSlabAllocator::connect(&shm_handle).unwrap(); - let wb: Box = match args.write_mode { - WriteMode::Direct => DirectWriterBuilder::for_filename(&args.write_to), - WriteMode::MMAP => MMapWriterBuilder::for_filename(&args.write_to), - #[cfg(not(feature = "hdf5"))] - WriteMode::HDF5 => panic!("hdf5 not supported"), - #[cfg(feature = "hdf5")] - WriteMode::HDF5 => HDF5WriterBuilder::for_filename(&args.write_to), - }; - acquisition_loop( &w1rx, &recycle_frames_tx, &writer_events_rx, events, - wb, writer_shm, ); } @@ -157,7 +149,8 @@ fn start_threads< } Ok(AcquisitionResult::DoneAborted { .. }) | Ok(AcquisitionResult::DoneSuccess { .. }) - | Ok(AcquisitionResult::DoneError) => { + | Ok(AcquisitionResult::ShutdownIdle) + | Ok(AcquisitionResult::DoneShuttingDown { .. }) => { info!("retire thread closing"); break; } @@ -176,6 +169,12 @@ fn start_threads< events.send(&EventMsg::Init {}); + let method: WriterType = args.write_mode.into(); + let writer_settings = WriterSettings::Enabled { + method, + filename: args.write_to.to_owned(), + }; + events.send(&EventMsg::Arm { params: AcquisitionParams { size: AcquisitionSize::NumFrames(40), @@ -183,7 +182,9 @@ fn start_threads< //sync: AcquisitionSync::WaitForSync, sync: AcquisitionSync::Immediately, binning: k2o::events::Binning::Bin1x, + writer_settings, }, + acquisition_id: 0, }); control_loop(events, pump) diff --git a/k2o/src/bin/main_async.rs b/k2o/src/bin/main_async.rs index 028ce341..4ecfedf6 100644 --- a/k2o/src/bin/main_async.rs +++ b/k2o/src/bin/main_async.rs @@ -62,7 +62,7 @@ pub async fn recv_decode_loop, _> = HashMap::new(); diff --git a/k2o/src/block.rs b/k2o/src/block.rs index aaad8ed9..cd829cd5 100644 --- a/k2o/src/block.rs +++ b/k2o/src/block.rs @@ -1,8 +1,6 @@ use std::time::Instant; -use ndarray::{ArrayBase, ArrayView, Dim, ViewRepr}; - -use crate::decode::{decode, decode_u16, decode_u16_vec, decode_u32}; +use ndarray::{ArrayBase, Dim, ViewRepr}; pub trait K2Block: Send { const PACKET_SIZE: usize; @@ -13,11 +11,11 @@ pub trait K2Block: Send { const SECTOR_HEIGHT: usize; const BLOCKS_PER_SECTOR: u8; - fn from_bytes(bytes: &[u8], sector_id: u8) -> Self; - fn replace_with(&mut self, bytes: &[u8], sector_id: u8); + fn from_bytes(bytes: &[u8], sector_id: u8, acquisition_id: usize) -> Self; + fn replace_with(&mut self, bytes: &[u8], sector_id: u8, acquisition_id: usize); fn as_array(&self) -> ArrayBase, Dim<[usize; 2]>>; fn as_vec(&self) -> &Vec; - fn empty(first_frame_id: u32) -> Self; + fn empty(first_frame_id: u32, acquisition_id: usize) -> Self; fn get_flags(&self) -> u8; fn sync_is_set(&self) -> bool { (self.get_flags() & 0x01) == 0x01 @@ -34,6 +32,7 @@ pub trait K2Block: Send { fn get_frame_id(&self) -> u32; fn get_sector_id(&self) -> u8; fn get_decoded_timestamp(&self) -> Instant; + fn get_acquisition_id(&self) -> usize; fn validate(&self); } diff --git a/k2o/src/block_is.rs b/k2o/src/block_is.rs index 30f3f25f..f2a90c4b 100644 --- a/k2o/src/block_is.rs +++ b/k2o/src/block_is.rs @@ -41,12 +41,13 @@ pub struct K2ISBlock { sector_id: u8, // not part of the actual data received, added as "metadata" decode_timestamp: Instant, // TODO: receive timestamp as an Instant + acquisition_id: usize, } impl K2ISBlock { /// return a dummy block with specified fields, matching IS specs /// NOTE: only meant for testing! - pub fn empty_for_pos(x: u16, y: u16, frame_id: u32) -> K2ISBlock { + pub fn empty_for_pos(x: u16, y: u16, frame_id: u32, acquisition_id: usize) -> K2ISBlock { let payload: Vec = vec![0; Self::DECODED_SIZE]; K2ISBlock { sync: 0xFFFF0055, @@ -64,13 +65,20 @@ impl K2ISBlock { payload, sector_id: 0, decode_timestamp: Instant::now(), + acquisition_id, } } /// return a block for the given input data and specified position /// NOTE: not optimized for performance - creates a copy of the data /// only use for testing! - pub fn from_vec_and_pos(data: &[u16], x: u16, y: u16, frame_id: u32) -> K2ISBlock { + pub fn from_vec_and_pos( + data: &[u16], + x: u16, + y: u16, + frame_id: u32, + acquisition_id: usize, + ) -> K2ISBlock { let payload = data.to_vec(); // create an owned copy here K2ISBlock { sync: 0xFFFF0055, @@ -88,12 +96,13 @@ impl K2ISBlock { payload, sector_id: 0, decode_timestamp: Instant::now(), + acquisition_id, } } } impl K2Block for K2ISBlock { - fn from_bytes(bytes: &[u8], sector_id: u8) -> K2ISBlock { + fn from_bytes(bytes: &[u8], sector_id: u8, acquisition_id: usize) -> K2ISBlock { // FIXME: we don't really need to initialize the vector, as it will be overwritten by `decode` just below... // FIXME: use MaybeUninit stuff from nightly? let mut payload = vec![0; Self::DECODED_SIZE]; @@ -119,10 +128,11 @@ impl K2Block for K2ISBlock { payload, sector_id, decode_timestamp: Instant::now(), + acquisition_id, } } - fn replace_with(&mut self, bytes: &[u8], sector_id: u8) { + fn replace_with(&mut self, bytes: &[u8], sector_id: u8, acquisition_id: usize) { decode::<{ Self::PACKET_SIZE }>(bytes, &mut self.payload); self.sync = decode_u32(&bytes[0..4]); @@ -139,6 +149,7 @@ impl K2Block for K2ISBlock { self.block_size = decode_u32(&bytes[36..40]); self.sector_id = sector_id; self.decode_timestamp = Instant::now(); + self.acquisition_id = acquisition_id; } fn as_array(&self) -> ArrayBase, Dim<[usize; 2]>> { @@ -155,7 +166,7 @@ impl K2Block for K2ISBlock { /// return a dummy block, matching IS specs /// NOTE: only meant for testing! - fn empty(first_frame_id: u32) -> Self { + fn empty(first_frame_id: u32, acquisition_id: usize) -> Self { let payload: Vec = vec![0; Self::DECODED_SIZE]; K2ISBlock { sync: 0xFFFF0055, @@ -173,6 +184,7 @@ impl K2Block for K2ISBlock { payload, sector_id: 0, decode_timestamp: Instant::now(), + acquisition_id, } } @@ -228,4 +240,8 @@ impl K2Block for K2ISBlock { assert_eq!(self.sync, 0xFFFF0055); assert_eq!(self.block_size, 0x5758) } + + fn get_acquisition_id(&self) -> usize { + self.acquisition_id + } } diff --git a/k2o/src/block_summit.rs b/k2o/src/block_summit.rs index c4c03802..9c09d32b 100644 --- a/k2o/src/block_summit.rs +++ b/k2o/src/block_summit.rs @@ -41,10 +41,11 @@ pub struct K2SummitBlock { sector_id: u8, // not part of the actual data received, added as "metadata" decode_timestamp: Instant, // TODO: receive timestamp as an Instant + acquisition_id: usize, } impl K2SummitBlock { - pub fn empty(frame_id: u32) -> Self { + pub fn empty(frame_id: u32, acquisition_id: usize) -> Self { let payload: Vec = vec![0; Self::DECODED_SIZE]; Self { sync: 0xFFFF0055, @@ -62,12 +63,13 @@ impl K2SummitBlock { payload, sector_id: 0, decode_timestamp: Instant::now(), + acquisition_id, } } } impl K2Block for K2SummitBlock { - fn from_bytes(bytes: &[u8], sector_id: u8) -> Self { + fn from_bytes(bytes: &[u8], sector_id: u8, acquisition_id: usize) -> Self { // FIXME: we don't really need to initialize the vector, as it will be overwritten by `decode` just below... // FIXME: use MaybeUninit stuff from nightly? let mut payload = vec![0; Self::DECODED_SIZE]; @@ -90,10 +92,11 @@ impl K2Block for K2SummitBlock { payload, sector_id, decode_timestamp: Instant::now(), + acquisition_id, } } - fn replace_with(&mut self, bytes: &[u8], sector_id: u8) { + fn replace_with(&mut self, bytes: &[u8], sector_id: u8, acquisition_id: usize) { decode_u16_vec::<{ Self::PACKET_SIZE }>(bytes, &mut self.payload); self.sync = decode_u32(&bytes[0..4]); @@ -110,6 +113,7 @@ impl K2Block for K2SummitBlock { self.block_size = decode_u32(&bytes[36..40]); self.sector_id = sector_id; self.decode_timestamp = Instant::now(); + self.acquisition_id = acquisition_id; } fn as_array(&self) -> ArrayBase, Dim<[usize; 2]>> { @@ -126,7 +130,7 @@ impl K2Block for K2SummitBlock { /// return a dummy block /// NOTE: only meant for testing! - fn empty(first_frame_id: u32) -> Self { + fn empty(first_frame_id: u32, acquisition_id: usize) -> Self { let payload: Vec = vec![0; Self::DECODED_SIZE]; K2SummitBlock { sync: 0xFFFF0055, @@ -144,6 +148,7 @@ impl K2Block for K2SummitBlock { payload, sector_id: 0, decode_timestamp: Instant::now(), + acquisition_id, } } @@ -199,4 +204,8 @@ impl K2Block for K2SummitBlock { assert_eq!(self.sync, 0xFFFF0055); assert_eq!(self.block_size as usize, Self::PACKET_SIZE); } + + fn get_acquisition_id(&self) -> usize { + self.acquisition_id + } } diff --git a/k2o/src/cli_args.rs b/k2o/src/cli_args.rs index 81a6dedd..848e0289 100644 --- a/k2o/src/cli_args.rs +++ b/k2o/src/cli_args.rs @@ -1,5 +1,7 @@ use clap::Parser; +use crate::events::WriterType; + #[derive(clap::ArgEnum, Clone, Copy, Debug)] pub enum Mode { IS, @@ -13,6 +15,19 @@ pub enum WriteMode { HDF5, } +impl Into for WriteMode { + fn into(self) -> WriterType { + match self { + WriteMode::Direct => WriterType::Direct, + WriteMode::MMAP => WriterType::Mmap, + #[cfg(not(feature = "hdf5"))] + WriteMode::HDF5 => panic!("hdf5 not supported"), + #[cfg(feature = "hdf5")] + WriteMode::HDF5 => WriterType::HDF5, + } + } +} + /// Test program - arm and perform a single acquisition #[derive(Parser, Debug)] #[clap(author, version, about, long_about = None)] diff --git a/k2o/src/control.rs b/k2o/src/control.rs index b7135bc2..8c8e2aa3 100644 --- a/k2o/src/control.rs +++ b/k2o/src/control.rs @@ -1,6 +1,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use crossbeam_channel::TryRecvError; +use log::{debug, info}; use opentelemetry::trace::{TraceContextExt, Tracer}; use crate::{ @@ -18,12 +19,16 @@ pub enum AcquisitionState { Idle, Armed { params: AcquisitionParams, + /// The current acquisition id + acquisition_id: usize, }, /// Current receiving data AcquisitionStarted { params: AcquisitionParams, /// The reference frame id for this acquisition frame_id: u32, + /// The current acquisition id + acquisition_id: usize, }, /// No longer receiving new data, but still processing /// (=> there is still data in the queues) @@ -31,6 +36,8 @@ pub enum AcquisitionState { params: AcquisitionParams, /// The reference frame id for this acquisition frame_id: u32, + /// The current acquisition id + acquisition_id: usize, }, Shutdown, } @@ -113,6 +120,7 @@ impl StateTracker { EventMsg::AcquisitionStartedSector { sector_id: _, frame_id: _, + acquisition_id: _, } => Err(StateError::InvalidTransition { from: self.state.clone(), msg: event.clone(), @@ -120,76 +128,103 @@ impl StateTracker { EventMsg::AcquisitionStarted { frame_id: _, params: _, + acquisition_id: _, } => Err(StateError::InvalidTransition { from: self.state.clone(), msg: event.clone(), }), - EventMsg::ArmSectors { params: _ } => Err(StateError::InvalidTransition { + EventMsg::ArmSectors { + params: _, + acquisition_id: _, + } => Err(StateError::InvalidTransition { from: self.state.clone(), msg: event.clone(), }), - EventMsg::AcquisitionEnded => Err(StateError::InvalidTransition { + EventMsg::AcquisitionEnded { .. } => Err(StateError::InvalidTransition { from: self.state.clone(), msg: event.clone(), }), - EventMsg::CancelAcquisition => Err(StateError::InvalidTransition { + EventMsg::CancelAcquisition { .. } => Err(StateError::InvalidTransition { from: self.state.clone(), msg: event.clone(), }), - EventMsg::ProcessingDone => Err(StateError::InvalidTransition { + EventMsg::ProcessingDone { .. } => Err(StateError::InvalidTransition { from: self.state.clone(), msg: event.clone(), }), // should only come in AcquisitionFinishing state // valid transitions: - EventMsg::Arm { params } => Ok(AcquisitionState::Armed { + EventMsg::Arm { + params, + acquisition_id, + } => Ok(AcquisitionState::Armed { params: params.clone(), + acquisition_id: *acquisition_id, }), EventMsg::Shutdown => Ok(AcquisitionState::Shutdown), EventMsg::AcquisitionError { msg } => Ok(AcquisitionState::Shutdown), } } - AcquisitionState::Armed { params } => { + AcquisitionState::Armed { + params, + acquisition_id, + } => { match event { // invalid transitions: EventMsg::Init => Err(StateError::InvalidTransition { from: self.state.clone(), msg: event.clone(), }), - EventMsg::Arm { params: _ } => Err(StateError::InvalidTransition { + EventMsg::Arm { + params: _, + acquisition_id: _, + } => Err(StateError::InvalidTransition { from: self.state.clone(), msg: event.clone(), }), - EventMsg::ProcessingDone => Err(StateError::InvalidTransition { + EventMsg::ProcessingDone { .. } => Err(StateError::InvalidTransition { from: self.state.clone(), msg: event.clone(), }), // should only come in AcquisitionFinishing state // valid transitions to self: - EventMsg::ArmSectors { params } => Ok(AcquisitionState::Armed { + EventMsg::ArmSectors { + params, + acquisition_id, + } => Ok(AcquisitionState::Armed { params: params.clone(), + acquisition_id: *acquisition_id, }), EventMsg::AcquisitionStartedSector { sector_id: _, frame_id: _, + acquisition_id, } => Ok(AcquisitionState::Armed { params: params.clone(), + acquisition_id: *acquisition_id, }), // valid transitions: - EventMsg::AcquisitionStarted { frame_id, params } => { - Ok(AcquisitionState::AcquisitionStarted { - params: params.clone(), - frame_id: *frame_id, - }) - } - EventMsg::AcquisitionEnded => Ok(AcquisitionState::Idle), - EventMsg::CancelAcquisition => Ok(AcquisitionState::Idle), + EventMsg::AcquisitionStarted { + frame_id, + params, + acquisition_id, + } => Ok(AcquisitionState::AcquisitionStarted { + params: params.clone(), + frame_id: *frame_id, + acquisition_id: *acquisition_id, + }), + EventMsg::AcquisitionEnded { acquisition_id: _ } => Ok(AcquisitionState::Idle), + EventMsg::CancelAcquisition { acquisition_id: _ } => Ok(AcquisitionState::Idle), EventMsg::Shutdown => Ok(AcquisitionState::Shutdown), EventMsg::AcquisitionError { msg } => Ok(AcquisitionState::Shutdown), } } - AcquisitionState::AcquisitionStarted { params, frame_id } => { + AcquisitionState::AcquisitionStarted { + params, + frame_id, + acquisition_id, + } => { match event { // invalid transitions: EventMsg::Init => Err(StateError::InvalidTransition { @@ -199,19 +234,26 @@ impl StateTracker { EventMsg::AcquisitionStarted { frame_id: _, params: _, + acquisition_id: _, } => Err(StateError::InvalidTransition { from: self.state.clone(), msg: event.clone(), }), // should only come in Armed state - EventMsg::Arm { params: _ } => Err(StateError::InvalidTransition { + EventMsg::Arm { + params: _, + acquisition_id: _, + } => Err(StateError::InvalidTransition { from: self.state.clone(), msg: event.clone(), }), // should only come in Idle state - EventMsg::ArmSectors { params: _ } => Err(StateError::InvalidTransition { + EventMsg::ArmSectors { + params: _, + acquisition_id: _, + } => Err(StateError::InvalidTransition { from: self.state.clone(), msg: event.clone(), }), // should only come in Armed state - EventMsg::ProcessingDone => Err(StateError::InvalidTransition { + EventMsg::ProcessingDone { .. } => Err(StateError::InvalidTransition { from: self.state.clone(), msg: event.clone(), }), // should only come in AcquisitionFinishing state @@ -220,15 +262,20 @@ impl StateTracker { EventMsg::AcquisitionStartedSector { sector_id: _, frame_id: _, + acquisition_id: _, } => Ok(AcquisitionState::AcquisitionStarted { params: params.clone(), frame_id: *frame_id, + acquisition_id: *acquisition_id, }), - EventMsg::AcquisitionEnded => Ok(AcquisitionState::AcquisitionFinishing { - params: params.clone(), - frame_id: *frame_id, - }), - EventMsg::CancelAcquisition => Ok(AcquisitionState::Idle), + EventMsg::AcquisitionEnded { acquisition_id } => { + Ok(AcquisitionState::AcquisitionFinishing { + params: params.clone(), + frame_id: *frame_id, + acquisition_id: *acquisition_id, + }) + } + EventMsg::CancelAcquisition { acquisition_id } => Ok(AcquisitionState::Idle), EventMsg::Shutdown => Ok(AcquisitionState::Shutdown), EventMsg::AcquisitionError { msg } => Ok(AcquisitionState::Shutdown), } @@ -236,6 +283,7 @@ impl StateTracker { AcquisitionState::AcquisitionFinishing { params: _, frame_id: _, + acquisition_id: acquisition_id_outer, } => { match event { // invalid transitions: @@ -246,6 +294,7 @@ impl StateTracker { EventMsg::AcquisitionStartedSector { sector_id: _, frame_id: _, + acquisition_id: _, } => Err(StateError::InvalidTransition { from: self.state.clone(), msg: event.clone(), @@ -253,27 +302,34 @@ impl StateTracker { EventMsg::AcquisitionStarted { frame_id: _, params: _, + acquisition_id: _, } => Err(StateError::InvalidTransition { from: self.state.clone(), msg: event.clone(), }), - EventMsg::Arm { params: _ } => Err(StateError::InvalidTransition { + EventMsg::Arm { + params: _, + acquisition_id: _, + } => Err(StateError::InvalidTransition { from: self.state.clone(), msg: event.clone(), }), - EventMsg::ArmSectors { params: _ } => Err(StateError::InvalidTransition { + EventMsg::ArmSectors { + params: _, + acquisition_id: _, + } => Err(StateError::InvalidTransition { from: self.state.clone(), msg: event.clone(), }), - EventMsg::AcquisitionEnded => Err(StateError::InvalidTransition { + EventMsg::AcquisitionEnded { .. } => Err(StateError::InvalidTransition { from: self.state.clone(), msg: event.clone(), }), // valid transitions: - EventMsg::CancelAcquisition => Ok(AcquisitionState::Idle), + EventMsg::CancelAcquisition { acquisition_id: _ } => Ok(AcquisitionState::Idle), EventMsg::Shutdown => Ok(AcquisitionState::Shutdown), - EventMsg::ProcessingDone => Ok(AcquisitionState::Idle), + EventMsg::ProcessingDone { acquisition_id: _ } => Ok(AcquisitionState::Idle), EventMsg::AcquisitionError { msg } => Ok(AcquisitionState::Shutdown), } } @@ -284,7 +340,7 @@ impl StateTracker { let next_state = self.next_state(msg)?; let ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); if next_state != self.state { - println!( + debug!( "StateTracker:\n old={:?}\n event={:?}\n -> {:?}\n ts={:?}", self.state, msg, next_state, ts, ); diff --git a/k2o/src/events.rs b/k2o/src/events.rs index edf84e3e..52624f9a 100644 --- a/k2o/src/events.rs +++ b/k2o/src/events.rs @@ -5,7 +5,11 @@ use std::{ }; use crossbeam_channel::{unbounded, Receiver, SendError, Sender}; -use log::info; +use log::{debug, info}; + +use crate::write::{DirectWriterBuilder, MMapWriterBuilder, NoopWriterBuilder, WriterBuilder}; +#[cfg(feature = "hdf5")] +use k2o::write::HDF5WriterBuilder; pub trait EventBus { /// Send `msg` to all subscribers @@ -46,7 +50,7 @@ impl EventBus for ChannelEventBus { } fn send(&self, msg: &T) { - info!("Event: {:?}", msg); + debug!("Event: {:?}", msg); // FIXME: don't panic for (tx, _) in self.channels.lock().unwrap().iter() { tx.send(msg.clone()).unwrap(); @@ -176,11 +180,70 @@ pub enum Binning { Bin8x, } +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum WriterType { + Direct, + Mmap, + #[cfg(feature = "hdf5")] + HDF5, +} + +pub enum WriterTypeError { + InvalidWriterType, +} + +impl TryFrom<&str> for WriterType { + type Error = WriterTypeError; + + fn try_from(value: &str) -> Result { + match value { + "direct" => Ok(Self::Direct), + "mmap" => Ok(Self::Mmap), + #[cfg(feature = "hdf5")] + "hdf5" => Ok(Self::HDF5), + _ => Err(WriterTypeError::InvalidWriterType), + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum WriterSettings { + Disabled, + Enabled { + method: WriterType, + filename: String, // maybe change to a path type? + }, +} + +impl WriterSettings { + pub fn disabled() -> Self { + Self::Disabled + } + + pub fn new(method: &str, filename: &str) -> Result { + Ok(Self::Enabled { + method: WriterType::try_from(method)?, + filename: filename.to_owned(), + }) + } + + pub fn get_writer_builder(&self) -> Box { + match self { + Self::Disabled => NoopWriterBuilder::new(), + Self::Enabled { method, filename } => match &method { + WriterType::Direct => DirectWriterBuilder::for_filename(filename), + WriterType::Mmap => MMapWriterBuilder::for_filename(filename), + }, + } + } +} + #[derive(Clone, Debug, PartialEq, Eq)] pub struct AcquisitionParams { pub size: AcquisitionSize, pub sync: AcquisitionSync, pub binning: Binning, + pub writer_settings: WriterSettings, } #[derive(Clone, Debug, PartialEq, Eq)] @@ -194,36 +257,46 @@ pub enum EventMsg { AcquisitionStartedSector { sector_id: u8, frame_id: u32, + acquisition_id: usize, }, /// Send when any sector has started the acquisition AcquisitionStarted { frame_id: u32, params: AcquisitionParams, + acquisition_id: usize, }, /// Send when a acquisition should be started Arm { params: AcquisitionParams, + acquisition_id: usize, }, /// Send when the sector receiver threads should start acquiring data ArmSectors { params: AcquisitionParams, + acquisition_id: usize, }, /// Send when the acquisition has ended (successfully or with an error) // FIXME: need to distinguish cases! - AcquisitionEnded, + AcquisitionEnded { + acquisition_id: usize, + }, /// Send when the final consumer is done processing all data - ProcessingDone, + ProcessingDone { + acquisition_id: usize, + }, /// Send when the currently running acquisition should be stopped. /// Depending on the `AcquisitionSize`, this can either be before finishing /// acquisition of the fixed number of frames, or for continuous, /// this successfully finishes the acquisition. - CancelAcquisition, + CancelAcquisition { + acquisition_id: usize, + }, /// Generic fatal acquisition error AcquisitionError { diff --git a/k2o/src/frame.rs b/k2o/src/frame.rs index 9ea558c4..aa6c6f44 100644 --- a/k2o/src/frame.rs +++ b/k2o/src/frame.rs @@ -25,17 +25,27 @@ pub trait FrameForWriting: Sized { Self::FRAME_WIDTH * Self::FRAME_HEIGHT * std::mem::size_of::() } - fn empty(frame_id: u32, shm: &mut SharedSlabAllocator) -> Self { - Self::empty_with_timestamp::(frame_id, &Instant::now(), shm) + fn empty(frame_id: u32, shm: &mut SharedSlabAllocator, acquisition_id: usize) -> Self { + Self::empty_with_timestamp::(frame_id, &Instant::now(), shm, acquisition_id) } - fn empty_from_block(block: &B, shm: &mut SharedSlabAllocator) -> Self { - Self::empty_with_timestamp::(block.get_frame_id(), &block.get_decoded_timestamp(), shm) + fn empty_from_block( + block: &B, + shm: &mut SharedSlabAllocator, + acquisition_id: usize, + ) -> Self { + Self::empty_with_timestamp::( + block.get_frame_id(), + &block.get_decoded_timestamp(), + shm, + acquisition_id, + ) } fn empty_with_timestamp( frame_id: u32, ts: &Instant, shm: &mut SharedSlabAllocator, + acquisition_id: usize, ) -> Self; fn writing_done(self, shm: &mut SharedSlabAllocator) -> Self::ReadOnlyFrame; @@ -141,6 +151,8 @@ pub trait FrameForWriting: Sized { // update "mtime" self.set_modified_timestamp(&Instant::now()); } + + fn get_acquisition_id(&self) -> usize; } pub trait K2Frame: Send { @@ -168,6 +180,43 @@ pub trait K2Frame: Send { fn get_num_subframes(binning: &Binning) -> u32; fn subframe_indexes(&self, binning: &Binning) -> Range; fn get_subframe(&self, index: u32, binning: &Binning, shm: &SharedSlabAllocator) -> SubFrame; + fn get_acquisition_id(&self) -> usize; + + /// to grab an independent copy of only some metadata of the frame + fn get_meta(&self) -> FrameMeta { + FrameMeta { + acquisition_id: self.get_acquisition_id(), + frame_id: self.get_frame_id(), + created_timestamp: self.get_created_timestamp(), + } + } +} + +pub struct FrameMeta { + acquisition_id: usize, + frame_id: u32, + created_timestamp: Instant, +} + +impl FrameMeta { + pub fn new(acquisition_id: usize, frame_id: u32, created_timestamp: Instant) -> Self { + Self { + acquisition_id, + frame_id, + created_timestamp, + } + } + pub fn get_acquisition_id(&self) -> usize { + self.acquisition_id + } + + pub fn get_created_timestamp(&self) -> Instant { + self.created_timestamp + } + + pub fn get_frame_id(&self) -> u32 { + self.frame_id + } } pub struct SubFrame { @@ -231,6 +280,9 @@ pub struct GenericFrame { /// when the last block was received, to handle dropped packets pub modified_timestamp: Instant, + + /// Acquisition id/generation + pub acquisition_id: usize, } impl GenericFrame { @@ -239,12 +291,14 @@ impl GenericFrame { frame_id: u32, created_timestamp: Instant, modified_timestamp: Instant, + acquisition_id: usize, ) -> GenericFrame { GenericFrame { payload, frame_id, created_timestamp, modified_timestamp, + acquisition_id, } } diff --git a/k2o/src/frame_is.rs b/k2o/src/frame_is.rs index 0cc70a90..205e4222 100644 --- a/k2o/src/frame_is.rs +++ b/k2o/src/frame_is.rs @@ -27,6 +27,9 @@ pub struct K2ISFrameForWriting { /// when the last block was received, to handle dropped packets pub modified_timestamp: Instant, + + /// Acquisition id/generation + pub acquisition_id: usize, } impl FrameForWriting for K2ISFrameForWriting { @@ -34,6 +37,7 @@ impl FrameForWriting for K2ISFrameForWriting { frame_id: u32, ts: &Instant, shm: &mut SharedSlabAllocator, + acquisition_id: usize, ) -> Self { let mut payload = shm.get_mut().expect("get free SHM slot"); assert!(payload.size >= Self::FRAME_WIDTH * Self::FRAME_HEIGHT); @@ -58,6 +62,7 @@ impl FrameForWriting for K2ISFrameForWriting { created_timestamp: *ts, modified_timestamp: Instant::now(), subframe_idx: 0, + acquisition_id, } } @@ -94,6 +99,7 @@ impl FrameForWriting for K2ISFrameForWriting { frame_id: self.frame_id, created_timestamp: self.created_timestamp, modified_timestamp: self.modified_timestamp, + acquisition_id: self.acquisition_id, } } @@ -119,6 +125,10 @@ impl FrameForWriting for K2ISFrameForWriting { let slot_as_u16: &[u16] = bytemuck::cast_slice(self.payload.as_slice()); slot_as_u16 } + + fn get_acquisition_id(&self) -> usize { + self.acquisition_id + } } impl K2ISFrameForWriting { @@ -138,6 +148,7 @@ impl K2ISFrameForWriting { frame_id: self.frame_id, created_timestamp: self.created_timestamp, modified_timestamp: self.modified_timestamp, + acquisition_id: self.acquisition_id, } } } @@ -156,6 +167,9 @@ pub struct K2ISFrame { /// when the last block was received, to handle dropped packets pub modified_timestamp: Instant, + + /// Acquisition id/generation + pub acquisition_id: usize, } impl K2Frame for K2ISFrame { @@ -224,6 +238,7 @@ impl K2Frame for K2ISFrame { self.frame_id, self.created_timestamp, self.modified_timestamp, + self.acquisition_id, ) } @@ -239,4 +254,8 @@ impl K2Frame for K2ISFrame { fn get_frame_id(&self) -> u32 { self.frame_id } + + fn get_acquisition_id(&self) -> usize { + self.acquisition_id + } } diff --git a/k2o/src/frame_summit.rs b/k2o/src/frame_summit.rs index 73be9b26..4c58b3f5 100644 --- a/k2o/src/frame_summit.rs +++ b/k2o/src/frame_summit.rs @@ -22,6 +22,9 @@ pub struct K2SummitFrame { /// when the last block was received, to handle dropped packets pub modified_timestamp: Instant, + + /// Acquisition id/generation + pub acquisition_id: usize, } pub struct K2SummitFrameForWriting { @@ -41,6 +44,9 @@ pub struct K2SummitFrameForWriting { /// when the last block was received, to handle dropped packets pub modified_timestamp: Instant, + + /// Acquisition id/generation + pub acquisition_id: usize, } impl K2SummitFrameForWriting {} @@ -70,6 +76,7 @@ impl FrameForWriting for K2SummitFrameForWriting { frame_id: u32, ts: &Instant, shm: &mut SharedSlabAllocator, + acquisition_id: usize, ) -> Self { let mut payload = shm.get_mut().expect("get free SHM slot"); assert!(payload.size >= Self::FRAME_WIDTH * Self::FRAME_HEIGHT); @@ -93,6 +100,7 @@ impl FrameForWriting for K2SummitFrameForWriting { created_timestamp: *ts, modified_timestamp: Instant::now(), subframe_idx: 0, + acquisition_id, } } @@ -132,8 +140,13 @@ impl FrameForWriting for K2SummitFrameForWriting { frame_id: self.frame_id, created_timestamp: self.created_timestamp, modified_timestamp: self.modified_timestamp, + acquisition_id: self.acquisition_id, } } + + fn get_acquisition_id(&self) -> usize { + self.acquisition_id + } } impl K2Frame for K2SummitFrame { @@ -198,6 +211,11 @@ impl K2Frame for K2SummitFrame { self.frame_id, self.created_timestamp, self.modified_timestamp, + self.acquisition_id, ) } + + fn get_acquisition_id(&self) -> usize { + self.acquisition_id + } } diff --git a/k2o/src/helpers.rs b/k2o/src/helpers.rs index 9904f411..363f00f1 100644 --- a/k2o/src/helpers.rs +++ b/k2o/src/helpers.rs @@ -66,7 +66,7 @@ pub fn recv_single(sector_id: u8) -> B { let mut buf: [u8; PACKET_SIZE] = [0; PACKET_SIZE]; let (number_of_bytes, _src_addr) = socket.recv_from(&mut buf).expect("recv_from failed"); assert_eq!(number_of_bytes, PACKET_SIZE); - B::from_bytes(&buf, sector_id) + B::from_bytes(&buf, sector_id, 0) } /// diff --git a/k2o/src/params.rs b/k2o/src/params.rs index 767ed1a1..095e1e6b 100644 --- a/k2o/src/params.rs +++ b/k2o/src/params.rs @@ -1,5 +1,5 @@ #[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub enum Mode { +pub enum CameraMode { IS, Summit, } diff --git a/k2o/src/recv.rs b/k2o/src/recv.rs index 9bc25baf..29a58cce 100644 --- a/k2o/src/recv.rs +++ b/k2o/src/recv.rs @@ -1,7 +1,7 @@ use std::{io::ErrorKind, time::Duration}; use crossbeam_channel::{Receiver, Sender, TryRecvError}; -use log::{error, info, warn}; +use log::{debug, error, info, warn}; use crate::{ block::{BlockRouteInfo, K2Block}, @@ -17,18 +17,39 @@ enum RecvState { /// Receiving and decoding blocks, but not passing them on downstream to assembly, /// goes to `Receiving` state once a block with sync flag has been received - WaitForSync, + WaitForSync { acquisition_id: usize }, /// Start receiving and decoding at the next block, regardless of sync status - WaitForNext, + WaitForNext { acquisition_id: usize }, /// Recveicing and decoding packets, and sending the decoded packets down /// the pipeline - Receiving, + Receiving { acquisition_id: usize }, +} + +fn block_for_bytes( + buf: &[u8], + chan: &Receiver, + acquisition_id: usize, + sector_id: u8, +) -> B { + let block: B = { + let maybe_block = chan.try_recv(); + match maybe_block { + Err(_) => B::from_bytes(buf, sector_id, acquisition_id), + Ok(mut b) => { + b.replace_with(buf, sector_id, acquisition_id); + b + } + } + }; + block.validate(); + block } /// receive and decode a block from the specified sector, and send it to the /// central assembly thread +#[allow(clippy::too_many_arguments)] pub fn recv_decode_loop( sector_id: u8, port: u32, @@ -61,11 +82,14 @@ pub fn recv_decode_loop( loop { match events_rx.try_recv() { - Ok(EventMsg::ArmSectors { params }) => { - info!("sector {sector_id} waiting for acquisition"); + Ok(EventMsg::ArmSectors { + params, + acquisition_id, + }) => { + debug!("sector {sector_id} waiting for acquisition {acquisition_id}"); state = match params.sync { - AcquisitionSync::Immediately => RecvState::WaitForNext, - AcquisitionSync::WaitForSync => RecvState::WaitForSync, + AcquisitionSync::Immediately => RecvState::WaitForNext { acquisition_id }, + AcquisitionSync::WaitForSync => RecvState::WaitForSync { acquisition_id }, }; } Ok(EventMsg::Shutdown {}) => break, @@ -91,34 +115,39 @@ pub fn recv_decode_loop( continue; } - let block: B = { - let maybe_block = recycle_blocks_rx.try_recv(); - match maybe_block { - Err(_) => B::from_bytes(&buf, sector_id), - Ok(mut b) => { - b.replace_with(&buf, sector_id); - b - } - } - }; - block.validate(); - match state { - RecvState::WaitForNext => { + RecvState::WaitForNext { acquisition_id } => { + let block = block_for_bytes(&buf, recycle_blocks_rx, acquisition_id, sector_id); events.send(&EventMsg::AcquisitionStartedSector { sector_id, frame_id: block.get_frame_id(), + acquisition_id, }); - state = RecvState::Receiving; + state = RecvState::Receiving { acquisition_id }; } - RecvState::WaitForSync if block.sync_is_set() => { - events.send(&EventMsg::AcquisitionStartedSector { - sector_id, - frame_id: block.get_frame_id(), - }); - state = RecvState::Receiving; + RecvState::WaitForSync { acquisition_id } => { + let block = block_for_bytes(&buf, recycle_blocks_rx, acquisition_id, sector_id); + if block.sync_is_set() { + events.send(&EventMsg::AcquisitionStartedSector { + sector_id, + frame_id: block.get_frame_id(), + acquisition_id, + }); + state = RecvState::Receiving { acquisition_id }; + } else { + let block = block_for_bytes(&buf, recycle_blocks_rx, acquisition_id, sector_id); + // recycle blocks directly if we don't forward them to the frame + // assembly thread: + recycle_blocks_tx.send(block).unwrap(); + + // FIXME: this is an opportunity to de-allocate + // memory, if we had buffered "too much" beforehand. + // Add a "high watermark" queue fill level and only recycle + // blocks if we are below. + } } - RecvState::Receiving => { + RecvState::Receiving { acquisition_id } => { + let block = block_for_bytes(&buf, recycle_blocks_rx, acquisition_id, sector_id); let route_info = BlockRouteInfo::new(&block); let l = assembly_channel.len(); if l > 400 && l % 100 == 0 { @@ -135,15 +164,10 @@ pub fn recv_decode_loop( break; } } - RecvState::Idle | RecvState::WaitForSync => { - // recycle blocks directly if we don't forward them to the frame - // assembly thread: + RecvState::Idle => { + // we use a "fake" acquisition id here, because there is no acquisition running: + let block = block_for_bytes(&buf, recycle_blocks_rx, 0, sector_id); recycle_blocks_tx.send(block).unwrap(); - - // FIXME: this is an opportunity to de-allocate - // memory, if we had buffered "too much" beforehand. - // Add a "high watermark" queue fill level and only recycle - // blocks if we are below. } } } diff --git a/k2o/src/write.rs b/k2o/src/write.rs index 4d09688f..16927442 100644 --- a/k2o/src/write.rs +++ b/k2o/src/write.rs @@ -13,12 +13,14 @@ use log::{info, trace}; use memmap2::{MmapMut, MmapOptions}; use ndarray::s; use ndarray_npy::{write_zeroed_npy, ViewMutNpyExt}; +use opentelemetry::trace::Tracer; use crate::{ dio::open_direct, events::AcquisitionSize, frame::SubFrame, helpers::{preallocate, AllocateMode, Shape2, Shape3}, + tracing::get_tracer, }; #[derive(Debug)] @@ -107,12 +109,15 @@ impl Writer for DirectWriter { } fn resize(&mut self, num_frames: usize) -> Result<(), WriterError> { - preallocate( - &self.filename, - self.frame_size_bytes, - num_frames, - AllocateMode::AllocateOnly, - ); + let tracer = get_tracer(); + tracer.in_span("DirectWriter::resize", |_cx| { + preallocate( + &self.filename, + self.frame_size_bytes, + num_frames, + AllocateMode::AllocateOnly, + ); + }); Ok(()) } } @@ -185,19 +190,22 @@ impl Writer for MMapWriter { } fn resize(&mut self, num_frames: usize) -> Result<(), WriterError> { - preallocate( - &self.filename, - self.frame_size_bytes, - num_frames, - AllocateMode::ZeroFill, - ); - let file = OpenOptions::new() - .read(true) - .write(true) - .open(&self.filename) - .unwrap(); - let mmap = unsafe { MmapOptions::new().map_mut(&file).unwrap() }; - self.mmap = mmap; + let tracer = get_tracer(); + tracer.in_span("MMapWriter::resize", |_cx| { + preallocate( + &self.filename, + self.frame_size_bytes, + num_frames, + AllocateMode::ZeroFill, + ); + let file = OpenOptions::new() + .read(true) + .write(true) + .open(&self.filename) + .unwrap(); + let mmap = unsafe { MmapOptions::new().map_mut(&file).unwrap() }; + self.mmap = mmap; + }); Ok(()) } } @@ -325,14 +333,8 @@ impl Writer for NoopWriter { pub struct NoopWriterBuilder {} impl NoopWriterBuilder { - fn new() -> Self { - Self {} - } -} - -impl Default for NoopWriterBuilder { - fn default() -> Self { - Self::new() + pub fn new() -> Box { + Box::new(Self {}) } } diff --git a/libertem_k2is/examples/fast_acq_integrate.py b/libertem_k2is/examples/fast_acq_integrate.py index 2b98bb14..55c3226f 100644 --- a/libertem_k2is/examples/fast_acq_integrate.py +++ b/libertem_k2is/examples/fast_acq_integrate.py @@ -2,7 +2,7 @@ This example demonstrates fast turnaround of multiple acquisitions, where: 1) Each acquisition is integrating potentially many frames, and -2) There is little down time between acquisitions, and +2) There is little down time between acquisitions, and 3) Each acquisition start point is properly synchronized, meaning it doesn't include data from any point in time before explicitly starting the acquisition. @@ -10,8 +10,8 @@ import time +import click import numpy as np -from libertem.common.buffers import zeros_aligned from libertem.common.tracing import maybe_setup_tracing from opentelemetry import trace @@ -23,69 +23,74 @@ tracer = trace.get_tracer("write_and_iterate") -def iterate(aq, cam_client): - frame_arr = zeros_aligned(aq.get_frame_shape(), dtype=np.uint16) +def iterate(aq, cam, cam_client, frame_arr): t0 = time.time() i = 0 try: - while frame := aq.get_next_frame(): + while frame := cam.get_next_frame(): i += 1 if frame.is_dropped(): print(f"dropped frame {frame.get_idx()}") continue - slot = aq.get_frame_slot(frame) + slot = cam.get_frame_slot(frame) frame_ref = cam_client.get_frame_ref(slot) mv = frame_ref.get_memoryview() payload = np.frombuffer(mv, dtype=np.uint16).reshape( - aq.get_frame_shape() + cam.get_frame_shape() ) - - payload.sum() - # print(payload.sum()) - + # frame_arr += payload cam_client.done(slot) - # print(arr.sum(), arr.shape, arr.dtype) finally: t1 = time.time() - print(f"stopping, got {i} frames in {t1-t0:.2f}s...") - aq.stop() + print(f"acquisition {aq} done, got {i} frames in {t1-t0:.2f}s...") + # np.save(f"/cachedata/alex/bar-sum-{aq.get_id()}.npy", frame_arr) -def main(): +@click.command +@click.option('--mode', default="summit", type=str) +@click.option('--num-parts', default=4, type=int) +@click.option('--frames-per-part', default=10, type=int) +def main(mode, num_parts, frames_per_part): maybe_setup_tracing("write_and_iterate") with tracer.start_as_current_span("main"): + if mode.lower() == "summit": + mode = Mode.Summit + elif mode.lower() == "is": + mode = Mode.IS + shm_socket = "/tmp/k2shm.socket" cam = Cam( local_addr_top="192.168.10.99", local_addr_bottom="192.168.10.99", - ) - - # writer = Writer( - # method="direct", - # # method="mmap", - # filename="/cachedata/alex/bar.raw", - # ) - shm_socket = "/tmp/k2shm.socket" - aqp = AcquisitionParams( - size=18, - # sync=Sync.WaitForSync, - sync=Sync.Immediately, - # writer=writer, - writer=None, + mode=mode, enable_frame_iterator=True, shm_path=shm_socket, - mode=Mode.Summit, ) - - aq = cam.make_acquisition(aqp) - aq.arm() - cam_client = CamClient(shm_socket) - aq.wait_for_start() - iterate(aq, cam_client) - - aq.wait_for_start() - iterate(aq, cam_client) + frame_arr = np.zeros(cam.get_frame_shape(), dtype=np.float32) + + try: + for i in range(num_parts): + # frame_arr[:] = 0 + writer = Writer( + method="direct", + # method="mmap", + filename=f"/cachedata/alex/bar-{i}.raw", + ) + aqp = AcquisitionParams( + size=frames_per_part, + # sync=Sync.WaitForSync, + sync=Sync.Immediately, + writer=writer, + # writer=None, + ) + aq = cam.make_acquisition(aqp) + print(f"acquisition {aq}") + cam.wait_for_start() + iterate(aq, cam, cam_client, frame_arr) + finally: + # this shuts down the runtime, backgrounds threads an all... + cam.stop() if __name__ == "__main__": diff --git a/libertem_k2is/src/bgthread.rs b/libertem_k2is/src/bgthread.rs index fea60e2a..050f00e7 100644 --- a/libertem_k2is/src/bgthread.rs +++ b/libertem_k2is/src/bgthread.rs @@ -14,10 +14,11 @@ use k2o::frame::{GenericFrame, K2Frame}; use k2o::frame_is::K2ISFrame; use k2o::frame_summit::K2SummitFrame; use k2o::helpers::{set_cpu_affinity, CPU_AFF_WRITER}; -use k2o::params::Mode; +use k2o::params::CameraMode; use k2o::recv::recv_decode_loop; use k2o::tracing::get_tracer; use k2o::write::WriterBuilder; +use log::{debug, info}; use opentelemetry::trace::Tracer; use opentelemetry::{global, Context}; #[derive(Debug, Clone)] @@ -53,7 +54,6 @@ fn k2_bg_thread< B: K2Block, >( events: &Events, - writer_builder: Box, addr_config: &AddrConfig, pump: MessagePump, writer_dest_channel: Sender>, // either going to the consumer, or back to the assembly @@ -142,7 +142,6 @@ fn k2_bg_thread< &writer_dest_channel, &writer_events_rx, events, - writer_builder, acq_shm, ); }) @@ -168,7 +167,6 @@ fn k2_bg_thread< /// pub fn start_bg_thread( events: Events, - writer_builder: Box, addr_config: AddrConfig, pump: MessagePump, @@ -186,7 +184,6 @@ pub fn start_bg_thread( let shm = SharedSlabAllocator::connect(&shm_handle.os_handle).expect("connect to shm"); k2_bg_thread::( &events, - writer_builder, &addr_config, pump, tx_from_writer, @@ -196,22 +193,6 @@ pub fn start_bg_thread( .expect("failed to start k2 background thread") } -pub struct AcquisitionRuntime { - // bg_thread is an Option so we are able to join by moving out of it - bg_thread: Option>, - main_events_tx: Sender, - main_events_rx: Receiver, - - /// This is where an "external" frame consumer gets their frames: - rx_writer_to_consumer: Receiver>, - - enable_frame_consumer: bool, - - state_tracker: StateTracker, - - shm: SharedSlabAllocator, -} - #[derive(Debug)] pub enum RuntimeError { Timeout, @@ -234,13 +215,46 @@ impl From for RuntimeError { } } +pub enum UpdateStateResult { + DidUpdate, + NoNewMessage, +} + +/// The `AcquisitionRuntime` starts and communicates with a background thread, +/// keeps track of the state via the `StateTracker`, and owns the shared memory +/// area. +/// +/// This runtime is kept alive over multiple acquisitions, and as such the +/// parameters (like IS/Summit mode, network settings, shm socket path, frame +/// iterator settings) cannot be changed without restarting the runtime. +/// +/// It is possible to change per-acquisition settings, though, like filename and +/// file writer settings, number of frames for the acquisition, and the camera +/// sync mode. +pub struct AcquisitionRuntime { + // bg_thread is an Option so we are able to join by moving out of it + bg_thread: Option>, + main_events_tx: Sender, + main_events_rx: Receiver, + + /// This is where an "external" frame consumer gets their frames: + rx_writer_to_consumer: Receiver>, + + enable_frame_consumer: bool, + + state_tracker: StateTracker, + + shm: SharedSlabAllocator, + + current_acquisition_id: usize, +} + impl AcquisitionRuntime { - pub fn new( - writer_builder: Box, + pub fn new( addr_config: &AddrConfig, enable_frame_consumer: bool, shm: SHMHandle, - mode: Mode, + mode: CameraMode, ) -> Self { let events: Events = ChannelEventBus::new(); let pump = MessagePump::new(&events); @@ -259,20 +273,18 @@ impl AcquisitionRuntime { // Frame consumer enabled -> after writing, the writer thread should send the frames // to the `tx_frame_consumer` channel match mode { - Mode::IS => Some(start_bg_thread::( + CameraMode::IS => Some(start_bg_thread::( events, - writer_builder, addr_config.clone(), pump, tx_writer_to_consumer, shm, )), - Mode::Summit => Some(start_bg_thread::< + CameraMode::Summit => Some(start_bg_thread::< K2SummitFrame, { K2SummitBlock::PACKET_SIZE }, >( events, - writer_builder, addr_config.clone(), pump, tx_writer_to_consumer, @@ -290,7 +302,8 @@ impl AcquisitionRuntime { // We currently don't wait for all threads, but that's fine, as it's // most important that the receiver for control messages is already // listening, as events are buffered in a channel: - loop { + let tracer = get_tracer(); + tracer.in_span("AcquisitionRuntime wait_for_init", |_cx| loop { match main_events_rx.recv_timeout(Duration::from_millis(5000)) { Ok(msg) => match state_tracker.set_state_from_msg(&msg) { Ok(AcquisitionState::Idle) => { @@ -307,7 +320,7 @@ impl AcquisitionRuntime { } Err(RecvTimeoutError::Disconnected) => panic!("error while waiting for init event"), } - } + }); let shm = SharedSlabAllocator::connect(&os_handle).expect("connect to shm"); @@ -319,20 +332,21 @@ impl AcquisitionRuntime { enable_frame_consumer, state_tracker, shm, + current_acquisition_id: 0, } } /// Receive at most one event from the event bus and update the state - pub fn update_state(&mut self) { + pub fn update_state(&mut self) -> UpdateStateResult { match self.main_events_rx.try_recv() { Ok(msg) => match self.state_tracker.set_state_from_msg(&msg) { - Ok(_) => {} + Ok(_) => UpdateStateResult::DidUpdate, Err(StateError::InvalidTransition { from, msg }) => { panic!("invalid state transition: from={from:?} msg={msg:?}") } }, Err(TryRecvError::Disconnected) => panic!("lost connection to background thread"), - Err(TryRecvError::Empty) => {} + Err(TryRecvError::Empty) => UpdateStateResult::NoNewMessage, } } @@ -356,14 +370,22 @@ impl AcquisitionRuntime { .recv_timeout(Duration::from_millis(100))?; // FIXME! frames are not yet ordered by index // so sentinels can come out of order (ugh!) - if matches!( - acquisition_result, - AcquisitionResult::DoneSuccess { .. } - | AcquisitionResult::DoneError { .. } - | AcquisitionResult::DoneAborted { .. } - ) { - self.main_events_tx.send(EventMsg::ProcessingDone)?; - } + match acquisition_result { + AcquisitionResult::DoneSuccess { + acquisition_id, + dropped: _, + } + | AcquisitionResult::DoneShuttingDown { acquisition_id } + | AcquisitionResult::DoneAborted { + acquisition_id, + dropped: _, + } => { + self.main_events_tx + .send(EventMsg::ProcessingDone { acquisition_id })?; + debug!("AcquisitionRuntime::get_next_frame: {acquisition_result:?}"); + } + _ => {} + }; Ok(acquisition_result) } @@ -388,11 +410,19 @@ impl AcquisitionRuntime { Some(frame_inner.into_slot(&self.shm).slot_idx) } - pub fn arm(&self, params: AcquisitionParams) -> Result<(), RuntimeError> { - self.main_events_tx.send(EventMsg::Arm { params })?; + pub fn arm(&mut self, params: AcquisitionParams) -> Result<(), RuntimeError> { + self.current_acquisition_id += 1; + self.main_events_tx.send(EventMsg::Arm { + params, + acquisition_id: self.current_acquisition_id, + })?; Ok(()) } + pub fn get_current_acquisition_id(&self) -> usize { + self.current_acquisition_id + } + pub fn stop(&mut self) -> Result<(), RuntimeError> { // FIXME: do we need to do anything special if an acquisition is // currently running? @@ -443,11 +473,15 @@ impl AcquisitionRuntime { { let deadline = Instant::now() + timeout; loop { - self.update_state(); + let update_result = self.update_state(); if pred(self) { return Some(()); } - std::thread::sleep(Duration::from_millis(10)); + // only sleep if there was no new message, so we can handle + // storms of events efficiently here: + if matches!(update_result, UpdateStateResult::NoNewMessage) { + std::thread::sleep(Duration::from_millis(1)); + } if Instant::now() > deadline { return None; } @@ -461,11 +495,14 @@ impl AcquisitionRuntime { /// Wait until the arm command succeeded, returning `()` for success and `None` for timeout. pub fn wait_for_arm(&mut self, timeout: Duration) -> Option<()> { - println!("wait_for_arm"); + debug!("wait_for_arm"); self.wait_predicate(timeout, |slf| { matches!( slf.state_tracker.state, - AcquisitionState::Armed { params: _ } + AcquisitionState::Armed { + params: _, + acquisition_id: _ + } ) }) } @@ -476,7 +513,8 @@ impl AcquisitionRuntime { slf.state_tracker.state, AcquisitionState::AcquisitionStarted { params: _, - frame_id: _ + frame_id: _, + acquisition_id: _, } ) }) @@ -490,12 +528,23 @@ impl AcquisitionRuntime { let (params, ref_frame_id) = match &self.state_tracker.state { AcquisitionState::Startup | AcquisitionState::Idle - | AcquisitionState::Armed { params: _ } + | AcquisitionState::Armed { + params: _, + acquisition_id: _, + } | AcquisitionState::Shutdown => { return None; } - AcquisitionState::AcquisitionStarted { params, frame_id } - | AcquisitionState::AcquisitionFinishing { params, frame_id } => (params, frame_id), + AcquisitionState::AcquisitionStarted { + params, + frame_id, + acquisition_id: _, + } + | AcquisitionState::AcquisitionFinishing { + params, + frame_id, + acquisition_id: _, + } => (params, frame_id), }; frame_in_acquisition(frame_id, *ref_frame_id, params) } diff --git a/libertem_k2is/src/lib.rs b/libertem_k2is/src/lib.rs index 030f7049..bff4c2f9 100644 --- a/libertem_k2is/src/lib.rs +++ b/libertem_k2is/src/lib.rs @@ -1,6 +1,7 @@ mod bgthread; mod shm_helpers; +use env_logger::Builder; use ipc_test::SharedSlabAllocator; use shm_helpers::{CamClient, FrameRef}; use std::{ @@ -17,11 +18,11 @@ use k2o::{ acquisition::AcquisitionResult, block::K2Block, block_is::K2ISBlock, - events::{AcquisitionParams, AcquisitionSync}, + events::{AcquisitionParams, AcquisitionSync, WriterSettings, WriterTypeError}, frame::{GenericFrame, K2Frame}, frame_is::K2ISFrame, frame_summit::K2SummitFrame, - params::Mode, + params::CameraMode, tracing::{get_tracer, init_tracer}, write::{DirectWriterBuilder, MMapWriterBuilder, WriterBuilder}, }; @@ -126,7 +127,10 @@ fn k2opy(_py: Python, m: &PyModule) -> PyResult<()> { let env = env_logger::Env::default() .filter_or("LIBERTEM_K2IS_LOG_LEVEL", "error") .write_style_or("LIBERTEM_K2IS_LOG_STYLE", "always"); - env_logger::init_from_env(env); + Builder::new() + .parse_env(env) + .format_timestamp_micros() + .init(); Ok(()) } @@ -155,20 +159,20 @@ enum PyMode { Summit, } -impl From for PyMode { - fn from(value: Mode) -> Self { +impl From for PyMode { + fn from(value: CameraMode) -> Self { match value { - Mode::IS => PyMode::IS, - Mode::Summit => PyMode::Summit, + CameraMode::IS => PyMode::IS, + CameraMode::Summit => PyMode::Summit, } } } -impl Into for PyMode { - fn into(self) -> Mode { +impl Into for PyMode { + fn into(self) -> CameraMode { match self { - PyMode::IS => Mode::IS, - PyMode::Summit => Mode::Summit, + PyMode::IS => CameraMode::IS, + PyMode::Summit => CameraMode::Summit, } } } @@ -176,35 +180,30 @@ impl Into for PyMode { #[pyclass(name = "Writer")] #[derive(Debug, Clone)] struct PyWriter { - pub filename: String, - pub method: String, + pub settings: WriterSettings, } impl PyWriter { - fn get_writer_builder(&self, filename: &str) -> PyResult> { - let wb: Box = match self.method.as_str() { - "direct" => DirectWriterBuilder::for_filename(filename), - "mmap" => MMapWriterBuilder::for_filename(filename), - #[cfg(feature = "hdf5")] - "hdf5" => HDF5WriterBuilder::for_filename(filename), - _ => { - let meth = &self.method; - let msg = format!("unknown method {meth}, choose one: mmap, direct, hdf5"); - return Err(exceptions::PyValueError::new_err(msg)); - } - }; - Ok(wb) + pub fn get_setttings(&self) -> &WriterSettings { + &self.settings } } #[pymethods] impl PyWriter { #[new] - fn new(filename: &str, method: &str) -> Self { - PyWriter { - filename: filename.to_string(), - method: method.to_string(), - } + fn new(filename: &str, method: &str) -> PyResult { + let settings = match WriterSettings::new(method, filename) { + Ok(s) => s, + Err(WriterTypeError::InvalidWriterType) => { + let msg = format!( + "unknown method {method}, choose one: mmap, direct, hdf5 (optional feature)" + ); + return Err(exceptions::PyValueError::new_err(msg)); + } + }; + + Ok(PyWriter { settings }) } } @@ -213,30 +212,21 @@ impl PyWriter { struct PyAcquisitionParams { pub size: Option, pub sync: SyncFlags, - pub mode: PyMode, - pub writer: Option, - pub enable_frame_iterator: bool, - pub shm_path: String, + pub writer_settings: WriterSettings, } #[pymethods] impl PyAcquisitionParams { #[new] - fn new( - size: Option, - sync: SyncFlags, - writer: Option, - mode: Option, - enable_frame_iterator: bool, - shm_path: String, - ) -> Self { + fn new(size: Option, sync: SyncFlags, writer: Option) -> Self { + let writer_settings = match writer { + None => WriterSettings::disabled(), + Some(py_writer) => py_writer.get_setttings().clone(), + }; PyAcquisitionParams { size, sync, - writer, - mode: mode.unwrap_or_default(), - enable_frame_iterator, - shm_path, + writer_settings, } } } @@ -262,41 +252,36 @@ impl PyFrame { #[pymethods] impl PyFrame { - fn is_dropped(slf: PyRef) -> bool { + fn is_dropped(&self) -> bool { matches!( - &slf.acquisition_result, + &self.acquisition_result, Some(AcquisitionResult::DroppedFrame(..)) | Some(AcquisitionResult::DroppedFrameOutside(..)) ) } - fn get_idx(slf: PyRef) -> u32 { - slf.frame_idx + fn get_idx(&self) -> u32 { + self.frame_idx } } /// -/// An `Acquisition` is an object representing acquisition parameters, +/// An `Acquisition` is an object to perform a single acquisition, that is, +/// acquire a potentially unlimited number of frames and iterate over them or +/// write them to disk. #[pyclass] struct Acquisition { params: PyAcquisitionParams, - addr_config: AddrConfig, - - runtime: Option, - shm: Option, + camera_mode: CameraMode, + id: usize, } impl Acquisition { - pub fn new( - params: PyAcquisitionParams, - addr_config: &AddrConfig, - shm: SharedSlabAllocator, - ) -> Self { + pub fn new(params: PyAcquisitionParams, camera_mode: CameraMode, id: usize) -> Self { Acquisition { params, - addr_config: addr_config.clone(), - runtime: None, - shm: Some(shm), + camera_mode, + id, } } } @@ -310,7 +295,68 @@ impl From for PyErr { #[pymethods] impl Acquisition { - fn wait_for_start(mut slf: PyRefMut, py: Python) -> PyResult<()> { + fn get_id(&self) -> usize { + self.id + } + + fn __repr__(&self) -> String { + let id = self.id; + format!("") + } +} + +#[pyclass] +struct Cam { + addr_config: AddrConfig, + camera_mode: PyMode, + shm_path: String, + enable_frame_iterator: bool, + shm: Option, + runtime: Option, +} + +#[pymethods] +impl Cam { + #[new] + fn new( + local_addr_top: &str, + local_addr_bottom: &str, + mode: PyMode, + shm_path: &str, + enable_frame_iterator: bool, + py: Python, + ) -> PyResult { + let _guard = span_from_py(py, "Cam::new")?; + + let path = Path::new(&shm_path); + let (num_slots, slot_size) = match mode { + PyMode::IS => (2000, 2048 * 1860 * 2), + PyMode::Summit => (500, 4096 * 3840 * 2), + }; + let tracer = get_tracer(); + let shm = tracer.in_span("Cam shm_setup", |_cx| { + SharedSlabAllocator::new(num_slots, slot_size, true, path).expect("create shm") + }); + let addr_config = AddrConfig::new(local_addr_top, local_addr_bottom); + + let runtime = AcquisitionRuntime::new( + &addr_config, + enable_frame_iterator, + shm.get_handle(), + mode.into(), + ); + + Ok(Cam { + addr_config, + camera_mode: mode, + shm_path: shm_path.to_owned(), + enable_frame_iterator, + shm: Some(shm), + runtime: Some(runtime), + }) + } + + fn wait_for_start(&mut self, py: Python) -> PyResult<()> { // TODO: total deadline for initialization? // TODO: currently the API can be used wrong, i.e. calling // `get_next_frame` before this function means it will throw away @@ -319,8 +365,9 @@ impl Acquisition { // an object from this function which is the actual frame iterator. Or // we should make the wait implicit and integrate it into // `get_next_frame`... hmm. + let _guard = span_from_py(py, "Cam::wait_for_start")?; - if let Some(runtime) = &mut slf.runtime { + if let Some(runtime) = &mut self.runtime { loop { if runtime.wait_for_start(Duration::from_millis(100)).is_some() { break; @@ -336,13 +383,72 @@ impl Acquisition { Ok(()) } - fn get_next_frame(mut slf: PyRefMut, py: Python) -> PyResult> { - if !slf.params.enable_frame_iterator { + /// + /// Wait for the current acquisition to complete. This is only needed + /// if we are only writing to a file, and not consuming the + /// frame iterator, which takes care of this synchronization + /// otherwise. + /// + /// Also succeeds if the runtime is already shut down. + /// + // FIXME: timeout? + fn wait_until_complete(&mut self, py: Python) -> PyResult<()> { + let _guard = span_from_py(py, "Acquisition::wait_until_complete")?; + + if let Some(runtime) = &mut self.runtime { + loop { + match runtime.wait_until_complete(Duration::from_millis(100)) { + Some(_) => return Ok(()), + None => { + py.check_signals()?; + } + } + } + } + Ok(()) + } + + fn stop(&mut self, timeout: Option, py: Python) -> PyResult<()> { + let _guard = span_from_py(py, "Acquisition::stop")?; + + let timeout_float: f32 = timeout.unwrap_or(30_f32); + match &mut self.runtime { + None => Err(exceptions::PyRuntimeError::new_err( + "trying to stop while not running", + )), + Some(runtime) => { + if runtime.stop().is_err() { + return Err(exceptions::PyRuntimeError::new_err( + "connection to background thread lost", + )); + } + let timeout = Duration::from_secs_f32(timeout_float); + let deadline = Instant::now() + timeout; + while Instant::now() < deadline { + if runtime.try_join().is_some() { + self.shm.take(); + return Ok(()); + } + std::thread::sleep(Duration::from_millis(100)); + py.check_signals()?; + } + // deadline exceeded + Err(exceptions::PyRuntimeError::new_err( + "timeout while waiting for background thread to stop", + )) + } + } + } + + fn get_next_frame(&mut self, py: Python) -> PyResult> { + let _guard = span_from_py(py, "Cam::get_next_frame")?; + + if !self.enable_frame_iterator { return Err(exceptions::PyRuntimeError::new_err( "get_next_frame called without enable_frame_iterator", )); } - if let Some(runtime) = &mut slf.runtime { + if let Some(runtime) = &mut self.runtime { loop { runtime.update_state(); match runtime.get_next_frame() { @@ -371,13 +477,20 @@ impl Acquisition { AcquisitionResult::DroppedFrameOutside(_) => { runtime.frame_done(result)?; } - AcquisitionResult::DoneError => { - // FIXME: propagate error as python exception? - panic!("WHAT?!"); - return Ok(None); + AcquisitionResult::ShutdownIdle + | AcquisitionResult::DoneShuttingDown { acquisition_id: _ } => { + return Err(exceptions::PyRuntimeError::new_err( + "acquisition runtime is shutting down", + )); + } + AcquisitionResult::DoneAborted { + dropped, + acquisition_id, } - AcquisitionResult::DoneAborted { dropped } - | AcquisitionResult::DoneSuccess { dropped } => { + | AcquisitionResult::DoneSuccess { + dropped, + acquisition_id, + } => { eprintln!("dropped {dropped} frames in this acquisition"); return Ok(None); } @@ -392,8 +505,8 @@ impl Acquisition { } } - fn frame_done(mut slf: PyRefMut, frame: &mut PyFrame) -> PyResult<()> { - if let Some(runtime) = &mut slf.runtime { + fn frame_done(&mut self, frame: &mut PyFrame) -> PyResult<()> { + if let Some(runtime) = &mut self.runtime { runtime.frame_done(frame.consume_frame_data())?; Ok(()) } else { @@ -403,8 +516,8 @@ impl Acquisition { } } - fn get_frame_slot(mut slf: PyRefMut, frame: &mut PyFrame) -> PyResult { - if let Some(runtime) = &mut slf.runtime { + fn get_frame_slot(&mut self, frame: &mut PyFrame) -> PyResult { + if let Some(runtime) = &mut self.runtime { let slot = runtime.get_frame_slot(frame.consume_frame_data()); if let Some(idx) = slot { Ok(idx) @@ -421,147 +534,51 @@ impl Acquisition { } fn get_frame_shape(&self) -> (usize, usize) { - match self.params.mode { + match self.camera_mode { PyMode::IS => (1860, 2048), PyMode::Summit => (3840, 4096), } } - /// - /// Start the receiver. Starts receiver threads and, if configured, - /// instantiates a writer. - /// - /// Immediately returns after initialization, but the frame iterator - /// will block until the data actually arrives. - /// - fn arm(mut slf: PyRefMut, py: Python) -> PyResult<()> { - let _guard = span_from_py(py, "Acquisition::arm")?; - - let wb = if let Some(writer) = &slf.params.writer { - writer.get_writer_builder(&writer.filename)? - } else { - Box::::default() - }; - - let sync: AcquisitionSync = slf.params.sync.clone().into(); - let p: AcquisitionParams = AcquisitionParams { - size: slf.params.size.into(), - sync, + /// Arm the runtime for the next acquisition + fn make_acquisition( + &mut self, + py: Python, + params: PyAcquisitionParams, + ) -> PyResult { + let _guard = span_from_py(py, "Cam::make_acquisition")?; + let p = params.clone(); + let acq_params = AcquisitionParams { + size: p.size.into(), + sync: p.sync.into(), binning: k2o::events::Binning::Bin1x, + writer_settings: p.writer_settings, }; - let mode = slf.params.mode; - let mut runtime = if let Some(shm) = &slf.shm { - AcquisitionRuntime::new::<{ ::PACKET_SIZE }>( - wb, - &slf.addr_config, - slf.params.enable_frame_iterator, - shm.get_handle(), - mode.into(), - ) - } else { - return Err(exceptions::PyRuntimeError::new_err( - "invalid state - no shared memory connection available", - )); - }; - if runtime.arm(p).is_err() { - return Err(exceptions::PyRuntimeError::new_err( - "connection to background thread lost", - )); - } - loop { - py.check_signals()?; - if runtime.wait_for_arm(Duration::from_millis(100)).is_some() { - break; + if let Some(runtime) = &mut self.runtime { + if runtime.arm(acq_params).is_err() { + return Err(exceptions::PyRuntimeError::new_err( + "connection to background thread lost", + )); } - } - slf.runtime = Some(runtime); - Ok(()) - } + let tracer = get_tracer(); - fn stop(mut slf: PyRefMut, timeout: Option, py: Python) -> PyResult<()> { - let _guard = span_from_py(py, "Acquisition::stop")?; - - let timeout_float: f32 = timeout.unwrap_or(30_f32); - match &mut slf.runtime { - None => Err(exceptions::PyRuntimeError::new_err( - "trying to stop while not running", - )), - Some(runtime) => { - if runtime.stop().is_err() { - return Err(exceptions::PyRuntimeError::new_err( - "connection to background thread lost", - )); - } - let timeout = Duration::from_secs_f32(timeout_float); - let deadline = Instant::now() + timeout; - while Instant::now() < deadline { - if runtime.try_join().is_some() { - slf.shm.take(); - return Ok(()); - } - std::thread::sleep(Duration::from_millis(100)); + tracer.in_span("AcquisitionRuntime::wait_for_arm", |_cx| -> PyResult<()> { + loop { py.check_signals()?; - } - // deadline exceeded - Err(exceptions::PyRuntimeError::new_err( - "timeout while waiting for background thread to stop", - )) - } - } - } - - /// - /// Wait for the acquisition to complete. This is only needed - /// if we are only writing to a file, and not consuming the - /// frame iterator, which takes care of this synchronization - /// otherwise. - /// - /// Also succeeds if the runtime is already shut down. - /// - // FIXME: timeout? - fn wait_until_complete(mut slf: PyRefMut, py: Python) -> PyResult<()> { - let _guard = span_from_py(py, "Acquisition::wait_until_complete")?; - - if let Some(runtime) = &mut slf.runtime { - loop { - match runtime.wait_until_complete(Duration::from_millis(100)) { - Some(_) => return Ok(()), - None => { - py.check_signals()?; + if runtime.wait_for_arm(Duration::from_millis(100)).is_some() { + return Ok(()); } } - } - } - Ok(()) - } -} - -#[pyclass] -struct Cam { - addr_config: AddrConfig, -} - -#[pymethods] -impl Cam { - #[new] - fn new(local_addr_top: &str, local_addr_bottom: &str) -> Self { - Cam { - addr_config: AddrConfig::new(local_addr_top, local_addr_bottom), + })?; + Ok(Acquisition::new( + params, + self.camera_mode.into(), + runtime.get_current_acquisition_id(), + )) + } else { + Err(exceptions::PyRuntimeError::new_err( + "invalid state - acquisition runtime not available", + )) } } - - fn make_acquisition( - slf: PyRef, - py: Python, - params: PyAcquisitionParams, - ) -> PyResult { - let _guard = span_from_py(py, "Cam::make_acquisition")?; - let path = Path::new(¶ms.shm_path); - let (num_slots, slot_size) = match params.mode { - PyMode::IS => (2000, 2048 * 1860 * 2), - PyMode::Summit => (500, 4096 * 3840 * 2), - }; - let shm = SharedSlabAllocator::new(num_slots, slot_size, true, path).expect("create shm"); - Ok(Acquisition::new(params, &slf.addr_config, shm)) - } } diff --git a/libertem_k2is/src/shm_helpers.rs b/libertem_k2is/src/shm_helpers.rs index 34bd5545..464b10e6 100644 --- a/libertem_k2is/src/shm_helpers.rs +++ b/libertem_k2is/src/shm_helpers.rs @@ -3,6 +3,8 @@ use std::ffi::c_int; use ipc_test::{SharedSlabAllocator, Slot}; use pyo3::{ffi::PyMemoryView_FromMemory, prelude::*, FromPyPointer}; +use crate::span_from_py; + #[allow(non_upper_case_globals)] const PyBUF_READ: c_int = 0x100; // somehow not exported by pyo3? oh no... @@ -31,9 +33,10 @@ pub struct CamClient { #[pymethods] impl CamClient { #[new] - fn new(socket_path: &str) -> Self { + fn new(socket_path: &str, py: Python) -> PyResult { + let _guard = span_from_py(py, "CamClient::new")?; let shm = SharedSlabAllocator::connect(socket_path).expect("connect to shm"); - CamClient { shm: Some(shm) } + Ok(CamClient { shm: Some(shm) }) } fn get_frame_ref(&self, py: Python, slot: usize) -> FrameRef {