Skip to content

Commit

Permalink
Don't accidentally drop the first block of an acquisition
Browse files Browse the repository at this point in the history
Also some cleanup..
  • Loading branch information
sk1p committed Nov 17, 2023
1 parent de5ac5b commit ad2e43c
Show file tree
Hide file tree
Showing 15 changed files with 103 additions and 108 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[workspace]
resolver = "2"
members = [
"libertem_dectris",
"libertem_asi_tpx3",
Expand Down
18 changes: 9 additions & 9 deletions k2o/benches/decode_benchmarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ fn criterion_benchmark(c: &mut Criterion) {
// let mut out_f32: [f32; DECODED_SIZE * NUM_PACKETS] = [0.0; DECODED_SIZE * NUM_PACKETS];
// (in production code, we don't need to make these large allocations, as we only look at ~single packets at a time)

let input = vec![0 as u8; PACKET_SIZE * NUM_PACKETS].into_boxed_slice();
let mut out = vec![0 as u16; DECODED_SIZE * NUM_PACKETS].into_boxed_slice();
let mut out_f32 = vec![0.0 as f32; DECODED_SIZE * NUM_PACKETS].into_boxed_slice();
let input = vec![0_u8; PACKET_SIZE * NUM_PACKETS].into_boxed_slice();
let mut out = vec![0_u16; DECODED_SIZE * NUM_PACKETS].into_boxed_slice();
let mut out_f32 = vec![0.0_f32; DECODED_SIZE * NUM_PACKETS].into_boxed_slice();

const TOTAL_INPUT_SIZE: usize = NUM_PACKETS * PACKET_SIZE;

Expand All @@ -40,7 +40,7 @@ fn criterion_benchmark(c: &mut Criterion) {
let out_chunks = out.chunks_exact_mut(DECODED_SIZE);

for (chunk, o_chunk) in in_chunks.zip(out_chunks) {
decode::<PACKET_SIZE>(black_box(&chunk), black_box(o_chunk));
decode::<PACKET_SIZE>(black_box(chunk), black_box(o_chunk));
}
})
},
Expand All @@ -56,7 +56,7 @@ fn criterion_benchmark(c: &mut Criterion) {

for (in_chunk, out_chunk) in in_chunks.zip(out_chunks) {
decode_unrolled::<PACKET_SIZE, DECODED_SIZE>(
black_box(&in_chunk),
black_box(in_chunk),
black_box(out_chunk),
);
}
Expand All @@ -74,7 +74,7 @@ fn criterion_benchmark(c: &mut Criterion) {

for (in_chunk, out_chunk) in in_chunks.zip(out_chunks) {
decode_map::<_, _, PACKET_SIZE, DECODED_SIZE>(
black_box(&in_chunk),
black_box(in_chunk),
black_box(out_chunk),
|x| x,
);
Expand All @@ -93,7 +93,7 @@ fn criterion_benchmark(c: &mut Criterion) {

for (in_chunk, out_chunk) in in_chunks.zip(out_chunks) {
decode_map::<_, _, PACKET_SIZE, DECODED_SIZE>(
black_box(&in_chunk),
black_box(in_chunk),
black_box(out_chunk),
|x| x as f32,
);
Expand All @@ -112,7 +112,7 @@ fn criterion_benchmark(c: &mut Criterion) {

for (in_chunk, out_chunk) in in_chunks.zip(out_chunks) {
decode_converted::<_, PACKET_SIZE, DECODED_SIZE>(
black_box(&in_chunk),
black_box(in_chunk),
black_box(out_chunk),
);
}
Expand All @@ -130,7 +130,7 @@ fn criterion_benchmark(c: &mut Criterion) {

for (in_chunk, out_chunk) in in_chunks.zip(out_chunks) {
decode_converted::<_, PACKET_SIZE, DECODED_SIZE>(
black_box(&in_chunk),
black_box(in_chunk),
black_box(out_chunk),
);
}
Expand Down
15 changes: 8 additions & 7 deletions k2o/src/acquisition.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
use std::{
fmt::Debug,
time::{Duration, Instant},
};
use std::time::{Duration, Instant};

use crossbeam_channel::{Receiver, RecvError, Select, SelectedOperation, Sender};
use human_bytes::human_bytes;
use ipc_test::SharedSlabAllocator;
use log::{error, info, warn};
use opentelemetry::{
global,
trace::{self, TraceContextExt, Tracer},
trace::{self, Span, TraceContextExt, Tracer},
Context, Key,
};
use partialdebug::placeholder::PartialDebug;
Expand Down Expand Up @@ -188,7 +185,11 @@ impl<'a, F: K2Frame> FrameHandler<'a, F> {
#[must_use]
fn handle_frames(mut self) -> HandleFramesResult {
let tracer = get_tracer();
let span = tracer.start("handle_frames");
let mut span = tracer.start("handle_frames");
span.add_event(
"start",
vec![Key::new("ref_frame_id").i64(self.ref_frame_id as i64)],
);
let _guard = trace::mark_span_as_active(span);
let mut sel = Select::new();
let op_events = sel.recv(self.events_rx);
Expand All @@ -209,7 +210,7 @@ impl<'a, F: K2Frame> FrameHandler<'a, F> {
match oper.index() {
i if i == op_events => match oper.recv(self.events_rx) {
Ok(EventMsg::Shutdown {}) => return HandleFramesResult::Shutdown,
Ok(EventMsg::CancelAcquisition { acquisition_id }) => {
Ok(EventMsg::CancelAcquisition { acquisition_id: _ }) => {
return HandleFramesResult::Aborted {
dropped: self.dropped,
}
Expand Down
12 changes: 10 additions & 2 deletions k2o/src/assemble.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{

use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, SendError, Sender};
use ipc_test::SharedSlabAllocator;
use log::{debug, error, info};
use log::{debug, error, info, warn};
use opentelemetry::Context;

use crate::{
Expand All @@ -28,7 +28,7 @@ impl<F: K2Frame> PendingFrames<F> {
pub fn new() -> Self {
PendingFrames {
frames: HashMap::new(),
timeout: Duration::from_millis(5),
timeout: Duration::from_millis(15),
}
}

Expand Down Expand Up @@ -94,6 +94,14 @@ impl<F: K2Frame> PendingFrames<F> {
for (_, frame) in self.frames.iter() {
let delta = now - frame.get_modified_timestamp();
if !frame.is_finished() && delta > self.timeout {
let tracker = frame.get_tracker();
let num_missing = tracker.iter().filter(|have_block| !**have_block).count();
warn!(
"dropping frame {}, num_missing: {:?} of {}",
frame.get_frame_id(),
num_missing,
tracker.len()
);
to_remove.push(frame.get_frame_id());
}
}
Expand Down
2 changes: 1 addition & 1 deletion k2o/src/bin/decode_only.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use k2o::decode::decode_unrolled;
use k2o::decode::HEADER_SIZE;
use k2o::decode::{decode, decode_unrolled};

fn main() {
const PACKET_SIZE: usize = 0x5758;
Expand Down
6 changes: 3 additions & 3 deletions k2o/src/cli_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ pub enum WriteMode {
HDF5,
}

impl Into<WriterType> for WriteMode {
fn into(self) -> WriterType {
match self {
impl From<WriteMode> for WriterType {
fn from(val: WriteMode) -> Self {
match val {
WriteMode::Direct => WriterType::Direct,
WriteMode::MMAP => WriterType::Mmap,
#[cfg(not(feature = "hdf5"))]
Expand Down
16 changes: 8 additions & 8 deletions k2o/src/control.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use crossbeam_channel::TryRecvError;
use log::{debug, info};
use log::debug;
use opentelemetry::trace::{TraceContextExt, Tracer};

use crate::{
Expand Down Expand Up @@ -162,12 +162,12 @@ impl StateTracker {
acquisition_id: *acquisition_id,
}),
EventMsg::Shutdown => Ok(AcquisitionState::Shutdown),
EventMsg::AcquisitionError { msg } => Ok(AcquisitionState::Shutdown),
EventMsg::AcquisitionError { msg: _ } => Ok(AcquisitionState::Shutdown),
}
}
AcquisitionState::Armed {
params,
acquisition_id,
acquisition_id: _,
} => {
match event {
// invalid transitions:
Expand Down Expand Up @@ -217,7 +217,7 @@ impl StateTracker {
EventMsg::AcquisitionEnded { acquisition_id: _ } => Ok(AcquisitionState::Idle),
EventMsg::CancelAcquisition { acquisition_id: _ } => Ok(AcquisitionState::Idle),
EventMsg::Shutdown => Ok(AcquisitionState::Shutdown),
EventMsg::AcquisitionError { msg } => Ok(AcquisitionState::Shutdown),
EventMsg::AcquisitionError { msg: _ } => Ok(AcquisitionState::Shutdown),
}
}
AcquisitionState::AcquisitionStarted {
Expand Down Expand Up @@ -275,15 +275,15 @@ impl StateTracker {
acquisition_id: *acquisition_id,
})
}
EventMsg::CancelAcquisition { acquisition_id } => Ok(AcquisitionState::Idle),
EventMsg::CancelAcquisition { acquisition_id: _ } => Ok(AcquisitionState::Idle),
EventMsg::Shutdown => Ok(AcquisitionState::Shutdown),
EventMsg::AcquisitionError { msg } => Ok(AcquisitionState::Shutdown),
EventMsg::AcquisitionError { msg: _ } => Ok(AcquisitionState::Shutdown),
}
}
AcquisitionState::AcquisitionFinishing {
params: _,
frame_id: _,
acquisition_id: acquisition_id_outer,
acquisition_id: _acquisition_id_outer,
} => {
match event {
// invalid transitions:
Expand Down Expand Up @@ -330,7 +330,7 @@ impl StateTracker {
EventMsg::CancelAcquisition { acquisition_id: _ } => Ok(AcquisitionState::Idle),
EventMsg::Shutdown => Ok(AcquisitionState::Shutdown),
EventMsg::ProcessingDone { acquisition_id: _ } => Ok(AcquisitionState::Idle),
EventMsg::AcquisitionError { msg } => Ok(AcquisitionState::Shutdown),
EventMsg::AcquisitionError { msg: _ } => Ok(AcquisitionState::Shutdown),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion k2o/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
};

use crossbeam_channel::{select, unbounded, Receiver, RecvError, SendError, Sender};
use log::{debug, info};
use log::debug;

use crate::write::{DirectWriterBuilder, MMapWriterBuilder, NoopWriterBuilder, WriterBuilder};
#[cfg(feature = "hdf5")]
Expand Down
14 changes: 0 additions & 14 deletions k2o/src/frame_is.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ impl FrameForWriting for K2ISFrameForWriting {
let slot_info = shm.writing_done(self.payload);
K2ISFrame {
payload: slot_info,
subframe_idx: self.subframe_idx,
frame_id: self.frame_id,
created_timestamp: self.created_timestamp,
modified_timestamp: self.modified_timestamp,
Expand Down Expand Up @@ -140,25 +139,12 @@ impl K2ISFrameForWriting {
counts.get(&false).unwrap_or(&0)
);
}

fn into_readonly(self, slot_info: SlotInfo) -> K2ISFrame {
K2ISFrame {
payload: slot_info,
subframe_idx: self.subframe_idx,
frame_id: self.frame_id,
created_timestamp: self.created_timestamp,
modified_timestamp: self.modified_timestamp,
acquisition_id: self.acquisition_id,
}
}
}

pub struct K2ISFrame {
/// a reference to the decoded payload of the whole frame
pub payload: SlotInfo,

subframe_idx: u8,

/// the frame id as received
pub frame_id: u32,

Expand Down
15 changes: 6 additions & 9 deletions k2o/src/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ use std::{
collections::{BinaryHeap, HashSet},
};

use crate::{
acquisition::AcquisitionResult,
frame::{GenericFrame, K2Frame},
};
use crate::{acquisition::AcquisitionResult, frame::GenericFrame};

pub enum FrameOrderingResult {
Buffered,
Expand All @@ -19,9 +16,9 @@ pub enum FrameWithIdx {
DroppedFrame(GenericFrame, u32),
}

impl Into<AcquisitionResult<GenericFrame>> for FrameWithIdx {
fn into(self) -> AcquisitionResult<GenericFrame> {
match self {
impl From<FrameWithIdx> for AcquisitionResult<GenericFrame> {
fn from(val: FrameWithIdx) -> Self {
match val {
FrameWithIdx::Frame(frame, frame_idx) => AcquisitionResult::Frame(frame, frame_idx),
FrameWithIdx::DroppedFrame(frame, frame_idx) => {
AcquisitionResult::DroppedFrame(frame, frame_idx)
Expand Down Expand Up @@ -103,8 +100,8 @@ impl FrameOrdering {
pub fn handle_frame(&mut self, frame_w_idx: FrameWithIdx) -> FrameOrderingResult {
let expected_frame_idx = self.next_expected_frame_idx;

// frame indices can be repeated, in case we drop a frame and later a
// block of said frame arrives, which starts a new frame with the old index
// Frame indices can be repeated, in case we drop a frame and later a
// block of said frame arrives, which starts a new frame with the old index.
// filter these out here:
if self.dropped_idxs.contains(&frame_w_idx.get_idx()) {
return FrameOrderingResult::Dropped;
Expand Down
8 changes: 8 additions & 0 deletions k2o/src/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ pub fn recv_decode_loop<B: K2Block, const PACKET_SIZE: usize>(
acquisition_id,
});
state = RecvState::Receiving { acquisition_id };
// send out the first block:
let route_info = BlockRouteInfo::new(&block);
if assembly_channel.send((block, route_info)).is_err() {
events.send(&EventMsg::AcquisitionError {
msg: "failed to send to assembly threads".to_string(),
});
break;
}
}
}
RecvState::WaitForNext { acquisition_id } => {
Expand Down
2 changes: 1 addition & 1 deletion k2o/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use hdf5::{
types::{IntSize, TypeDescriptor},
Dataset, Extent,
};
use log::{info, trace};
use log::trace;
use memmap2::{MmapMut, MmapOptions};
use ndarray::s;
use ndarray_npy::{write_zeroed_npy, ViewMutNpyExt};
Expand Down
Loading

0 comments on commit ad2e43c

Please sign in to comment.