From 5199a1c025a0d44dd2013b4dd2a87c925f49e6f9 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Fri, 23 Aug 2024 18:46:11 +0200 Subject: [PATCH 01/16] Rework of pipeline backoff --- Cargo.lock | 2 + Cargo.toml | 1 + commons/zenoh-config/src/defaults.rs | 2 +- commons/zenoh-sync/src/event.rs | 300 ++++++++++++++++++++++ commons/zenoh-sync/src/lib.rs | 3 + io/zenoh-transport/Cargo.toml | 2 + io/zenoh-transport/src/common/pipeline.rs | 216 ++++++++-------- 7 files changed, 422 insertions(+), 104 deletions(-) create mode 100644 commons/zenoh-sync/src/event.rs diff --git a/Cargo.lock b/Cargo.lock index 1de420de99..dc5163b1c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6102,9 +6102,11 @@ name = "zenoh-transport" version = "1.0.0-dev" dependencies = [ "async-trait", + "crossbeam-utils", "flume", "futures", "futures-util", + "lazy_static", "lz4_flex", "paste", "rand 0.8.5", diff --git a/Cargo.toml b/Cargo.toml index e2aac0cb40..15b93b9543 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,6 +90,7 @@ console-subscriber = "0.3.0" const_format = "0.2.30" crc = "3.0.1" criterion = "0.5" +crossbeam-utils = "0.8.2" derive_more = "0.99.17" derive-new = "0.6.0" tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] } diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index c6e69dd148..a00edbacc4 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -201,7 +201,7 @@ impl Default for QueueConf { Self { size: QueueSizeConf::default(), congestion_control: CongestionControlConf::default(), - backoff: 100, + backoff: 1_000_000, } } } diff --git a/commons/zenoh-sync/src/event.rs b/commons/zenoh-sync/src/event.rs new file mode 100644 index 0000000000..3f13134bcd --- /dev/null +++ b/commons/zenoh-sync/src/event.rs @@ -0,0 +1,300 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use event_listener::{Event as EventLib, Listener}; +use std::{ + sync::{ + atomic::{AtomicU16, AtomicU8, Ordering}, + Arc, + }, + time::{Duration, Instant}, +}; + +// Return types +pub struct EventClosed; + +impl std::fmt::Display for EventClosed { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +impl std::fmt::Debug for EventClosed { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("Event Closed") + } +} + +impl std::error::Error for EventClosed {} + +#[repr(u8)] +pub enum WaitDeadline { + Event, + Deadline, +} + +#[repr(u8)] +pub enum WaitTimeout { + Event, + Timeout, +} + +/// This is a Event Variable similar to that provided by POSIX. +/// As for POSIX condition variables, this assumes that a mutex is +/// properly used to coordinate behaviour. In other terms there should +/// not be race condition on [notify_one](Event::notify_one) or +/// [notify_all](Event::notify_all). +/// +struct EventInner { + event: EventLib, + flag: AtomicU8, + notifiers: AtomicU16, + waiters: AtomicU16, +} + +const UNSET: u8 = 0; +const OK: u8 = 1; +const ERR: u8 = 1 << 1; + +#[repr(u8)] +enum EventCheck { + Unset = UNSET, + Ok = OK, + Err = ERR, +} + +#[repr(u8)] +enum EventSet { + Ok = OK, + Err = ERR, +} + +impl EventInner { + fn check(&self) -> EventCheck { + let f = self.flag.fetch_and(!OK, Ordering::SeqCst); + if f & ERR != 0 { + return EventCheck::Err; + } + if f == OK { + return EventCheck::Ok; + } + EventCheck::Unset + } + + fn set(&self) -> EventSet { + let f = self.flag.fetch_or(OK, Ordering::SeqCst); + if f & ERR != 0 { + return EventSet::Err; + } + EventSet::Ok + } + + fn err(&self) { + self.flag.store(ERR, Ordering::SeqCst); + } +} + +#[repr(transparent)] +pub struct Notifier(Arc); + +impl Clone for Notifier { + fn clone(&self) -> Self { + let n = self.0.notifiers.fetch_add(1, Ordering::SeqCst); + // Panic on overflow + assert!(n != 0); + Self(self.0.clone()) + } +} + +impl Drop for Notifier { + fn drop(&mut self) { + let n = self.0.notifiers.fetch_sub(1, Ordering::SeqCst); + if n == 1 { + // The last Notifier has been dropped, close the event and notify everyone + self.0.err(); + self.0.event.notify(usize::MAX); + } + } +} + +#[repr(transparent)] +pub struct Waiter(Arc); + +impl Clone for Waiter { + fn clone(&self) -> Self { + let n = self.0.waiters.fetch_add(1, Ordering::Relaxed); + // Panic on overflow + assert!(n != 0); + Self(self.0.clone()) + } +} + +impl Drop for Waiter { + fn drop(&mut self) { + let n = self.0.waiters.fetch_sub(1, Ordering::SeqCst); + if n == 1 { + // The last Waiter has been dropped, close the event + self.0.err(); + } + } +} + +/// Creates a new condition variable with a given capacity. +/// The capacity indicates the maximum number of tasks that +/// may be waiting on the condition. +pub fn new() -> (Notifier, Waiter) { + let inner = Arc::new(EventInner { + event: EventLib::new(), + flag: AtomicU8::new(UNSET), + notifiers: AtomicU16::new(1), + waiters: AtomicU16::new(1), + }); + (Notifier(inner.clone()), Waiter(inner)) +} + +impl Waiter { + /// Waits for the condition to be notified + #[inline] + pub async fn wait_async(&self) -> Result<(), EventClosed> { + // Wait until the flag is set. + loop { + // Check the flag. + match self.0.check() { + EventCheck::Ok => break, + EventCheck::Err => return Err(EventClosed), + EventCheck::Unset => {} + } + + // Start listening for events. + let listener = self.0.event.listen(); + + // Check the flag again after creating the listener. + match self.0.check() { + EventCheck::Ok => break, + EventCheck::Err => return Err(EventClosed), + EventCheck::Unset => {} + } + + // Wait for a notification and continue the loop. + listener.await; + } + + Ok(()) + } + + /// Waits for the condition to be notified + #[inline] + pub fn wait(&self) -> Result<(), EventClosed> { + // Wait until the flag is set. + loop { + // Check the flag. + match self.0.check() { + EventCheck::Ok => break, + EventCheck::Err => return Err(EventClosed), + EventCheck::Unset => {} + } + + // Start listening for events. + let listener = self.0.event.listen(); + + // Check the flag again after creating the listener. + match self.0.check() { + EventCheck::Ok => break, + EventCheck::Err => return Err(EventClosed), + EventCheck::Unset => {} + } + + // Wait for a notification and continue the loop. + listener.wait(); + } + + Ok(()) + } + + /// Waits for the condition to be notified + #[inline] + pub fn wait_deadline(&self, deadline: Instant) -> Result { + // Wait until the flag is set. + loop { + // Check the flag. + match self.0.check() { + EventCheck::Ok => break, + EventCheck::Err => return Err(EventClosed), + EventCheck::Unset => {} + } + + // Start listening for events. + let listener = self.0.event.listen(); + + // Check the flag again after creating the listener. + match self.0.check() { + EventCheck::Ok => break, + EventCheck::Err => return Err(EventClosed), + EventCheck::Unset => {} + } + + // Wait for a notification and continue the loop. + if listener.wait_deadline(deadline).is_none() { + return Ok(WaitDeadline::Deadline); + } + } + + Ok(WaitDeadline::Event) + } + + /// Waits for the condition to be notified + #[inline] + pub fn wait_timeout(&self, timeout: Duration) -> Result { + // Wait until the flag is set. + loop { + // Check the flag. + match self.0.check() { + EventCheck::Ok => break, + EventCheck::Err => return Err(EventClosed), + EventCheck::Unset => {} + } + + // Start listening for events. + let listener = self.0.event.listen(); + + // Check the flag again after creating the listener. + match self.0.check() { + EventCheck::Ok => break, + EventCheck::Err => return Err(EventClosed), + EventCheck::Unset => {} + } + + // Wait for a notification and continue the loop. + if listener.wait_timeout(timeout).is_none() { + return Ok(WaitTimeout::Timeout); + } + } + + Ok(WaitTimeout::Event) + } +} + +impl Notifier { + /// Notifies one pending listener + #[inline] + pub fn notify(&self) -> Result<(), EventClosed> { + // Set the flag. + match self.0.set() { + EventSet::Ok => { + self.0.event.notify_additional_relaxed(1); + Ok(()) + } + EventSet::Err => Err(EventClosed), + } + } +} diff --git a/commons/zenoh-sync/src/lib.rs b/commons/zenoh-sync/src/lib.rs index 20e95d2bb8..8289b29fbb 100644 --- a/commons/zenoh-sync/src/lib.rs +++ b/commons/zenoh-sync/src/lib.rs @@ -25,6 +25,9 @@ use std::{ use futures::FutureExt; +pub mod event; +pub use event::*; + pub mod fifo_queue; pub use fifo_queue::*; diff --git a/io/zenoh-transport/Cargo.toml b/io/zenoh-transport/Cargo.toml index c1a2c9b8ae..a3dabbae0e 100644 --- a/io/zenoh-transport/Cargo.toml +++ b/io/zenoh-transport/Cargo.toml @@ -52,6 +52,7 @@ default = ["test", "transport_multilink"] [dependencies] async-trait = { workspace = true } +crossbeam-utils = { workspace = true } tokio = { workspace = true, features = [ "sync", "fs", @@ -61,6 +62,7 @@ tokio = { workspace = true, features = [ "io-util", "net", ] } +lazy_static = { workspace = true } tokio-util = { workspace = true, features = ["rt"]} flume = { workspace = true } tracing = {workspace = true} diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 68a4b87d24..d97cb08efc 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -1,13 +1,31 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use super::{ + batch::{Encode, WBatch}, + priority::{TransportChannelTx, TransportPriorityTx}, +}; +use crate::common::batch::BatchConfig; +use crossbeam_utils::CachePadded; +use ringbuffer_spsc::{RingBuffer, RingBufferReader, RingBufferWriter}; use std::{ sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU32, Ordering}, Arc, Mutex, MutexGuard, }, time::{Duration, Instant}, + u64, }; - -use flume::{bounded, Receiver, Sender}; -use ringbuffer_spsc::{RingBuffer, RingBufferReader, RingBufferWriter}; use zenoh_buffers::{ reader::{HasReader, Reader}, writer::HasWriter, @@ -25,35 +43,13 @@ use zenoh_protocol::{ AtomicBatchSize, BatchSize, TransportMessage, }, }; - -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -use super::{ - batch::{Encode, WBatch}, - priority::{TransportChannelTx, TransportPriorityTx}, -}; -use crate::common::batch::BatchConfig; - -// It's faster to work directly with nanoseconds. -// Backoff will never last more the u32::MAX nanoseconds. -type NanoSeconds = u32; +use zenoh_sync::{event, Notifier, Waiter}; const RBLEN: usize = QueueSizeConf::MAX; // Inner structure to reuse serialization batches struct StageInRefill { - n_ref_r: Receiver<()>, + n_ref_r: Waiter, s_ref_r: RingBufferReader, } @@ -63,36 +59,48 @@ impl StageInRefill { } fn wait(&self) -> bool { - self.n_ref_r.recv().is_ok() + self.n_ref_r.wait().is_ok() } fn wait_deadline(&self, instant: Instant) -> bool { - self.n_ref_r.recv_deadline(instant).is_ok() + self.n_ref_r.wait_deadline(instant).is_ok() } } +lazy_static::lazy_static! { + static ref LOCAL_EPOCH: Instant = Instant::now(); +} + +type AtomicMicroSeconds = AtomicU32; +type MicroSeconds = u32; + +struct AtomicBackoff { + active: CachePadded, + bytes: CachePadded, + first_write: CachePadded, +} + // Inner structure to link the initial stage with the final stage of the pipeline struct StageInOut { - n_out_w: Sender<()>, + n_out_w: Notifier, s_out_w: RingBufferWriter, - bytes: Arc, - backoff: Arc, + atomic_backoff: Arc, } impl StageInOut { #[inline] fn notify(&self, bytes: BatchSize) { - self.bytes.store(bytes, Ordering::Relaxed); - if !self.backoff.load(Ordering::Relaxed) { - let _ = self.n_out_w.try_send(()); + self.atomic_backoff.bytes.store(bytes, Ordering::Relaxed); + if !self.atomic_backoff.active.load(Ordering::Relaxed) { + let _ = self.n_out_w.notify(); } } #[inline] fn move_batch(&mut self, batch: WBatch) { let _ = self.s_out_w.push(batch); - self.bytes.store(0, Ordering::Relaxed); - let _ = self.n_out_w.try_send(()); + self.atomic_backoff.bytes.store(0, Ordering::Relaxed); + let _ = self.n_out_w.notify(); } } @@ -145,6 +153,7 @@ impl StageIn { None => match self.s_ref.pull() { Some(mut batch) => { batch.clear(); + self.s_out.atomic_backoff.first_write.store(LOCAL_EPOCH.elapsed().as_micros() as MicroSeconds, Ordering::Relaxed); break batch; } None => { @@ -299,6 +308,10 @@ impl StageIn { None => match self.s_ref.pull() { Some(mut batch) => { batch.clear(); + self.s_out.atomic_backoff.first_write.store( + LOCAL_EPOCH.elapsed().as_micros() as MicroSeconds, + Ordering::Relaxed, + ); break batch; } None => { @@ -333,7 +346,6 @@ impl StageIn { // Get the current serialization batch. let mut batch = zgetbatch_rets!(); // Attempt the serialization on the current batch - // Attempt the serialization on the current batch match batch.encode(&msg) { Ok(_) => zretok!(batch), Err(_) => { @@ -354,54 +366,27 @@ impl StageIn { enum Pull { Some(WBatch), None, - Backoff(NanoSeconds), + Backoff(MicroSeconds), } // Inner structure to keep track and signal backoff operations #[derive(Clone)] struct Backoff { - tslot: NanoSeconds, - retry_time: NanoSeconds, + threshold: Duration, last_bytes: BatchSize, - bytes: Arc, - backoff: Arc, + atomic: Arc, + // active: bool, } impl Backoff { - fn new(tslot: NanoSeconds, bytes: Arc, backoff: Arc) -> Self { + fn new(threshold: Duration, atomic: Arc) -> Self { Self { - tslot, - retry_time: 0, + threshold, last_bytes: 0, - bytes, - backoff, - } - } - - fn next(&mut self) { - if self.retry_time == 0 { - self.retry_time = self.tslot; - self.backoff.store(true, Ordering::Relaxed); - } else { - match self.retry_time.checked_mul(2) { - Some(rt) => { - self.retry_time = rt; - } - None => { - self.retry_time = NanoSeconds::MAX; - tracing::warn!( - "Pipeline pull backoff overflow detected! Retrying in {}ns.", - self.retry_time - ); - } - } + atomic, + // active: false, } } - - fn reset(&mut self) { - self.retry_time = 0; - self.backoff.store(false, Ordering::Relaxed); - } } // Inner structure to link the final stage with the initial stage of the pipeline @@ -422,13 +407,35 @@ impl StageOutIn { } fn try_pull_deep(&mut self) -> Pull { - let new_bytes = self.backoff.bytes.load(Ordering::Relaxed); - let old_bytes = self.backoff.last_bytes; - self.backoff.last_bytes = new_bytes; + // Verify first backoff is not active + let mut pull = !self.backoff.atomic.active.load(Ordering::Relaxed); + + // If backoff is active, verify the current number of bytes is equal to the old number + // of bytes seen in the previous backoff iteration + if !pull { + let new_bytes = self.backoff.atomic.bytes.load(Ordering::Relaxed); + let old_bytes = self.backoff.last_bytes; + self.backoff.last_bytes = new_bytes; + + pull = new_bytes == old_bytes; + } - if new_bytes == old_bytes { + // Verify that we have not been doing backoff for too long + let mut diff = 0; + let mut threshold = 0; + if !pull { + diff = LOCAL_EPOCH.elapsed().as_micros() as MicroSeconds + - self.backoff.atomic.first_write.load(Ordering::Relaxed); + threshold = self.backoff.threshold.as_micros() as MicroSeconds; + + pull = diff > threshold; + } + + if pull { // It seems no new bytes have been written on the batch, try to pull if let Ok(mut g) = self.current.try_lock() { + self.backoff.atomic.active.store(false, Ordering::Relaxed); + // First try to pull from stage OUT to make sure we are not in the case // where new_bytes == old_bytes are because of two identical serializations if let Some(batch) = self.s_out_r.pull() { @@ -445,24 +452,25 @@ impl StageOutIn { } } } - // Go to backoff } + // Activate backoff + self.backoff.atomic.active.store(true, Ordering::Relaxed); + // Do backoff - self.backoff.next(); - Pull::Backoff(self.backoff.retry_time) + Pull::Backoff(threshold - diff) } } struct StageOutRefill { - n_ref_w: Sender<()>, + n_ref_w: Notifier, s_ref_w: RingBufferWriter, } impl StageOutRefill { fn refill(&mut self, batch: WBatch) { assert!(self.s_ref_w.push(batch).is_none()); - let _ = self.n_ref_w.try_send(()); + let _ = self.n_ref_w.notify(); } } @@ -525,7 +533,7 @@ impl TransmissionPipeline { // Create the channel for notifying that new batches are in the out ring buffer // This is a MPSC channel - let (n_out_w, n_out_r) = bounded(1); + let (n_out_w, n_out_r) = event::new(); for (prio, num) in size_iter.enumerate() { assert!(*num != 0 && *num <= RBLEN); @@ -540,22 +548,26 @@ impl TransmissionPipeline { } // Create the channel for notifying that new batches are in the refill ring buffer // This is a SPSC channel - let (n_ref_w, n_ref_r) = bounded(1); + let (n_ref_w, n_ref_r) = event::new(); // Create the refill ring buffer // This is a SPSC ring buffer let (s_out_w, s_out_r) = RingBuffer::::init(); let current = Arc::new(Mutex::new(None)); - let bytes = Arc::new(AtomicBatchSize::new(0)); - let backoff = Arc::new(AtomicBool::new(false)); + let bytes = Arc::new(AtomicBackoff { + active: CachePadded::new(AtomicBool::new(false)), + bytes: CachePadded::new(AtomicBatchSize::new(0)), + first_write: CachePadded::new(AtomicMicroSeconds::new( + LOCAL_EPOCH.elapsed().as_micros() as MicroSeconds, + )), + }); stage_in.push(Mutex::new(StageIn { s_ref: StageInRefill { n_ref_r, s_ref_r }, s_out: StageInOut { n_out_w: n_out_w.clone(), s_out_w, - bytes: bytes.clone(), - backoff: backoff.clone(), + atomic_backoff: bytes.clone(), }, mutex: StageInMutex { current: current.clone(), @@ -570,7 +582,7 @@ impl TransmissionPipeline { s_in: StageOutIn { s_out_r, current, - backoff: Backoff::new(config.backoff.as_nanos() as NanoSeconds, bytes, backoff), + backoff: Backoff::new(config.backoff, bytes), }, s_ref: StageOutRefill { n_ref_w, s_ref_w }, }); @@ -652,28 +664,23 @@ impl TransmissionPipelineProducer { pub(crate) struct TransmissionPipelineConsumer { // A single Mutex for all the priority queues stage_out: Box<[StageOut]>, - n_out_r: Receiver<()>, + n_out_r: Waiter, active: Arc, } impl TransmissionPipelineConsumer { pub(crate) async fn pull(&mut self) -> Option<(WBatch, usize)> { - // Reset backoff before pulling - for queue in self.stage_out.iter_mut() { - queue.s_in.backoff.reset(); - } - while self.active.load(Ordering::Relaxed) { + let mut backoff = MicroSeconds::MAX; // Calculate the backoff maximum - let mut bo = NanoSeconds::MAX; for (prio, queue) in self.stage_out.iter_mut().enumerate() { match queue.try_pull() { Pull::Some(batch) => { return Some((batch, prio)); } - Pull::Backoff(b) => { - if b < bo { - bo = b; + Pull::Backoff(deadline) => { + if deadline < backoff { + backoff = deadline; } } Pull::None => {} @@ -684,12 +691,15 @@ impl TransmissionPipelineConsumer { // While trying to pull from the queue, the stage_in `lock()` will most likely taken, leading to // a spinning behaviour while attempting to take the lock. Yield the current task to avoid // spinning the current task indefinitely. + // std::thread::yield_now(); tokio::task::yield_now().await; // Wait for the backoff to expire or for a new message - let res = - tokio::time::timeout(Duration::from_nanos(bo as u64), self.n_out_r.recv_async()) - .await; + let res = tokio::time::timeout( + Duration::from_micros(backoff as u64), + self.n_out_r.wait_async(), + ) + .await; match res { Ok(Ok(())) => { // We have received a notification from the channel that some bytes are available, retry to pull. From 9fa3c0d7ee667a86817c93b2ca3a3081ed998f35 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Mon, 26 Aug 2024 09:48:49 +0200 Subject: [PATCH 02/16] Cargo fmt --- commons/zenoh-sync/src/event.rs | 3 ++- io/zenoh-transport/src/common/pipeline.rs | 16 +++++++++------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/commons/zenoh-sync/src/event.rs b/commons/zenoh-sync/src/event.rs index 3f13134bcd..0146ba8811 100644 --- a/commons/zenoh-sync/src/event.rs +++ b/commons/zenoh-sync/src/event.rs @@ -11,7 +11,6 @@ // Contributors: // ZettaScale Zenoh Team, // -use event_listener::{Event as EventLib, Listener}; use std::{ sync::{ atomic::{AtomicU16, AtomicU8, Ordering}, @@ -20,6 +19,8 @@ use std::{ time::{Duration, Instant}, }; +use event_listener::{Event as EventLib, Listener}; + // Return types pub struct EventClosed; diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index d97cb08efc..dbbaf48532 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -11,13 +11,6 @@ // Contributors: // ZettaScale Zenoh Team, // -use super::{ - batch::{Encode, WBatch}, - priority::{TransportChannelTx, TransportPriorityTx}, -}; -use crate::common::batch::BatchConfig; -use crossbeam_utils::CachePadded; -use ringbuffer_spsc::{RingBuffer, RingBufferReader, RingBufferWriter}; use std::{ sync::{ atomic::{AtomicBool, AtomicU32, Ordering}, @@ -26,6 +19,9 @@ use std::{ time::{Duration, Instant}, u64, }; + +use crossbeam_utils::CachePadded; +use ringbuffer_spsc::{RingBuffer, RingBufferReader, RingBufferWriter}; use zenoh_buffers::{ reader::{HasReader, Reader}, writer::HasWriter, @@ -45,6 +41,12 @@ use zenoh_protocol::{ }; use zenoh_sync::{event, Notifier, Waiter}; +use super::{ + batch::{Encode, WBatch}, + priority::{TransportChannelTx, TransportPriorityTx}, +}; +use crate::common::batch::BatchConfig; + const RBLEN: usize = QueueSizeConf::MAX; // Inner structure to reuse serialization batches From 1ff4fa0555ccbdeac0ce4b7ad1defd8ceba49861 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Mon, 26 Aug 2024 10:20:48 +0200 Subject: [PATCH 03/16] Fic backoff calculation --- io/zenoh-transport/src/common/pipeline.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index dbbaf48532..c585f96433 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -423,14 +423,17 @@ impl StageOutIn { } // Verify that we have not been doing backoff for too long - let mut diff = 0; - let mut threshold = 0; + let mut backoff = 0; if !pull { - diff = LOCAL_EPOCH.elapsed().as_micros() as MicroSeconds + let diff = LOCAL_EPOCH.elapsed().as_micros() as MicroSeconds - self.backoff.atomic.first_write.load(Ordering::Relaxed); - threshold = self.backoff.threshold.as_micros() as MicroSeconds; + let threshold = self.backoff.threshold.as_micros() as MicroSeconds; - pull = diff > threshold; + if diff >= threshold { + pull = true; + } else { + backoff = threshold - diff; + } } if pull { @@ -460,7 +463,7 @@ impl StageOutIn { self.backoff.atomic.active.store(true, Ordering::Relaxed); // Do backoff - Pull::Backoff(threshold - diff) + Pull::Backoff(backoff) } } From 2813f75e77f86e9cb34d858775540f7f86b807e2 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Mon, 26 Aug 2024 10:21:32 +0200 Subject: [PATCH 04/16] Fix backoff calculation --- commons/zenoh-shm/src/header/storage.rs | 2 +- commons/zenoh-shm/src/header/subscription.rs | 5 ++--- commons/zenoh-shm/src/watchdog/confirmator.rs | 5 ++--- commons/zenoh-shm/src/watchdog/storage.rs | 2 +- 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/commons/zenoh-shm/src/header/storage.rs b/commons/zenoh-shm/src/header/storage.rs index 7d4c06cd2a..db556937d0 100644 --- a/commons/zenoh-shm/src/header/storage.rs +++ b/commons/zenoh-shm/src/header/storage.rs @@ -25,7 +25,7 @@ use super::{ segment::HeaderSegment, }; -#[dynamic(lazy,drop)] +#[dynamic(lazy, drop)] pub static mut GLOBAL_HEADER_STORAGE: HeaderStorage = HeaderStorage::new(32768usize).unwrap(); pub struct HeaderStorage { diff --git a/commons/zenoh-shm/src/header/subscription.rs b/commons/zenoh-shm/src/header/subscription.rs index 6259877302..6f92960aaa 100644 --- a/commons/zenoh-shm/src/header/subscription.rs +++ b/commons/zenoh-shm/src/header/subscription.rs @@ -24,9 +24,8 @@ use super::{ segment::HeaderSegment, }; -#[dynamic(lazy,drop)] - pub static mut GLOBAL_HEADER_SUBSCRIPTION: Subscription = Subscription::new(); - +#[dynamic(lazy, drop)] +pub static mut GLOBAL_HEADER_SUBSCRIPTION: Subscription = Subscription::new(); pub struct Subscription { linked_table: Mutex>>, diff --git a/commons/zenoh-shm/src/watchdog/confirmator.rs b/commons/zenoh-shm/src/watchdog/confirmator.rs index 9d87adfb97..1a9ac0f04f 100644 --- a/commons/zenoh-shm/src/watchdog/confirmator.rs +++ b/commons/zenoh-shm/src/watchdog/confirmator.rs @@ -27,10 +27,9 @@ use super::{ segment::Segment, }; -#[dynamic(lazy,drop)] +#[dynamic(lazy, drop)] pub static mut GLOBAL_CONFIRMATOR: WatchdogConfirmator = - WatchdogConfirmator::new(Duration::from_millis(50)); - + WatchdogConfirmator::new(Duration::from_millis(50)); pub struct ConfirmedDescriptor { pub owned: OwnedDescriptor, diff --git a/commons/zenoh-shm/src/watchdog/storage.rs b/commons/zenoh-shm/src/watchdog/storage.rs index 48fa4cde40..ff9772961c 100644 --- a/commons/zenoh-shm/src/watchdog/storage.rs +++ b/commons/zenoh-shm/src/watchdog/storage.rs @@ -21,7 +21,7 @@ use zenoh_result::{zerror, ZResult}; use super::{allocated_watchdog::AllocatedWatchdog, descriptor::OwnedDescriptor, segment::Segment}; -#[dynamic(lazy,drop)] +#[dynamic(lazy, drop)] pub static mut GLOBAL_STORAGE: WatchdogStorage = WatchdogStorage::new(32768usize).unwrap(); pub struct WatchdogStorage { From 03d4c2b690103d61e5cd5a614590dac75d80bab2 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Mon, 26 Aug 2024 10:55:05 +0200 Subject: [PATCH 05/16] Fix lint --- io/zenoh-transport/src/common/pipeline.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index c585f96433..530a6c0ced 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -17,7 +17,6 @@ use std::{ Arc, Mutex, MutexGuard, }, time::{Duration, Instant}, - u64, }; use crossbeam_utils::CachePadded; From 2899f06dc1a303f9302c56eb631628bad750ac93 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Mon, 26 Aug 2024 11:26:38 +0200 Subject: [PATCH 06/16] Add event tests --- commons/zenoh-sync/src/event.rs | 124 +++++++++++++++++++++++++++++--- 1 file changed, 113 insertions(+), 11 deletions(-) diff --git a/commons/zenoh-sync/src/event.rs b/commons/zenoh-sync/src/event.rs index 0146ba8811..1cd26cbd7c 100644 --- a/commons/zenoh-sync/src/event.rs +++ b/commons/zenoh-sync/src/event.rs @@ -150,9 +150,9 @@ impl Drop for Waiter { } } -/// Creates a new condition variable with a given capacity. -/// The capacity indicates the maximum number of tasks that -/// may be waiting on the condition. +/// Creates a new lock-free event variable. Every time a [`Notifier`] calls ['Notifier::notify`], one [`Waiter`] will be waken-up. +/// If no waiter is waiting when the `notify` is called, the notification will not be lost. That means the next waiter will return +/// immediately when calling `wait`. pub fn new() -> (Notifier, Waiter) { let inner = Arc::new(EventInner { event: EventLib::new(), @@ -172,8 +172,8 @@ impl Waiter { // Check the flag. match self.0.check() { EventCheck::Ok => break, - EventCheck::Err => return Err(EventClosed), EventCheck::Unset => {} + EventCheck::Err => return Err(EventClosed), } // Start listening for events. @@ -182,8 +182,8 @@ impl Waiter { // Check the flag again after creating the listener. match self.0.check() { EventCheck::Ok => break, - EventCheck::Err => return Err(EventClosed), EventCheck::Unset => {} + EventCheck::Err => return Err(EventClosed), } // Wait for a notification and continue the loop. @@ -201,8 +201,8 @@ impl Waiter { // Check the flag. match self.0.check() { EventCheck::Ok => break, - EventCheck::Err => return Err(EventClosed), EventCheck::Unset => {} + EventCheck::Err => return Err(EventClosed), } // Start listening for events. @@ -211,8 +211,8 @@ impl Waiter { // Check the flag again after creating the listener. match self.0.check() { EventCheck::Ok => break, - EventCheck::Err => return Err(EventClosed), EventCheck::Unset => {} + EventCheck::Err => return Err(EventClosed), } // Wait for a notification and continue the loop. @@ -230,8 +230,8 @@ impl Waiter { // Check the flag. match self.0.check() { EventCheck::Ok => break, - EventCheck::Err => return Err(EventClosed), EventCheck::Unset => {} + EventCheck::Err => return Err(EventClosed), } // Start listening for events. @@ -240,8 +240,8 @@ impl Waiter { // Check the flag again after creating the listener. match self.0.check() { EventCheck::Ok => break, - EventCheck::Err => return Err(EventClosed), EventCheck::Unset => {} + EventCheck::Err => return Err(EventClosed), } // Wait for a notification and continue the loop. @@ -261,8 +261,8 @@ impl Waiter { // Check the flag. match self.0.check() { EventCheck::Ok => break, - EventCheck::Err => return Err(EventClosed), EventCheck::Unset => {} + EventCheck::Err => return Err(EventClosed), } // Start listening for events. @@ -271,8 +271,8 @@ impl Waiter { // Check the flag again after creating the listener. match self.0.check() { EventCheck::Ok => break, - EventCheck::Err => return Err(EventClosed), EventCheck::Unset => {} + EventCheck::Err => return Err(EventClosed), } // Wait for a notification and continue the loop. @@ -299,3 +299,105 @@ impl Notifier { } } } + +mod tests { + #[test] + fn event_steps() { + use crate::{EventClosed, WaitTimeout}; + use std::sync::{Arc, Barrier}; + use std::time::Duration; + + let barrier = Arc::new(Barrier::new(2)); + + let (notifier, waiter) = super::new(); + + let tslot = Duration::from_secs(1); + + let bs = barrier.clone(); + let s = std::thread::spawn(move || { + // 1 + match waiter.wait_timeout(tslot) { + Ok(WaitTimeout::Event) => {} + Ok(WaitTimeout::Timeout) => panic!("Timeout {:#?}", tslot), + Err(EventClosed) => panic!("Event closed"), + } + + bs.wait(); + + // 2 + std::thread::sleep(tslot); + + match waiter.wait_timeout(tslot) { + Ok(WaitTimeout::Event) => {} + Ok(WaitTimeout::Timeout) => panic!("Timeout {:#?}", tslot), + Err(EventClosed) => panic!("Event closed"), + } + + match waiter.wait_timeout(tslot) { + Ok(WaitTimeout::Event) => panic!("Event Ok but it should be Timeout"), + Ok(WaitTimeout::Timeout) => {} + Err(EventClosed) => panic!("Event closed"), + } + + bs.wait(); + }); + + let bp = barrier.clone(); + let p = std::thread::spawn(move || { + // 1 + notifier.notify().unwrap(); + + bp.wait(); + + // 2 + notifier.notify().unwrap(); + notifier.notify().unwrap(); + + bp.wait(); + }); + + s.join().unwrap(); + p.join().unwrap(); + } + + #[test] + fn event_loop() { + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::time::{Duration, Instant}; + + const N: usize = 1_000; + static COUNTER: AtomicUsize = AtomicUsize::new(0); + + let (notifier, waiter) = super::new(); + + let s = std::thread::spawn(move || { + for _ in 0..N { + waiter.wait().unwrap(); + COUNTER.fetch_add(1, Ordering::Relaxed); + } + }); + let p = std::thread::spawn(move || { + for _ in 0..N { + notifier.notify().unwrap(); + std::thread::sleep(Duration::from_millis(1)); + } + }); + + let start = Instant::now(); + let tout = Duration::from_secs(60); + loop { + let n = COUNTER.load(Ordering::Relaxed); + if n == N { + break; + } + if start.elapsed() > tout { + panic!("Timeout {:#?}. Counter: {n}/{N}", tout); + } + + std::thread::sleep(Duration::from_secs(1)); + } + + s.join().unwrap(); + p.join().unwrap(); + } +} From 02858669b3b211358833b64d0580cf4dceebe827 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Mon, 26 Aug 2024 12:30:24 +0200 Subject: [PATCH 07/16] Improve event tests --- commons/zenoh-sync/src/event.rs | 48 ++++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 15 deletions(-) diff --git a/commons/zenoh-sync/src/event.rs b/commons/zenoh-sync/src/event.rs index 1cd26cbd7c..6fd05ca214 100644 --- a/commons/zenoh-sync/src/event.rs +++ b/commons/zenoh-sync/src/event.rs @@ -11,7 +11,9 @@ // Contributors: // ZettaScale Zenoh Team, // +use event_listener::{Event as EventLib, Listener}; use std::{ + fmt, sync::{ atomic::{AtomicU16, AtomicU8, Ordering}, Arc, @@ -19,19 +21,17 @@ use std::{ time::{Duration, Instant}, }; -use event_listener::{Event as EventLib, Listener}; - // Return types pub struct EventClosed; -impl std::fmt::Display for EventClosed { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl fmt::Display for EventClosed { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{:?}", self) } } -impl std::fmt::Debug for EventClosed { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl fmt::Debug for EventClosed { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("Event Closed") } } @@ -304,8 +304,10 @@ mod tests { #[test] fn event_steps() { use crate::{EventClosed, WaitTimeout}; - use std::sync::{Arc, Barrier}; - use std::time::Duration; + use std::{ + sync::{Arc, Barrier}, + time::Duration, + }; let barrier = Arc::new(Barrier::new(2)); @@ -315,7 +317,7 @@ mod tests { let bs = barrier.clone(); let s = std::thread::spawn(move || { - // 1 + // 1 - Wait one notification match waiter.wait_timeout(tslot) { Ok(WaitTimeout::Event) => {} Ok(WaitTimeout::Timeout) => panic!("Timeout {:#?}", tslot), @@ -324,8 +326,8 @@ mod tests { bs.wait(); - // 2 - std::thread::sleep(tslot); + // 2 - Being notified twice but waiting only once + bs.wait(); match waiter.wait_timeout(tslot) { Ok(WaitTimeout::Event) => {} @@ -340,20 +342,34 @@ mod tests { } bs.wait(); + + // 3 - Notifier has been dropped + bs.wait(); + + waiter.wait().unwrap_err(); + + bs.wait(); }); let bp = barrier.clone(); let p = std::thread::spawn(move || { - // 1 + // 1 - Notify once notifier.notify().unwrap(); bp.wait(); - // 2 + // 2 - Notify twice notifier.notify().unwrap(); notifier.notify().unwrap(); bp.wait(); + bp.wait(); + + // 3 - Drop notifier yielding an error in the waiter + drop(notifier); + + bp.wait(); + bp.wait(); }); s.join().unwrap(); @@ -362,8 +378,10 @@ mod tests { #[test] fn event_loop() { - use std::sync::atomic::{AtomicUsize, Ordering}; - use std::time::{Duration, Instant}; + use std::{ + sync::atomic::{AtomicUsize, Ordering}, + time::{Duration, Instant}, + }; const N: usize = 1_000; static COUNTER: AtomicUsize = AtomicUsize::new(0); From a6a001761faee4389addfd2af7b9bdb1bca6b1a3 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Mon, 26 Aug 2024 12:43:35 +0200 Subject: [PATCH 08/16] Update event API --- commons/zenoh-sync/src/event.rs | 108 +++++++++++++++++++++----------- 1 file changed, 71 insertions(+), 37 deletions(-) diff --git a/commons/zenoh-sync/src/event.rs b/commons/zenoh-sync/src/event.rs index 6fd05ca214..615f6c6aa1 100644 --- a/commons/zenoh-sync/src/event.rs +++ b/commons/zenoh-sync/src/event.rs @@ -21,35 +21,69 @@ use std::{ time::{Duration, Instant}, }; -// Return types -pub struct EventClosed; +// Error types +pub struct WaitError; -impl fmt::Display for EventClosed { +impl fmt::Display for WaitError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{:?}", self) } } -impl fmt::Debug for EventClosed { +impl fmt::Debug for WaitError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("Event Closed") } } -impl std::error::Error for EventClosed {} +impl std::error::Error for WaitError {} #[repr(u8)] -pub enum WaitDeadline { - Event, +pub enum WaitDeadlineError { Deadline, + WaitError, } +impl fmt::Display for WaitDeadlineError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self) + } +} + +impl fmt::Debug for WaitDeadlineError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Deadline => f.write_str("Deadline reached"), + Self::WaitError => f.write_str("Event Closed"), + } + } +} + +impl std::error::Error for WaitDeadlineError {} + #[repr(u8)] -pub enum WaitTimeout { - Event, +pub enum WaitTimeoutError { Timeout, + WaitError, +} + +impl fmt::Display for WaitTimeoutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self) + } +} + +impl fmt::Debug for WaitTimeoutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Timeout => f.write_str("Timeout expired"), + Self::WaitError => f.write_str("Event Closed"), + } + } } +impl std::error::Error for WaitTimeoutError {} + /// This is a Event Variable similar to that provided by POSIX. /// As for POSIX condition variables, this assumes that a mutex is /// properly used to coordinate behaviour. In other terms there should @@ -166,14 +200,14 @@ pub fn new() -> (Notifier, Waiter) { impl Waiter { /// Waits for the condition to be notified #[inline] - pub async fn wait_async(&self) -> Result<(), EventClosed> { + pub async fn wait_async(&self) -> Result<(), WaitError> { // Wait until the flag is set. loop { // Check the flag. match self.0.check() { EventCheck::Ok => break, EventCheck::Unset => {} - EventCheck::Err => return Err(EventClosed), + EventCheck::Err => return Err(WaitError), } // Start listening for events. @@ -183,7 +217,7 @@ impl Waiter { match self.0.check() { EventCheck::Ok => break, EventCheck::Unset => {} - EventCheck::Err => return Err(EventClosed), + EventCheck::Err => return Err(WaitError), } // Wait for a notification and continue the loop. @@ -195,14 +229,14 @@ impl Waiter { /// Waits for the condition to be notified #[inline] - pub fn wait(&self) -> Result<(), EventClosed> { + pub fn wait(&self) -> Result<(), WaitError> { // Wait until the flag is set. loop { // Check the flag. match self.0.check() { EventCheck::Ok => break, EventCheck::Unset => {} - EventCheck::Err => return Err(EventClosed), + EventCheck::Err => return Err(WaitError), } // Start listening for events. @@ -212,7 +246,7 @@ impl Waiter { match self.0.check() { EventCheck::Ok => break, EventCheck::Unset => {} - EventCheck::Err => return Err(EventClosed), + EventCheck::Err => return Err(WaitError), } // Wait for a notification and continue the loop. @@ -224,14 +258,14 @@ impl Waiter { /// Waits for the condition to be notified #[inline] - pub fn wait_deadline(&self, deadline: Instant) -> Result { + pub fn wait_deadline(&self, deadline: Instant) -> Result<(), WaitDeadlineError> { // Wait until the flag is set. loop { // Check the flag. match self.0.check() { EventCheck::Ok => break, EventCheck::Unset => {} - EventCheck::Err => return Err(EventClosed), + EventCheck::Err => return Err(WaitDeadlineError::WaitError), } // Start listening for events. @@ -241,28 +275,28 @@ impl Waiter { match self.0.check() { EventCheck::Ok => break, EventCheck::Unset => {} - EventCheck::Err => return Err(EventClosed), + EventCheck::Err => return Err(WaitDeadlineError::WaitError), } // Wait for a notification and continue the loop. if listener.wait_deadline(deadline).is_none() { - return Ok(WaitDeadline::Deadline); + return Ok(()); } } - Ok(WaitDeadline::Event) + Ok(()) } /// Waits for the condition to be notified #[inline] - pub fn wait_timeout(&self, timeout: Duration) -> Result { + pub fn wait_timeout(&self, timeout: Duration) -> Result<(), WaitTimeoutError> { // Wait until the flag is set. loop { // Check the flag. match self.0.check() { EventCheck::Ok => break, EventCheck::Unset => {} - EventCheck::Err => return Err(EventClosed), + EventCheck::Err => return Err(WaitTimeoutError::WaitError), } // Start listening for events. @@ -272,30 +306,30 @@ impl Waiter { match self.0.check() { EventCheck::Ok => break, EventCheck::Unset => {} - EventCheck::Err => return Err(EventClosed), + EventCheck::Err => return Err(WaitTimeoutError::WaitError), } // Wait for a notification and continue the loop. if listener.wait_timeout(timeout).is_none() { - return Ok(WaitTimeout::Timeout); + return Ok(()); } } - Ok(WaitTimeout::Event) + Ok(()) } } impl Notifier { /// Notifies one pending listener #[inline] - pub fn notify(&self) -> Result<(), EventClosed> { + pub fn notify(&self) -> Result<(), WaitError> { // Set the flag. match self.0.set() { EventSet::Ok => { self.0.event.notify_additional_relaxed(1); Ok(()) } - EventSet::Err => Err(EventClosed), + EventSet::Err => Err(WaitError), } } } @@ -303,7 +337,7 @@ impl Notifier { mod tests { #[test] fn event_steps() { - use crate::{EventClosed, WaitTimeout}; + use crate::WaitTimeoutError; use std::{ sync::{Arc, Barrier}, time::Duration, @@ -319,9 +353,9 @@ mod tests { let s = std::thread::spawn(move || { // 1 - Wait one notification match waiter.wait_timeout(tslot) { - Ok(WaitTimeout::Event) => {} - Ok(WaitTimeout::Timeout) => panic!("Timeout {:#?}", tslot), - Err(EventClosed) => panic!("Event closed"), + Ok(()) => {} + Err(WaitTimeoutError::Timeout) => panic!("Timeout {:#?}", tslot), + Err(WaitTimeoutError::WaitError) => panic!("Event closed"), } bs.wait(); @@ -330,15 +364,15 @@ mod tests { bs.wait(); match waiter.wait_timeout(tslot) { - Ok(WaitTimeout::Event) => {} - Ok(WaitTimeout::Timeout) => panic!("Timeout {:#?}", tslot), - Err(EventClosed) => panic!("Event closed"), + Ok(()) => {} + Err(WaitTimeoutError::Timeout) => panic!("Timeout {:#?}", tslot), + Err(WaitTimeoutError::WaitError) => panic!("Event closed"), } match waiter.wait_timeout(tslot) { - Ok(WaitTimeout::Event) => panic!("Event Ok but it should be Timeout"), - Ok(WaitTimeout::Timeout) => {} - Err(EventClosed) => panic!("Event closed"), + Ok(()) => panic!("Event Ok but it should be Timeout"), + Err(WaitTimeoutError::Timeout) => {} + Err(WaitTimeoutError::WaitError) => panic!("Event closed"), } bs.wait(); From ebdee432ebe9e363c410ac20bfdc64dd18e9e447 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Mon, 26 Aug 2024 15:13:43 +0200 Subject: [PATCH 09/16] Improve event tests --- commons/zenoh-sync/src/event.rs | 115 +++++++++++++++++++++++++++++--- 1 file changed, 105 insertions(+), 10 deletions(-) diff --git a/commons/zenoh-sync/src/event.rs b/commons/zenoh-sync/src/event.rs index 615f6c6aa1..9c2380aa35 100644 --- a/commons/zenoh-sync/src/event.rs +++ b/commons/zenoh-sync/src/event.rs @@ -22,6 +22,7 @@ use std::{ }; // Error types +const WAIT_ERR_STR: &str = "No notifier available"; pub struct WaitError; impl fmt::Display for WaitError { @@ -32,7 +33,7 @@ impl fmt::Display for WaitError { impl fmt::Debug for WaitError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("Event Closed") + f.write_str(WAIT_ERR_STR) } } @@ -54,7 +55,7 @@ impl fmt::Debug for WaitDeadlineError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::Deadline => f.write_str("Deadline reached"), - Self::WaitError => f.write_str("Event Closed"), + Self::WaitError => f.write_str(WAIT_ERR_STR), } } } @@ -77,13 +78,30 @@ impl fmt::Debug for WaitTimeoutError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::Timeout => f.write_str("Timeout expired"), - Self::WaitError => f.write_str("Event Closed"), + Self::WaitError => f.write_str(WAIT_ERR_STR), } } } impl std::error::Error for WaitTimeoutError {} +const NOTIFY_ERR_STR: &str = "No waiter available"; +pub struct NotifyError; + +impl fmt::Display for NotifyError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self) + } +} + +impl fmt::Debug for NotifyError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(NOTIFY_ERR_STR) + } +} + +impl std::error::Error for NotifyError {} + /// This is a Event Variable similar to that provided by POSIX. /// As for POSIX condition variables, this assumes that a mutex is /// properly used to coordinate behaviour. In other terms there should @@ -322,14 +340,14 @@ impl Waiter { impl Notifier { /// Notifies one pending listener #[inline] - pub fn notify(&self) -> Result<(), WaitError> { + pub fn notify(&self) -> Result<(), NotifyError> { // Set the flag. match self.0.set() { EventSet::Ok => { self.0.event.notify_additional_relaxed(1); Ok(()) } - EventSet::Err => Err(WaitError), + EventSet::Err => Err(NotifyError), } } } @@ -344,9 +362,7 @@ mod tests { }; let barrier = Arc::new(Barrier::new(2)); - let (notifier, waiter) = super::new(); - let tslot = Duration::from_secs(1); let bs = barrier.clone(); @@ -413,7 +429,10 @@ mod tests { #[test] fn event_loop() { use std::{ - sync::atomic::{AtomicUsize, Ordering}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Barrier, + }, time::{Duration, Instant}, }; @@ -421,17 +440,20 @@ mod tests { static COUNTER: AtomicUsize = AtomicUsize::new(0); let (notifier, waiter) = super::new(); + let barrier = Arc::new(Barrier::new(2)); + let bs = barrier.clone(); let s = std::thread::spawn(move || { for _ in 0..N { waiter.wait().unwrap(); COUNTER.fetch_add(1, Ordering::Relaxed); + bs.wait(); } }); let p = std::thread::spawn(move || { for _ in 0..N { notifier.notify().unwrap(); - std::thread::sleep(Duration::from_millis(1)); + barrier.wait(); } }); @@ -446,10 +468,83 @@ mod tests { panic!("Timeout {:#?}. Counter: {n}/{N}", tout); } - std::thread::sleep(Duration::from_secs(1)); + std::thread::sleep(Duration::from_millis(100)); } s.join().unwrap(); p.join().unwrap(); } + + #[test] + fn event_multiple() { + use std::{ + sync::atomic::{AtomicUsize, Ordering}, + time::{Duration, Instant}, + }; + + const N: usize = 1_000; + static COUNTER: AtomicUsize = AtomicUsize::new(0); + + let (notifier, waiter) = super::new(); + + let w1 = waiter.clone(); + let s1 = std::thread::spawn(move || { + let mut n = 0; + while COUNTER.fetch_add(1, Ordering::Relaxed) < N - 2 { + w1.wait().unwrap(); + n += 1; + } + println!("S1: {}", n); + }); + let s2 = std::thread::spawn(move || { + let mut n = 0; + while COUNTER.fetch_add(1, Ordering::Relaxed) < N - 2 { + waiter.wait().unwrap(); + n += 1; + } + println!("S2: {}", n); + }); + + let n1 = notifier.clone(); + let p1 = std::thread::spawn(move || { + let mut n = 0; + while COUNTER.load(Ordering::Relaxed) < N { + n1.notify().unwrap(); + n += 1; + std::thread::sleep(Duration::from_millis(1)); + } + println!("P1: {}", n); + }); + let p2 = std::thread::spawn(move || { + let mut n = 0; + while COUNTER.load(Ordering::Relaxed) < N { + notifier.notify().unwrap(); + n += 1; + std::thread::sleep(Duration::from_millis(1)); + } + println!("P2: {}", n); + }); + + std::thread::spawn(move || { + let start = Instant::now(); + let tout = Duration::from_secs(60); + loop { + let n = COUNTER.load(Ordering::Relaxed); + if n == N { + break; + } + if start.elapsed() > tout { + panic!("Timeout {:#?}. Counter: {n}/{N}", tout); + } + + std::thread::sleep(Duration::from_millis(100)); + } + }); + + p1.join().unwrap(); + p2.join().unwrap(); + + s1.join().unwrap(); + s2.join().unwrap(); + } } From 76b3d789d18aeefd79ad2618bcf9e57f34571baf Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Mon, 26 Aug 2024 15:24:16 +0200 Subject: [PATCH 10/16] Precommit --- commons/zenoh-sync/src/event.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/commons/zenoh-sync/src/event.rs b/commons/zenoh-sync/src/event.rs index 9c2380aa35..1db54abc25 100644 --- a/commons/zenoh-sync/src/event.rs +++ b/commons/zenoh-sync/src/event.rs @@ -11,7 +11,6 @@ // Contributors: // ZettaScale Zenoh Team, // -use event_listener::{Event as EventLib, Listener}; use std::{ fmt, sync::{ @@ -21,6 +20,8 @@ use std::{ time::{Duration, Instant}, }; +use event_listener::{Event as EventLib, Listener}; + // Error types const WAIT_ERR_STR: &str = "No notifier available"; pub struct WaitError; @@ -355,12 +356,13 @@ impl Notifier { mod tests { #[test] fn event_steps() { - use crate::WaitTimeoutError; use std::{ sync::{Arc, Barrier}, time::Duration, }; + use crate::WaitTimeoutError; + let barrier = Arc::new(Barrier::new(2)); let (notifier, waiter) = super::new(); let tslot = Duration::from_secs(1); From 687d7615d4c7aa9ccdf62fc4cf32014cb40bb917 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Mon, 26 Aug 2024 16:02:53 +0200 Subject: [PATCH 11/16] Fix event wait_timeout and wait_deadline impls --- commons/zenoh-sync/src/event.rs | 98 ++++++++++++++++----------------- 1 file changed, 47 insertions(+), 51 deletions(-) diff --git a/commons/zenoh-sync/src/event.rs b/commons/zenoh-sync/src/event.rs index 1db54abc25..adeb5fc0b0 100644 --- a/commons/zenoh-sync/src/event.rs +++ b/commons/zenoh-sync/src/event.rs @@ -103,12 +103,7 @@ impl fmt::Debug for NotifyError { impl std::error::Error for NotifyError {} -/// This is a Event Variable similar to that provided by POSIX. -/// As for POSIX condition variables, this assumes that a mutex is -/// properly used to coordinate behaviour. In other terms there should -/// not be race condition on [notify_one](Event::notify_one) or -/// [notify_all](Event::notify_all). -/// +// Inner struct EventInner { event: EventLib, flag: AtomicU8, @@ -158,9 +153,38 @@ impl EventInner { } } +/// Creates a new lock-free event variable. Every time a [`Notifier`] calls ['Notifier::notify`], one [`Waiter`] will be waken-up. +/// If no waiter is waiting when the `notify` is called, the notification will not be lost. That means the next waiter will return +/// immediately when calling `wait`. +pub fn new() -> (Notifier, Waiter) { + let inner = Arc::new(EventInner { + event: EventLib::new(), + flag: AtomicU8::new(UNSET), + notifiers: AtomicU16::new(1), + waiters: AtomicU16::new(1), + }); + (Notifier(inner.clone()), Waiter(inner)) +} + +/// A [`Notifier`] is used to notify and wake up one and only one [`Waiter`]. #[repr(transparent)] pub struct Notifier(Arc); +impl Notifier { + /// Notifies one pending listener + #[inline] + pub fn notify(&self) -> Result<(), NotifyError> { + // Set the flag. + match self.0.set() { + EventSet::Ok => { + self.0.event.notify_additional_relaxed(1); + Ok(()) + } + EventSet::Err => Err(NotifyError), + } + } +} + impl Clone for Notifier { fn clone(&self) -> Self { let n = self.0.notifiers.fetch_add(1, Ordering::SeqCst); @@ -184,38 +208,6 @@ impl Drop for Notifier { #[repr(transparent)] pub struct Waiter(Arc); -impl Clone for Waiter { - fn clone(&self) -> Self { - let n = self.0.waiters.fetch_add(1, Ordering::Relaxed); - // Panic on overflow - assert!(n != 0); - Self(self.0.clone()) - } -} - -impl Drop for Waiter { - fn drop(&mut self) { - let n = self.0.waiters.fetch_sub(1, Ordering::SeqCst); - if n == 1 { - // The last Waiter has been dropped, close the event - self.0.err(); - } - } -} - -/// Creates a new lock-free event variable. Every time a [`Notifier`] calls ['Notifier::notify`], one [`Waiter`] will be waken-up. -/// If no waiter is waiting when the `notify` is called, the notification will not be lost. That means the next waiter will return -/// immediately when calling `wait`. -pub fn new() -> (Notifier, Waiter) { - let inner = Arc::new(EventInner { - event: EventLib::new(), - flag: AtomicU8::new(UNSET), - notifiers: AtomicU16::new(1), - waiters: AtomicU16::new(1), - }); - (Notifier(inner.clone()), Waiter(inner)) -} - impl Waiter { /// Waits for the condition to be notified #[inline] @@ -299,7 +291,7 @@ impl Waiter { // Wait for a notification and continue the loop. if listener.wait_deadline(deadline).is_none() { - return Ok(()); + return Err(WaitDeadlineError::Deadline); } } @@ -330,7 +322,7 @@ impl Waiter { // Wait for a notification and continue the loop. if listener.wait_timeout(timeout).is_none() { - return Ok(()); + return Err(WaitTimeoutError::Timeout); } } @@ -338,17 +330,21 @@ impl Waiter { } } -impl Notifier { - /// Notifies one pending listener - #[inline] - pub fn notify(&self) -> Result<(), NotifyError> { - // Set the flag. - match self.0.set() { - EventSet::Ok => { - self.0.event.notify_additional_relaxed(1); - Ok(()) - } - EventSet::Err => Err(NotifyError), +impl Clone for Waiter { + fn clone(&self) -> Self { + let n = self.0.waiters.fetch_add(1, Ordering::Relaxed); + // Panic on overflow + assert!(n != 0); + Self(self.0.clone()) + } +} + +impl Drop for Waiter { + fn drop(&mut self) { + let n = self.0.waiters.fetch_sub(1, Ordering::SeqCst); + if n == 1 { + // The last Waiter has been dropped, close the event + self.0.err(); } } } From 033074f3933068f923264010c6f6afdc2418bf08 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Mon, 26 Aug 2024 16:41:13 +0200 Subject: [PATCH 12/16] Add event_deadline tests --- commons/zenoh-sync/src/event.rs | 79 +++++++++++++++++++++++++++++++-- 1 file changed, 76 insertions(+), 3 deletions(-) diff --git a/commons/zenoh-sync/src/event.rs b/commons/zenoh-sync/src/event.rs index adeb5fc0b0..bf711b43f4 100644 --- a/commons/zenoh-sync/src/event.rs +++ b/commons/zenoh-sync/src/event.rs @@ -267,7 +267,7 @@ impl Waiter { Ok(()) } - /// Waits for the condition to be notified + /// Waits for the condition to be notified or returns an error when the deadline is reached #[inline] pub fn wait_deadline(&self, deadline: Instant) -> Result<(), WaitDeadlineError> { // Wait until the flag is set. @@ -298,7 +298,7 @@ impl Waiter { Ok(()) } - /// Waits for the condition to be notified + /// Waits for the condition to be notified or returns an error when the timeout is expired #[inline] pub fn wait_timeout(&self, timeout: Duration) -> Result<(), WaitTimeoutError> { // Wait until the flag is set. @@ -351,7 +351,7 @@ impl Drop for Waiter { mod tests { #[test] - fn event_steps() { + fn event_timeout() { use std::{ sync::{Arc, Barrier}, time::Duration, @@ -424,6 +424,79 @@ mod tests { p.join().unwrap(); } + #[test] + fn event_deadline() { + use crate::WaitDeadlineError; + use std::{ + sync::{Arc, Barrier}, + time::{Duration, Instant}, + }; + + let barrier = Arc::new(Barrier::new(2)); + let (notifier, waiter) = super::new(); + let tslot = Duration::from_secs(1); + + let bs = barrier.clone(); + let s = std::thread::spawn(move || { + // 1 - Wait one notification + match waiter.wait_deadline(Instant::now() + tslot) { + Ok(()) => {} + Err(WaitDeadlineError::Deadline) => panic!("Timeout {:#?}", tslot), + Err(WaitDeadlineError::WaitError) => panic!("Event closed"), + } + + bs.wait(); + + // 2 - Being notified twice but waiting only once + bs.wait(); + + match waiter.wait_deadline(Instant::now() + tslot) { + Ok(()) => {} + Err(WaitDeadlineError::Deadline) => panic!("Timeout {:#?}", tslot), + Err(WaitDeadlineError::WaitError) => panic!("Event closed"), + } + + match waiter.wait_deadline(Instant::now() + tslot) { + Ok(()) => panic!("Event Ok but it should be Timeout"), + Err(WaitDeadlineError::Deadline) => {} + Err(WaitDeadlineError::WaitError) => panic!("Event closed"), + } + + bs.wait(); + + // 3 - Notifier has been dropped + bs.wait(); + + waiter.wait().unwrap_err(); + + bs.wait(); + }); + + let bp = barrier.clone(); + let p = std::thread::spawn(move || { + // 1 - Notify once + notifier.notify().unwrap(); + + bp.wait(); + + // 2 - Notify twice + notifier.notify().unwrap(); + notifier.notify().unwrap(); + + bp.wait(); + bp.wait(); + + // 3 - Drop notifier yielding an error in the waiter + drop(notifier); + + bp.wait(); + bp.wait(); + }); + + s.join().unwrap(); + p.join().unwrap(); + } + #[test] fn event_loop() { use std::{ From 81a3e6863f86efafc9cfc57314cc55a69974e946 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Mon, 26 Aug 2024 17:31:32 +0200 Subject: [PATCH 13/16] Pre-commit --- commons/zenoh-sync/src/event.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/commons/zenoh-sync/src/event.rs b/commons/zenoh-sync/src/event.rs index bf711b43f4..e6ec70c1a3 100644 --- a/commons/zenoh-sync/src/event.rs +++ b/commons/zenoh-sync/src/event.rs @@ -426,12 +426,13 @@ mod tests { #[test] fn event_deadline() { - use crate::WaitDeadlineError; use std::{ sync::{Arc, Barrier}, time::{Duration, Instant}, }; + use crate::WaitDeadlineError; + let barrier = Arc::new(Barrier::new(2)); let (notifier, waiter) = super::new(); let tslot = Duration::from_secs(1); From 519057bb21507a4b1cc39be11ec847c11b524294 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Wed, 28 Aug 2024 10:03:31 +0200 Subject: [PATCH 14/16] Update batching config --- DEFAULT_CONFIG.json5 | 15 +++++--- commons/zenoh-config/src/defaults.rs | 20 +++++------ commons/zenoh-config/src/lib.rs | 17 ++++++---- io/zenoh-transport/src/common/pipeline.rs | 16 ++++----- io/zenoh-transport/src/manager.rs | 34 ++++++++++--------- io/zenoh-transport/src/multicast/link.rs | 4 +-- .../src/unicast/universal/link.rs | 4 +-- 7 files changed, 60 insertions(+), 50 deletions(-) diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 27af64ef93..babdaa354b 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -353,8 +353,6 @@ /// Therefore, the maximum batch size is 2^16-1 (i.e. 65535). /// The default batch size value is the maximum batch size: 65535. batch_size: 65535, - /// Perform batching of messages if they are smaller of the batch_size - batching: true, /// Each zenoh link has a transmission queue that can be configured queue: { /// The size of each priority queue indicates the number of batches a given queue can contain. @@ -380,9 +378,16 @@ /// The maximum time in microseconds to wait for an available batch before dropping the message if still no batch is available. wait_before_drop: 1000, }, - /// The initial exponential backoff time in nanoseconds to allow the batching to eventually progress. - /// Higher values lead to a more aggressive batching but it will introduce additional latency. - backoff: 100, + /// Perform batching of messages if they are smaller of the batch_size + batching: { + /// Perform adaptive batching of messages if they are smaller of the batch_size. + /// When the network is detected to not be fast enough to transmit every message individually, many small messages may be + /// batched together and sent all at once on the wire reducing the overall network overhead. This is typically of a high-throughput + /// scenario mainly composed of small messages. In other words, batching is actived by the network back-pressure. + enabled: true, + /// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens. + time_limit: 1, + } }, }, /// Configure the zenoh RX parameters of a link diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index a00edbacc4..cc6bf5854a 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -191,17 +191,6 @@ impl Default for LinkTxConf { batch_size: BatchSize::MAX, queue: QueueConf::default(), threads: num, - batching: true, - } - } -} - -impl Default for QueueConf { - fn default() -> Self { - Self { - size: QueueSizeConf::default(), - congestion_control: CongestionControlConf::default(), - backoff: 1_000_000, } } } @@ -234,6 +223,15 @@ impl Default for CongestionControlConf { } } +impl Default for BatchingConf { + fn default() -> Self { + BatchingConf { + enabled: true, + time_limit: 1, + } + } +} + impl Default for LinkRxConf { fn default() -> Self { Self { diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index b7b63e1602..2d361b8d96 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -407,9 +407,8 @@ validated_struct::validator! { keep_alive: usize, /// Zenoh's MTU equivalent (default: 2^16-1) (max: 2^16-1) batch_size: BatchSize, - /// Perform batching of messages if they are smaller of the batch_size - batching: bool, - pub queue: QueueConf { + pub queue: #[derive(Default)] + QueueConf { /// The size of each priority queue indicates the number of batches a given queue can contain. /// The amount of memory being allocated for each queue is then SIZE_XXX * BATCH_SIZE. /// In the case of the transport link MTU being smaller than the ZN_BATCH_SIZE, @@ -432,9 +431,15 @@ validated_struct::validator! { /// The maximum time in microseconds to wait for an available batch before dropping the message if still no batch is available. pub wait_before_drop: u64, }, - /// The initial exponential backoff time in nanoseconds to allow the batching to eventually progress. - /// Higher values lead to a more aggressive batching but it will introduce additional latency. - backoff: u64, + pub batching: BatchingConf { + /// Perform adaptive batching of messages if they are smaller of the batch_size. + /// When the network is detected to not be fast enough to transmit every message individually, many small messages may be + /// batched together and sent all at once on the wire reducing the overall network overhead. This is typically of a high-throughput + /// scenario mainly composed of small messages. In other words, batching is actived by the network back-pressure. + enabled: bool, + /// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens. + time_limit: u64, + }, }, // Number of threads used for TX threads: usize, diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 530a6c0ced..6dc2fe9f23 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -513,8 +513,8 @@ pub(crate) struct TransmissionPipelineConf { pub(crate) batch: BatchConfig, pub(crate) queue_size: [usize; Priority::NUM], pub(crate) wait_before_drop: Duration, - pub(crate) batching: bool, - pub(crate) backoff: Duration, + pub(crate) batching_enabled: bool, + pub(crate) batching_time_limit: Duration, } // A 2-stage transmission pipeline @@ -578,7 +578,7 @@ impl TransmissionPipeline { priority: priority[prio].clone(), }, fragbuf: ZBuf::empty(), - batching: config.batching, + batching: config.batching_enabled, })); // The stage out for this priority @@ -586,7 +586,7 @@ impl TransmissionPipeline { s_in: StageOutIn { s_out_r, current, - backoff: Backoff::new(config.backoff, bytes), + backoff: Backoff::new(config.batching_time_limit, bytes), }, s_ref: StageOutRefill { n_ref_w, s_ref_w }, }); @@ -788,9 +788,9 @@ mod tests { is_compression: true, }, queue_size: [1; Priority::NUM], - batching: true, + batching_enabled: true, wait_before_drop: Duration::from_millis(1), - backoff: Duration::from_micros(1), + batching_time_limit: Duration::from_micros(1), }; const CONFIG_NOT_STREAMED: TransmissionPipelineConf = TransmissionPipelineConf { @@ -801,9 +801,9 @@ mod tests { is_compression: false, }, queue_size: [1; Priority::NUM], - batching: true, + batching_enabled: true, wait_before_drop: Duration::from_millis(1), - backoff: Duration::from_micros(1), + batching_time_limit: Duration::from_micros(1), }; #[tokio::test(flavor = "multi_thread", worker_threads = 4)] diff --git a/io/zenoh-transport/src/manager.rs b/io/zenoh-transport/src/manager.rs index 669744838f..305ccab574 100644 --- a/io/zenoh-transport/src/manager.rs +++ b/io/zenoh-transport/src/manager.rs @@ -130,10 +130,10 @@ pub struct TransportManagerBuilder { whatami: WhatAmI, resolution: Resolution, batch_size: BatchSize, - batching: bool, + batching_enabled: bool, + batching_time_limit: Duration, wait_before_drop: Duration, queue_size: QueueSizeConf, - queue_backoff: Duration, defrag_buff_size: usize, link_rx_buffer_size: usize, unicast: TransportManagerBuilderUnicast, @@ -172,8 +172,13 @@ impl TransportManagerBuilder { self } - pub fn batching(mut self, batching: bool) -> Self { - self.batching = batching; + pub fn batching_enabled(mut self, batching_enabled: bool) -> Self { + self.batching_enabled = batching_enabled; + self + } + + pub fn batching_time_limit(mut self, batching_time_limit: Duration) -> Self { + self.batching_time_limit = batching_time_limit; self } @@ -187,11 +192,6 @@ impl TransportManagerBuilder { self } - pub fn queue_backoff(mut self, queue_backoff: Duration) -> Self { - self.queue_backoff = queue_backoff; - self - } - pub fn defrag_buff_size(mut self, defrag_buff_size: usize) -> Self { self.defrag_buff_size = defrag_buff_size; self @@ -238,14 +238,16 @@ impl TransportManagerBuilder { resolution.set(Field::FrameSN, *link.tx().sequence_number_resolution()); self = self.resolution(resolution); self = self.batch_size(*link.tx().batch_size()); - self = self.batching(*link.tx().batching()); + self = self.batching_enabled(*link.tx().queue().batching().enabled()); + self = self.batching_time_limit(Duration::from_millis( + *link.tx().queue().batching().time_limit(), + )); self = self.defrag_buff_size(*link.rx().max_message_size()); self = self.link_rx_buffer_size(*link.rx().buffer_size()); self = self.wait_before_drop(Duration::from_micros( *link.tx().queue().congestion_control().wait_before_drop(), )); self = self.queue_size(link.tx().queue().size().clone()); - self = self.queue_backoff(Duration::from_nanos(*link.tx().queue().backoff())); self = self.tx_threads(*link.tx().threads()); self = self.protocols(link.protocols().clone()); @@ -301,10 +303,10 @@ impl TransportManagerBuilder { whatami: self.whatami, resolution: self.resolution, batch_size: self.batch_size, - batching: self.batching, + batching: self.batching_enabled, wait_before_drop: self.wait_before_drop, queue_size, - queue_backoff: self.queue_backoff, + queue_backoff: self.batching_time_limit, defrag_buff_size: self.defrag_buff_size, link_rx_buffer_size: self.link_rx_buffer_size, unicast: unicast.config, @@ -340,7 +342,7 @@ impl Default for TransportManagerBuilder { fn default() -> Self { let link_rx = LinkRxConf::default(); let queue = QueueConf::default(); - let backoff = *queue.backoff(); + let backoff = *queue.batching().time_limit(); let wait_before_drop = *queue.congestion_control().wait_before_drop(); Self { version: VERSION, @@ -348,10 +350,10 @@ impl Default for TransportManagerBuilder { whatami: zenoh_config::defaults::mode, resolution: Resolution::default(), batch_size: BatchSize::MAX, - batching: true, + batching_enabled: true, wait_before_drop: Duration::from_micros(wait_before_drop), queue_size: queue.size, - queue_backoff: Duration::from_nanos(backoff), + batching_time_limit: Duration::from_millis(backoff), defrag_buff_size: *link_rx.max_message_size(), link_rx_buffer_size: *link_rx.buffer_size(), endpoints: HashMap::new(), diff --git a/io/zenoh-transport/src/multicast/link.rs b/io/zenoh-transport/src/multicast/link.rs index 90999d32ce..d0d5ef4fb0 100644 --- a/io/zenoh-transport/src/multicast/link.rs +++ b/io/zenoh-transport/src/multicast/link.rs @@ -321,8 +321,8 @@ impl TransportLinkMulticastUniversal { batch: self.link.config.batch, queue_size: self.transport.manager.config.queue_size, wait_before_drop: self.transport.manager.config.wait_before_drop, - batching: self.transport.manager.config.batching, - backoff: self.transport.manager.config.queue_backoff, + batching_enabled: self.transport.manager.config.batching, + batching_time_limit: self.transport.manager.config.queue_backoff, }; // The pipeline let (producer, consumer) = TransmissionPipeline::make(tpc, &priority_tx); diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index cc3afc06e5..fff842c255 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -62,8 +62,8 @@ impl TransportLinkUnicastUniversal { }, queue_size: transport.manager.config.queue_size, wait_before_drop: transport.manager.config.wait_before_drop, - batching: transport.manager.config.batching, - backoff: transport.manager.config.queue_backoff, + batching_enabled: transport.manager.config.batching, + batching_time_limit: transport.manager.config.queue_backoff, }; // The pipeline From 582d59b9138c75123b4087a94cb9fe999b555d6a Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Wed, 28 Aug 2024 10:26:20 +0200 Subject: [PATCH 15/16] Fix typos --- DEFAULT_CONFIG.json5 | 2 +- commons/zenoh-config/src/lib.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index babdaa354b..dd4a1bf0d9 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -383,7 +383,7 @@ /// Perform adaptive batching of messages if they are smaller of the batch_size. /// When the network is detected to not be fast enough to transmit every message individually, many small messages may be /// batched together and sent all at once on the wire reducing the overall network overhead. This is typically of a high-throughput - /// scenario mainly composed of small messages. In other words, batching is actived by the network back-pressure. + /// scenario mainly composed of small messages. In other words, batching is activated by the network back-pressure. enabled: true, /// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens. time_limit: 1, diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 2d361b8d96..e227d1c8e0 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -435,7 +435,7 @@ validated_struct::validator! { /// Perform adaptive batching of messages if they are smaller of the batch_size. /// When the network is detected to not be fast enough to transmit every message individually, many small messages may be /// batched together and sent all at once on the wire reducing the overall network overhead. This is typically of a high-throughput - /// scenario mainly composed of small messages. In other words, batching is actived by the network back-pressure. + /// scenario mainly composed of small messages. In other words, batching is activated by the network back-pressure. enabled: bool, /// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens. time_limit: u64, From 28b9dd8d6c1f7228e49c8f5fc237ba1e99bb2ddf Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Wed, 28 Aug 2024 14:45:39 +0200 Subject: [PATCH 16/16] Address review comments --- commons/zenoh-sync/src/event.rs | 2 +- io/zenoh-transport/src/common/pipeline.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/commons/zenoh-sync/src/event.rs b/commons/zenoh-sync/src/event.rs index e6ec70c1a3..f1aa5b5b69 100644 --- a/commons/zenoh-sync/src/event.rs +++ b/commons/zenoh-sync/src/event.rs @@ -1,5 +1,5 @@ // -// Copyright (c) 2023 ZettaScale Technology +// Copyright (c) 2024 ZettaScale Technology // // This program and the accompanying materials are made available under the // terms of the Eclipse Public License 2.0 which is available at diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 6dc2fe9f23..60ea3b215d 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -695,7 +695,6 @@ impl TransmissionPipelineConsumer { // While trying to pull from the queue, the stage_in `lock()` will most likely taken, leading to // a spinning behaviour while attempting to take the lock. Yield the current task to avoid // spinning the current task indefinitely. - // std::thread::yield_now(); tokio::task::yield_now().await; // Wait for the backoff to expire or for a new message