Skip to content

Commit

Permalink
[skip ci] Draft implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Feb 28, 2024
1 parent 199219f commit 589055d
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 18 deletions.
11 changes: 11 additions & 0 deletions commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,14 @@ impl Default for SharedMemoryConf {
Self { enabled: false }
}
}

impl Default for ConnectionRetryConf {
fn default() -> Self {
Self {
count: 0,
timeout_ms: 1000,
timeout_increase_factor: 1.,
exit_on_fail: ModeDependentValue::Unique(true),
}
}
}
15 changes: 10 additions & 5 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
//! Configuration to pass to `zenoh::open()` and `zenoh::scout()` functions and associated constants.
pub mod defaults;
mod include;
use core::time;
use include::recursive_include;
use secrecy::{CloneableSecret, DebugSecret, Secret, SerializableSecret, Zeroize};
use serde::{
Expand Down Expand Up @@ -100,11 +99,15 @@ pub struct DownsamplingItemConf {

#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct ConnectionRetryConf {
//TODO(sashacmc): add comments
// Do we need some skip/fail/etc.?
pub count: i32,
pub timeout_ms: time::Duration,
//TODO(sashacmc): add comments, and infinit value for counter
// ModeDependentValue:
// 1) only for exit_on_fail
// 2) for all
// 3) for no one (we have connect and listen config, they already separate)
pub count: u32,
pub timeout_ms: u64,
pub timeout_increase_factor: f32,
pub exit_on_fail: ModeDependentValue<bool>,
}

pub trait ConfigValidator: Send + Sync {
Expand Down Expand Up @@ -189,11 +192,13 @@ validated_struct::validator! {
pub connect: #[derive(Default)]
ConnectConfig {
pub endpoints: Vec<EndPoint>,
pub retry: Option<ConnectionRetryConf>,
},
/// Which endpoints to listen on. `zenohd` will add `tcp/[::]:7447` to these locators if left empty.
pub listen: #[derive(Default)]
ListenConfig {
pub endpoints: Vec<EndPoint>,
pub retry: Option<ConnectionRetryConf>,
},
pub scouting: #[derive(Default)]
ScoutingConf {
Expand Down
19 changes: 19 additions & 0 deletions commons/zenoh-core/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,25 @@ macro_rules! zparse {
};
}

// This macro allows to parse a string to the target type
// No faili, but log the error and use default
#[macro_export]
macro_rules! zparse_default {
($str:expr, $default:expr) => {
match $str.parse() {
Ok(value) => value,
Err(_) => {
let e = zenoh_result::zerror!(
"Failed to read configuration: {} is not a valid value",
$str
);
log::warn!("{}", e);
$default
}
}
};
}

// This macro allows to do conditional compilation
#[macro_export]
macro_rules! zcondfeat {
Expand Down
79 changes: 66 additions & 13 deletions zenoh/src/net/runtime/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@
use super::{Runtime, RuntimeSession};
use async_std::net::UdpSocket;
use async_std::prelude::FutureExt;
use core::time;
use futures::prelude::*;
use socket2::{Domain, Socket, Type};
use std::net::{IpAddr, Ipv6Addr, SocketAddr};
use std::time::Duration;
use zenoh_buffers::reader::DidntRead;
use zenoh_buffers::{reader::HasReader, writer::HasWriter};
use zenoh_codec::{RCodec, WCodec, Zenoh080};
use zenoh_config::{unwrap_or_default, ModeDependent};
use zenoh_config::{unwrap_or_default, ModeDependent, ModeDependentValue};
use zenoh_link::{Locator, LocatorInspector};
use zenoh_protocol::{
core::{whatami::WhatAmIMatcher, EndPoint, WhatAmI, ZenohId},
Expand Down Expand Up @@ -292,22 +291,60 @@ impl Runtime {
Ok(())
}

fn get_retry_config(&self, endpoint: &EndPoint) -> zenoh_config::ConnectionRetryConf {
fn get_retry_config(
&self,
endpoint: &EndPoint,
listen: bool,
) -> zenoh_config::ConnectionRetryConf {
let guard = &self.state.config.lock();
let config = endpoint.config();
let retry = config.get("TODO");
let mut res = zenoh_config::ConnectionRetryConf::default();
if let Some(cfg) = if listen {
guard.listen.retry()
} else {
guard.connect.retry()
} {
res = cfg.clone();
}

zenoh_config::ConnectionRetryConf {
count: 0,
timeout_ms: time::Duration::from_micros(0),
timeout_increase_factor: 1.,
//TODO(sashacmc):
// "tcp/192.168.0.1:7447#retry_count=1;retry_timeout_ms=1000;retry_timeout_increase_factor=2;retry_exit_on_fail=false"
// seems too long, but I don't thick someone will configure it often like it
// alternatives:
// "tcp/192.168.0.1:7447#retry{count=1;timeout_ms=1000;timeout_increase_factor=2;exit_on_fail=false}"
// but we have no support for it now
//
// "tcp/192.168.0.1:7447#retry=1;tm=1000;incfactor=2;exit_on_fail=false"
// but it not consistent with global config
//
let config = endpoint.config();
if let Some(val) = config.get("retry_count") {
res.count = zparse_default!(val, res.count);
}
if let Some(val) = config.get("retry_timeout_ms") {
res.timeout_ms = zparse_default!(val, res.timeout_ms);
}
if let Some(val) = config.get("retry_timeout_increase_factor") {
res.timeout_increase_factor = zparse_default!(val, res.timeout_increase_factor);
}
if let Some(val) = config.get("retry_exit_on_fail") {
//TODO(sashacmc): rewrite to use separate structure or macro
res.exit_on_fail = ModeDependentValue::Unique(zparse_default!(
val,
*res.exit_on_fail.get(self.whatami()).unwrap_or(&false)
));
}
res
}

async fn bind_listeners(&self, listeners: &[EndPoint]) -> ZResult<()> {
for listener in listeners {
let endpoint = listener.clone();
let retry_config = self.get_retry_config(&endpoint);
let retry_config = self.get_retry_config(&endpoint, true);
log::info!(
"Retry!!!!!!!!!!!!!! {}: {}",
retry_config.count,
retry_config.timeout_ms
);
if retry_config.count > 0 {
self.spawn_add_listener(endpoint, retry_config).await?
} else {
Expand All @@ -327,9 +364,23 @@ impl Runtime {
) -> ZResult<()> {
let this = self.clone();
self.spawn(async move {
// TODO(sashacmc): do retry
if this.add_listener(listener).await.is_ok() {
let mut delay = retry_config.timeout_ms;
let mut success = false;
for _ in 0..retry_config.count {
if this.add_listener(listener.clone()).await.is_ok() {
success = true;
break;
}
async_std::task::sleep(Duration::from_millis(delay)).await;
delay = (delay as f32 * retry_config.timeout_increase_factor) as u64;
}
if success {
this.print_locators();
} else {
if *retry_config.exit_on_fail.get(this.whatami()).unwrap() {
//TODO(sashacmc): how to exit, this is don't work?
panic!("Unable to connect");
}
}
});
Ok(())
Expand Down Expand Up @@ -543,7 +594,9 @@ impl Runtime {
);
}
}
//TODO(sashacmc): rework
//TODO(sashacmc): Why it was implelneted like it?
//We agree rework it to the iteration count, or we should use this logic to connections
//listener too
async_std::task::sleep(delay).await;
delay *= CONNECTION_RETRY_PERIOD_INCREASE_FACTOR;
if delay > CONNECTION_RETRY_MAX_PERIOD {
Expand Down

0 comments on commit 589055d

Please sign in to comment.