Skip to content

Commit

Permalink
Implement IWRR schedule on TransmissionPipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
jerry73204 committed Jan 10, 2024
1 parent fbcbf51 commit 8a023d0
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 8 deletions.
1 change: 1 addition & 0 deletions io/zenoh-transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ zenoh-result = { workspace = true }
zenoh-shm = { workspace = true, optional = true }
zenoh-sync = { workspace = true }
zenoh-util = { workspace = true }
itertools = "0.12.0"

[dev-dependencies]
env_logger = { workspace = true }
Expand Down
66 changes: 58 additions & 8 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,18 @@ use super::{
};
use async_std::prelude::FutureExt;
use flume::{bounded, Receiver, Sender};
use itertools::{chain, Itertools};
use ringbuffer_spsc::{RingBuffer, RingBufferReader, RingBufferWriter};
use std::sync::atomic::{AtomicBool, AtomicU16, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::thread;
use std::time::Duration;
use std::{
iter,
sync::{Arc, Mutex, MutexGuard},
};
use std::{
iter::Peekable,
sync::atomic::{AtomicBool, AtomicU16, Ordering},
};
use zenoh_buffers::{
reader::{HasReader, Reader},
writer::HasWriter,
Expand Down Expand Up @@ -573,6 +580,11 @@ impl TransmissionPipeline {
}

let active = Arc::new(AtomicBool::new(true));
let prio_iter: Box<dyn Iterator<Item = Vec<usize>> + Send + Sync> = {
let weights = vec![1; stage_out.len()];
Box::new(iwrr_vec_iter(&weights))
};

let producer = TransmissionPipelineProducer {
stage_in: stage_in.into_boxed_slice().into(),
active: active.clone(),
Expand All @@ -581,6 +593,7 @@ impl TransmissionPipeline {
stage_out: stage_out.into_boxed_slice(),
n_out_r,
active,
prio_iter: prio_iter.peekable(),
};

(producer, consumer)
Expand Down Expand Up @@ -642,14 +655,19 @@ pub(crate) struct TransmissionPipelineConsumer {
stage_out: Box<[StageOut]>,
n_out_r: Receiver<()>,
active: Arc<AtomicBool>,
prio_iter: Peekable<Box<dyn Iterator<Item = Vec<usize>> + Send + Sync>>,
}

impl TransmissionPipelineConsumer {
pub(crate) async fn pull(&mut self) -> Option<(WBatch, usize)> {
while self.active.load(Ordering::Relaxed) {
let prios = self.prio_iter.next().unwrap();

// Calculate the backoff maximum
let mut bo = NanoSeconds::MAX;
for (prio, queue) in self.stage_out.iter_mut().enumerate() {

for prio in prios {
let queue = &mut self.stage_out[prio];
match queue.try_pull() {
Pull::Some(batch) => {
return Some((batch, prio));
Expand All @@ -664,11 +682,13 @@ impl TransmissionPipelineConsumer {
}

// Wait for the backoff to expire or for a new message
let _ = self
.n_out_r
.recv_async()
.timeout(Duration::from_nanos(bo as u64))
.await;
if bo != NanoSeconds::MAX {
let _ = self
.n_out_r
.recv_async()
.timeout(Duration::from_nanos(bo as u64))
.await;
}
}
None
}
Expand Down Expand Up @@ -702,6 +722,36 @@ impl TransmissionPipelineConsumer {
}
}

fn iwrr_vec_iter(weights: &[usize]) -> impl Iterator<Item = Vec<usize>> + Send + Sync {
let w_min = weights
.iter()
.cloned()
.min()
.expect("weights must not be empty");
assert!(w_min > 0, "weights must be nonzero");

let mut indices: Vec<usize> = (0..weights.len()).collect();
indices.sort_unstable_by_key(|&idx| (weights[idx], idx));

let sorted_weights: Vec<_> = indices.iter().map(|&idx| weights[idx]).collect();
indices.reverse();

chain!([0], sorted_weights)
.tuple_windows()
.zip((1..=weights.len()).rev())
.map(|((prev, next), slots)| {
let rounds = next - prev;
(slots, rounds)
})
.filter(|&(_, rounds)| rounds > 0)
.flat_map(move |(slots, rounds)| {
let mut selected: Vec<_> = indices[0..slots].to_vec();
selected.sort_unstable();
iter::repeat(selected).take(rounds)
})
.cycle()
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit 8a023d0

Please sign in to comment.