Skip to content

Commit

Permalink
Use retry config for connections
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Mar 8, 2024
1 parent 758e735 commit 400f3fc
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 53 deletions.
59 changes: 37 additions & 22 deletions commons/zenoh-config/src/connection_retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@ use crate::mode_dependend::*;

#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct ConnectionRetryModeDependentConf {
// TODO(sashacmc): add comments
// global timeout for full connect/listen cycle
pub timeout_ms: Option<ModeDependentValue<i64>>,
// timeout for one connect try
pub try_timeout_ms: Option<ModeDependentValue<i64>>,
// intial wait timeout until next try
pub period_init_ms: Option<ModeDependentValue<i64>>,
// maximum wait timeout until next try
pub period_max_ms: Option<ModeDependentValue<i64>>,
// increase factor for the next timeout until next try
pub period_increase_factor: Option<ModeDependentValue<f64>>,
// if connection timeout exceed, exit from application
pub fail_exit: Option<ModeDependentValue<bool>>,
}

Expand Down Expand Up @@ -96,7 +101,7 @@ impl ConnectionRetryPeriod {
}
}

pub fn next_duration(&mut self) -> std::time::Duration {
pub fn duration(&self) -> std::time::Duration {
if self.conf.period_init_ms < 0 {
return std::time::Duration::MAX;
}
Expand All @@ -105,7 +110,11 @@ impl ConnectionRetryPeriod {
return std::time::Duration::from_millis(0);
}

let res = std::time::Duration::from_millis(self.delay as u64);
std::time::Duration::from_millis(self.delay as u64)
}

pub fn next_duration(&mut self) -> std::time::Duration {
let res = self.duration();

self.delay = (self.delay as f64 * self.conf.period_increase_factor) as i64;
if self.conf.period_max_ms > 0 && self.delay > self.conf.period_max_ms {
Expand All @@ -116,7 +125,11 @@ impl ConnectionRetryPeriod {
}
}

pub fn get_retry_config(config: &Config, endpoint: &EndPoint, listen: bool) -> ConnectionRetryConf {
pub fn get_retry_config(
config: &Config,
endpoint: Option<&EndPoint>,
listen: bool,
) -> ConnectionRetryConf {
let whatami = config.mode().unwrap_or(defaults::mode);

let mut res: ConnectionRetryConf;
Expand All @@ -138,24 +151,26 @@ pub fn get_retry_config(config: &Config, endpoint: &EndPoint, listen: bool) -> C
.get(whatami, default)
}

let config = endpoint.config();
if let Some(val) = config.get("retry_timeout_ms") {
res.timeout_ms = zparse_default!(val, res.timeout_ms);
}
if let Some(val) = config.get("retry_try_timeout_ms") {
res.try_timeout_ms = zparse_default!(val, res.try_timeout_ms);
}
if let Some(val) = config.get("retry_period_init_ms") {
res.period_init_ms = zparse_default!(val, res.period_init_ms);
}
if let Some(val) = config.get("retry_period_max_ms") {
res.period_max_ms = zparse_default!(val, res.period_max_ms);
}
if let Some(val) = config.get("retry_period_increase_factor") {
res.period_increase_factor = zparse_default!(val, res.period_increase_factor);
}
if let Some(val) = config.get("retry_fail_exit") {
res.fail_exit = zparse_default!(val, res.fail_exit);
if let Some(endpoint) = endpoint {
let config = endpoint.config();
if let Some(val) = config.get("retry_timeout_ms") {
res.timeout_ms = zparse_default!(val, res.timeout_ms);
}
if let Some(val) = config.get("retry_try_timeout_ms") {
res.try_timeout_ms = zparse_default!(val, res.try_timeout_ms);
}
if let Some(val) = config.get("retry_period_init_ms") {
res.period_init_ms = zparse_default!(val, res.period_init_ms);
}
if let Some(val) = config.get("retry_period_max_ms") {
res.period_max_ms = zparse_default!(val, res.period_max_ms);
}
if let Some(val) = config.get("retry_period_increase_factor") {
res.period_increase_factor = zparse_default!(val, res.period_increase_factor);
}
if let Some(val) = config.get("retry_fail_exit") {
res.fail_exit = zparse_default!(val, res.fail_exit);
}
}
res
}
56 changes: 27 additions & 29 deletions zenoh/src/net/runtime/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ const RCV_BUF_SIZE: usize = u16::MAX as usize;
const SCOUT_INITIAL_PERIOD: Duration = Duration::from_millis(1_000);
const SCOUT_MAX_PERIOD: Duration = Duration::from_millis(8_000);
const SCOUT_PERIOD_INCREASE_FACTOR: u32 = 2;
const CONNECTION_TIMEOUT: Duration = Duration::from_millis(10_000);
const CONNECTION_RETRY_INITIAL_PERIOD: Duration = Duration::from_millis(1_000);
const CONNECTION_RETRY_MAX_PERIOD: Duration = Duration::from_millis(4_000);
const CONNECTION_RETRY_PERIOD_INCREASE_FACTOR: u32 = 2;
const ROUTER_DEFAULT_LISTENER: &str = "tcp/[::]:7447";
const PEER_DEFAULT_LISTENER: &str = "tcp/[::]:0";

Expand Down Expand Up @@ -89,8 +85,9 @@ impl Runtime {
}
_ => {
for locator in &peers {
let retry_config = self.get_connect_retry_config(locator);
match tokio::time::timeout(
CONNECTION_TIMEOUT,
retry_config.try_timeout(),
self.manager().open_transport_unicast(locator.clone()),
)
.await
Expand Down Expand Up @@ -293,19 +290,25 @@ impl Runtime {
Ok(())
}

fn get_retry_config(
&self,
endpoint: &EndPoint,
listen: bool,
) -> zenoh_config::ConnectionRetryConf {
fn get_listen_retry_config(&self, endpoint: &EndPoint) -> zenoh_config::ConnectionRetryConf {
let guard = &self.state.config.lock();
zenoh_config::get_retry_config(guard, Some(endpoint), true)
}

fn get_connect_retry_config(&self, endpoint: &EndPoint) -> zenoh_config::ConnectionRetryConf {
let guard = &self.state.config.lock();
zenoh_config::get_retry_config(guard, endpoint, listen)
zenoh_config::get_retry_config(guard, Some(endpoint), false)
}

fn get_global_connect_retry_config(&self) -> zenoh_config::ConnectionRetryConf {
let guard = &self.state.config.lock();
zenoh_config::get_retry_config(guard, None, false)
}

async fn bind_listeners(&self, listeners: &[EndPoint]) -> ZResult<()> {
for listener in listeners {
let endpoint = listener.clone();
let retry_config = self.get_retry_config(&endpoint, true);
let retry_config = self.get_listen_retry_config(&endpoint);
log::debug!("Try to add listener: {:?}: {:?}", endpoint, retry_config);
if retry_config.timeout().is_zero() {
// try to add listener and exit immediately without retry
Expand Down Expand Up @@ -542,12 +545,13 @@ impl Runtime {
}

async fn peer_connector(&self, peer: EndPoint) {
let mut delay = CONNECTION_RETRY_INITIAL_PERIOD;
let retry_config = self.get_connect_retry_config(&peer);
let mut period = retry_config.period();
loop {
log::trace!("Trying to connect to configured peer {}", peer);
let endpoint = peer.clone();
match tokio::time::timeout(
CONNECTION_TIMEOUT,
retry_config.try_timeout(),
self.manager().open_transport_unicast(endpoint),
)
.await
Expand All @@ -569,23 +573,19 @@ impl Runtime {
"Unable to connect to configured peer {}! {}. Retry in {:?}.",
peer,
e,
delay
period.duration()
);
}
Err(e) => {
log::debug!(
"Unable to connect to configured peer {}! {}. Retry in {:?}.",
peer,
e,
delay
period.duration()
);
}
}
tokio::time::sleep(delay).await;
delay *= CONNECTION_RETRY_PERIOD_INCREASE_FACTOR;
if delay > CONNECTION_RETRY_MAX_PERIOD {
delay = CONNECTION_RETRY_MAX_PERIOD;
}
tokio::time::sleep(period.next_duration()).await;
}
}

Expand Down Expand Up @@ -700,10 +700,11 @@ impl Runtime {
};

let endpoint = locator.to_owned().into();
let retry_config = self.get_connect_retry_config(&endpoint);
let manager = self.manager();
if is_multicast {
match tokio::time::timeout(
CONNECTION_TIMEOUT,
retry_config.try_timeout(),
manager.open_transport_multicast(endpoint),
)
.await
Expand All @@ -720,7 +721,7 @@ impl Runtime {
}
} else {
match tokio::time::timeout(
CONNECTION_TIMEOUT,
retry_config.try_timeout(),
manager.open_transport_unicast(endpoint),
)
.await
Expand Down Expand Up @@ -907,13 +908,10 @@ impl Runtime {
WhatAmI::Client => {
let runtime = session.runtime.clone();
session.runtime.spawn(async move {
let mut delay = CONNECTION_RETRY_INITIAL_PERIOD;
let retry_config = runtime.get_global_connect_retry_config();
let mut period = retry_config.period();
while runtime.start_client().await.is_err() {
tokio::time::sleep(delay).await;
delay *= CONNECTION_RETRY_PERIOD_INCREASE_FACTOR;
if delay > CONNECTION_RETRY_MAX_PERIOD {
delay = CONNECTION_RETRY_MAX_PERIOD;
}
tokio::time::sleep(period.next_duration()).await;
}
});
}
Expand Down
56 changes: 54 additions & 2 deletions zenoh/tests/connection_retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ fn retry_config_overriding() {
];

for (i, endpoint) in config.listen().endpoints().iter().enumerate() {
let retry_config = zenoh_config::get_retry_config(&config, endpoint, true);
let retry_config = zenoh_config::get_retry_config(&config, Some(endpoint), true);
assert_eq!(retry_config, expected[i]);
}
}
Expand All @@ -89,7 +89,7 @@ fn retry_config_parsing() {
.unwrap();

let endpoint: EndPoint = "tcp/[::]:0".parse().unwrap();
let retry_config = zenoh_config::get_retry_config(&config, &endpoint, true);
let retry_config = zenoh_config::get_retry_config(&config, Some(&endpoint), true);

assert_eq!(retry_config.timeout(), std::time::Duration::MAX);
assert_eq!(
Expand All @@ -101,10 +101,62 @@ fn retry_config_parsing() {
let expected = vec![1000, 2000, 4000, 6000, 6000, 6000, 6000];

for v in expected {
assert_eq!(period.duration(), std::time::Duration::from_millis(v));
assert_eq!(period.next_duration(), std::time::Duration::from_millis(v));
}
}

#[test]
fn retry_config_const_period() {
let mut config = Config::default();
config
.insert_json5(
"listen/retry",
r#"
{
period_init_ms: 1000,
period_increase_factor: 1,
}
"#,
)
.unwrap();

let endpoint: EndPoint = "tcp/[::]:0".parse().unwrap();
let retry_config = zenoh_config::get_retry_config(&config, Some(&endpoint), true);

let mut period = retry_config.period();
let expected = vec![1000, 1000, 1000, 1000];

for v in expected {
assert_eq!(period.duration(), std::time::Duration::from_millis(v));
assert_eq!(period.next_duration(), std::time::Duration::from_millis(v));
}
}

#[test]
fn retry_config_infinit_period() {
let mut config = Config::default();
config
.insert_json5(
"listen/retry",
r#"
{
period_init_ms: -1,
period_increase_factor: 1,
}
"#,
)
.unwrap();

let endpoint: EndPoint = "tcp/[::]:0".parse().unwrap();
let retry_config = zenoh_config::get_retry_config(&config, Some(&endpoint), true);

let mut period = retry_config.period();

assert_eq!(period.duration(), std::time::Duration::MAX);
assert_eq!(period.next_duration(), std::time::Duration::MAX);
}

#[test]
#[should_panic(expected = "Can not create a new TCP listener")]
fn listen_no_retry() {
Expand Down

0 comments on commit 400f3fc

Please sign in to comment.