diff --git a/io/zenoh-transport/Cargo.toml b/io/zenoh-transport/Cargo.toml index 0921f4e1ee..5e3393f8f0 100644 --- a/io/zenoh-transport/Cargo.toml +++ b/io/zenoh-transport/Cargo.toml @@ -74,6 +74,7 @@ zenoh-result = { workspace = true } zenoh-shm = { workspace = true, optional = true } zenoh-sync = { workspace = true } zenoh-util = { workspace = true } +itertools = "0.12.0" [dev-dependencies] env_logger = { workspace = true } diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 954c656280..f341220a77 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -19,11 +19,18 @@ use super::{ }; use async_std::prelude::FutureExt; use flume::{bounded, Receiver, Sender}; +use itertools::{chain, Itertools}; use ringbuffer_spsc::{RingBuffer, RingBufferReader, RingBufferWriter}; -use std::sync::atomic::{AtomicBool, AtomicU16, Ordering}; -use std::sync::{Arc, Mutex, MutexGuard}; use std::thread; use std::time::Duration; +use std::{ + iter, + sync::{Arc, Mutex, MutexGuard}, +}; +use std::{ + iter::Peekable, + sync::atomic::{AtomicBool, AtomicU16, Ordering}, +}; use zenoh_buffers::{ reader::{HasReader, Reader}, writer::HasWriter, @@ -573,6 +580,11 @@ impl TransmissionPipeline { } let active = Arc::new(AtomicBool::new(true)); + let prio_iter: Box> + Send + Sync> = { + let weights = vec![1; stage_out.len()]; + Box::new(iwrr_vec_iter(&weights)) + }; + let producer = TransmissionPipelineProducer { stage_in: stage_in.into_boxed_slice().into(), active: active.clone(), @@ -581,6 +593,7 @@ impl TransmissionPipeline { stage_out: stage_out.into_boxed_slice(), n_out_r, active, + prio_iter: prio_iter.peekable(), }; (producer, consumer) @@ -642,14 +655,19 @@ pub(crate) struct TransmissionPipelineConsumer { stage_out: Box<[StageOut]>, n_out_r: Receiver<()>, active: Arc, + prio_iter: Peekable> + Send + Sync>>, } impl TransmissionPipelineConsumer { pub(crate) async fn pull(&mut self) -> Option<(WBatch, usize)> { while self.active.load(Ordering::Relaxed) { + let prios = self.prio_iter.next().unwrap(); + // Calculate the backoff maximum let mut bo = NanoSeconds::MAX; - for (prio, queue) in self.stage_out.iter_mut().enumerate() { + + for prio in prios { + let queue = &mut self.stage_out[prio]; match queue.try_pull() { Pull::Some(batch) => { return Some((batch, prio)); @@ -664,11 +682,13 @@ impl TransmissionPipelineConsumer { } // Wait for the backoff to expire or for a new message - let _ = self - .n_out_r - .recv_async() - .timeout(Duration::from_nanos(bo as u64)) - .await; + if bo != NanoSeconds::MAX { + let _ = self + .n_out_r + .recv_async() + .timeout(Duration::from_nanos(bo as u64)) + .await; + } } None } @@ -702,6 +722,36 @@ impl TransmissionPipelineConsumer { } } +fn iwrr_vec_iter(weights: &[usize]) -> impl Iterator> + Send + Sync { + let w_min = weights + .iter() + .cloned() + .min() + .expect("weights must not be empty"); + assert!(w_min > 0, "weights must be nonzero"); + + let mut indices: Vec = (0..weights.len()).collect(); + indices.sort_unstable_by_key(|&idx| (weights[idx], idx)); + + let sorted_weights: Vec<_> = indices.iter().map(|&idx| weights[idx]).collect(); + indices.reverse(); + + chain!([0], sorted_weights) + .tuple_windows() + .zip((1..=weights.len()).rev()) + .map(|((prev, next), slots)| { + let rounds = next - prev; + (slots, rounds) + }) + .filter(|&(_, rounds)| rounds > 0) + .flat_map(move |(slots, rounds)| { + let mut selected: Vec<_> = indices[0..slots].to_vec(); + selected.sort_unstable(); + iter::repeat(selected).take(rounds) + }) + .cycle() +} + #[cfg(test)] mod tests { use super::*;