From 400f3fcf6c63ada820701e58083ec8f2b239f799 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 8 Mar 2024 14:10:11 +0100 Subject: [PATCH] Use retry config for connections --- commons/zenoh-config/src/connection_retry.rs | 59 ++++++++++++-------- zenoh/src/net/runtime/orchestrator.rs | 56 +++++++++---------- zenoh/tests/connection_retry.rs | 56 ++++++++++++++++++- 3 files changed, 118 insertions(+), 53 deletions(-) diff --git a/commons/zenoh-config/src/connection_retry.rs b/commons/zenoh-config/src/connection_retry.rs index b350549169..7f21e1aef0 100644 --- a/commons/zenoh-config/src/connection_retry.rs +++ b/commons/zenoh-config/src/connection_retry.rs @@ -7,12 +7,17 @@ use crate::mode_dependend::*; #[derive(Debug, Deserialize, Serialize, Clone)] pub struct ConnectionRetryModeDependentConf { - // TODO(sashacmc): add comments + // global timeout for full connect/listen cycle pub timeout_ms: Option>, + // timeout for one connect try pub try_timeout_ms: Option>, + // intial wait timeout until next try pub period_init_ms: Option>, + // maximum wait timeout until next try pub period_max_ms: Option>, + // increase factor for the next timeout until next try pub period_increase_factor: Option>, + // if connection timeout exceed, exit from application pub fail_exit: Option>, } @@ -96,7 +101,7 @@ impl ConnectionRetryPeriod { } } - pub fn next_duration(&mut self) -> std::time::Duration { + pub fn duration(&self) -> std::time::Duration { if self.conf.period_init_ms < 0 { return std::time::Duration::MAX; } @@ -105,7 +110,11 @@ impl ConnectionRetryPeriod { return std::time::Duration::from_millis(0); } - let res = std::time::Duration::from_millis(self.delay as u64); + std::time::Duration::from_millis(self.delay as u64) + } + + pub fn next_duration(&mut self) -> std::time::Duration { + let res = self.duration(); self.delay = (self.delay as f64 * self.conf.period_increase_factor) as i64; if self.conf.period_max_ms > 0 && self.delay > self.conf.period_max_ms { @@ -116,7 +125,11 @@ impl ConnectionRetryPeriod { } } -pub fn get_retry_config(config: &Config, endpoint: &EndPoint, listen: bool) -> ConnectionRetryConf { +pub fn get_retry_config( + config: &Config, + endpoint: Option<&EndPoint>, + listen: bool, +) -> ConnectionRetryConf { let whatami = config.mode().unwrap_or(defaults::mode); let mut res: ConnectionRetryConf; @@ -138,24 +151,26 @@ pub fn get_retry_config(config: &Config, endpoint: &EndPoint, listen: bool) -> C .get(whatami, default) } - let config = endpoint.config(); - if let Some(val) = config.get("retry_timeout_ms") { - res.timeout_ms = zparse_default!(val, res.timeout_ms); - } - if let Some(val) = config.get("retry_try_timeout_ms") { - res.try_timeout_ms = zparse_default!(val, res.try_timeout_ms); - } - if let Some(val) = config.get("retry_period_init_ms") { - res.period_init_ms = zparse_default!(val, res.period_init_ms); - } - if let Some(val) = config.get("retry_period_max_ms") { - res.period_max_ms = zparse_default!(val, res.period_max_ms); - } - if let Some(val) = config.get("retry_period_increase_factor") { - res.period_increase_factor = zparse_default!(val, res.period_increase_factor); - } - if let Some(val) = config.get("retry_fail_exit") { - res.fail_exit = zparse_default!(val, res.fail_exit); + if let Some(endpoint) = endpoint { + let config = endpoint.config(); + if let Some(val) = config.get("retry_timeout_ms") { + res.timeout_ms = zparse_default!(val, res.timeout_ms); + } + if let Some(val) = config.get("retry_try_timeout_ms") { + res.try_timeout_ms = zparse_default!(val, res.try_timeout_ms); + } + if let Some(val) = config.get("retry_period_init_ms") { + res.period_init_ms = zparse_default!(val, res.period_init_ms); + } + if let Some(val) = config.get("retry_period_max_ms") { + res.period_max_ms = zparse_default!(val, res.period_max_ms); + } + if let Some(val) = config.get("retry_period_increase_factor") { + res.period_increase_factor = zparse_default!(val, res.period_increase_factor); + } + if let Some(val) = config.get("retry_fail_exit") { + res.fail_exit = zparse_default!(val, res.fail_exit); + } } res } diff --git a/zenoh/src/net/runtime/orchestrator.rs b/zenoh/src/net/runtime/orchestrator.rs index ce1ca400ad..36da3a11be 100644 --- a/zenoh/src/net/runtime/orchestrator.rs +++ b/zenoh/src/net/runtime/orchestrator.rs @@ -32,10 +32,6 @@ const RCV_BUF_SIZE: usize = u16::MAX as usize; const SCOUT_INITIAL_PERIOD: Duration = Duration::from_millis(1_000); const SCOUT_MAX_PERIOD: Duration = Duration::from_millis(8_000); const SCOUT_PERIOD_INCREASE_FACTOR: u32 = 2; -const CONNECTION_TIMEOUT: Duration = Duration::from_millis(10_000); -const CONNECTION_RETRY_INITIAL_PERIOD: Duration = Duration::from_millis(1_000); -const CONNECTION_RETRY_MAX_PERIOD: Duration = Duration::from_millis(4_000); -const CONNECTION_RETRY_PERIOD_INCREASE_FACTOR: u32 = 2; const ROUTER_DEFAULT_LISTENER: &str = "tcp/[::]:7447"; const PEER_DEFAULT_LISTENER: &str = "tcp/[::]:0"; @@ -89,8 +85,9 @@ impl Runtime { } _ => { for locator in &peers { + let retry_config = self.get_connect_retry_config(locator); match tokio::time::timeout( - CONNECTION_TIMEOUT, + retry_config.try_timeout(), self.manager().open_transport_unicast(locator.clone()), ) .await @@ -293,19 +290,25 @@ impl Runtime { Ok(()) } - fn get_retry_config( - &self, - endpoint: &EndPoint, - listen: bool, - ) -> zenoh_config::ConnectionRetryConf { + fn get_listen_retry_config(&self, endpoint: &EndPoint) -> zenoh_config::ConnectionRetryConf { + let guard = &self.state.config.lock(); + zenoh_config::get_retry_config(guard, Some(endpoint), true) + } + + fn get_connect_retry_config(&self, endpoint: &EndPoint) -> zenoh_config::ConnectionRetryConf { let guard = &self.state.config.lock(); - zenoh_config::get_retry_config(guard, endpoint, listen) + zenoh_config::get_retry_config(guard, Some(endpoint), false) + } + + fn get_global_connect_retry_config(&self) -> zenoh_config::ConnectionRetryConf { + let guard = &self.state.config.lock(); + zenoh_config::get_retry_config(guard, None, false) } async fn bind_listeners(&self, listeners: &[EndPoint]) -> ZResult<()> { for listener in listeners { let endpoint = listener.clone(); - let retry_config = self.get_retry_config(&endpoint, true); + let retry_config = self.get_listen_retry_config(&endpoint); log::debug!("Try to add listener: {:?}: {:?}", endpoint, retry_config); if retry_config.timeout().is_zero() { // try to add listener and exit immediately without retry @@ -542,12 +545,13 @@ impl Runtime { } async fn peer_connector(&self, peer: EndPoint) { - let mut delay = CONNECTION_RETRY_INITIAL_PERIOD; + let retry_config = self.get_connect_retry_config(&peer); + let mut period = retry_config.period(); loop { log::trace!("Trying to connect to configured peer {}", peer); let endpoint = peer.clone(); match tokio::time::timeout( - CONNECTION_TIMEOUT, + retry_config.try_timeout(), self.manager().open_transport_unicast(endpoint), ) .await @@ -569,7 +573,7 @@ impl Runtime { "Unable to connect to configured peer {}! {}. Retry in {:?}.", peer, e, - delay + period.duration() ); } Err(e) => { @@ -577,15 +581,11 @@ impl Runtime { "Unable to connect to configured peer {}! {}. Retry in {:?}.", peer, e, - delay + period.duration() ); } } - tokio::time::sleep(delay).await; - delay *= CONNECTION_RETRY_PERIOD_INCREASE_FACTOR; - if delay > CONNECTION_RETRY_MAX_PERIOD { - delay = CONNECTION_RETRY_MAX_PERIOD; - } + tokio::time::sleep(period.next_duration()).await; } } @@ -700,10 +700,11 @@ impl Runtime { }; let endpoint = locator.to_owned().into(); + let retry_config = self.get_connect_retry_config(&endpoint); let manager = self.manager(); if is_multicast { match tokio::time::timeout( - CONNECTION_TIMEOUT, + retry_config.try_timeout(), manager.open_transport_multicast(endpoint), ) .await @@ -720,7 +721,7 @@ impl Runtime { } } else { match tokio::time::timeout( - CONNECTION_TIMEOUT, + retry_config.try_timeout(), manager.open_transport_unicast(endpoint), ) .await @@ -907,13 +908,10 @@ impl Runtime { WhatAmI::Client => { let runtime = session.runtime.clone(); session.runtime.spawn(async move { - let mut delay = CONNECTION_RETRY_INITIAL_PERIOD; + let retry_config = runtime.get_global_connect_retry_config(); + let mut period = retry_config.period(); while runtime.start_client().await.is_err() { - tokio::time::sleep(delay).await; - delay *= CONNECTION_RETRY_PERIOD_INCREASE_FACTOR; - if delay > CONNECTION_RETRY_MAX_PERIOD { - delay = CONNECTION_RETRY_MAX_PERIOD; - } + tokio::time::sleep(period.next_duration()).await; } }); } diff --git a/zenoh/tests/connection_retry.rs b/zenoh/tests/connection_retry.rs index 7325b37001..b231800982 100644 --- a/zenoh/tests/connection_retry.rs +++ b/zenoh/tests/connection_retry.rs @@ -65,7 +65,7 @@ fn retry_config_overriding() { ]; for (i, endpoint) in config.listen().endpoints().iter().enumerate() { - let retry_config = zenoh_config::get_retry_config(&config, endpoint, true); + let retry_config = zenoh_config::get_retry_config(&config, Some(endpoint), true); assert_eq!(retry_config, expected[i]); } } @@ -89,7 +89,7 @@ fn retry_config_parsing() { .unwrap(); let endpoint: EndPoint = "tcp/[::]:0".parse().unwrap(); - let retry_config = zenoh_config::get_retry_config(&config, &endpoint, true); + let retry_config = zenoh_config::get_retry_config(&config, Some(&endpoint), true); assert_eq!(retry_config.timeout(), std::time::Duration::MAX); assert_eq!( @@ -101,10 +101,62 @@ fn retry_config_parsing() { let expected = vec![1000, 2000, 4000, 6000, 6000, 6000, 6000]; for v in expected { + assert_eq!(period.duration(), std::time::Duration::from_millis(v)); assert_eq!(period.next_duration(), std::time::Duration::from_millis(v)); } } +#[test] +fn retry_config_const_period() { + let mut config = Config::default(); + config + .insert_json5( + "listen/retry", + r#" + { + period_init_ms: 1000, + period_increase_factor: 1, + } + "#, + ) + .unwrap(); + + let endpoint: EndPoint = "tcp/[::]:0".parse().unwrap(); + let retry_config = zenoh_config::get_retry_config(&config, Some(&endpoint), true); + + let mut period = retry_config.period(); + let expected = vec![1000, 1000, 1000, 1000]; + + for v in expected { + assert_eq!(period.duration(), std::time::Duration::from_millis(v)); + assert_eq!(period.next_duration(), std::time::Duration::from_millis(v)); + } +} + +#[test] +fn retry_config_infinit_period() { + let mut config = Config::default(); + config + .insert_json5( + "listen/retry", + r#" + { + period_init_ms: -1, + period_increase_factor: 1, + } + "#, + ) + .unwrap(); + + let endpoint: EndPoint = "tcp/[::]:0".parse().unwrap(); + let retry_config = zenoh_config::get_retry_config(&config, Some(&endpoint), true); + + let mut period = retry_config.period(); + + assert_eq!(period.duration(), std::time::Duration::MAX); + assert_eq!(period.next_duration(), std::time::Duration::MAX); +} + #[test] #[should_panic(expected = "Can not create a new TCP listener")] fn listen_no_retry() {