diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 68ccb64d7..8d140ee9b 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -420,8 +420,6 @@ /// The maximum time in microseconds to wait for an available batch before closing the transport session when sending a blocking message /// if still no batch is available. wait_before_close: 5000000, - /// The maximum deadline limit for multi-fragment messages. - max_wait_before_close_fragments: 15000000, }, }, /// Perform batching of messages if they are smaller of the batch_size diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index c7a565ccd..edf2c46c7 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -262,7 +262,6 @@ impl Default for CongestionControlBlockConf { fn default() -> Self { Self { wait_before_close: 5000000, - max_wait_before_close_fragments: 15000000, } } } diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 978b3bc2e..51c851b3a 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -453,8 +453,6 @@ validated_struct::validator! { /// The maximum time in microseconds to wait for an available batch before closing the transport session /// when sending a blocking message if still no batch is available. wait_before_close: i64, - /// The maximum deadline limit for multi-fragment messages. - max_wait_before_close_fragments: i64, }, }, pub batching: BatchingConf { diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 2465cc218..0dbded920 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -131,11 +131,11 @@ impl StageInMutex { #[derive(Debug)] struct WaitTime { wait_time: Duration, - max_wait_time: Duration, + max_wait_time: Option, } impl WaitTime { - fn new(wait_time: Duration, max_wait_time: Duration) -> Self { + fn new(wait_time: Duration, max_wait_time: Option) -> Self { Self { wait_time, max_wait_time, @@ -143,10 +143,17 @@ impl WaitTime { } fn advance(&mut self, instant: &mut Instant) { - if let Some(new_max_wait_time) = self.max_wait_time.checked_sub(self.wait_time) { - *instant += self.wait_time; - self.max_wait_time = new_max_wait_time; - self.wait_time *= 2; + match &mut self.max_wait_time { + Some(max_wait_time) => { + if let Some(new_max_wait_time) = max_wait_time.checked_sub(self.wait_time) { + *instant += self.wait_time; + *max_wait_time = new_max_wait_time; + self.wait_time *= 2; + } + } + None => { + *instant += self.wait_time; + } } } @@ -158,17 +165,16 @@ impl WaitTime { #[derive(Clone)] enum DeadlineSetting { Immediate, - Infinite, Finite(Instant), } struct LazyDeadline { deadline: Option, - wait_time: Option, + wait_time: WaitTime, } impl LazyDeadline { - fn new(wait_time: Option) -> Self { + fn new(wait_time: WaitTime) -> Self { Self { deadline: None, wait_time, @@ -178,13 +184,8 @@ impl LazyDeadline { fn advance(&mut self) { match self.deadline().to_owned() { DeadlineSetting::Immediate => {} - DeadlineSetting::Infinite => {} DeadlineSetting::Finite(mut instant) => { - // SAFETY: this is safe because DeadlineSetting::Finite is returned by - // deadline() only if wait_time is Some(_) - let wait_time = unsafe { self.wait_time.as_mut().unwrap_unchecked() }; - wait_time.advance(&mut instant); - + self.wait_time.advance(&mut instant); self.deadline = Some(DeadlineSetting::Finite(instant)); } } @@ -193,14 +194,9 @@ impl LazyDeadline { #[inline] fn deadline(&mut self) -> &mut DeadlineSetting { self.deadline - .get_or_insert_with(|| match self.wait_time.as_mut() { - Some(wait_time) => match wait_time.wait_time() { - Duration::ZERO => DeadlineSetting::Immediate, - nonzero_wait_time => { - DeadlineSetting::Finite(Instant::now().add(nonzero_wait_time)) - } - }, - None => DeadlineSetting::Infinite, + .get_or_insert_with(|| match self.wait_time.wait_time() { + Duration::ZERO => DeadlineSetting::Immediate, + nonzero_wait_time => DeadlineSetting::Finite(Instant::now().add(nonzero_wait_time)), }) } } @@ -210,11 +206,9 @@ struct Deadline { } impl Deadline { - fn new(wait_time: Option<(Duration, Duration)>) -> Self { + fn new(wait_time: Duration, max_wait_time: Option) -> Self { Self { - lazy_deadline: LazyDeadline::new( - wait_time.map(|(wait_time, max_wait_time)| WaitTime::new(wait_time, max_wait_time)), - ), + lazy_deadline: LazyDeadline::new(WaitTime::new(wait_time, max_wait_time)), } } @@ -222,7 +216,6 @@ impl Deadline { fn wait(&mut self, s_ref: &StageInRefill) -> bool { match self.lazy_deadline.deadline() { DeadlineSetting::Immediate => false, - DeadlineSetting::Infinite => s_ref.wait(), DeadlineSetting::Finite(instant) => s_ref.wait_deadline(*instant), } } @@ -615,7 +608,7 @@ pub(crate) struct TransmissionPipelineConf { pub(crate) batch: BatchConfig, pub(crate) queue_size: [usize; Priority::NUM], pub(crate) wait_before_drop: (Duration, Duration), - pub(crate) wait_before_close: (Duration, Duration), + pub(crate) wait_before_close: Duration, pub(crate) batching_enabled: bool, pub(crate) batching_time_limit: Duration, } @@ -718,7 +711,7 @@ pub(crate) struct TransmissionPipelineProducer { stage_in: Arc<[Mutex]>, active: Arc, wait_before_drop: (Duration, Duration), - wait_before_close: (Duration, Duration), + wait_before_close: Duration, } impl TransmissionPipelineProducer { @@ -732,12 +725,12 @@ impl TransmissionPipelineProducer { (0, Priority::DEFAULT) }; // If message is droppable, compute a deadline after which the sample could be dropped - let wait_time = if msg.is_droppable() { - self.wait_before_drop + let (wait_time, max_wait_time) = if msg.is_droppable() { + (self.wait_before_drop.0, Some(self.wait_before_drop.1)) } else { - self.wait_before_close + (self.wait_before_close, None) }; - let mut deadline = Deadline::new(Some(wait_time)); + let mut deadline = Deadline::new(wait_time, max_wait_time); // Lock the channel. We are the only one that will be writing on it. let mut queue = zlock!(self.stage_in[idx]); queue.push_network_message(&mut msg, priority, &mut deadline) @@ -894,7 +887,7 @@ mod tests { queue_size: [1; Priority::NUM], batching_enabled: true, wait_before_drop: (Duration::from_millis(1), Duration::from_millis(1024)), - wait_before_close: (Duration::from_secs(5), Duration::from_millis(1024)), + wait_before_close: Duration::from_secs(5), batching_time_limit: Duration::from_micros(1), }; @@ -908,7 +901,7 @@ mod tests { queue_size: [1; Priority::NUM], batching_enabled: true, wait_before_drop: (Duration::from_millis(1), Duration::from_millis(1024)), - wait_before_close: (Duration::from_secs(5), Duration::from_millis(1024)), + wait_before_close: Duration::from_secs(5), batching_time_limit: Duration::from_micros(1), }; diff --git a/io/zenoh-transport/src/manager.rs b/io/zenoh-transport/src/manager.rs index 068bea123..d0cf2a093 100644 --- a/io/zenoh-transport/src/manager.rs +++ b/io/zenoh-transport/src/manager.rs @@ -110,7 +110,7 @@ pub struct TransportManagerConfig { pub batch_size: BatchSize, pub batching: bool, pub wait_before_drop: (Duration, Duration), - pub wait_before_close: (Duration, Duration), + pub wait_before_close: Duration, pub queue_size: [usize; Priority::NUM], pub queue_backoff: Duration, pub defrag_buff_size: usize, @@ -142,7 +142,7 @@ pub struct TransportManagerBuilder { batching_enabled: bool, batching_time_limit: Duration, wait_before_drop: (Duration, Duration), - wait_before_close: (Duration, Duration), + wait_before_close: Duration, queue_size: QueueSizeConf, defrag_buff_size: usize, link_rx_buffer_size: usize, @@ -197,7 +197,7 @@ impl TransportManagerBuilder { self } - pub fn wait_before_close(mut self, wait_before_close: (Duration, Duration)) -> Self { + pub fn wait_before_close(mut self, wait_before_close: Duration) -> Self { self.wait_before_close = wait_before_close; self } @@ -265,10 +265,7 @@ impl TransportManagerBuilder { duration_from_i64us(*cc_drop.wait_before_drop()), duration_from_i64us(*cc_drop.max_wait_before_drop_fragments()), )); - self = self.wait_before_close(( - duration_from_i64us(*cc_block.wait_before_close()), - duration_from_i64us(*cc_block.max_wait_before_close_fragments()), - )); + self = self.wait_before_close(duration_from_i64us(*cc_block.wait_before_close())); self = self.queue_size(link.tx().queue().size().clone()); self = self.tx_threads(*link.tx().threads()); self = self.protocols(link.protocols().clone()); @@ -379,10 +376,7 @@ impl Default for TransportManagerBuilder { duration_from_i64us(*cc_drop.wait_before_drop()), duration_from_i64us(*cc_drop.max_wait_before_drop_fragments()), ), - wait_before_close: ( - duration_from_i64us(*cc_block.wait_before_close()), - duration_from_i64us(*cc_block.max_wait_before_close_fragments()), - ), + wait_before_close: duration_from_i64us(*cc_block.wait_before_close()), queue_size: queue.size, batching_time_limit: Duration::from_millis(backoff), defrag_buff_size: *link_rx.max_message_size(),