diff --git a/Cargo.lock b/Cargo.lock index 1de420de99..dc5163b1c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6102,9 +6102,11 @@ name = "zenoh-transport" version = "1.0.0-dev" dependencies = [ "async-trait", + "crossbeam-utils", "flume", "futures", "futures-util", + "lazy_static", "lz4_flex", "paste", "rand 0.8.5", diff --git a/Cargo.toml b/Cargo.toml index e2aac0cb40..15b93b9543 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -90,6 +90,7 @@ console-subscriber = "0.3.0" const_format = "0.2.30" crc = "3.0.1" criterion = "0.5" +crossbeam-utils = "0.8.2" derive_more = "0.99.17" derive-new = "0.6.0" tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] } diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 27af64ef93..dd4a1bf0d9 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -353,8 +353,6 @@ /// Therefore, the maximum batch size is 2^16-1 (i.e. 65535). /// The default batch size value is the maximum batch size: 65535. batch_size: 65535, - /// Perform batching of messages if they are smaller of the batch_size - batching: true, /// Each zenoh link has a transmission queue that can be configured queue: { /// The size of each priority queue indicates the number of batches a given queue can contain. @@ -380,9 +378,16 @@ /// The maximum time in microseconds to wait for an available batch before dropping the message if still no batch is available. wait_before_drop: 1000, }, - /// The initial exponential backoff time in nanoseconds to allow the batching to eventually progress. - /// Higher values lead to a more aggressive batching but it will introduce additional latency. - backoff: 100, + /// Perform batching of messages if they are smaller of the batch_size + batching: { + /// Perform adaptive batching of messages if they are smaller of the batch_size. + /// When the network is detected to not be fast enough to transmit every message individually, many small messages may be + /// batched together and sent all at once on the wire reducing the overall network overhead. This is typically of a high-throughput + /// scenario mainly composed of small messages. In other words, batching is activated by the network back-pressure. + enabled: true, + /// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens. + time_limit: 1, + } }, }, /// Configure the zenoh RX parameters of a link diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index c6e69dd148..cc6bf5854a 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -191,17 +191,6 @@ impl Default for LinkTxConf { batch_size: BatchSize::MAX, queue: QueueConf::default(), threads: num, - batching: true, - } - } -} - -impl Default for QueueConf { - fn default() -> Self { - Self { - size: QueueSizeConf::default(), - congestion_control: CongestionControlConf::default(), - backoff: 100, } } } @@ -234,6 +223,15 @@ impl Default for CongestionControlConf { } } +impl Default for BatchingConf { + fn default() -> Self { + BatchingConf { + enabled: true, + time_limit: 1, + } + } +} + impl Default for LinkRxConf { fn default() -> Self { Self { diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index b7b63e1602..e227d1c8e0 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -407,9 +407,8 @@ validated_struct::validator! { keep_alive: usize, /// Zenoh's MTU equivalent (default: 2^16-1) (max: 2^16-1) batch_size: BatchSize, - /// Perform batching of messages if they are smaller of the batch_size - batching: bool, - pub queue: QueueConf { + pub queue: #[derive(Default)] + QueueConf { /// The size of each priority queue indicates the number of batches a given queue can contain. /// The amount of memory being allocated for each queue is then SIZE_XXX * BATCH_SIZE. /// In the case of the transport link MTU being smaller than the ZN_BATCH_SIZE, @@ -432,9 +431,15 @@ validated_struct::validator! { /// The maximum time in microseconds to wait for an available batch before dropping the message if still no batch is available. pub wait_before_drop: u64, }, - /// The initial exponential backoff time in nanoseconds to allow the batching to eventually progress. - /// Higher values lead to a more aggressive batching but it will introduce additional latency. - backoff: u64, + pub batching: BatchingConf { + /// Perform adaptive batching of messages if they are smaller of the batch_size. + /// When the network is detected to not be fast enough to transmit every message individually, many small messages may be + /// batched together and sent all at once on the wire reducing the overall network overhead. This is typically of a high-throughput + /// scenario mainly composed of small messages. In other words, batching is activated by the network back-pressure. + enabled: bool, + /// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens. + time_limit: u64, + }, }, // Number of threads used for TX threads: usize, diff --git a/commons/zenoh-shm/src/header/storage.rs b/commons/zenoh-shm/src/header/storage.rs index 7d4c06cd2a..db556937d0 100644 --- a/commons/zenoh-shm/src/header/storage.rs +++ b/commons/zenoh-shm/src/header/storage.rs @@ -25,7 +25,7 @@ use super::{ segment::HeaderSegment, }; -#[dynamic(lazy,drop)] +#[dynamic(lazy, drop)] pub static mut GLOBAL_HEADER_STORAGE: HeaderStorage = HeaderStorage::new(32768usize).unwrap(); pub struct HeaderStorage { diff --git a/commons/zenoh-shm/src/header/subscription.rs b/commons/zenoh-shm/src/header/subscription.rs index 6259877302..6f92960aaa 100644 --- a/commons/zenoh-shm/src/header/subscription.rs +++ b/commons/zenoh-shm/src/header/subscription.rs @@ -24,9 +24,8 @@ use super::{ segment::HeaderSegment, }; -#[dynamic(lazy,drop)] - pub static mut GLOBAL_HEADER_SUBSCRIPTION: Subscription = Subscription::new(); - +#[dynamic(lazy, drop)] +pub static mut GLOBAL_HEADER_SUBSCRIPTION: Subscription = Subscription::new(); pub struct Subscription { linked_table: Mutex>>, diff --git a/commons/zenoh-shm/src/watchdog/confirmator.rs b/commons/zenoh-shm/src/watchdog/confirmator.rs index 9d87adfb97..1a9ac0f04f 100644 --- a/commons/zenoh-shm/src/watchdog/confirmator.rs +++ b/commons/zenoh-shm/src/watchdog/confirmator.rs @@ -27,10 +27,9 @@ use super::{ segment::Segment, }; -#[dynamic(lazy,drop)] +#[dynamic(lazy, drop)] pub static mut GLOBAL_CONFIRMATOR: WatchdogConfirmator = - WatchdogConfirmator::new(Duration::from_millis(50)); - + WatchdogConfirmator::new(Duration::from_millis(50)); pub struct ConfirmedDescriptor { pub owned: OwnedDescriptor, diff --git a/commons/zenoh-shm/src/watchdog/storage.rs b/commons/zenoh-shm/src/watchdog/storage.rs index 48fa4cde40..ff9772961c 100644 --- a/commons/zenoh-shm/src/watchdog/storage.rs +++ b/commons/zenoh-shm/src/watchdog/storage.rs @@ -21,7 +21,7 @@ use zenoh_result::{zerror, ZResult}; use super::{allocated_watchdog::AllocatedWatchdog, descriptor::OwnedDescriptor, segment::Segment}; -#[dynamic(lazy,drop)] +#[dynamic(lazy, drop)] pub static mut GLOBAL_STORAGE: WatchdogStorage = WatchdogStorage::new(32768usize).unwrap(); pub struct WatchdogStorage { diff --git a/commons/zenoh-sync/src/event.rs b/commons/zenoh-sync/src/event.rs new file mode 100644 index 0000000000..f1aa5b5b69 --- /dev/null +++ b/commons/zenoh-sync/src/event.rs @@ -0,0 +1,622 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use std::{ + fmt, + sync::{ + atomic::{AtomicU16, AtomicU8, Ordering}, + Arc, + }, + time::{Duration, Instant}, +}; + +use event_listener::{Event as EventLib, Listener}; + +// Error types +const WAIT_ERR_STR: &str = "No notifier available"; +pub struct WaitError; + +impl fmt::Display for WaitError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self) + } +} + +impl fmt::Debug for WaitError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(WAIT_ERR_STR) + } +} + +impl std::error::Error for WaitError {} + +#[repr(u8)] +pub enum WaitDeadlineError { + Deadline, + WaitError, +} + +impl fmt::Display for WaitDeadlineError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self) + } +} + +impl fmt::Debug for WaitDeadlineError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Deadline => f.write_str("Deadline reached"), + Self::WaitError => f.write_str(WAIT_ERR_STR), + } + } +} + +impl std::error::Error for WaitDeadlineError {} + +#[repr(u8)] +pub enum WaitTimeoutError { + Timeout, + WaitError, +} + +impl fmt::Display for WaitTimeoutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self) + } +} + +impl fmt::Debug for WaitTimeoutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Timeout => f.write_str("Timeout expired"), + Self::WaitError => f.write_str(WAIT_ERR_STR), + } + } +} + +impl std::error::Error for WaitTimeoutError {} + +const NOTIFY_ERR_STR: &str = "No waiter available"; +pub struct NotifyError; + +impl fmt::Display for NotifyError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self) + } +} + +impl fmt::Debug for NotifyError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(NOTIFY_ERR_STR) + } +} + +impl std::error::Error for NotifyError {} + +// Inner +struct EventInner { + event: EventLib, + flag: AtomicU8, + notifiers: AtomicU16, + waiters: AtomicU16, +} + +const UNSET: u8 = 0; +const OK: u8 = 1; +const ERR: u8 = 1 << 1; + +#[repr(u8)] +enum EventCheck { + Unset = UNSET, + Ok = OK, + Err = ERR, +} + +#[repr(u8)] +enum EventSet { + Ok = OK, + Err = ERR, +} + +impl EventInner { + fn check(&self) -> EventCheck { + let f = self.flag.fetch_and(!OK, Ordering::SeqCst); + if f & ERR != 0 { + return EventCheck::Err; + } + if f == OK { + return EventCheck::Ok; + } + EventCheck::Unset + } + + fn set(&self) -> EventSet { + let f = self.flag.fetch_or(OK, Ordering::SeqCst); + if f & ERR != 0 { + return EventSet::Err; + } + EventSet::Ok + } + + fn err(&self) { + self.flag.store(ERR, Ordering::SeqCst); + } +} + +/// Creates a new lock-free event variable. Every time a [`Notifier`] calls ['Notifier::notify`], one [`Waiter`] will be waken-up. +/// If no waiter is waiting when the `notify` is called, the notification will not be lost. That means the next waiter will return +/// immediately when calling `wait`. +pub fn new() -> (Notifier, Waiter) { + let inner = Arc::new(EventInner { + event: EventLib::new(), + flag: AtomicU8::new(UNSET), + notifiers: AtomicU16::new(1), + waiters: AtomicU16::new(1), + }); + (Notifier(inner.clone()), Waiter(inner)) +} + +/// A [`Notifier`] is used to notify and wake up one and only one [`Waiter`]. +#[repr(transparent)] +pub struct Notifier(Arc); + +impl Notifier { + /// Notifies one pending listener + #[inline] + pub fn notify(&self) -> Result<(), NotifyError> { + // Set the flag. + match self.0.set() { + EventSet::Ok => { + self.0.event.notify_additional_relaxed(1); + Ok(()) + } + EventSet::Err => Err(NotifyError), + } + } +} + +impl Clone for Notifier { + fn clone(&self) -> Self { + let n = self.0.notifiers.fetch_add(1, Ordering::SeqCst); + // Panic on overflow + assert!(n != 0); + Self(self.0.clone()) + } +} + +impl Drop for Notifier { + fn drop(&mut self) { + let n = self.0.notifiers.fetch_sub(1, Ordering::SeqCst); + if n == 1 { + // The last Notifier has been dropped, close the event and notify everyone + self.0.err(); + self.0.event.notify(usize::MAX); + } + } +} + +#[repr(transparent)] +pub struct Waiter(Arc); + +impl Waiter { + /// Waits for the condition to be notified + #[inline] + pub async fn wait_async(&self) -> Result<(), WaitError> { + // Wait until the flag is set. + loop { + // Check the flag. + match self.0.check() { + EventCheck::Ok => break, + EventCheck::Unset => {} + EventCheck::Err => return Err(WaitError), + } + + // Start listening for events. + let listener = self.0.event.listen(); + + // Check the flag again after creating the listener. + match self.0.check() { + EventCheck::Ok => break, + EventCheck::Unset => {} + EventCheck::Err => return Err(WaitError), + } + + // Wait for a notification and continue the loop. + listener.await; + } + + Ok(()) + } + + /// Waits for the condition to be notified + #[inline] + pub fn wait(&self) -> Result<(), WaitError> { + // Wait until the flag is set. + loop { + // Check the flag. + match self.0.check() { + EventCheck::Ok => break, + EventCheck::Unset => {} + EventCheck::Err => return Err(WaitError), + } + + // Start listening for events. + let listener = self.0.event.listen(); + + // Check the flag again after creating the listener. + match self.0.check() { + EventCheck::Ok => break, + EventCheck::Unset => {} + EventCheck::Err => return Err(WaitError), + } + + // Wait for a notification and continue the loop. + listener.wait(); + } + + Ok(()) + } + + /// Waits for the condition to be notified or returns an error when the deadline is reached + #[inline] + pub fn wait_deadline(&self, deadline: Instant) -> Result<(), WaitDeadlineError> { + // Wait until the flag is set. + loop { + // Check the flag. + match self.0.check() { + EventCheck::Ok => break, + EventCheck::Unset => {} + EventCheck::Err => return Err(WaitDeadlineError::WaitError), + } + + // Start listening for events. + let listener = self.0.event.listen(); + + // Check the flag again after creating the listener. + match self.0.check() { + EventCheck::Ok => break, + EventCheck::Unset => {} + EventCheck::Err => return Err(WaitDeadlineError::WaitError), + } + + // Wait for a notification and continue the loop. + if listener.wait_deadline(deadline).is_none() { + return Err(WaitDeadlineError::Deadline); + } + } + + Ok(()) + } + + /// Waits for the condition to be notified or returns an error when the timeout is expired + #[inline] + pub fn wait_timeout(&self, timeout: Duration) -> Result<(), WaitTimeoutError> { + // Wait until the flag is set. + loop { + // Check the flag. + match self.0.check() { + EventCheck::Ok => break, + EventCheck::Unset => {} + EventCheck::Err => return Err(WaitTimeoutError::WaitError), + } + + // Start listening for events. + let listener = self.0.event.listen(); + + // Check the flag again after creating the listener. + match self.0.check() { + EventCheck::Ok => break, + EventCheck::Unset => {} + EventCheck::Err => return Err(WaitTimeoutError::WaitError), + } + + // Wait for a notification and continue the loop. + if listener.wait_timeout(timeout).is_none() { + return Err(WaitTimeoutError::Timeout); + } + } + + Ok(()) + } +} + +impl Clone for Waiter { + fn clone(&self) -> Self { + let n = self.0.waiters.fetch_add(1, Ordering::Relaxed); + // Panic on overflow + assert!(n != 0); + Self(self.0.clone()) + } +} + +impl Drop for Waiter { + fn drop(&mut self) { + let n = self.0.waiters.fetch_sub(1, Ordering::SeqCst); + if n == 1 { + // The last Waiter has been dropped, close the event + self.0.err(); + } + } +} + +mod tests { + #[test] + fn event_timeout() { + use std::{ + sync::{Arc, Barrier}, + time::Duration, + }; + + use crate::WaitTimeoutError; + + let barrier = Arc::new(Barrier::new(2)); + let (notifier, waiter) = super::new(); + let tslot = Duration::from_secs(1); + + let bs = barrier.clone(); + let s = std::thread::spawn(move || { + // 1 - Wait one notification + match waiter.wait_timeout(tslot) { + Ok(()) => {} + Err(WaitTimeoutError::Timeout) => panic!("Timeout {:#?}", tslot), + Err(WaitTimeoutError::WaitError) => panic!("Event closed"), + } + + bs.wait(); + + // 2 - Being notified twice but waiting only once + bs.wait(); + + match waiter.wait_timeout(tslot) { + Ok(()) => {} + Err(WaitTimeoutError::Timeout) => panic!("Timeout {:#?}", tslot), + Err(WaitTimeoutError::WaitError) => panic!("Event closed"), + } + + match waiter.wait_timeout(tslot) { + Ok(()) => panic!("Event Ok but it should be Timeout"), + Err(WaitTimeoutError::Timeout) => {} + Err(WaitTimeoutError::WaitError) => panic!("Event closed"), + } + + bs.wait(); + + // 3 - Notifier has been dropped + bs.wait(); + + waiter.wait().unwrap_err(); + + bs.wait(); + }); + + let bp = barrier.clone(); + let p = std::thread::spawn(move || { + // 1 - Notify once + notifier.notify().unwrap(); + + bp.wait(); + + // 2 - Notify twice + notifier.notify().unwrap(); + notifier.notify().unwrap(); + + bp.wait(); + bp.wait(); + + // 3 - Drop notifier yielding an error in the waiter + drop(notifier); + + bp.wait(); + bp.wait(); + }); + + s.join().unwrap(); + p.join().unwrap(); + } + + #[test] + fn event_deadline() { + use std::{ + sync::{Arc, Barrier}, + time::{Duration, Instant}, + }; + + use crate::WaitDeadlineError; + + let barrier = Arc::new(Barrier::new(2)); + let (notifier, waiter) = super::new(); + let tslot = Duration::from_secs(1); + + let bs = barrier.clone(); + let s = std::thread::spawn(move || { + // 1 - Wait one notification + match waiter.wait_deadline(Instant::now() + tslot) { + Ok(()) => {} + Err(WaitDeadlineError::Deadline) => panic!("Timeout {:#?}", tslot), + Err(WaitDeadlineError::WaitError) => panic!("Event closed"), + } + + bs.wait(); + + // 2 - Being notified twice but waiting only once + bs.wait(); + + match waiter.wait_deadline(Instant::now() + tslot) { + Ok(()) => {} + Err(WaitDeadlineError::Deadline) => panic!("Timeout {:#?}", tslot), + Err(WaitDeadlineError::WaitError) => panic!("Event closed"), + } + + match waiter.wait_deadline(Instant::now() + tslot) { + Ok(()) => panic!("Event Ok but it should be Timeout"), + Err(WaitDeadlineError::Deadline) => {} + Err(WaitDeadlineError::WaitError) => panic!("Event closed"), + } + + bs.wait(); + + // 3 - Notifier has been dropped + bs.wait(); + + waiter.wait().unwrap_err(); + + bs.wait(); + }); + + let bp = barrier.clone(); + let p = std::thread::spawn(move || { + // 1 - Notify once + notifier.notify().unwrap(); + + bp.wait(); + + // 2 - Notify twice + notifier.notify().unwrap(); + notifier.notify().unwrap(); + + bp.wait(); + bp.wait(); + + // 3 - Drop notifier yielding an error in the waiter + drop(notifier); + + bp.wait(); + bp.wait(); + }); + + s.join().unwrap(); + p.join().unwrap(); + } + + #[test] + fn event_loop() { + use std::{ + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Barrier, + }, + time::{Duration, Instant}, + }; + + const N: usize = 1_000; + static COUNTER: AtomicUsize = AtomicUsize::new(0); + + let (notifier, waiter) = super::new(); + let barrier = Arc::new(Barrier::new(2)); + + let bs = barrier.clone(); + let s = std::thread::spawn(move || { + for _ in 0..N { + waiter.wait().unwrap(); + COUNTER.fetch_add(1, Ordering::Relaxed); + bs.wait(); + } + }); + let p = std::thread::spawn(move || { + for _ in 0..N { + notifier.notify().unwrap(); + barrier.wait(); + } + }); + + let start = Instant::now(); + let tout = Duration::from_secs(60); + loop { + let n = COUNTER.load(Ordering::Relaxed); + if n == N { + break; + } + if start.elapsed() > tout { + panic!("Timeout {:#?}. Counter: {n}/{N}", tout); + } + + std::thread::sleep(Duration::from_millis(100)); + } + + s.join().unwrap(); + p.join().unwrap(); + } + + #[test] + fn event_multiple() { + use std::{ + sync::atomic::{AtomicUsize, Ordering}, + time::{Duration, Instant}, + }; + + const N: usize = 1_000; + static COUNTER: AtomicUsize = AtomicUsize::new(0); + + let (notifier, waiter) = super::new(); + + let w1 = waiter.clone(); + let s1 = std::thread::spawn(move || { + let mut n = 0; + while COUNTER.fetch_add(1, Ordering::Relaxed) < N - 2 { + w1.wait().unwrap(); + n += 1; + } + println!("S1: {}", n); + }); + let s2 = std::thread::spawn(move || { + let mut n = 0; + while COUNTER.fetch_add(1, Ordering::Relaxed) < N - 2 { + waiter.wait().unwrap(); + n += 1; + } + println!("S2: {}", n); + }); + + let n1 = notifier.clone(); + let p1 = std::thread::spawn(move || { + let mut n = 0; + while COUNTER.load(Ordering::Relaxed) < N { + n1.notify().unwrap(); + n += 1; + std::thread::sleep(Duration::from_millis(1)); + } + println!("P1: {}", n); + }); + let p2 = std::thread::spawn(move || { + let mut n = 0; + while COUNTER.load(Ordering::Relaxed) < N { + notifier.notify().unwrap(); + n += 1; + std::thread::sleep(Duration::from_millis(1)); + } + println!("P2: {}", n); + }); + + std::thread::spawn(move || { + let start = Instant::now(); + let tout = Duration::from_secs(60); + loop { + let n = COUNTER.load(Ordering::Relaxed); + if n == N { + break; + } + if start.elapsed() > tout { + panic!("Timeout {:#?}. Counter: {n}/{N}", tout); + } + + std::thread::sleep(Duration::from_millis(100)); + } + }); + + p1.join().unwrap(); + p2.join().unwrap(); + + s1.join().unwrap(); + s2.join().unwrap(); + } +} diff --git a/commons/zenoh-sync/src/lib.rs b/commons/zenoh-sync/src/lib.rs index 20e95d2bb8..8289b29fbb 100644 --- a/commons/zenoh-sync/src/lib.rs +++ b/commons/zenoh-sync/src/lib.rs @@ -25,6 +25,9 @@ use std::{ use futures::FutureExt; +pub mod event; +pub use event::*; + pub mod fifo_queue; pub use fifo_queue::*; diff --git a/io/zenoh-transport/Cargo.toml b/io/zenoh-transport/Cargo.toml index c1a2c9b8ae..a3dabbae0e 100644 --- a/io/zenoh-transport/Cargo.toml +++ b/io/zenoh-transport/Cargo.toml @@ -52,6 +52,7 @@ default = ["test", "transport_multilink"] [dependencies] async-trait = { workspace = true } +crossbeam-utils = { workspace = true } tokio = { workspace = true, features = [ "sync", "fs", @@ -61,6 +62,7 @@ tokio = { workspace = true, features = [ "io-util", "net", ] } +lazy_static = { workspace = true } tokio-util = { workspace = true, features = ["rt"]} flume = { workspace = true } tracing = {workspace = true} diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 68a4b87d24..60ea3b215d 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -1,12 +1,25 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// use std::{ sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU32, Ordering}, Arc, Mutex, MutexGuard, }, time::{Duration, Instant}, }; -use flume::{bounded, Receiver, Sender}; +use crossbeam_utils::CachePadded; use ringbuffer_spsc::{RingBuffer, RingBufferReader, RingBufferWriter}; use zenoh_buffers::{ reader::{HasReader, Reader}, @@ -25,35 +38,19 @@ use zenoh_protocol::{ AtomicBatchSize, BatchSize, TransportMessage, }, }; +use zenoh_sync::{event, Notifier, Waiter}; -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// use super::{ batch::{Encode, WBatch}, priority::{TransportChannelTx, TransportPriorityTx}, }; use crate::common::batch::BatchConfig; -// It's faster to work directly with nanoseconds. -// Backoff will never last more the u32::MAX nanoseconds. -type NanoSeconds = u32; - const RBLEN: usize = QueueSizeConf::MAX; // Inner structure to reuse serialization batches struct StageInRefill { - n_ref_r: Receiver<()>, + n_ref_r: Waiter, s_ref_r: RingBufferReader, } @@ -63,36 +60,48 @@ impl StageInRefill { } fn wait(&self) -> bool { - self.n_ref_r.recv().is_ok() + self.n_ref_r.wait().is_ok() } fn wait_deadline(&self, instant: Instant) -> bool { - self.n_ref_r.recv_deadline(instant).is_ok() + self.n_ref_r.wait_deadline(instant).is_ok() } } +lazy_static::lazy_static! { + static ref LOCAL_EPOCH: Instant = Instant::now(); +} + +type AtomicMicroSeconds = AtomicU32; +type MicroSeconds = u32; + +struct AtomicBackoff { + active: CachePadded, + bytes: CachePadded, + first_write: CachePadded, +} + // Inner structure to link the initial stage with the final stage of the pipeline struct StageInOut { - n_out_w: Sender<()>, + n_out_w: Notifier, s_out_w: RingBufferWriter, - bytes: Arc, - backoff: Arc, + atomic_backoff: Arc, } impl StageInOut { #[inline] fn notify(&self, bytes: BatchSize) { - self.bytes.store(bytes, Ordering::Relaxed); - if !self.backoff.load(Ordering::Relaxed) { - let _ = self.n_out_w.try_send(()); + self.atomic_backoff.bytes.store(bytes, Ordering::Relaxed); + if !self.atomic_backoff.active.load(Ordering::Relaxed) { + let _ = self.n_out_w.notify(); } } #[inline] fn move_batch(&mut self, batch: WBatch) { let _ = self.s_out_w.push(batch); - self.bytes.store(0, Ordering::Relaxed); - let _ = self.n_out_w.try_send(()); + self.atomic_backoff.bytes.store(0, Ordering::Relaxed); + let _ = self.n_out_w.notify(); } } @@ -145,6 +154,7 @@ impl StageIn { None => match self.s_ref.pull() { Some(mut batch) => { batch.clear(); + self.s_out.atomic_backoff.first_write.store(LOCAL_EPOCH.elapsed().as_micros() as MicroSeconds, Ordering::Relaxed); break batch; } None => { @@ -299,6 +309,10 @@ impl StageIn { None => match self.s_ref.pull() { Some(mut batch) => { batch.clear(); + self.s_out.atomic_backoff.first_write.store( + LOCAL_EPOCH.elapsed().as_micros() as MicroSeconds, + Ordering::Relaxed, + ); break batch; } None => { @@ -333,7 +347,6 @@ impl StageIn { // Get the current serialization batch. let mut batch = zgetbatch_rets!(); // Attempt the serialization on the current batch - // Attempt the serialization on the current batch match batch.encode(&msg) { Ok(_) => zretok!(batch), Err(_) => { @@ -354,54 +367,27 @@ impl StageIn { enum Pull { Some(WBatch), None, - Backoff(NanoSeconds), + Backoff(MicroSeconds), } // Inner structure to keep track and signal backoff operations #[derive(Clone)] struct Backoff { - tslot: NanoSeconds, - retry_time: NanoSeconds, + threshold: Duration, last_bytes: BatchSize, - bytes: Arc, - backoff: Arc, + atomic: Arc, + // active: bool, } impl Backoff { - fn new(tslot: NanoSeconds, bytes: Arc, backoff: Arc) -> Self { + fn new(threshold: Duration, atomic: Arc) -> Self { Self { - tslot, - retry_time: 0, + threshold, last_bytes: 0, - bytes, - backoff, - } - } - - fn next(&mut self) { - if self.retry_time == 0 { - self.retry_time = self.tslot; - self.backoff.store(true, Ordering::Relaxed); - } else { - match self.retry_time.checked_mul(2) { - Some(rt) => { - self.retry_time = rt; - } - None => { - self.retry_time = NanoSeconds::MAX; - tracing::warn!( - "Pipeline pull backoff overflow detected! Retrying in {}ns.", - self.retry_time - ); - } - } + atomic, + // active: false, } } - - fn reset(&mut self) { - self.retry_time = 0; - self.backoff.store(false, Ordering::Relaxed); - } } // Inner structure to link the final stage with the initial stage of the pipeline @@ -422,13 +408,38 @@ impl StageOutIn { } fn try_pull_deep(&mut self) -> Pull { - let new_bytes = self.backoff.bytes.load(Ordering::Relaxed); - let old_bytes = self.backoff.last_bytes; - self.backoff.last_bytes = new_bytes; + // Verify first backoff is not active + let mut pull = !self.backoff.atomic.active.load(Ordering::Relaxed); - if new_bytes == old_bytes { + // If backoff is active, verify the current number of bytes is equal to the old number + // of bytes seen in the previous backoff iteration + if !pull { + let new_bytes = self.backoff.atomic.bytes.load(Ordering::Relaxed); + let old_bytes = self.backoff.last_bytes; + self.backoff.last_bytes = new_bytes; + + pull = new_bytes == old_bytes; + } + + // Verify that we have not been doing backoff for too long + let mut backoff = 0; + if !pull { + let diff = LOCAL_EPOCH.elapsed().as_micros() as MicroSeconds + - self.backoff.atomic.first_write.load(Ordering::Relaxed); + let threshold = self.backoff.threshold.as_micros() as MicroSeconds; + + if diff >= threshold { + pull = true; + } else { + backoff = threshold - diff; + } + } + + if pull { // It seems no new bytes have been written on the batch, try to pull if let Ok(mut g) = self.current.try_lock() { + self.backoff.atomic.active.store(false, Ordering::Relaxed); + // First try to pull from stage OUT to make sure we are not in the case // where new_bytes == old_bytes are because of two identical serializations if let Some(batch) = self.s_out_r.pull() { @@ -445,24 +456,25 @@ impl StageOutIn { } } } - // Go to backoff } + // Activate backoff + self.backoff.atomic.active.store(true, Ordering::Relaxed); + // Do backoff - self.backoff.next(); - Pull::Backoff(self.backoff.retry_time) + Pull::Backoff(backoff) } } struct StageOutRefill { - n_ref_w: Sender<()>, + n_ref_w: Notifier, s_ref_w: RingBufferWriter, } impl StageOutRefill { fn refill(&mut self, batch: WBatch) { assert!(self.s_ref_w.push(batch).is_none()); - let _ = self.n_ref_w.try_send(()); + let _ = self.n_ref_w.notify(); } } @@ -501,8 +513,8 @@ pub(crate) struct TransmissionPipelineConf { pub(crate) batch: BatchConfig, pub(crate) queue_size: [usize; Priority::NUM], pub(crate) wait_before_drop: Duration, - pub(crate) batching: bool, - pub(crate) backoff: Duration, + pub(crate) batching_enabled: bool, + pub(crate) batching_time_limit: Duration, } // A 2-stage transmission pipeline @@ -525,7 +537,7 @@ impl TransmissionPipeline { // Create the channel for notifying that new batches are in the out ring buffer // This is a MPSC channel - let (n_out_w, n_out_r) = bounded(1); + let (n_out_w, n_out_r) = event::new(); for (prio, num) in size_iter.enumerate() { assert!(*num != 0 && *num <= RBLEN); @@ -540,29 +552,33 @@ impl TransmissionPipeline { } // Create the channel for notifying that new batches are in the refill ring buffer // This is a SPSC channel - let (n_ref_w, n_ref_r) = bounded(1); + let (n_ref_w, n_ref_r) = event::new(); // Create the refill ring buffer // This is a SPSC ring buffer let (s_out_w, s_out_r) = RingBuffer::::init(); let current = Arc::new(Mutex::new(None)); - let bytes = Arc::new(AtomicBatchSize::new(0)); - let backoff = Arc::new(AtomicBool::new(false)); + let bytes = Arc::new(AtomicBackoff { + active: CachePadded::new(AtomicBool::new(false)), + bytes: CachePadded::new(AtomicBatchSize::new(0)), + first_write: CachePadded::new(AtomicMicroSeconds::new( + LOCAL_EPOCH.elapsed().as_micros() as MicroSeconds, + )), + }); stage_in.push(Mutex::new(StageIn { s_ref: StageInRefill { n_ref_r, s_ref_r }, s_out: StageInOut { n_out_w: n_out_w.clone(), s_out_w, - bytes: bytes.clone(), - backoff: backoff.clone(), + atomic_backoff: bytes.clone(), }, mutex: StageInMutex { current: current.clone(), priority: priority[prio].clone(), }, fragbuf: ZBuf::empty(), - batching: config.batching, + batching: config.batching_enabled, })); // The stage out for this priority @@ -570,7 +586,7 @@ impl TransmissionPipeline { s_in: StageOutIn { s_out_r, current, - backoff: Backoff::new(config.backoff.as_nanos() as NanoSeconds, bytes, backoff), + backoff: Backoff::new(config.batching_time_limit, bytes), }, s_ref: StageOutRefill { n_ref_w, s_ref_w }, }); @@ -652,28 +668,23 @@ impl TransmissionPipelineProducer { pub(crate) struct TransmissionPipelineConsumer { // A single Mutex for all the priority queues stage_out: Box<[StageOut]>, - n_out_r: Receiver<()>, + n_out_r: Waiter, active: Arc, } impl TransmissionPipelineConsumer { pub(crate) async fn pull(&mut self) -> Option<(WBatch, usize)> { - // Reset backoff before pulling - for queue in self.stage_out.iter_mut() { - queue.s_in.backoff.reset(); - } - while self.active.load(Ordering::Relaxed) { + let mut backoff = MicroSeconds::MAX; // Calculate the backoff maximum - let mut bo = NanoSeconds::MAX; for (prio, queue) in self.stage_out.iter_mut().enumerate() { match queue.try_pull() { Pull::Some(batch) => { return Some((batch, prio)); } - Pull::Backoff(b) => { - if b < bo { - bo = b; + Pull::Backoff(deadline) => { + if deadline < backoff { + backoff = deadline; } } Pull::None => {} @@ -687,9 +698,11 @@ impl TransmissionPipelineConsumer { tokio::task::yield_now().await; // Wait for the backoff to expire or for a new message - let res = - tokio::time::timeout(Duration::from_nanos(bo as u64), self.n_out_r.recv_async()) - .await; + let res = tokio::time::timeout( + Duration::from_micros(backoff as u64), + self.n_out_r.wait_async(), + ) + .await; match res { Ok(Ok(())) => { // We have received a notification from the channel that some bytes are available, retry to pull. @@ -774,9 +787,9 @@ mod tests { is_compression: true, }, queue_size: [1; Priority::NUM], - batching: true, + batching_enabled: true, wait_before_drop: Duration::from_millis(1), - backoff: Duration::from_micros(1), + batching_time_limit: Duration::from_micros(1), }; const CONFIG_NOT_STREAMED: TransmissionPipelineConf = TransmissionPipelineConf { @@ -787,9 +800,9 @@ mod tests { is_compression: false, }, queue_size: [1; Priority::NUM], - batching: true, + batching_enabled: true, wait_before_drop: Duration::from_millis(1), - backoff: Duration::from_micros(1), + batching_time_limit: Duration::from_micros(1), }; #[tokio::test(flavor = "multi_thread", worker_threads = 4)] diff --git a/io/zenoh-transport/src/manager.rs b/io/zenoh-transport/src/manager.rs index 669744838f..305ccab574 100644 --- a/io/zenoh-transport/src/manager.rs +++ b/io/zenoh-transport/src/manager.rs @@ -130,10 +130,10 @@ pub struct TransportManagerBuilder { whatami: WhatAmI, resolution: Resolution, batch_size: BatchSize, - batching: bool, + batching_enabled: bool, + batching_time_limit: Duration, wait_before_drop: Duration, queue_size: QueueSizeConf, - queue_backoff: Duration, defrag_buff_size: usize, link_rx_buffer_size: usize, unicast: TransportManagerBuilderUnicast, @@ -172,8 +172,13 @@ impl TransportManagerBuilder { self } - pub fn batching(mut self, batching: bool) -> Self { - self.batching = batching; + pub fn batching_enabled(mut self, batching_enabled: bool) -> Self { + self.batching_enabled = batching_enabled; + self + } + + pub fn batching_time_limit(mut self, batching_time_limit: Duration) -> Self { + self.batching_time_limit = batching_time_limit; self } @@ -187,11 +192,6 @@ impl TransportManagerBuilder { self } - pub fn queue_backoff(mut self, queue_backoff: Duration) -> Self { - self.queue_backoff = queue_backoff; - self - } - pub fn defrag_buff_size(mut self, defrag_buff_size: usize) -> Self { self.defrag_buff_size = defrag_buff_size; self @@ -238,14 +238,16 @@ impl TransportManagerBuilder { resolution.set(Field::FrameSN, *link.tx().sequence_number_resolution()); self = self.resolution(resolution); self = self.batch_size(*link.tx().batch_size()); - self = self.batching(*link.tx().batching()); + self = self.batching_enabled(*link.tx().queue().batching().enabled()); + self = self.batching_time_limit(Duration::from_millis( + *link.tx().queue().batching().time_limit(), + )); self = self.defrag_buff_size(*link.rx().max_message_size()); self = self.link_rx_buffer_size(*link.rx().buffer_size()); self = self.wait_before_drop(Duration::from_micros( *link.tx().queue().congestion_control().wait_before_drop(), )); self = self.queue_size(link.tx().queue().size().clone()); - self = self.queue_backoff(Duration::from_nanos(*link.tx().queue().backoff())); self = self.tx_threads(*link.tx().threads()); self = self.protocols(link.protocols().clone()); @@ -301,10 +303,10 @@ impl TransportManagerBuilder { whatami: self.whatami, resolution: self.resolution, batch_size: self.batch_size, - batching: self.batching, + batching: self.batching_enabled, wait_before_drop: self.wait_before_drop, queue_size, - queue_backoff: self.queue_backoff, + queue_backoff: self.batching_time_limit, defrag_buff_size: self.defrag_buff_size, link_rx_buffer_size: self.link_rx_buffer_size, unicast: unicast.config, @@ -340,7 +342,7 @@ impl Default for TransportManagerBuilder { fn default() -> Self { let link_rx = LinkRxConf::default(); let queue = QueueConf::default(); - let backoff = *queue.backoff(); + let backoff = *queue.batching().time_limit(); let wait_before_drop = *queue.congestion_control().wait_before_drop(); Self { version: VERSION, @@ -348,10 +350,10 @@ impl Default for TransportManagerBuilder { whatami: zenoh_config::defaults::mode, resolution: Resolution::default(), batch_size: BatchSize::MAX, - batching: true, + batching_enabled: true, wait_before_drop: Duration::from_micros(wait_before_drop), queue_size: queue.size, - queue_backoff: Duration::from_nanos(backoff), + batching_time_limit: Duration::from_millis(backoff), defrag_buff_size: *link_rx.max_message_size(), link_rx_buffer_size: *link_rx.buffer_size(), endpoints: HashMap::new(), diff --git a/io/zenoh-transport/src/multicast/link.rs b/io/zenoh-transport/src/multicast/link.rs index 90999d32ce..d0d5ef4fb0 100644 --- a/io/zenoh-transport/src/multicast/link.rs +++ b/io/zenoh-transport/src/multicast/link.rs @@ -321,8 +321,8 @@ impl TransportLinkMulticastUniversal { batch: self.link.config.batch, queue_size: self.transport.manager.config.queue_size, wait_before_drop: self.transport.manager.config.wait_before_drop, - batching: self.transport.manager.config.batching, - backoff: self.transport.manager.config.queue_backoff, + batching_enabled: self.transport.manager.config.batching, + batching_time_limit: self.transport.manager.config.queue_backoff, }; // The pipeline let (producer, consumer) = TransmissionPipeline::make(tpc, &priority_tx); diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index cc3afc06e5..fff842c255 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -62,8 +62,8 @@ impl TransportLinkUnicastUniversal { }, queue_size: transport.manager.config.queue_size, wait_before_drop: transport.manager.config.wait_before_drop, - batching: transport.manager.config.batching, - backoff: transport.manager.config.queue_backoff, + batching_enabled: transport.manager.config.batching, + batching_time_limit: transport.manager.config.queue_backoff, }; // The pipeline