From dfde2cc7457e65e40ee3305f74e1a0ec170df536 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Thu, 21 Nov 2024 17:05:29 +0300 Subject: [PATCH 1/6] implement non-linear wait before drop --- io/zenoh-transport/src/common/pipeline.rs | 62 ++++++++++++++++++----- 1 file changed, 48 insertions(+), 14 deletions(-) diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 054a5e0a16..b69e5d87bd 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -128,6 +128,32 @@ impl StageInMutex { } } +struct WaitTime { + wait_time: Duration, + ttl: usize, +} + +impl WaitTime { + fn new(wait_time: Duration, ttl: usize) -> Self { + Self { wait_time, ttl } + } + + fn wait_time(&mut self) -> Duration { + let result = self.wait_time; + match self.ttl.checked_sub(1) { + Some(new_ttl) => { + self.ttl = new_ttl; + self.wait_time = result * 2; + } + None => { + self.wait_time = Duration::ZERO; + } + } + result + } +} + +#[derive(Clone)] enum DeadlineSetting { Immediate, Infinite, @@ -136,11 +162,11 @@ enum DeadlineSetting { struct LazyDeadline { deadline: Option, - wait_time: Option, + wait_time: Option, } impl LazyDeadline { - fn new(wait_time: Option) -> Self { + fn new(wait_time: Option) -> Self { Self { deadline: None, wait_time, @@ -148,25 +174,31 @@ impl LazyDeadline { } fn advance(&mut self) { - let wait_time = self.wait_time; - match &mut self.deadline() { + match self.deadline().to_owned() { DeadlineSetting::Immediate => {} DeadlineSetting::Infinite => {} - DeadlineSetting::Finite(instant) => { - *instant = instant.add(unsafe { wait_time.unwrap_unchecked() }); + DeadlineSetting::Finite(mut instant) => { + // SAFETY: this is safe because DeadlineSetting::Finite is returned by + // deadline() only if wait_time is Some(_) + instant = + instant.add(unsafe { self.wait_time.as_mut().unwrap_unchecked().wait_time() }); + self.deadline = Some(DeadlineSetting::Finite(instant)); } } } #[inline] fn deadline(&mut self) -> &mut DeadlineSetting { - self.deadline.get_or_insert_with(|| match self.wait_time { - Some(wait_time) => match wait_time.is_zero() { - true => DeadlineSetting::Immediate, - false => DeadlineSetting::Finite(Instant::now().add(wait_time)), - }, - None => DeadlineSetting::Infinite, - }) + self.deadline + .get_or_insert_with(|| match self.wait_time.as_mut() { + Some(wait_time) => match wait_time.wait_time() { + Duration::ZERO => DeadlineSetting::Immediate, + nonzero_wait_time => { + DeadlineSetting::Finite(Instant::now().add(nonzero_wait_time)) + } + }, + None => DeadlineSetting::Infinite, + }) } } @@ -177,7 +209,9 @@ struct Deadline { impl Deadline { fn new(wait_time: Option) -> Self { Self { - lazy_deadline: LazyDeadline::new(wait_time), + lazy_deadline: LazyDeadline::new( + wait_time.map(|wait_time| WaitTime::new(wait_time, 10)), + ), } } From 1ca7057478a76e5466407e07bf2616cd2bfb34da Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Fri, 22 Nov 2024 13:55:45 +0300 Subject: [PATCH 2/6] Add Wait TTL to the config --- DEFAULT_CONFIG.json5 | 14 +++++++ commons/zenoh-config/src/defaults.rs | 2 + commons/zenoh-config/src/lib.rs | 14 +++++++ io/zenoh-transport/src/common/pipeline.rs | 46 +++++++++++----------- io/zenoh-transport/src/manager.rs | 48 +++++++++++------------ 5 files changed, 77 insertions(+), 47 deletions(-) diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 7a3e614635..c7bc9a7ed6 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -412,12 +412,26 @@ drop: { /// The maximum time in microseconds to wait for an available batch before dropping a droppable message if still no batch is available. wait_before_drop: 1000, + /// The maximum deadline limit for multi-fragment messages. + /// When sending multi-fragment message, for each consecutive fragment the deadline from + /// the start point of message transmission is calculated as + /// wait_before_drop * 2 ^ min(N, ttl) where N is a fragment number. + /// Thus, this parameter allows setting a limitation for maximum time for multi-fragment + /// message transmission as wait_before_drop * 2 ^ ttl + ttl: 8, }, /// Behavior pushing CongestionControl::Block messages to the queue. block: { /// The maximum time in microseconds to wait for an available batch before closing the transport session when sending a blocking message /// if still no batch is available. wait_before_close: 5000000, + /// The maximum deadline limit for multi-fragment messages. + /// When sending multi-fragment message, for each consecutive fragment the deadline from + /// the start point of message transmission is calculated as + /// wait_before_close * 2 ^ min(N, ttl) where N is a fragment number. + /// Thus, this parameter allows setting a limitation for maximum time for multi-fragment + /// message transmission as wait_before_close * 2 ^ ttl + ttl: 2, }, }, /// Perform batching of messages if they are smaller of the batch_size diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index 78fe112916..d32b56d8d6 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -253,6 +253,7 @@ impl Default for CongestionControlDropConf { fn default() -> Self { Self { wait_before_drop: 1000, + ttl: 8, } } } @@ -261,6 +262,7 @@ impl Default for CongestionControlBlockConf { fn default() -> Self { Self { wait_before_close: 5000000, + ttl: 2, } } } diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index d25ccc63c3..52aea9d1af 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -445,12 +445,26 @@ validated_struct::validator! { /// The maximum time in microseconds to wait for an available batch before dropping a droppable message /// if still no batch is available. wait_before_drop: i64, + /// The maximum deadline limit for multi-fragment messages. + /// When sending multi-fragment message, for each consecutive fragment the deadline from + /// the start point of message transmission is calculated as + /// wait_before_drop * 2 ^ min(N, ttl) where N is a fragment number. + /// Thus, this parameter allows setting a limitation for maximum time for multi-fragment + /// message transmission as wait_before_drop * 2 ^ ttl + ttl: usize, }, /// Behavior pushing CongestionControl::Block messages to the queue. pub block: CongestionControlBlockConf { /// The maximum time in microseconds to wait for an available batch before closing the transport session /// when sending a blocking message if still no batch is available. wait_before_close: i64, + /// The maximum deadline limit for multi-fragment messages. + /// When sending multi-fragment message, for each consecutive fragment the deadline from + /// the start point of message transmission is calculated as + /// wait_before_close * 2 ^ min(N, ttl) where N is a fragment number. + /// Thus, this parameter allows setting a limitation for maximum time for multi-fragment + /// message transmission as wait_before_close * 2 ^ ttl + ttl: usize, }, }, pub batching: BatchingConf { diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index b69e5d87bd..fefe298a87 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -138,18 +138,15 @@ impl WaitTime { Self { wait_time, ttl } } - fn wait_time(&mut self) -> Duration { - let result = self.wait_time; - match self.ttl.checked_sub(1) { - Some(new_ttl) => { - self.ttl = new_ttl; - self.wait_time = result * 2; - } - None => { - self.wait_time = Duration::ZERO; - } + fn advance(&mut self) { + if let Some(new_ttl) = self.ttl.checked_sub(1) { + self.ttl = new_ttl; + self.wait_time *= 2; } - result + } + + fn wait_time(&self) -> Duration { + self.wait_time } } @@ -180,8 +177,11 @@ impl LazyDeadline { DeadlineSetting::Finite(mut instant) => { // SAFETY: this is safe because DeadlineSetting::Finite is returned by // deadline() only if wait_time is Some(_) - instant = - instant.add(unsafe { self.wait_time.as_mut().unwrap_unchecked().wait_time() }); + let wait_time = unsafe { self.wait_time.as_mut().unwrap_unchecked() }; + + instant = instant.add(wait_time.wait_time()); + wait_time.advance(); + self.deadline = Some(DeadlineSetting::Finite(instant)); } } @@ -207,10 +207,10 @@ struct Deadline { } impl Deadline { - fn new(wait_time: Option) -> Self { + fn new(wait_time: Option<(Duration, usize)>) -> Self { Self { lazy_deadline: LazyDeadline::new( - wait_time.map(|wait_time| WaitTime::new(wait_time, 10)), + wait_time.map(|(wait_time, ttl)| WaitTime::new(wait_time, ttl)), ), } } @@ -607,8 +607,8 @@ impl StageOut { pub(crate) struct TransmissionPipelineConf { pub(crate) batch: BatchConfig, pub(crate) queue_size: [usize; Priority::NUM], - pub(crate) wait_before_drop: Duration, - pub(crate) wait_before_close: Duration, + pub(crate) wait_before_drop: (Duration, usize), + pub(crate) wait_before_close: (Duration, usize), pub(crate) batching_enabled: bool, pub(crate) batching_time_limit: Duration, } @@ -710,8 +710,8 @@ pub(crate) struct TransmissionPipelineProducer { // Each priority queue has its own Mutex stage_in: Arc<[Mutex]>, active: Arc, - wait_before_drop: Duration, - wait_before_close: Duration, + wait_before_drop: (Duration, usize), + wait_before_close: (Duration, usize), } impl TransmissionPipelineProducer { @@ -886,8 +886,8 @@ mod tests { }, queue_size: [1; Priority::NUM], batching_enabled: true, - wait_before_drop: Duration::from_millis(1), - wait_before_close: Duration::from_secs(5), + wait_before_drop: (Duration::from_millis(1), 10), + wait_before_close: (Duration::from_secs(5), 10), batching_time_limit: Duration::from_micros(1), }; @@ -900,8 +900,8 @@ mod tests { }, queue_size: [1; Priority::NUM], batching_enabled: true, - wait_before_drop: Duration::from_millis(1), - wait_before_close: Duration::from_secs(5), + wait_before_drop: (Duration::from_millis(1), 10), + wait_before_close: (Duration::from_secs(5), 10), batching_time_limit: Duration::from_micros(1), }; diff --git a/io/zenoh-transport/src/manager.rs b/io/zenoh-transport/src/manager.rs index 3045e87453..7aa4326a1e 100644 --- a/io/zenoh-transport/src/manager.rs +++ b/io/zenoh-transport/src/manager.rs @@ -109,8 +109,8 @@ pub struct TransportManagerConfig { pub resolution: Resolution, pub batch_size: BatchSize, pub batching: bool, - pub wait_before_drop: Duration, - pub wait_before_close: Duration, + pub wait_before_drop: (Duration, usize), + pub wait_before_close: (Duration, usize), pub queue_size: [usize; Priority::NUM], pub queue_backoff: Duration, pub defrag_buff_size: usize, @@ -141,8 +141,8 @@ pub struct TransportManagerBuilder { batch_size: BatchSize, batching_enabled: bool, batching_time_limit: Duration, - wait_before_drop: Duration, - wait_before_close: Duration, + wait_before_drop: (Duration, usize), + wait_before_close: (Duration, usize), queue_size: QueueSizeConf, defrag_buff_size: usize, link_rx_buffer_size: usize, @@ -192,12 +192,12 @@ impl TransportManagerBuilder { self } - pub fn wait_before_drop(mut self, wait_before_drop: Duration) -> Self { + pub fn wait_before_drop(mut self, wait_before_drop: (Duration, usize)) -> Self { self.wait_before_drop = wait_before_drop; self } - pub fn wait_before_close(mut self, wait_before_close: Duration) -> Self { + pub fn wait_before_close(mut self, wait_before_close: (Duration, usize)) -> Self { self.wait_before_close = wait_before_close; self } @@ -249,6 +249,8 @@ impl TransportManagerBuilder { } let link = config.transport().link(); + let cc_drop = link.tx().queue().congestion_control().drop(); + let cc_block = link.tx().queue().congestion_control().block(); let mut resolution = Resolution::default(); resolution.set(Field::FrameSN, *link.tx().sequence_number_resolution()); self = self.resolution(resolution); @@ -259,21 +261,13 @@ impl TransportManagerBuilder { )); self = self.defrag_buff_size(*link.rx().max_message_size()); self = self.link_rx_buffer_size(*link.rx().buffer_size()); - self = self.wait_before_drop(duration_from_i64us( - *link - .tx() - .queue() - .congestion_control() - .drop() - .wait_before_drop(), + self = self.wait_before_drop(( + duration_from_i64us(*cc_drop.wait_before_drop()), + *cc_drop.ttl(), )); - self = self.wait_before_close(duration_from_i64us( - *link - .tx() - .queue() - .congestion_control() - .block() - .wait_before_close(), + self = self.wait_before_close(( + duration_from_i64us(*cc_block.wait_before_close()), + *cc_block.ttl(), )); self = self.queue_size(link.tx().queue().size().clone()); self = self.tx_threads(*link.tx().threads()); @@ -372,8 +366,8 @@ impl Default for TransportManagerBuilder { let link_rx = LinkRxConf::default(); let queue = QueueConf::default(); let backoff = *queue.batching().time_limit(); - let wait_before_drop = *queue.congestion_control().drop().wait_before_drop(); - let wait_before_close = *queue.congestion_control().block().wait_before_close(); + let cc_drop = queue.congestion_control().drop(); + let cc_block = queue.congestion_control().block(); Self { version: VERSION, zid: ZenohIdProto::rand(), @@ -381,8 +375,14 @@ impl Default for TransportManagerBuilder { resolution: Resolution::default(), batch_size: BatchSize::MAX, batching_enabled: true, - wait_before_drop: duration_from_i64us(wait_before_drop), - wait_before_close: duration_from_i64us(wait_before_close), + wait_before_drop: ( + duration_from_i64us(*cc_drop.wait_before_drop()), + *cc_drop.ttl(), + ), + wait_before_close: ( + duration_from_i64us(*cc_block.wait_before_close()), + *cc_block.ttl(), + ), queue_size: queue.size, batching_time_limit: Duration::from_millis(backoff), defrag_buff_size: *link_rx.max_message_size(), From d9068139d614c412c38590a85545d167bdc997ad Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Fri, 22 Nov 2024 14:06:36 +0300 Subject: [PATCH 3/6] fix deadline increase behavior --- io/zenoh-transport/src/common/pipeline.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index fefe298a87..2c22ec41f1 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -138,8 +138,9 @@ impl WaitTime { Self { wait_time, ttl } } - fn advance(&mut self) { + fn advance(&mut self, instant: &mut Instant) { if let Some(new_ttl) = self.ttl.checked_sub(1) { + *instant += self.wait_time; self.ttl = new_ttl; self.wait_time *= 2; } @@ -178,9 +179,7 @@ impl LazyDeadline { // SAFETY: this is safe because DeadlineSetting::Finite is returned by // deadline() only if wait_time is Some(_) let wait_time = unsafe { self.wait_time.as_mut().unwrap_unchecked() }; - - instant = instant.add(wait_time.wait_time()); - wait_time.advance(); + wait_time.advance(&mut instant); self.deadline = Some(DeadlineSetting::Finite(instant)); } From 07276e63e6dbf1b4bd086d7c0c2b5ec762938af2 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Wed, 27 Nov 2024 17:12:06 +0300 Subject: [PATCH 4/6] ttl -> max_wait_before_*_fragments --- DEFAULT_CONFIG.json5 | 14 ++-------- commons/zenoh-config/src/defaults.rs | 4 +-- commons/zenoh-config/src/lib.rs | 14 ++-------- io/zenoh-transport/src/common/pipeline.rs | 34 +++++++++++++---------- io/zenoh-transport/src/manager.rs | 20 ++++++------- 5 files changed, 35 insertions(+), 51 deletions(-) diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index c7bc9a7ed6..fd94c30eab 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -413,12 +413,7 @@ /// The maximum time in microseconds to wait for an available batch before dropping a droppable message if still no batch is available. wait_before_drop: 1000, /// The maximum deadline limit for multi-fragment messages. - /// When sending multi-fragment message, for each consecutive fragment the deadline from - /// the start point of message transmission is calculated as - /// wait_before_drop * 2 ^ min(N, ttl) where N is a fragment number. - /// Thus, this parameter allows setting a limitation for maximum time for multi-fragment - /// message transmission as wait_before_drop * 2 ^ ttl - ttl: 8, + max_wait_before_drop_fragments: 255000, }, /// Behavior pushing CongestionControl::Block messages to the queue. block: { @@ -426,12 +421,7 @@ /// if still no batch is available. wait_before_close: 5000000, /// The maximum deadline limit for multi-fragment messages. - /// When sending multi-fragment message, for each consecutive fragment the deadline from - /// the start point of message transmission is calculated as - /// wait_before_close * 2 ^ min(N, ttl) where N is a fragment number. - /// Thus, this parameter allows setting a limitation for maximum time for multi-fragment - /// message transmission as wait_before_close * 2 ^ ttl - ttl: 2, + max_wait_before_close_fragments: 15000000, }, }, /// Perform batching of messages if they are smaller of the batch_size diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index d32b56d8d6..9e7cc2baa1 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -253,7 +253,7 @@ impl Default for CongestionControlDropConf { fn default() -> Self { Self { wait_before_drop: 1000, - ttl: 8, + max_wait_before_drop_fragments: 255000, } } } @@ -262,7 +262,7 @@ impl Default for CongestionControlBlockConf { fn default() -> Self { Self { wait_before_close: 5000000, - ttl: 2, + max_wait_before_close_fragments: 15000000, } } } diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 52aea9d1af..978b3bc2e1 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -446,12 +446,7 @@ validated_struct::validator! { /// if still no batch is available. wait_before_drop: i64, /// The maximum deadline limit for multi-fragment messages. - /// When sending multi-fragment message, for each consecutive fragment the deadline from - /// the start point of message transmission is calculated as - /// wait_before_drop * 2 ^ min(N, ttl) where N is a fragment number. - /// Thus, this parameter allows setting a limitation for maximum time for multi-fragment - /// message transmission as wait_before_drop * 2 ^ ttl - ttl: usize, + max_wait_before_drop_fragments: i64, }, /// Behavior pushing CongestionControl::Block messages to the queue. pub block: CongestionControlBlockConf { @@ -459,12 +454,7 @@ validated_struct::validator! { /// when sending a blocking message if still no batch is available. wait_before_close: i64, /// The maximum deadline limit for multi-fragment messages. - /// When sending multi-fragment message, for each consecutive fragment the deadline from - /// the start point of message transmission is calculated as - /// wait_before_close * 2 ^ min(N, ttl) where N is a fragment number. - /// Thus, this parameter allows setting a limitation for maximum time for multi-fragment - /// message transmission as wait_before_close * 2 ^ ttl - ttl: usize, + max_wait_before_close_fragments: i64, }, }, pub batching: BatchingConf { diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index dc71dfabac..2465cc218e 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -128,20 +128,24 @@ impl StageInMutex { } } +#[derive(Debug)] struct WaitTime { wait_time: Duration, - ttl: usize, + max_wait_time: Duration, } impl WaitTime { - fn new(wait_time: Duration, ttl: usize) -> Self { - Self { wait_time, ttl } + fn new(wait_time: Duration, max_wait_time: Duration) -> Self { + Self { + wait_time, + max_wait_time, + } } fn advance(&mut self, instant: &mut Instant) { - if let Some(new_ttl) = self.ttl.checked_sub(1) { + if let Some(new_max_wait_time) = self.max_wait_time.checked_sub(self.wait_time) { *instant += self.wait_time; - self.ttl = new_ttl; + self.max_wait_time = new_max_wait_time; self.wait_time *= 2; } } @@ -206,10 +210,10 @@ struct Deadline { } impl Deadline { - fn new(wait_time: Option<(Duration, usize)>) -> Self { + fn new(wait_time: Option<(Duration, Duration)>) -> Self { Self { lazy_deadline: LazyDeadline::new( - wait_time.map(|(wait_time, ttl)| WaitTime::new(wait_time, ttl)), + wait_time.map(|(wait_time, max_wait_time)| WaitTime::new(wait_time, max_wait_time)), ), } } @@ -610,8 +614,8 @@ impl StageOut { pub(crate) struct TransmissionPipelineConf { pub(crate) batch: BatchConfig, pub(crate) queue_size: [usize; Priority::NUM], - pub(crate) wait_before_drop: (Duration, usize), - pub(crate) wait_before_close: (Duration, usize), + pub(crate) wait_before_drop: (Duration, Duration), + pub(crate) wait_before_close: (Duration, Duration), pub(crate) batching_enabled: bool, pub(crate) batching_time_limit: Duration, } @@ -713,8 +717,8 @@ pub(crate) struct TransmissionPipelineProducer { // Each priority queue has its own Mutex stage_in: Arc<[Mutex]>, active: Arc, - wait_before_drop: (Duration, usize), - wait_before_close: (Duration, usize), + wait_before_drop: (Duration, Duration), + wait_before_close: (Duration, Duration), } impl TransmissionPipelineProducer { @@ -889,8 +893,8 @@ mod tests { }, queue_size: [1; Priority::NUM], batching_enabled: true, - wait_before_drop: (Duration::from_millis(1), 10), - wait_before_close: (Duration::from_secs(5), 10), + wait_before_drop: (Duration::from_millis(1), Duration::from_millis(1024)), + wait_before_close: (Duration::from_secs(5), Duration::from_millis(1024)), batching_time_limit: Duration::from_micros(1), }; @@ -903,8 +907,8 @@ mod tests { }, queue_size: [1; Priority::NUM], batching_enabled: true, - wait_before_drop: (Duration::from_millis(1), 10), - wait_before_close: (Duration::from_secs(5), 10), + wait_before_drop: (Duration::from_millis(1), Duration::from_millis(1024)), + wait_before_close: (Duration::from_secs(5), Duration::from_millis(1024)), batching_time_limit: Duration::from_micros(1), }; diff --git a/io/zenoh-transport/src/manager.rs b/io/zenoh-transport/src/manager.rs index 7aa4326a1e..068bea1234 100644 --- a/io/zenoh-transport/src/manager.rs +++ b/io/zenoh-transport/src/manager.rs @@ -109,8 +109,8 @@ pub struct TransportManagerConfig { pub resolution: Resolution, pub batch_size: BatchSize, pub batching: bool, - pub wait_before_drop: (Duration, usize), - pub wait_before_close: (Duration, usize), + pub wait_before_drop: (Duration, Duration), + pub wait_before_close: (Duration, Duration), pub queue_size: [usize; Priority::NUM], pub queue_backoff: Duration, pub defrag_buff_size: usize, @@ -141,8 +141,8 @@ pub struct TransportManagerBuilder { batch_size: BatchSize, batching_enabled: bool, batching_time_limit: Duration, - wait_before_drop: (Duration, usize), - wait_before_close: (Duration, usize), + wait_before_drop: (Duration, Duration), + wait_before_close: (Duration, Duration), queue_size: QueueSizeConf, defrag_buff_size: usize, link_rx_buffer_size: usize, @@ -192,12 +192,12 @@ impl TransportManagerBuilder { self } - pub fn wait_before_drop(mut self, wait_before_drop: (Duration, usize)) -> Self { + pub fn wait_before_drop(mut self, wait_before_drop: (Duration, Duration)) -> Self { self.wait_before_drop = wait_before_drop; self } - pub fn wait_before_close(mut self, wait_before_close: (Duration, usize)) -> Self { + pub fn wait_before_close(mut self, wait_before_close: (Duration, Duration)) -> Self { self.wait_before_close = wait_before_close; self } @@ -263,11 +263,11 @@ impl TransportManagerBuilder { self = self.link_rx_buffer_size(*link.rx().buffer_size()); self = self.wait_before_drop(( duration_from_i64us(*cc_drop.wait_before_drop()), - *cc_drop.ttl(), + duration_from_i64us(*cc_drop.max_wait_before_drop_fragments()), )); self = self.wait_before_close(( duration_from_i64us(*cc_block.wait_before_close()), - *cc_block.ttl(), + duration_from_i64us(*cc_block.max_wait_before_close_fragments()), )); self = self.queue_size(link.tx().queue().size().clone()); self = self.tx_threads(*link.tx().threads()); @@ -377,11 +377,11 @@ impl Default for TransportManagerBuilder { batching_enabled: true, wait_before_drop: ( duration_from_i64us(*cc_drop.wait_before_drop()), - *cc_drop.ttl(), + duration_from_i64us(*cc_drop.max_wait_before_drop_fragments()), ), wait_before_close: ( duration_from_i64us(*cc_block.wait_before_close()), - *cc_block.ttl(), + duration_from_i64us(*cc_block.max_wait_before_close_fragments()), ), queue_size: queue.size, batching_time_limit: Duration::from_millis(backoff), From 8e24a889a5fe06363962b5957b172335e776e65b Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Wed, 27 Nov 2024 17:52:02 +0300 Subject: [PATCH 5/6] Change default max_wait_before_drop_fragments to 50 milliseconds --- DEFAULT_CONFIG.json5 | 2 +- commons/zenoh-config/src/defaults.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index fd94c30eab..68ccb64d7a 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -413,7 +413,7 @@ /// The maximum time in microseconds to wait for an available batch before dropping a droppable message if still no batch is available. wait_before_drop: 1000, /// The maximum deadline limit for multi-fragment messages. - max_wait_before_drop_fragments: 255000, + max_wait_before_drop_fragments: 50000, }, /// Behavior pushing CongestionControl::Block messages to the queue. block: { diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index 9e7cc2baa1..c7a565ccde 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -253,7 +253,7 @@ impl Default for CongestionControlDropConf { fn default() -> Self { Self { wait_before_drop: 1000, - max_wait_before_drop_fragments: 255000, + max_wait_before_drop_fragments: 50000, } } } From 70ff070fad5e1b7266c868b0e6d17933e2388cb3 Mon Sep 17 00:00:00 2001 From: yellowhatter Date: Thu, 28 Nov 2024 11:29:32 +0300 Subject: [PATCH 6/6] get rid of non-linear wait functionality for CC::Block --- DEFAULT_CONFIG.json5 | 2 - commons/zenoh-config/src/defaults.rs | 1 - commons/zenoh-config/src/lib.rs | 2 - io/zenoh-transport/src/common/pipeline.rs | 65 ++++++++++------------- io/zenoh-transport/src/manager.rs | 16 ++---- 5 files changed, 34 insertions(+), 52 deletions(-) diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 68ccb64d7a..8d140ee9ba 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -420,8 +420,6 @@ /// The maximum time in microseconds to wait for an available batch before closing the transport session when sending a blocking message /// if still no batch is available. wait_before_close: 5000000, - /// The maximum deadline limit for multi-fragment messages. - max_wait_before_close_fragments: 15000000, }, }, /// Perform batching of messages if they are smaller of the batch_size diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index c7a565ccde..edf2c46c7f 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -262,7 +262,6 @@ impl Default for CongestionControlBlockConf { fn default() -> Self { Self { wait_before_close: 5000000, - max_wait_before_close_fragments: 15000000, } } } diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 978b3bc2e1..51c851b3ab 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -453,8 +453,6 @@ validated_struct::validator! { /// The maximum time in microseconds to wait for an available batch before closing the transport session /// when sending a blocking message if still no batch is available. wait_before_close: i64, - /// The maximum deadline limit for multi-fragment messages. - max_wait_before_close_fragments: i64, }, }, pub batching: BatchingConf { diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 2465cc218e..0dbded9209 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -131,11 +131,11 @@ impl StageInMutex { #[derive(Debug)] struct WaitTime { wait_time: Duration, - max_wait_time: Duration, + max_wait_time: Option, } impl WaitTime { - fn new(wait_time: Duration, max_wait_time: Duration) -> Self { + fn new(wait_time: Duration, max_wait_time: Option) -> Self { Self { wait_time, max_wait_time, @@ -143,10 +143,17 @@ impl WaitTime { } fn advance(&mut self, instant: &mut Instant) { - if let Some(new_max_wait_time) = self.max_wait_time.checked_sub(self.wait_time) { - *instant += self.wait_time; - self.max_wait_time = new_max_wait_time; - self.wait_time *= 2; + match &mut self.max_wait_time { + Some(max_wait_time) => { + if let Some(new_max_wait_time) = max_wait_time.checked_sub(self.wait_time) { + *instant += self.wait_time; + *max_wait_time = new_max_wait_time; + self.wait_time *= 2; + } + } + None => { + *instant += self.wait_time; + } } } @@ -158,17 +165,16 @@ impl WaitTime { #[derive(Clone)] enum DeadlineSetting { Immediate, - Infinite, Finite(Instant), } struct LazyDeadline { deadline: Option, - wait_time: Option, + wait_time: WaitTime, } impl LazyDeadline { - fn new(wait_time: Option) -> Self { + fn new(wait_time: WaitTime) -> Self { Self { deadline: None, wait_time, @@ -178,13 +184,8 @@ impl LazyDeadline { fn advance(&mut self) { match self.deadline().to_owned() { DeadlineSetting::Immediate => {} - DeadlineSetting::Infinite => {} DeadlineSetting::Finite(mut instant) => { - // SAFETY: this is safe because DeadlineSetting::Finite is returned by - // deadline() only if wait_time is Some(_) - let wait_time = unsafe { self.wait_time.as_mut().unwrap_unchecked() }; - wait_time.advance(&mut instant); - + self.wait_time.advance(&mut instant); self.deadline = Some(DeadlineSetting::Finite(instant)); } } @@ -193,14 +194,9 @@ impl LazyDeadline { #[inline] fn deadline(&mut self) -> &mut DeadlineSetting { self.deadline - .get_or_insert_with(|| match self.wait_time.as_mut() { - Some(wait_time) => match wait_time.wait_time() { - Duration::ZERO => DeadlineSetting::Immediate, - nonzero_wait_time => { - DeadlineSetting::Finite(Instant::now().add(nonzero_wait_time)) - } - }, - None => DeadlineSetting::Infinite, + .get_or_insert_with(|| match self.wait_time.wait_time() { + Duration::ZERO => DeadlineSetting::Immediate, + nonzero_wait_time => DeadlineSetting::Finite(Instant::now().add(nonzero_wait_time)), }) } } @@ -210,11 +206,9 @@ struct Deadline { } impl Deadline { - fn new(wait_time: Option<(Duration, Duration)>) -> Self { + fn new(wait_time: Duration, max_wait_time: Option) -> Self { Self { - lazy_deadline: LazyDeadline::new( - wait_time.map(|(wait_time, max_wait_time)| WaitTime::new(wait_time, max_wait_time)), - ), + lazy_deadline: LazyDeadline::new(WaitTime::new(wait_time, max_wait_time)), } } @@ -222,7 +216,6 @@ impl Deadline { fn wait(&mut self, s_ref: &StageInRefill) -> bool { match self.lazy_deadline.deadline() { DeadlineSetting::Immediate => false, - DeadlineSetting::Infinite => s_ref.wait(), DeadlineSetting::Finite(instant) => s_ref.wait_deadline(*instant), } } @@ -615,7 +608,7 @@ pub(crate) struct TransmissionPipelineConf { pub(crate) batch: BatchConfig, pub(crate) queue_size: [usize; Priority::NUM], pub(crate) wait_before_drop: (Duration, Duration), - pub(crate) wait_before_close: (Duration, Duration), + pub(crate) wait_before_close: Duration, pub(crate) batching_enabled: bool, pub(crate) batching_time_limit: Duration, } @@ -718,7 +711,7 @@ pub(crate) struct TransmissionPipelineProducer { stage_in: Arc<[Mutex]>, active: Arc, wait_before_drop: (Duration, Duration), - wait_before_close: (Duration, Duration), + wait_before_close: Duration, } impl TransmissionPipelineProducer { @@ -732,12 +725,12 @@ impl TransmissionPipelineProducer { (0, Priority::DEFAULT) }; // If message is droppable, compute a deadline after which the sample could be dropped - let wait_time = if msg.is_droppable() { - self.wait_before_drop + let (wait_time, max_wait_time) = if msg.is_droppable() { + (self.wait_before_drop.0, Some(self.wait_before_drop.1)) } else { - self.wait_before_close + (self.wait_before_close, None) }; - let mut deadline = Deadline::new(Some(wait_time)); + let mut deadline = Deadline::new(wait_time, max_wait_time); // Lock the channel. We are the only one that will be writing on it. let mut queue = zlock!(self.stage_in[idx]); queue.push_network_message(&mut msg, priority, &mut deadline) @@ -894,7 +887,7 @@ mod tests { queue_size: [1; Priority::NUM], batching_enabled: true, wait_before_drop: (Duration::from_millis(1), Duration::from_millis(1024)), - wait_before_close: (Duration::from_secs(5), Duration::from_millis(1024)), + wait_before_close: Duration::from_secs(5), batching_time_limit: Duration::from_micros(1), }; @@ -908,7 +901,7 @@ mod tests { queue_size: [1; Priority::NUM], batching_enabled: true, wait_before_drop: (Duration::from_millis(1), Duration::from_millis(1024)), - wait_before_close: (Duration::from_secs(5), Duration::from_millis(1024)), + wait_before_close: Duration::from_secs(5), batching_time_limit: Duration::from_micros(1), }; diff --git a/io/zenoh-transport/src/manager.rs b/io/zenoh-transport/src/manager.rs index 068bea1234..d0cf2a093b 100644 --- a/io/zenoh-transport/src/manager.rs +++ b/io/zenoh-transport/src/manager.rs @@ -110,7 +110,7 @@ pub struct TransportManagerConfig { pub batch_size: BatchSize, pub batching: bool, pub wait_before_drop: (Duration, Duration), - pub wait_before_close: (Duration, Duration), + pub wait_before_close: Duration, pub queue_size: [usize; Priority::NUM], pub queue_backoff: Duration, pub defrag_buff_size: usize, @@ -142,7 +142,7 @@ pub struct TransportManagerBuilder { batching_enabled: bool, batching_time_limit: Duration, wait_before_drop: (Duration, Duration), - wait_before_close: (Duration, Duration), + wait_before_close: Duration, queue_size: QueueSizeConf, defrag_buff_size: usize, link_rx_buffer_size: usize, @@ -197,7 +197,7 @@ impl TransportManagerBuilder { self } - pub fn wait_before_close(mut self, wait_before_close: (Duration, Duration)) -> Self { + pub fn wait_before_close(mut self, wait_before_close: Duration) -> Self { self.wait_before_close = wait_before_close; self } @@ -265,10 +265,7 @@ impl TransportManagerBuilder { duration_from_i64us(*cc_drop.wait_before_drop()), duration_from_i64us(*cc_drop.max_wait_before_drop_fragments()), )); - self = self.wait_before_close(( - duration_from_i64us(*cc_block.wait_before_close()), - duration_from_i64us(*cc_block.max_wait_before_close_fragments()), - )); + self = self.wait_before_close(duration_from_i64us(*cc_block.wait_before_close())); self = self.queue_size(link.tx().queue().size().clone()); self = self.tx_threads(*link.tx().threads()); self = self.protocols(link.protocols().clone()); @@ -379,10 +376,7 @@ impl Default for TransportManagerBuilder { duration_from_i64us(*cc_drop.wait_before_drop()), duration_from_i64us(*cc_drop.max_wait_before_drop_fragments()), ), - wait_before_close: ( - duration_from_i64us(*cc_block.wait_before_close()), - duration_from_i64us(*cc_block.max_wait_before_close_fragments()), - ), + wait_before_close: duration_from_i64us(*cc_block.wait_before_close()), queue_size: queue.size, batching_time_limit: Duration::from_millis(backoff), defrag_buff_size: *link_rx.max_message_size(),