Skip to content

Commit

Permalink
Update ordering test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
sk1p committed Apr 30, 2024
1 parent c250ff9 commit bd9cc1c
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 134 deletions.
84 changes: 1 addition & 83 deletions k2o/src/acquisition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ struct FrameHandler<'a, F: K2Frame> {
}

impl<'a, F: K2Frame> FrameHandler<'a, F> {
#[allow(clippy::too_many_arguments)]
fn new(
acquisition_id: usize,
channel: &'a Receiver<AssemblyResult<F>>,
Expand Down Expand Up @@ -578,86 +579,3 @@ pub fn acquisition_loop<F: K2Frame>(
});
global::force_flush_tracer_provider();
}

// /// Track until which index we have received all frames, and the possibly
// /// "non-contiguous" set of other frames "on the right" which we have received.
// struct TrackFrames {
// /// Marker dividing the index space of the acquisition into frames that we have
// /// finished processing, and those that are still in flight or to be received.
// ///
// /// This points at the first frame in the todo-part, so a `0` at the beginning
// /// means everything still needs to be received.
// ///
// /// Finished processing also includes dropped frames, where we have waited long
// /// enough and didn't get all the data for the full frame.
// dense_until: usize,
//
// /// Set of all indices `i` that have been successfully received, but where
// /// another index `j` exists, such that the frame `j` has not been received fully.
// leading: HashSet<usize>,
//
// /// Separate tracker for dropped frames
// dropped: HashSet<usize>,
// }
//
// impl TrackFrames {
// pub fn new() -> Self {
// TrackFrames {
// dense_until: 0,
// leading: HashSet::new(),
// dropped: HashSet::new(),
// }
// }
//
// /// After changing `leading` or `dense_until`, call this function to check
// /// if we can move `dense_until` even further, and remove items from
// /// `leading`.
// fn maybe_move_marker(&mut self) {
// if self.leading.len() == 0 {
// return; // no need to adjust if `self.leading` is empty
// }
// let max_leading = self
// .leading
// .iter()
// .max()
// .expect("`leading` is not empty, so should have a maximum");
// for idx in self.dense_until..=*max_leading {
// if self.leading.contains(&idx) {
// self.leading.remove(&idx);
// self.dense_until = idx + 1;
// } else {
// // first frame which is not done yet encountered, so we keep this and
// // the following in the `leading` set.
// return;
// }
// }
// }
//
// fn track_frame<F: K2Frame>(&mut self, frame: &F, rel_idx: usize) {
// if rel_idx == self.dense_until {
// // fast path: the frame is exactly the next "expected" frame:
// self.dense_until += 1;
// self.maybe_move_marker();
// return;
// } else if rel_idx > self.dense_until {
// // anything else
// self.leading.insert(rel_idx);
// self.maybe_move_marker();
// return;
// } else {
// panic!(
// "cannot track a frame with idx {} < {}",
// rel_idx, self.dense_until
// );
// }
// }
//
// pub fn track_frame_done<F: K2Frame>(&mut self, frame: &F, rel_idx: usize) {
// self.track_frame(frame, rel_idx);
// }
//
// pub fn track_frame_dropped<F: K2Frame>(&mut self, frame: &F, rel_idx: usize) {
// self.track_frame(frame, rel_idx);
// self.dropped.insert(rel_idx);
// }
// }
154 changes: 103 additions & 51 deletions k2o/src/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,14 @@ impl FrameOrdering {
}
}

#[cfg(nope)]
#[cfg(test)]
mod tests {
use ipc_test::SharedSlabAllocator;
use tempfile::tempdir;

use crate::{
frame::{K2Frame, K2ISFrame},
frame::{FrameForWriting, K2Frame},
frame_is::{K2ISFrame, K2ISFrameForWriting},
ordering::FrameWithIdx,
};

Expand All @@ -183,10 +183,9 @@ mod tests {
#[test]
fn test_direct_return() {
let socket_dir = tempdir().unwrap();
let socket_as_path = socket_dir.into_path();
let handle_path = socket_as_path.to_str().unwrap();
let socket_as_path = socket_dir.into_path().join("test.socket");

let mut ordering: FrameOrdering<K2ISFrame> = FrameOrdering::new(0);
let mut ordering: FrameOrdering = FrameOrdering::new(0);

let mut ssa = SharedSlabAllocator::new(
10,
Expand All @@ -196,16 +195,25 @@ mod tests {
)
.expect("create SHM area for testing");

let f1 = K2ISFrame::empty(1, &mut ssa);
let f2 = K2ISFrame::empty(2, &mut ssa);
let f3 = K2ISFrame::empty(3, &mut ssa);
let f1w: K2ISFrameForWriting = K2ISFrameForWriting::empty(1, &mut ssa, 0);
let f1 = f1w.writing_done(&mut ssa);
let f2w: K2ISFrameForWriting = K2ISFrameForWriting::empty(2, &mut ssa, 0);
let f2 = f2w.writing_done(&mut ssa);
let f3w: K2ISFrameForWriting = K2ISFrameForWriting::empty(3, &mut ssa, 0);
let f3 = f3w.writing_done(&mut ssa);

assert_eq!(ordering.frame_buffer.len(), 0);
assert!(ordering.handle_frame(FrameWithIdx::Frame(f1, 0)).is_frame());
assert!(ordering
.handle_frame(FrameWithIdx::Frame(f1.into_generic(), 0))
.is_frame());
assert_eq!(ordering.frame_buffer.len(), 0);
assert!(ordering.handle_frame(FrameWithIdx::Frame(f2, 1)).is_frame());
assert!(ordering
.handle_frame(FrameWithIdx::Frame(f2.into_generic(), 1))
.is_frame());
assert_eq!(ordering.frame_buffer.len(), 0);
assert!(ordering.handle_frame(FrameWithIdx::Frame(f3, 2)).is_frame());
assert!(ordering
.handle_frame(FrameWithIdx::Frame(f3.into_generic(), 2))
.is_frame());

assert_eq!(ordering.next_expected_frame_idx, 3);

Expand All @@ -215,32 +223,42 @@ mod tests {

#[test]
fn test_one_missing() {
let mut ordering: FrameOrdering<K2ISFrame> = FrameOrdering::new(0);
let socket_dir = tempdir().unwrap();
let socket_as_path = socket_dir.into_path().join("test.socket");
let mut ordering: FrameOrdering = FrameOrdering::new(0);

let mut ssa = SharedSlabAllocator::new(
10,
K2ISFrame::FRAME_HEIGHT * K2ISFrame::FRAME_WIDTH * std::mem::size_of::<u16>(),
false,
&socket_as_path,
)
.expect("create SHM area for testing");

let f1 = K2ISFrame::empty(1, &mut ssa);
let f2 = K2ISFrame::empty(2, &mut ssa);
let f3 = K2ISFrame::empty(3, &mut ssa);
let f1w: K2ISFrameForWriting = K2ISFrameForWriting::empty(1, &mut ssa, 0);
let f1 = f1w.writing_done(&mut ssa);
let f2w: K2ISFrameForWriting = K2ISFrameForWriting::empty(2, &mut ssa, 0);
let f2 = f2w.writing_done(&mut ssa);
let f3w: K2ISFrameForWriting = K2ISFrameForWriting::empty(3, &mut ssa, 0);
let f3 = f3w.writing_done(&mut ssa);

assert!(ordering.handle_frame(FrameWithIdx::Frame(f1, 0)).is_frame());
assert!(ordering
.handle_frame(FrameWithIdx::Frame(f3, 2))
.handle_frame(FrameWithIdx::Frame(f1.into_generic(), 0))
.is_frame());
assert!(ordering
.handle_frame(FrameWithIdx::Frame(f3.into_generic(), 2))
.is_buffered());

// f1 was the last emitted frame, f3 is buffered, f2 is expected:
assert_eq!(ordering.next_expected_frame_idx, 1);
assert_eq!(ordering.frame_buffer.len(), 1);
assert!(matches!(ordering.maybe_get_next_frame(), None));
assert!(ordering.maybe_get_next_frame().is_none());

// now, we push in f2, which is directly emitted, because its frame id
// matches the currently expected id (f1 + 1):
assert!(ordering.handle_frame(FrameWithIdx::Frame(f2, 1)).is_frame());
assert!(ordering
.handle_frame(FrameWithIdx::Frame(f2.into_generic(), 1))
.is_frame());
assert_eq!(ordering.next_expected_frame_idx, 2);
assert_eq!(ordering.frame_buffer.len(), 1);

Expand All @@ -257,36 +275,47 @@ mod tests {

#[test]
fn test_multiple_buffered() {
let mut ordering: FrameOrdering<K2ISFrame> = FrameOrdering::new(0);
let socket_dir = tempdir().unwrap();
let socket_as_path = socket_dir.into_path().join("test.socket");
let mut ordering: FrameOrdering = FrameOrdering::new(0);

let mut ssa = SharedSlabAllocator::new(
10,
K2ISFrame::FRAME_HEIGHT * K2ISFrame::FRAME_WIDTH * std::mem::size_of::<u16>(),
false,
&socket_as_path,
)
.expect("create SHM area for testing");

let f1 = K2ISFrame::empty(1, &mut ssa);
let f2 = K2ISFrame::empty(2, &mut ssa);
let f3 = K2ISFrame::empty(3, &mut ssa);
let f4 = K2ISFrame::empty(4, &mut ssa);
let f1w: K2ISFrameForWriting = K2ISFrameForWriting::empty(1, &mut ssa, 0);
let f1 = f1w.writing_done(&mut ssa);
let f2w: K2ISFrameForWriting = K2ISFrameForWriting::empty(2, &mut ssa, 0);
let f2 = f2w.writing_done(&mut ssa);
let f3w: K2ISFrameForWriting = K2ISFrameForWriting::empty(3, &mut ssa, 0);
let f3 = f3w.writing_done(&mut ssa);
let f4w: K2ISFrameForWriting = K2ISFrameForWriting::empty(4, &mut ssa, 0);
let f4 = f4w.writing_done(&mut ssa);

assert!(ordering.handle_frame(FrameWithIdx::Frame(f1, 0)).is_frame());
assert!(ordering
.handle_frame(FrameWithIdx::Frame(f3, 2))
.handle_frame(FrameWithIdx::Frame(f1.into_generic(), 0))
.is_frame());
assert!(ordering
.handle_frame(FrameWithIdx::Frame(f3.into_generic(), 2))
.is_buffered());
assert!(ordering
.handle_frame(FrameWithIdx::Frame(f4, 3))
.handle_frame(FrameWithIdx::Frame(f4.into_generic(), 3))
.is_buffered());

// f1 was the last emitted frame, f3 and f4 are buffered:
assert_eq!(ordering.next_expected_frame_idx, 1);
assert_eq!(ordering.frame_buffer.len(), 2);
assert!(matches!(ordering.maybe_get_next_frame(), None));
assert!(ordering.maybe_get_next_frame().is_none());

// now, we push in f2, which is directly emitted, because its frame id
// matches the currently expected id (f1 + 1):
assert!(ordering.handle_frame(FrameWithIdx::Frame(f2, 1)).is_frame());
assert!(ordering
.handle_frame(FrameWithIdx::Frame(f2.into_generic(), 1))
.is_frame());
assert_eq!(ordering.next_expected_frame_idx, 2);
assert_eq!(ordering.frame_buffer.len(), 2);

Expand All @@ -313,44 +342,56 @@ mod tests {

#[test]
fn test_multiple_holes() {
let socket_dir = tempdir().unwrap();
let socket_as_path = socket_dir.into_path().join("test.socket");
// something like this: f1 _ f3 _ f5
let mut ordering: FrameOrdering<K2ISFrame> = FrameOrdering::new(0);
let mut ordering: FrameOrdering = FrameOrdering::new(0);

let mut ssa = SharedSlabAllocator::new(
10,
K2ISFrame::FRAME_HEIGHT * K2ISFrame::FRAME_WIDTH * std::mem::size_of::<u16>(),
false,
&socket_as_path,
)
.expect("create SHM area for testing");

let f1 = K2ISFrame::empty(1, &mut ssa);
let f2 = K2ISFrame::empty(2, &mut ssa);
let f3 = K2ISFrame::empty(3, &mut ssa);
let f4 = K2ISFrame::empty(4, &mut ssa);
let f5 = K2ISFrame::empty(5, &mut ssa);
let f1w: K2ISFrameForWriting = K2ISFrameForWriting::empty(1, &mut ssa, 0);
let f1 = f1w.writing_done(&mut ssa);
let f2w: K2ISFrameForWriting = K2ISFrameForWriting::empty(2, &mut ssa, 0);
let f2 = f2w.writing_done(&mut ssa);
let f3w: K2ISFrameForWriting = K2ISFrameForWriting::empty(3, &mut ssa, 0);
let f3 = f3w.writing_done(&mut ssa);
let f4w: K2ISFrameForWriting = K2ISFrameForWriting::empty(4, &mut ssa, 0);
let f4 = f4w.writing_done(&mut ssa);
let f5w: K2ISFrameForWriting = K2ISFrameForWriting::empty(5, &mut ssa, 0);
let f5 = f5w.writing_done(&mut ssa);

assert!(ordering.handle_frame(FrameWithIdx::Frame(f1, 0)).is_frame());
assert!(ordering
.handle_frame(FrameWithIdx::Frame(f3, 2))
.handle_frame(FrameWithIdx::Frame(f1.into_generic(), 0))
.is_frame());
assert!(ordering
.handle_frame(FrameWithIdx::Frame(f3.into_generic(), 2))
.is_buffered());
assert!(ordering
.handle_frame(FrameWithIdx::Frame(f5, 4))
.handle_frame(FrameWithIdx::Frame(f5.into_generic(), 4))
.is_buffered());

// f1 was the last emitted frame, f3 and f5 are buffered:
assert_eq!(ordering.next_expected_frame_idx, 1);
assert_eq!(ordering.frame_buffer.len(), 2);
assert!(matches!(ordering.maybe_get_next_frame(), None));
assert!(ordering.maybe_get_next_frame().is_none());

// now, we push in f2, which is directly emitted, because its frame id
// matches the currently expected id (f1 + 1):
assert!(ordering.handle_frame(FrameWithIdx::Frame(f2, 1)).is_frame());
assert!(ordering
.handle_frame(FrameWithIdx::Frame(f2.into_generic(), 1))
.is_frame());
assert_eq!(ordering.next_expected_frame_idx, 2);
assert_eq!(ordering.frame_buffer.len(), 2);

// now, we push in f4, which is buffered:
assert!(ordering
.handle_frame(FrameWithIdx::Frame(f4, 3))
.handle_frame(FrameWithIdx::Frame(f4.into_generic(), 3))
.is_buffered());
assert_eq!(ordering.next_expected_frame_idx, 2);
assert_eq!(ordering.frame_buffer.len(), 3);
Expand All @@ -377,32 +418,43 @@ mod tests {
None => false,
});
assert_eq!(ordering.frame_buffer.len(), 0);
assert!(matches!(ordering.maybe_get_next_frame(), None));
assert!(ordering.maybe_get_next_frame().is_none());
}

#[test]
fn test_drop_duplicates() {
let mut ordering: FrameOrdering<K2ISFrame> = FrameOrdering::new(0);
let socket_dir = tempdir().unwrap();
let socket_as_path = socket_dir.into_path().join("test.socket");
let mut ordering: FrameOrdering = FrameOrdering::new(0);

let mut ssa = SharedSlabAllocator::new(
10,
K2ISFrame::FRAME_HEIGHT * K2ISFrame::FRAME_WIDTH * std::mem::size_of::<u16>(),
false,
&socket_as_path,
)
.expect("create SHM area for testing");

let f1 = K2ISFrame::empty(1, &mut ssa);
let f2 = K2ISFrame::empty(2, &mut ssa);
let f3 = K2ISFrame::empty(1, &mut ssa);
let f4 = K2ISFrame::empty(4, &mut ssa);
let f1w: K2ISFrameForWriting = K2ISFrameForWriting::empty(1, &mut ssa, 0);
let f1 = f1w.writing_done(&mut ssa);
let f2w: K2ISFrameForWriting = K2ISFrameForWriting::empty(2, &mut ssa, 0);
let f2 = f2w.writing_done(&mut ssa);
let f3w: K2ISFrameForWriting = K2ISFrameForWriting::empty(3, &mut ssa, 0);
let f3 = f3w.writing_done(&mut ssa);
let f4w: K2ISFrameForWriting = K2ISFrameForWriting::empty(4, &mut ssa, 0);
let f4 = f4w.writing_done(&mut ssa);

assert!(ordering
.handle_frame(FrameWithIdx::DroppedFrame(f1, 0))
.handle_frame(FrameWithIdx::DroppedFrame(f1.into_generic(), 0))
.is_frame());
assert!(ordering.handle_frame(FrameWithIdx::Frame(f2, 1)).is_frame());
assert!(ordering
.handle_frame(FrameWithIdx::DroppedFrame(f3, 0))
.handle_frame(FrameWithIdx::Frame(f2.into_generic(), 1))
.is_frame());
assert!(ordering
.handle_frame(FrameWithIdx::DroppedFrame(f3.into_generic(), 0))
.is_dropped()); // note duplicate index!
assert!(ordering.handle_frame(FrameWithIdx::Frame(f4, 2)).is_frame());
assert!(ordering
.handle_frame(FrameWithIdx::Frame(f4.into_generic(), 2))
.is_frame());
}
}

0 comments on commit bd9cc1c

Please sign in to comment.