Skip to content

Commit

Permalink
Make realtime for recv/asm a runtime configuration
Browse files Browse the repository at this point in the history
Extract some configuration into `AcquisitionRuntimeConfig`
  • Loading branch information
sk1p committed Apr 30, 2024
1 parent 71de2f1 commit c250ff9
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 48 deletions.
21 changes: 19 additions & 2 deletions k2o/benches/assemble_benchmarks.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,38 @@
use std::time::Duration;
use std::time::Instant;

use criterion::BenchmarkId;
use criterion::Throughput;
use criterion::{black_box, criterion_group, criterion_main, Criterion};

use ipc_test::SharedSlabAllocator;
use k2o::block::K2Block;
use k2o::block_is::K2ISBlock;
use k2o::frame::FrameForWriting;
use k2o::frame::K2Frame;
use k2o::frame_is::K2ISFrame;
use k2o::frame_is::K2ISFrameForWriting;
use tempfile::tempdir;

fn criterion_benchmark(c: &mut Criterion) {
const PACKET_SIZE: usize = 0x5758;

const TOTAL_INPUT_SIZE: usize = 1 * PACKET_SIZE;
const TOTAL_INPUT_SIZE: usize = PACKET_SIZE;

let block = K2ISBlock::empty(42, 0);
let mut frame: K2ISFrame = K2ISFrame::empty::<K2ISBlock>(42, 0);

let socket_dir = tempdir().unwrap();
let socket_as_path = socket_dir.into_path().join("stuff.socket");

const FRAME_ID: u32 = 42;
let mut ssa = SharedSlabAllocator::new(
10,
K2ISFrame::FRAME_HEIGHT * K2ISFrame::FRAME_WIDTH * std::mem::size_of::<u16>(),
false,
&socket_as_path,
)
.expect("create SHM area for testing");
let mut frame: K2ISFrameForWriting = K2ISFrameForWriting::empty(FRAME_ID, &mut ssa, 0);

let mut assign_block = c.benchmark_group("assign_block* functions");
assign_block.measurement_time(Duration::new(10, 0));
Expand Down
22 changes: 17 additions & 5 deletions k2o/src/assemble.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,17 @@ fn assembly_worker<F: K2Frame, B: K2Block>(
stop_event: Arc<AtomicBool>,
handle_path: &str,
timeout: &Duration,
realtime: bool,
) -> Result<(), AssemblyError> {
let mut pending: PendingFrames<F> = PendingFrames::new(timeout.clone());
let mut pending: PendingFrames<F> = PendingFrames::new(*timeout);
let mut shm = SharedSlabAllocator::connect(handle_path).expect("connect to SHM");

// match make_realtime(5) {
// Ok(prio) => info!("successfully enabled realtime priority {prio}"),
// Err(e) => error!("failed to set realtime priority: {e:?}"),
// }
if realtime {
match make_realtime(5) {
Ok(prio) => info!("successfully enabled realtime priority {prio}"),
Err(e) => error!("failed to set realtime priority: {e:?}"),
}
}

loop {
match blocks_rx.recv_timeout(Duration::from_millis(100)) {
Expand Down Expand Up @@ -206,6 +209,7 @@ pub fn assembler_main<F: K2Frame, B: K2Block>(
events_rx: EventReceiver,
shm: SharedSlabAllocator,
timeout: &Duration,
realtime: bool,
) {
let pool_size = 5;
let mut worker_channels: Vec<Sender<B>> = Vec::with_capacity(pool_size);
Expand All @@ -219,6 +223,13 @@ pub fn assembler_main<F: K2Frame, B: K2Block>(
recycle_blocks_tx.send(B::empty(0, 0)).unwrap();
}

if realtime {
match make_realtime(5) {
Ok(prio) => info!("successfully enabled realtime priority {prio}"),
Err(e) => error!("failed to set realtime priority: {e:?}"),
}
}

crossbeam::scope(|s| {
for idx in 0..pool_size {
let asm_worker_ctx = ctx.clone();
Expand All @@ -240,6 +251,7 @@ pub fn assembler_main<F: K2Frame, B: K2Block>(
this_stop_event,
&shm_handle.os_handle,
timeout,
realtime,
)
.is_err()
{
Expand Down
4 changes: 3 additions & 1 deletion k2o/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use k2o::frame_is::K2ISFrame;
use k2o::frame_summit::K2SummitFrame;
use k2o::helpers::CPU_AFF_WRITER;
use k2o::helpers::{recv_and_get_init, set_cpu_affinity};
use k2o::recv::recv_decode_loop;
use k2o::recv::{recv_decode_loop, RecvConfig};
use k2o::tracing::init_tracer;
use log::info;
use tokio::runtime::Runtime;
Expand Down Expand Up @@ -80,6 +80,7 @@ fn start_threads<
events,
local_addr,
None,
&RecvConfig::new(true),
);
})
.expect("could not spawn recv+decode thread");
Expand Down Expand Up @@ -108,6 +109,7 @@ fn start_threads<
asm_events_rx,
asm_shm,
&Duration::from_millis(100),
true,
);
}
})
Expand Down
3 changes: 2 additions & 1 deletion k2o/src/bin/timestamp_recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use k2o::{
AcquisitionParams, AcquisitionSize, AcquisitionSync, ChannelEventBus, EventBus, EventMsg,
Events, WriterSettings,
},
recv::recv_decode_loop,
recv::{recv_decode_loop, RecvConfig},
};

fn mean(data: &[u128]) -> Option<f32> {
Expand Down Expand Up @@ -83,6 +83,7 @@ fn start_threads<
events,
local_addr,
None,
&RecvConfig::new(true),
);
})
.expect("could not spawn recv+decode thread");
Expand Down
25 changes: 18 additions & 7 deletions k2o/src/recv.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use std::{
io::ErrorKind,
sync::{
atomic::{AtomicU8, Ordering},
Arc, Barrier, Condvar, Mutex,
},
sync::{Arc, Condvar, Mutex},
time::Duration,
};

Expand All @@ -17,6 +14,17 @@ use crate::{
net::create_mcast_socket,
};

#[derive(Debug, Clone)]
pub struct RecvConfig {
pub realtime: bool,
}

impl RecvConfig {
pub fn new(realtime: bool) -> Self {
Self { realtime }
}
}

#[derive(PartialEq)]
enum RecvState {
Initializing,
Expand Down Expand Up @@ -82,13 +90,16 @@ pub fn recv_decode_loop<B: K2Block, const PACKET_SIZE: usize>(
events: &Events,
local_addr: String,
first_block_counter: Option<Arc<(Mutex<u8>, Condvar)>>,
config: &RecvConfig,
) {
let socket = create_mcast_socket(port, "225.1.1.1", &local_addr);
let mut buf: [u8; PACKET_SIZE] = [0; PACKET_SIZE];

match make_realtime(50) {
Ok(prio) => info!("successfully enabled realtime priority {prio}"),
Err(e) => error!("failed to set realtime priority: {e:?}"),
if config.realtime {
match make_realtime(50) {
Ok(prio) => info!("successfully enabled realtime priority {prio}"),
Err(e) => error!("failed to set realtime priority: {e:?}"),
}
}

socket
Expand Down
Loading

0 comments on commit c250ff9

Please sign in to comment.