diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index 8d1a5dbc0f..22f201604a 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -211,3 +211,14 @@ impl Default for SharedMemoryConf { Self { enabled: false } } } + +impl Default for ConnectionRetryConf { + fn default() -> Self { + Self { + count: 0, + timeout_ms: 1000, + timeout_increase_factor: 1., + exit_on_fail: ModeDependentValue::Unique(true), + } + } +} diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index c5ab959f4d..ed26993cd0 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -15,7 +15,6 @@ //! Configuration to pass to `zenoh::open()` and `zenoh::scout()` functions and associated constants. pub mod defaults; mod include; -use core::time; use include::recursive_include; use secrecy::{CloneableSecret, DebugSecret, Secret, SerializableSecret, Zeroize}; use serde::{ @@ -100,11 +99,15 @@ pub struct DownsamplingItemConf { #[derive(Debug, Deserialize, Serialize, Clone)] pub struct ConnectionRetryConf { - //TODO(sashacmc): add comments - // Do we need some skip/fail/etc.? - pub count: i32, - pub timeout_ms: time::Duration, + //TODO(sashacmc): add comments, and infinit value for counter + // ModeDependentValue: + // 1) only for exit_on_fail + // 2) for all + // 3) for no one (we have connect and listen config, they already separate) + pub count: u32, + pub timeout_ms: u64, pub timeout_increase_factor: f32, + pub exit_on_fail: ModeDependentValue, } pub trait ConfigValidator: Send + Sync { @@ -189,11 +192,13 @@ validated_struct::validator! { pub connect: #[derive(Default)] ConnectConfig { pub endpoints: Vec, + pub retry: Option, }, /// Which endpoints to listen on. `zenohd` will add `tcp/[::]:7447` to these locators if left empty. pub listen: #[derive(Default)] ListenConfig { pub endpoints: Vec, + pub retry: Option, }, pub scouting: #[derive(Default)] ScoutingConf { diff --git a/commons/zenoh-core/src/macros.rs b/commons/zenoh-core/src/macros.rs index b0cbb24963..d78c3d6dd6 100644 --- a/commons/zenoh-core/src/macros.rs +++ b/commons/zenoh-core/src/macros.rs @@ -206,6 +206,25 @@ macro_rules! zparse { }; } +// This macro allows to parse a string to the target type +// No faili, but log the error and use default +#[macro_export] +macro_rules! zparse_default { + ($str:expr, $default:expr) => { + match $str.parse() { + Ok(value) => value, + Err(_) => { + let e = zenoh_result::zerror!( + "Failed to read configuration: {} is not a valid value", + $str + ); + log::warn!("{}", e); + $default + } + } + }; +} + // This macro allows to do conditional compilation #[macro_export] macro_rules! zcondfeat { diff --git a/zenoh/src/net/runtime/orchestrator.rs b/zenoh/src/net/runtime/orchestrator.rs index c79bd1b413..4a6f8cd9a7 100644 --- a/zenoh/src/net/runtime/orchestrator.rs +++ b/zenoh/src/net/runtime/orchestrator.rs @@ -14,7 +14,6 @@ use super::{Runtime, RuntimeSession}; use async_std::net::UdpSocket; use async_std::prelude::FutureExt; -use core::time; use futures::prelude::*; use socket2::{Domain, Socket, Type}; use std::net::{IpAddr, Ipv6Addr, SocketAddr}; @@ -22,7 +21,7 @@ use std::time::Duration; use zenoh_buffers::reader::DidntRead; use zenoh_buffers::{reader::HasReader, writer::HasWriter}; use zenoh_codec::{RCodec, WCodec, Zenoh080}; -use zenoh_config::{unwrap_or_default, ModeDependent}; +use zenoh_config::{unwrap_or_default, ModeDependent, ModeDependentValue}; use zenoh_link::{Locator, LocatorInspector}; use zenoh_protocol::{ core::{whatami::WhatAmIMatcher, EndPoint, WhatAmI, ZenohId}, @@ -292,22 +291,60 @@ impl Runtime { Ok(()) } - fn get_retry_config(&self, endpoint: &EndPoint) -> zenoh_config::ConnectionRetryConf { + fn get_retry_config( + &self, + endpoint: &EndPoint, + listen: bool, + ) -> zenoh_config::ConnectionRetryConf { let guard = &self.state.config.lock(); - let config = endpoint.config(); - let retry = config.get("TODO"); + let mut res = zenoh_config::ConnectionRetryConf::default(); + if let Some(cfg) = if listen { + guard.listen.retry() + } else { + guard.connect.retry() + } { + res = cfg.clone(); + } - zenoh_config::ConnectionRetryConf { - count: 0, - timeout_ms: time::Duration::from_micros(0), - timeout_increase_factor: 1., + //TODO(sashacmc): + // "tcp/192.168.0.1:7447#retry_count=1;retry_timeout_ms=1000;retry_timeout_increase_factor=2;retry_exit_on_fail=false" + // seems too long, but I don't thick someone will configure it often like it + // alternatives: + // "tcp/192.168.0.1:7447#retry{count=1;timeout_ms=1000;timeout_increase_factor=2;exit_on_fail=false}" + // but we have no support for it now + // + // "tcp/192.168.0.1:7447#retry=1;tm=1000;incfactor=2;exit_on_fail=false" + // but it not consistent with global config + // + let config = endpoint.config(); + if let Some(val) = config.get("retry_count") { + res.count = zparse_default!(val, res.count); + } + if let Some(val) = config.get("retry_timeout_ms") { + res.timeout_ms = zparse_default!(val, res.timeout_ms); } + if let Some(val) = config.get("retry_timeout_increase_factor") { + res.timeout_increase_factor = zparse_default!(val, res.timeout_increase_factor); + } + if let Some(val) = config.get("retry_exit_on_fail") { + //TODO(sashacmc): rewrite to use separate structure or macro + res.exit_on_fail = ModeDependentValue::Unique(zparse_default!( + val, + *res.exit_on_fail.get(self.whatami()).unwrap_or(&false) + )); + } + res } async fn bind_listeners(&self, listeners: &[EndPoint]) -> ZResult<()> { for listener in listeners { let endpoint = listener.clone(); - let retry_config = self.get_retry_config(&endpoint); + let retry_config = self.get_retry_config(&endpoint, true); + log::info!( + "Retry!!!!!!!!!!!!!! {}: {}", + retry_config.count, + retry_config.timeout_ms + ); if retry_config.count > 0 { self.spawn_add_listener(endpoint, retry_config).await? } else { @@ -327,9 +364,23 @@ impl Runtime { ) -> ZResult<()> { let this = self.clone(); self.spawn(async move { - // TODO(sashacmc): do retry - if this.add_listener(listener).await.is_ok() { + let mut delay = retry_config.timeout_ms; + let mut success = false; + for _ in 0..retry_config.count { + if this.add_listener(listener.clone()).await.is_ok() { + success = true; + break; + } + async_std::task::sleep(Duration::from_millis(delay)).await; + delay = (delay as f32 * retry_config.timeout_increase_factor) as u64; + } + if success { this.print_locators(); + } else { + if *retry_config.exit_on_fail.get(this.whatami()).unwrap() { + //TODO(sashacmc): how to exit, this is don't work? + panic!("Unable to connect"); + } } }); Ok(()) @@ -543,7 +594,9 @@ impl Runtime { ); } } - //TODO(sashacmc): rework + //TODO(sashacmc): Why it was implelneted like it? + //We agree rework it to the iteration count, or we should use this logic to connections + //listener too async_std::task::sleep(delay).await; delay *= CONNECTION_RETRY_PERIOD_INCREASE_FACTOR; if delay > CONNECTION_RETRY_MAX_PERIOD {