diff --git a/k2o/src/acquisition.rs b/k2o/src/acquisition.rs index 508792d2..10306700 100644 --- a/k2o/src/acquisition.rs +++ b/k2o/src/acquisition.rs @@ -162,6 +162,7 @@ struct FrameHandler<'a, F: K2Frame> { } impl<'a, F: K2Frame> FrameHandler<'a, F> { + #[allow(clippy::too_many_arguments)] fn new( acquisition_id: usize, channel: &'a Receiver>, @@ -578,86 +579,3 @@ pub fn acquisition_loop( }); global::force_flush_tracer_provider(); } - -// /// Track until which index we have received all frames, and the possibly -// /// "non-contiguous" set of other frames "on the right" which we have received. -// struct TrackFrames { -// /// Marker dividing the index space of the acquisition into frames that we have -// /// finished processing, and those that are still in flight or to be received. -// /// -// /// This points at the first frame in the todo-part, so a `0` at the beginning -// /// means everything still needs to be received. -// /// -// /// Finished processing also includes dropped frames, where we have waited long -// /// enough and didn't get all the data for the full frame. -// dense_until: usize, -// -// /// Set of all indices `i` that have been successfully received, but where -// /// another index `j` exists, such that the frame `j` has not been received fully. -// leading: HashSet, -// -// /// Separate tracker for dropped frames -// dropped: HashSet, -// } -// -// impl TrackFrames { -// pub fn new() -> Self { -// TrackFrames { -// dense_until: 0, -// leading: HashSet::new(), -// dropped: HashSet::new(), -// } -// } -// -// /// After changing `leading` or `dense_until`, call this function to check -// /// if we can move `dense_until` even further, and remove items from -// /// `leading`. -// fn maybe_move_marker(&mut self) { -// if self.leading.len() == 0 { -// return; // no need to adjust if `self.leading` is empty -// } -// let max_leading = self -// .leading -// .iter() -// .max() -// .expect("`leading` is not empty, so should have a maximum"); -// for idx in self.dense_until..=*max_leading { -// if self.leading.contains(&idx) { -// self.leading.remove(&idx); -// self.dense_until = idx + 1; -// } else { -// // first frame which is not done yet encountered, so we keep this and -// // the following in the `leading` set. -// return; -// } -// } -// } -// -// fn track_frame(&mut self, frame: &F, rel_idx: usize) { -// if rel_idx == self.dense_until { -// // fast path: the frame is exactly the next "expected" frame: -// self.dense_until += 1; -// self.maybe_move_marker(); -// return; -// } else if rel_idx > self.dense_until { -// // anything else -// self.leading.insert(rel_idx); -// self.maybe_move_marker(); -// return; -// } else { -// panic!( -// "cannot track a frame with idx {} < {}", -// rel_idx, self.dense_until -// ); -// } -// } -// -// pub fn track_frame_done(&mut self, frame: &F, rel_idx: usize) { -// self.track_frame(frame, rel_idx); -// } -// -// pub fn track_frame_dropped(&mut self, frame: &F, rel_idx: usize) { -// self.track_frame(frame, rel_idx); -// self.dropped.insert(rel_idx); -// } -// } diff --git a/k2o/src/ordering.rs b/k2o/src/ordering.rs index ce9b10ee..361c4894 100644 --- a/k2o/src/ordering.rs +++ b/k2o/src/ordering.rs @@ -167,14 +167,14 @@ impl FrameOrdering { } } -#[cfg(nope)] #[cfg(test)] mod tests { use ipc_test::SharedSlabAllocator; use tempfile::tempdir; use crate::{ - frame::{K2Frame, K2ISFrame}, + frame::{FrameForWriting, K2Frame}, + frame_is::{K2ISFrame, K2ISFrameForWriting}, ordering::FrameWithIdx, }; @@ -183,10 +183,9 @@ mod tests { #[test] fn test_direct_return() { let socket_dir = tempdir().unwrap(); - let socket_as_path = socket_dir.into_path(); - let handle_path = socket_as_path.to_str().unwrap(); + let socket_as_path = socket_dir.into_path().join("test.socket"); - let mut ordering: FrameOrdering = FrameOrdering::new(0); + let mut ordering: FrameOrdering = FrameOrdering::new(0); let mut ssa = SharedSlabAllocator::new( 10, @@ -196,16 +195,25 @@ mod tests { ) .expect("create SHM area for testing"); - let f1 = K2ISFrame::empty(1, &mut ssa); - let f2 = K2ISFrame::empty(2, &mut ssa); - let f3 = K2ISFrame::empty(3, &mut ssa); + let f1w: K2ISFrameForWriting = K2ISFrameForWriting::empty(1, &mut ssa, 0); + let f1 = f1w.writing_done(&mut ssa); + let f2w: K2ISFrameForWriting = K2ISFrameForWriting::empty(2, &mut ssa, 0); + let f2 = f2w.writing_done(&mut ssa); + let f3w: K2ISFrameForWriting = K2ISFrameForWriting::empty(3, &mut ssa, 0); + let f3 = f3w.writing_done(&mut ssa); assert_eq!(ordering.frame_buffer.len(), 0); - assert!(ordering.handle_frame(FrameWithIdx::Frame(f1, 0)).is_frame()); + assert!(ordering + .handle_frame(FrameWithIdx::Frame(f1.into_generic(), 0)) + .is_frame()); assert_eq!(ordering.frame_buffer.len(), 0); - assert!(ordering.handle_frame(FrameWithIdx::Frame(f2, 1)).is_frame()); + assert!(ordering + .handle_frame(FrameWithIdx::Frame(f2.into_generic(), 1)) + .is_frame()); assert_eq!(ordering.frame_buffer.len(), 0); - assert!(ordering.handle_frame(FrameWithIdx::Frame(f3, 2)).is_frame()); + assert!(ordering + .handle_frame(FrameWithIdx::Frame(f3.into_generic(), 2)) + .is_frame()); assert_eq!(ordering.next_expected_frame_idx, 3); @@ -215,32 +223,42 @@ mod tests { #[test] fn test_one_missing() { - let mut ordering: FrameOrdering = FrameOrdering::new(0); + let socket_dir = tempdir().unwrap(); + let socket_as_path = socket_dir.into_path().join("test.socket"); + let mut ordering: FrameOrdering = FrameOrdering::new(0); let mut ssa = SharedSlabAllocator::new( 10, K2ISFrame::FRAME_HEIGHT * K2ISFrame::FRAME_WIDTH * std::mem::size_of::(), false, + &socket_as_path, ) .expect("create SHM area for testing"); - let f1 = K2ISFrame::empty(1, &mut ssa); - let f2 = K2ISFrame::empty(2, &mut ssa); - let f3 = K2ISFrame::empty(3, &mut ssa); + let f1w: K2ISFrameForWriting = K2ISFrameForWriting::empty(1, &mut ssa, 0); + let f1 = f1w.writing_done(&mut ssa); + let f2w: K2ISFrameForWriting = K2ISFrameForWriting::empty(2, &mut ssa, 0); + let f2 = f2w.writing_done(&mut ssa); + let f3w: K2ISFrameForWriting = K2ISFrameForWriting::empty(3, &mut ssa, 0); + let f3 = f3w.writing_done(&mut ssa); - assert!(ordering.handle_frame(FrameWithIdx::Frame(f1, 0)).is_frame()); assert!(ordering - .handle_frame(FrameWithIdx::Frame(f3, 2)) + .handle_frame(FrameWithIdx::Frame(f1.into_generic(), 0)) + .is_frame()); + assert!(ordering + .handle_frame(FrameWithIdx::Frame(f3.into_generic(), 2)) .is_buffered()); // f1 was the last emitted frame, f3 is buffered, f2 is expected: assert_eq!(ordering.next_expected_frame_idx, 1); assert_eq!(ordering.frame_buffer.len(), 1); - assert!(matches!(ordering.maybe_get_next_frame(), None)); + assert!(ordering.maybe_get_next_frame().is_none()); // now, we push in f2, which is directly emitted, because its frame id // matches the currently expected id (f1 + 1): - assert!(ordering.handle_frame(FrameWithIdx::Frame(f2, 1)).is_frame()); + assert!(ordering + .handle_frame(FrameWithIdx::Frame(f2.into_generic(), 1)) + .is_frame()); assert_eq!(ordering.next_expected_frame_idx, 2); assert_eq!(ordering.frame_buffer.len(), 1); @@ -257,36 +275,47 @@ mod tests { #[test] fn test_multiple_buffered() { - let mut ordering: FrameOrdering = FrameOrdering::new(0); + let socket_dir = tempdir().unwrap(); + let socket_as_path = socket_dir.into_path().join("test.socket"); + let mut ordering: FrameOrdering = FrameOrdering::new(0); let mut ssa = SharedSlabAllocator::new( 10, K2ISFrame::FRAME_HEIGHT * K2ISFrame::FRAME_WIDTH * std::mem::size_of::(), false, + &socket_as_path, ) .expect("create SHM area for testing"); - let f1 = K2ISFrame::empty(1, &mut ssa); - let f2 = K2ISFrame::empty(2, &mut ssa); - let f3 = K2ISFrame::empty(3, &mut ssa); - let f4 = K2ISFrame::empty(4, &mut ssa); + let f1w: K2ISFrameForWriting = K2ISFrameForWriting::empty(1, &mut ssa, 0); + let f1 = f1w.writing_done(&mut ssa); + let f2w: K2ISFrameForWriting = K2ISFrameForWriting::empty(2, &mut ssa, 0); + let f2 = f2w.writing_done(&mut ssa); + let f3w: K2ISFrameForWriting = K2ISFrameForWriting::empty(3, &mut ssa, 0); + let f3 = f3w.writing_done(&mut ssa); + let f4w: K2ISFrameForWriting = K2ISFrameForWriting::empty(4, &mut ssa, 0); + let f4 = f4w.writing_done(&mut ssa); - assert!(ordering.handle_frame(FrameWithIdx::Frame(f1, 0)).is_frame()); assert!(ordering - .handle_frame(FrameWithIdx::Frame(f3, 2)) + .handle_frame(FrameWithIdx::Frame(f1.into_generic(), 0)) + .is_frame()); + assert!(ordering + .handle_frame(FrameWithIdx::Frame(f3.into_generic(), 2)) .is_buffered()); assert!(ordering - .handle_frame(FrameWithIdx::Frame(f4, 3)) + .handle_frame(FrameWithIdx::Frame(f4.into_generic(), 3)) .is_buffered()); // f1 was the last emitted frame, f3 and f4 are buffered: assert_eq!(ordering.next_expected_frame_idx, 1); assert_eq!(ordering.frame_buffer.len(), 2); - assert!(matches!(ordering.maybe_get_next_frame(), None)); + assert!(ordering.maybe_get_next_frame().is_none()); // now, we push in f2, which is directly emitted, because its frame id // matches the currently expected id (f1 + 1): - assert!(ordering.handle_frame(FrameWithIdx::Frame(f2, 1)).is_frame()); + assert!(ordering + .handle_frame(FrameWithIdx::Frame(f2.into_generic(), 1)) + .is_frame()); assert_eq!(ordering.next_expected_frame_idx, 2); assert_eq!(ordering.frame_buffer.len(), 2); @@ -313,44 +342,56 @@ mod tests { #[test] fn test_multiple_holes() { + let socket_dir = tempdir().unwrap(); + let socket_as_path = socket_dir.into_path().join("test.socket"); // something like this: f1 _ f3 _ f5 - let mut ordering: FrameOrdering = FrameOrdering::new(0); + let mut ordering: FrameOrdering = FrameOrdering::new(0); let mut ssa = SharedSlabAllocator::new( 10, K2ISFrame::FRAME_HEIGHT * K2ISFrame::FRAME_WIDTH * std::mem::size_of::(), false, + &socket_as_path, ) .expect("create SHM area for testing"); - let f1 = K2ISFrame::empty(1, &mut ssa); - let f2 = K2ISFrame::empty(2, &mut ssa); - let f3 = K2ISFrame::empty(3, &mut ssa); - let f4 = K2ISFrame::empty(4, &mut ssa); - let f5 = K2ISFrame::empty(5, &mut ssa); + let f1w: K2ISFrameForWriting = K2ISFrameForWriting::empty(1, &mut ssa, 0); + let f1 = f1w.writing_done(&mut ssa); + let f2w: K2ISFrameForWriting = K2ISFrameForWriting::empty(2, &mut ssa, 0); + let f2 = f2w.writing_done(&mut ssa); + let f3w: K2ISFrameForWriting = K2ISFrameForWriting::empty(3, &mut ssa, 0); + let f3 = f3w.writing_done(&mut ssa); + let f4w: K2ISFrameForWriting = K2ISFrameForWriting::empty(4, &mut ssa, 0); + let f4 = f4w.writing_done(&mut ssa); + let f5w: K2ISFrameForWriting = K2ISFrameForWriting::empty(5, &mut ssa, 0); + let f5 = f5w.writing_done(&mut ssa); - assert!(ordering.handle_frame(FrameWithIdx::Frame(f1, 0)).is_frame()); assert!(ordering - .handle_frame(FrameWithIdx::Frame(f3, 2)) + .handle_frame(FrameWithIdx::Frame(f1.into_generic(), 0)) + .is_frame()); + assert!(ordering + .handle_frame(FrameWithIdx::Frame(f3.into_generic(), 2)) .is_buffered()); assert!(ordering - .handle_frame(FrameWithIdx::Frame(f5, 4)) + .handle_frame(FrameWithIdx::Frame(f5.into_generic(), 4)) .is_buffered()); // f1 was the last emitted frame, f3 and f5 are buffered: assert_eq!(ordering.next_expected_frame_idx, 1); assert_eq!(ordering.frame_buffer.len(), 2); - assert!(matches!(ordering.maybe_get_next_frame(), None)); + assert!(ordering.maybe_get_next_frame().is_none()); // now, we push in f2, which is directly emitted, because its frame id // matches the currently expected id (f1 + 1): - assert!(ordering.handle_frame(FrameWithIdx::Frame(f2, 1)).is_frame()); + assert!(ordering + .handle_frame(FrameWithIdx::Frame(f2.into_generic(), 1)) + .is_frame()); assert_eq!(ordering.next_expected_frame_idx, 2); assert_eq!(ordering.frame_buffer.len(), 2); // now, we push in f4, which is buffered: assert!(ordering - .handle_frame(FrameWithIdx::Frame(f4, 3)) + .handle_frame(FrameWithIdx::Frame(f4.into_generic(), 3)) .is_buffered()); assert_eq!(ordering.next_expected_frame_idx, 2); assert_eq!(ordering.frame_buffer.len(), 3); @@ -377,32 +418,43 @@ mod tests { None => false, }); assert_eq!(ordering.frame_buffer.len(), 0); - assert!(matches!(ordering.maybe_get_next_frame(), None)); + assert!(ordering.maybe_get_next_frame().is_none()); } #[test] fn test_drop_duplicates() { - let mut ordering: FrameOrdering = FrameOrdering::new(0); + let socket_dir = tempdir().unwrap(); + let socket_as_path = socket_dir.into_path().join("test.socket"); + let mut ordering: FrameOrdering = FrameOrdering::new(0); let mut ssa = SharedSlabAllocator::new( 10, K2ISFrame::FRAME_HEIGHT * K2ISFrame::FRAME_WIDTH * std::mem::size_of::(), false, + &socket_as_path, ) .expect("create SHM area for testing"); - let f1 = K2ISFrame::empty(1, &mut ssa); - let f2 = K2ISFrame::empty(2, &mut ssa); - let f3 = K2ISFrame::empty(1, &mut ssa); - let f4 = K2ISFrame::empty(4, &mut ssa); + let f1w: K2ISFrameForWriting = K2ISFrameForWriting::empty(1, &mut ssa, 0); + let f1 = f1w.writing_done(&mut ssa); + let f2w: K2ISFrameForWriting = K2ISFrameForWriting::empty(2, &mut ssa, 0); + let f2 = f2w.writing_done(&mut ssa); + let f3w: K2ISFrameForWriting = K2ISFrameForWriting::empty(3, &mut ssa, 0); + let f3 = f3w.writing_done(&mut ssa); + let f4w: K2ISFrameForWriting = K2ISFrameForWriting::empty(4, &mut ssa, 0); + let f4 = f4w.writing_done(&mut ssa); assert!(ordering - .handle_frame(FrameWithIdx::DroppedFrame(f1, 0)) + .handle_frame(FrameWithIdx::DroppedFrame(f1.into_generic(), 0)) .is_frame()); - assert!(ordering.handle_frame(FrameWithIdx::Frame(f2, 1)).is_frame()); assert!(ordering - .handle_frame(FrameWithIdx::DroppedFrame(f3, 0)) + .handle_frame(FrameWithIdx::Frame(f2.into_generic(), 1)) + .is_frame()); + assert!(ordering + .handle_frame(FrameWithIdx::DroppedFrame(f3.into_generic(), 0)) .is_dropped()); // note duplicate index! - assert!(ordering.handle_frame(FrameWithIdx::Frame(f4, 2)).is_frame()); + assert!(ordering + .handle_frame(FrameWithIdx::Frame(f4.into_generic(), 2)) + .is_frame()); } }