From c250ff9d0090e119c9c930d7a53f0c1c803368a2 Mon Sep 17 00:00:00 2001 From: Alexander Clausen Date: Wed, 22 Nov 2023 14:28:45 +0100 Subject: [PATCH] Make realtime for recv/asm a runtime configuration Extract some configuration into `AcquisitionRuntimeConfig` --- k2o/benches/assemble_benchmarks.rs | 21 +++++++- k2o/src/assemble.rs | 22 ++++++-- k2o/src/bin/main.rs | 4 +- k2o/src/bin/timestamp_recorder.rs | 3 +- k2o/src/recv.rs | 25 ++++++--- k2o/src/runtime.rs | 86 +++++++++++++++++++++--------- libertem_k2is/src/lib.rs | 20 ++++--- 7 files changed, 133 insertions(+), 48 deletions(-) diff --git a/k2o/benches/assemble_benchmarks.rs b/k2o/benches/assemble_benchmarks.rs index bed5a016..9bd3361e 100644 --- a/k2o/benches/assemble_benchmarks.rs +++ b/k2o/benches/assemble_benchmarks.rs @@ -1,21 +1,38 @@ use std::time::Duration; +use std::time::Instant; use criterion::BenchmarkId; use criterion::Throughput; use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use ipc_test::SharedSlabAllocator; use k2o::block::K2Block; use k2o::block_is::K2ISBlock; +use k2o::frame::FrameForWriting; use k2o::frame::K2Frame; use k2o::frame_is::K2ISFrame; +use k2o::frame_is::K2ISFrameForWriting; +use tempfile::tempdir; fn criterion_benchmark(c: &mut Criterion) { const PACKET_SIZE: usize = 0x5758; - const TOTAL_INPUT_SIZE: usize = 1 * PACKET_SIZE; + const TOTAL_INPUT_SIZE: usize = PACKET_SIZE; let block = K2ISBlock::empty(42, 0); - let mut frame: K2ISFrame = K2ISFrame::empty::(42, 0); + + let socket_dir = tempdir().unwrap(); + let socket_as_path = socket_dir.into_path().join("stuff.socket"); + + const FRAME_ID: u32 = 42; + let mut ssa = SharedSlabAllocator::new( + 10, + K2ISFrame::FRAME_HEIGHT * K2ISFrame::FRAME_WIDTH * std::mem::size_of::(), + false, + &socket_as_path, + ) + .expect("create SHM area for testing"); + let mut frame: K2ISFrameForWriting = K2ISFrameForWriting::empty(FRAME_ID, &mut ssa, 0); let mut assign_block = c.benchmark_group("assign_block* functions"); assign_block.measurement_time(Duration::new(10, 0)); diff --git a/k2o/src/assemble.rs b/k2o/src/assemble.rs index 9b53e2ba..c87f6dee 100644 --- a/k2o/src/assemble.rs +++ b/k2o/src/assemble.rs @@ -149,14 +149,17 @@ fn assembly_worker( stop_event: Arc, handle_path: &str, timeout: &Duration, + realtime: bool, ) -> Result<(), AssemblyError> { - let mut pending: PendingFrames = PendingFrames::new(timeout.clone()); + let mut pending: PendingFrames = PendingFrames::new(*timeout); let mut shm = SharedSlabAllocator::connect(handle_path).expect("connect to SHM"); - // match make_realtime(5) { - // Ok(prio) => info!("successfully enabled realtime priority {prio}"), - // Err(e) => error!("failed to set realtime priority: {e:?}"), - // } + if realtime { + match make_realtime(5) { + Ok(prio) => info!("successfully enabled realtime priority {prio}"), + Err(e) => error!("failed to set realtime priority: {e:?}"), + } + } loop { match blocks_rx.recv_timeout(Duration::from_millis(100)) { @@ -206,6 +209,7 @@ pub fn assembler_main( events_rx: EventReceiver, shm: SharedSlabAllocator, timeout: &Duration, + realtime: bool, ) { let pool_size = 5; let mut worker_channels: Vec> = Vec::with_capacity(pool_size); @@ -219,6 +223,13 @@ pub fn assembler_main( recycle_blocks_tx.send(B::empty(0, 0)).unwrap(); } + if realtime { + match make_realtime(5) { + Ok(prio) => info!("successfully enabled realtime priority {prio}"), + Err(e) => error!("failed to set realtime priority: {e:?}"), + } + } + crossbeam::scope(|s| { for idx in 0..pool_size { let asm_worker_ctx = ctx.clone(); @@ -240,6 +251,7 @@ pub fn assembler_main( this_stop_event, &shm_handle.os_handle, timeout, + realtime, ) .is_err() { diff --git a/k2o/src/bin/main.rs b/k2o/src/bin/main.rs index e30cbf31..3149d2f9 100644 --- a/k2o/src/bin/main.rs +++ b/k2o/src/bin/main.rs @@ -27,7 +27,7 @@ use k2o::frame_is::K2ISFrame; use k2o::frame_summit::K2SummitFrame; use k2o::helpers::CPU_AFF_WRITER; use k2o::helpers::{recv_and_get_init, set_cpu_affinity}; -use k2o::recv::recv_decode_loop; +use k2o::recv::{recv_decode_loop, RecvConfig}; use k2o::tracing::init_tracer; use log::info; use tokio::runtime::Runtime; @@ -80,6 +80,7 @@ fn start_threads< events, local_addr, None, + &RecvConfig::new(true), ); }) .expect("could not spawn recv+decode thread"); @@ -108,6 +109,7 @@ fn start_threads< asm_events_rx, asm_shm, &Duration::from_millis(100), + true, ); } }) diff --git a/k2o/src/bin/timestamp_recorder.rs b/k2o/src/bin/timestamp_recorder.rs index 77c008fc..b01fae82 100644 --- a/k2o/src/bin/timestamp_recorder.rs +++ b/k2o/src/bin/timestamp_recorder.rs @@ -13,7 +13,7 @@ use k2o::{ AcquisitionParams, AcquisitionSize, AcquisitionSync, ChannelEventBus, EventBus, EventMsg, Events, WriterSettings, }, - recv::recv_decode_loop, + recv::{recv_decode_loop, RecvConfig}, }; fn mean(data: &[u128]) -> Option { @@ -83,6 +83,7 @@ fn start_threads< events, local_addr, None, + &RecvConfig::new(true), ); }) .expect("could not spawn recv+decode thread"); diff --git a/k2o/src/recv.rs b/k2o/src/recv.rs index 9170dbe3..20eb6a35 100644 --- a/k2o/src/recv.rs +++ b/k2o/src/recv.rs @@ -1,9 +1,6 @@ use std::{ io::ErrorKind, - sync::{ - atomic::{AtomicU8, Ordering}, - Arc, Barrier, Condvar, Mutex, - }, + sync::{Arc, Condvar, Mutex}, time::Duration, }; @@ -17,6 +14,17 @@ use crate::{ net::create_mcast_socket, }; +#[derive(Debug, Clone)] +pub struct RecvConfig { + pub realtime: bool, +} + +impl RecvConfig { + pub fn new(realtime: bool) -> Self { + Self { realtime } + } +} + #[derive(PartialEq)] enum RecvState { Initializing, @@ -82,13 +90,16 @@ pub fn recv_decode_loop( events: &Events, local_addr: String, first_block_counter: Option, Condvar)>>, + config: &RecvConfig, ) { let socket = create_mcast_socket(port, "225.1.1.1", &local_addr); let mut buf: [u8; PACKET_SIZE] = [0; PACKET_SIZE]; - match make_realtime(50) { - Ok(prio) => info!("successfully enabled realtime priority {prio}"), - Err(e) => error!("failed to set realtime priority: {e:?}"), + if config.realtime { + match make_realtime(50) { + Ok(prio) => info!("successfully enabled realtime priority {prio}"), + Err(e) => error!("failed to set realtime priority: {e:?}"), + } } socket diff --git a/k2o/src/runtime.rs b/k2o/src/runtime.rs index 99670fb4..9c8d22ac 100644 --- a/k2o/src/runtime.rs +++ b/k2o/src/runtime.rs @@ -10,12 +10,11 @@ use crate::frame_is::K2ISFrame; use crate::frame_summit::K2SummitFrame; use crate::helpers::{set_cpu_affinity, CPU_AFF_WRITER}; use crate::params::CameraMode; -use crate::recv::recv_decode_loop; +use crate::recv::{recv_decode_loop, RecvConfig}; use crate::tracing::get_tracer; use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, SendError, Sender, TryRecvError}; use ipc_test::{SHMHandle, SharedSlabAllocator}; -use std::sync::atomic::{AtomicU8, Ordering}; -use std::sync::{Arc, Barrier, Condvar, Mutex}; +use std::sync::{Arc, Condvar, Mutex}; use std::thread::JoinHandle; use std::time::{Duration, Instant}; @@ -32,11 +31,12 @@ pub struct AddrConfig { #[derive(Debug, Clone)] pub struct AssemblyConfig { pub timeout: Duration, + pub realtime: bool, } impl AssemblyConfig { - fn new(timeout: Duration) -> Self { - Self { timeout } + fn new(timeout: Duration, realtime: bool) -> Self { + Self { timeout, realtime } } } @@ -44,6 +44,7 @@ impl Default for AssemblyConfig { fn default() -> Self { Self { timeout: Duration::from_millis(100), + realtime: true, } } } @@ -80,6 +81,7 @@ fn k2_bg_thread< writer_dest_channel: Sender>, // either going to the consumer, or back to the assembly shm: SharedSlabAllocator, asm_config: AssemblyConfig, + recv_config: RecvConfig, ) { let tracer = get_tracer(); tracer.in_span("start_threads", |_cx| { @@ -102,6 +104,7 @@ fn k2_bg_thread< let port = addr_config.port_for_sector(sector_id); let decode_ctx = ctx.clone(); let sector_counter = Arc::clone(&first_block_counter); + let recv_config = recv_config.clone(); s.builder() .name(format!("recv-decode-{}", sector_id)) .spawn(move |_| { @@ -116,6 +119,7 @@ fn k2_bg_thread< events, addr, Some(sector_counter), + &recv_config, ); }) .expect("spawn recv+decode thread"); @@ -145,6 +149,7 @@ fn k2_bg_thread< asm_events_rx, asm_shm, &asm_config.timeout, + asm_config.realtime, ); }) .expect("could not spawn assembly thread"); @@ -176,10 +181,12 @@ fn k2_bg_thread< let (lock, cvar) = &*first_block_counter; - cvar.wait_while(lock.lock().unwrap(), |counter| { - (*counter as usize) < ids.len() - }) - .unwrap(); + drop( + cvar.wait_while(lock.lock().unwrap(), |counter| { + (*counter as usize) < ids.len() + }) + .unwrap(), + ); events.send(&EventMsg::Init {}); control_loop(events, &Some(pump)); @@ -208,10 +215,12 @@ pub fn start_bg_thread( shm_handle: SHMHandle, asm_config: &AssemblyConfig, + recv_config: &RecvConfig, ) -> JoinHandle<()> { let thread_builder = std::thread::Builder::new(); let ctx = Context::current(); let asm_config = asm_config.clone(); + let recv_config = recv_config.clone(); thread_builder .name("k2_bg_thread".to_string()) .spawn(move || { @@ -224,6 +233,7 @@ pub fn start_bg_thread( tx_from_writer, shm, asm_config, + recv_config, ); }) .expect("failed to start k2 background thread") @@ -267,6 +277,33 @@ impl WaitResult { } } +#[derive(Debug, Clone)] +pub struct AcquisitionRuntimeConfig { + pub enable_frame_iterator: bool, + pub recv_realtime: bool, + pub assembly_realtime: bool, + pub mode: CameraMode, + pub addr_config: AddrConfig, +} + +impl AcquisitionRuntimeConfig { + pub fn new( + enable_frame_iterator: bool, + recv_realtime: bool, + assembly_realtime: bool, + mode: CameraMode, + addr_config: AddrConfig, + ) -> Self { + Self { + enable_frame_iterator, + recv_realtime, + assembly_realtime, + mode, + addr_config, + } + } +} + /// The `AcquisitionRuntime` starts and communicates with a background thread, /// keeps track of the state via the `StateTracker`, and owns the shared memory /// area. @@ -287,22 +324,17 @@ pub struct AcquisitionRuntime { /// This is where an "external" frame consumer gets their frames: rx_writer_to_consumer: Receiver>, - enable_frame_consumer: bool, - state_tracker: StateTracker, shm: SharedSlabAllocator, current_acquisition_id: usize, + + config: AcquisitionRuntimeConfig, } impl AcquisitionRuntime { - pub fn new( - addr_config: &AddrConfig, - enable_frame_consumer: bool, - shm: SHMHandle, - mode: CameraMode, - ) -> Self { + pub fn new(config: &AcquisitionRuntimeConfig, shm: SHMHandle) -> Self { let events: Events = ChannelEventBus::new(); let pump = MessagePump::new(&events); let (main_events_tx, main_events_rx) = pump.get_ext_channels(); @@ -316,29 +348,33 @@ impl AcquisitionRuntime { // // They are configured by wiring up channels in the correct way. let os_handle = shm.os_handle.clone(); - let bg_thread = if enable_frame_consumer { + let bg_thread = if config.enable_frame_iterator { // Frame consumer enabled -> after writing, the writer thread should send the frames // to the `tx_frame_consumer` channel - let asm_config = AssemblyConfig::new(Duration::from_millis(25)); - match mode { + let asm_config = + AssemblyConfig::new(Duration::from_millis(25), config.assembly_realtime); + let recv_config = RecvConfig::new(config.recv_realtime); + match config.mode { CameraMode::IS => Some(start_bg_thread::( events, - addr_config.clone(), + config.addr_config.clone(), pump, tx_writer_to_consumer, shm, &asm_config, + &recv_config, )), CameraMode::Summit => Some(start_bg_thread::< K2SummitFrame, { K2SummitBlock::PACKET_SIZE }, >( events, - addr_config.clone(), + config.addr_config.clone(), pump, tx_writer_to_consumer, shm, &asm_config, + &recv_config, )), } } else { @@ -379,10 +415,10 @@ impl AcquisitionRuntime { main_events_tx, main_events_rx, rx_writer_to_consumer, - enable_frame_consumer, state_tracker, shm, current_acquisition_id: 0, + config: config.clone(), } } @@ -412,7 +448,7 @@ impl AcquisitionRuntime { pub fn get_next_frame(&self) -> Result, RuntimeError> { // FIXME: can we make this a non-issue somehow? - if !self.enable_frame_consumer { + if !self.config.enable_frame_iterator { return Err(RuntimeError::ConfigurationError); } let acquisition_result = self @@ -443,7 +479,7 @@ impl AcquisitionRuntime { ) -> Result<(), RuntimeError> { // TODO: keep track of which frames we have seen here, and once we have // seen all of them, send `EventMsg::ProcesingDone` - if !self.enable_frame_consumer { + if !self.config.enable_frame_iterator { return Err(RuntimeError::ConfigurationError); } let inner = frame.unpack(); diff --git a/libertem_k2is/src/lib.rs b/libertem_k2is/src/lib.rs index 03c49b7f..613c588d 100644 --- a/libertem_k2is/src/lib.rs +++ b/libertem_k2is/src/lib.rs @@ -11,7 +11,7 @@ use std::{ }; use tokio::runtime::Runtime; -use k2o::runtime::{AddrConfig, RuntimeError, WaitResult}; +use k2o::runtime::{AcquisitionRuntimeConfig, AddrConfig, RuntimeError, WaitResult}; use k2o::{ acquisition::AcquisitionResult, @@ -332,13 +332,16 @@ struct Cam { #[pymethods] impl Cam { #[new] + #[allow(clippy::too_many_arguments)] fn new( + py: Python, local_addr_top: &str, local_addr_bottom: &str, mode: PyMode, shm_path: &str, - enable_frame_iterator: bool, - py: Python, + enable_frame_iterator: Option, + recv_realtime: Option, + assembly_realtime: Option, ) -> PyResult { let _guard = span_from_py(py, "Cam::new")?; @@ -352,14 +355,17 @@ impl Cam { SharedSlabAllocator::new(num_slots, slot_size, true, path).expect("create shm") }); let addr_config = AddrConfig::new(local_addr_top, local_addr_bottom); - - let runtime = AcquisitionRuntime::new( - &addr_config, + let enable_frame_iterator = enable_frame_iterator.unwrap_or(true); + let runtime_config = AcquisitionRuntimeConfig::new( enable_frame_iterator, - shm.get_handle(), + recv_realtime.unwrap_or(true), + assembly_realtime.unwrap_or(true), mode.into(), + addr_config, ); + let runtime = AcquisitionRuntime::new(&runtime_config, shm.get_handle()); + Ok(Cam { camera_mode: mode, enable_frame_iterator,