Skip to content

Commit

Permalink
Improve pipeline jitter (#1335)
Browse files Browse the repository at this point in the history
* Rework of pipeline backoff

* Cargo fmt

* Fic backoff calculation

* Fix backoff calculation

* Fix lint

* Add event tests

* Improve event tests

* Update event API

* Improve event tests

* Precommit

* Fix event wait_timeout and wait_deadline impls

* Add event_deadline tests

* Pre-commit

* Update batching config

* Fix typos

* Address review comments
  • Loading branch information
Mallets authored Aug 28, 2024
1 parent 6dea3bf commit 7f7d648
Show file tree
Hide file tree
Showing 16 changed files with 803 additions and 152 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ console-subscriber = "0.3.0"
const_format = "0.2.30"
crc = "3.0.1"
criterion = "0.5"
crossbeam-utils = "0.8.2"
derive_more = "0.99.17"
derive-new = "0.6.0"
tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] }
Expand Down
15 changes: 10 additions & 5 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,6 @@
/// Therefore, the maximum batch size is 2^16-1 (i.e. 65535).
/// The default batch size value is the maximum batch size: 65535.
batch_size: 65535,
/// Perform batching of messages if they are smaller of the batch_size
batching: true,
/// Each zenoh link has a transmission queue that can be configured
queue: {
/// The size of each priority queue indicates the number of batches a given queue can contain.
Expand All @@ -380,9 +378,16 @@
/// The maximum time in microseconds to wait for an available batch before dropping the message if still no batch is available.
wait_before_drop: 1000,
},
/// The initial exponential backoff time in nanoseconds to allow the batching to eventually progress.
/// Higher values lead to a more aggressive batching but it will introduce additional latency.
backoff: 100,
/// Perform batching of messages if they are smaller of the batch_size
batching: {
/// Perform adaptive batching of messages if they are smaller of the batch_size.
/// When the network is detected to not be fast enough to transmit every message individually, many small messages may be
/// batched together and sent all at once on the wire reducing the overall network overhead. This is typically of a high-throughput
/// scenario mainly composed of small messages. In other words, batching is activated by the network back-pressure.
enabled: true,
/// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens.
time_limit: 1,
}
},
},
/// Configure the zenoh RX parameters of a link
Expand Down
20 changes: 9 additions & 11 deletions commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,17 +191,6 @@ impl Default for LinkTxConf {
batch_size: BatchSize::MAX,
queue: QueueConf::default(),
threads: num,
batching: true,
}
}
}

impl Default for QueueConf {
fn default() -> Self {
Self {
size: QueueSizeConf::default(),
congestion_control: CongestionControlConf::default(),
backoff: 100,
}
}
}
Expand Down Expand Up @@ -234,6 +223,15 @@ impl Default for CongestionControlConf {
}
}

impl Default for BatchingConf {
fn default() -> Self {
BatchingConf {
enabled: true,
time_limit: 1,
}
}
}

impl Default for LinkRxConf {
fn default() -> Self {
Self {
Expand Down
17 changes: 11 additions & 6 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,9 +407,8 @@ validated_struct::validator! {
keep_alive: usize,
/// Zenoh's MTU equivalent (default: 2^16-1) (max: 2^16-1)
batch_size: BatchSize,
/// Perform batching of messages if they are smaller of the batch_size
batching: bool,
pub queue: QueueConf {
pub queue: #[derive(Default)]
QueueConf {
/// The size of each priority queue indicates the number of batches a given queue can contain.
/// The amount of memory being allocated for each queue is then SIZE_XXX * BATCH_SIZE.
/// In the case of the transport link MTU being smaller than the ZN_BATCH_SIZE,
Expand All @@ -432,9 +431,15 @@ validated_struct::validator! {
/// The maximum time in microseconds to wait for an available batch before dropping the message if still no batch is available.
pub wait_before_drop: u64,
},
/// The initial exponential backoff time in nanoseconds to allow the batching to eventually progress.
/// Higher values lead to a more aggressive batching but it will introduce additional latency.
backoff: u64,
pub batching: BatchingConf {
/// Perform adaptive batching of messages if they are smaller of the batch_size.
/// When the network is detected to not be fast enough to transmit every message individually, many small messages may be
/// batched together and sent all at once on the wire reducing the overall network overhead. This is typically of a high-throughput
/// scenario mainly composed of small messages. In other words, batching is activated by the network back-pressure.
enabled: bool,
/// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens.
time_limit: u64,
},
},
// Number of threads used for TX
threads: usize,
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-shm/src/header/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use super::{
segment::HeaderSegment,
};

#[dynamic(lazy,drop)]
#[dynamic(lazy, drop)]
pub static mut GLOBAL_HEADER_STORAGE: HeaderStorage = HeaderStorage::new(32768usize).unwrap();

pub struct HeaderStorage {
Expand Down
5 changes: 2 additions & 3 deletions commons/zenoh-shm/src/header/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ use super::{
segment::HeaderSegment,
};

#[dynamic(lazy,drop)]
pub static mut GLOBAL_HEADER_SUBSCRIPTION: Subscription = Subscription::new();

#[dynamic(lazy, drop)]
pub static mut GLOBAL_HEADER_SUBSCRIPTION: Subscription = Subscription::new();

pub struct Subscription {
linked_table: Mutex<BTreeMap<HeaderSegmentID, Arc<HeaderSegment>>>,
Expand Down
5 changes: 2 additions & 3 deletions commons/zenoh-shm/src/watchdog/confirmator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ use super::{
segment::Segment,
};

#[dynamic(lazy,drop)]
#[dynamic(lazy, drop)]
pub static mut GLOBAL_CONFIRMATOR: WatchdogConfirmator =
WatchdogConfirmator::new(Duration::from_millis(50));

WatchdogConfirmator::new(Duration::from_millis(50));

pub struct ConfirmedDescriptor {
pub owned: OwnedDescriptor,
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-shm/src/watchdog/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use zenoh_result::{zerror, ZResult};

use super::{allocated_watchdog::AllocatedWatchdog, descriptor::OwnedDescriptor, segment::Segment};

#[dynamic(lazy,drop)]
#[dynamic(lazy, drop)]
pub static mut GLOBAL_STORAGE: WatchdogStorage = WatchdogStorage::new(32768usize).unwrap();

pub struct WatchdogStorage {
Expand Down
Loading

0 comments on commit 7f7d648

Please sign in to comment.