Skip to content

Commit

Permalink
Add connection retry
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Mar 18, 2024
1 parent 0bdd136 commit 61e06eb
Show file tree
Hide file tree
Showing 10 changed files with 994 additions and 250 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 44 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,31 @@
/// For TCP/UDP on Linux, it is possible additionally specify the interface to be connected to:
/// E.g. tcp/192.168.0.1:7447#iface=eth0, for connect only if the IP address is reachable via the interface eth0
connect: {
/// timeout waiting for all endpoints connected (0: no retry, -1: infinite timeout)
/// Accepts a single value or different values for router, peer and client.
timeout_ms: { router: -1, peer: -1, client: 0 },

endpoints: [
// "<proto>/<address>"
],

/// Global connect configuration,
/// Accepts a single value or different values for router, peer and client.
/// The configuration can also be specified for the separate endpoint
/// it will override the global one
/// E.g. tcp/192.168.0.1:7447#retry_period_init_ms=20000;retry_period_max_ms=10000"

/// exit from application, if timeout exceed
exit_on_failure: { router: false, peer: false, client: true },
/// connect establishing retry configuration
retry: {
/// intial wait timeout until next connect try
period_init_ms: 1000,
/// maximum wait timeout until next connect try
period_max_ms: 4000,
/// increase factor for the next timeout until nexti connect try
period_increase_factor: 2,
},
},

/// Which endpoints to listen on. E.g. tcp/localhost:7447.
Expand All @@ -33,9 +55,31 @@
/// For TCP/UDP on Linux, it is possible additionally specify the interface to be listened to:
/// E.g. tcp/0.0.0.0:7447#iface=eth0, for listen connection only on eth0
listen: {
/// timeout waiting for all listen endpoints (0: no retry, -1: infinite timeout)
/// Accepts a single value or different values for router, peer and client.
timeout_ms: 0,

endpoints: [
// "<proto>/<address>"
],

/// Global listen configuration,
/// Accepts a single value or different values for router, peer and client.
/// The configuration can also be specified for the separate endpoint
/// it will override the global one
/// E.g. tcp/192.168.0.1:7447#exit_on_failure=false;retry_period_max_ms=1000"

/// exit from application, if timeout exceed
exit_on_failure: true,
/// listen retry configuration
retry: {
/// intial wait timeout until next try
period_init_ms: 1000,
/// maximum wait timeout until next try
period_max_ms: 4000,
/// increase factor for the next timeout until next try
period_increase_factor: 2,
},
},
/// Configure the scouting mechanisms and their behaviours
scouting: {
Expand Down
1 change: 1 addition & 0 deletions commons/zenoh-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ categories = { workspace = true }
description = "Internal crate for zenoh."

[dependencies]
log = { workspace = true }
flume = { workspace = true }
json5 = { workspace = true }
num_cpus = { workspace = true }
Expand Down
200 changes: 200 additions & 0 deletions commons/zenoh-config/src/connection_retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use crate::{
defaults::{
self, DEFAULT_CONNECT_EXIT_ON_FAIL, DEFAULT_CONNECT_TIMEOUT_MS,
DEFAULT_LISTEN_EXIT_ON_FAIL, DEFAULT_LISTEN_TIMEOUT_MS,
},
Config,
};
use serde::{Deserialize, Serialize};
use zenoh_core::zparse_default;
use zenoh_protocol::core::WhatAmI;

use crate::mode_dependend::*;

#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct ConnectionRetryModeDependentConf {
// intial wait timeout until next try
pub period_init_ms: Option<ModeDependentValue<i64>>,
// maximum wait timeout until next try
pub period_max_ms: Option<ModeDependentValue<i64>>,
// increase factor for the next timeout until next try
pub period_increase_factor: Option<ModeDependentValue<f64>>,
}

#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)]
pub struct ConnectionRetryConf {
pub exit_on_failure: bool,
pub period_init_ms: i64,
pub period_max_ms: i64,
pub period_increase_factor: f64,
}

impl ConnectionRetryConf {
pub fn new(
whatami: WhatAmI,
exit_on_failure: bool,
retry: ConnectionRetryModeDependentConf,
default_retry: ConnectionRetryModeDependentConf,
) -> ConnectionRetryConf {
ConnectionRetryConf {
exit_on_failure,
period_init_ms: *retry
.period_init_ms
.get(whatami)
.unwrap_or(default_retry.period_init_ms.get(whatami).unwrap()),
period_max_ms: *retry
.period_max_ms
.get(whatami)
.unwrap_or(default_retry.period_max_ms.get(whatami).unwrap()),
period_increase_factor: *retry
.period_increase_factor
.get(whatami)
.unwrap_or(default_retry.period_increase_factor.get(whatami).unwrap()),
}
}

pub fn timeout(&self) -> std::time::Duration {
ms_to_duration(self.period_init_ms)
}

pub fn period(&self) -> ConnectionRetryPeriod {
ConnectionRetryPeriod::new(self)
}
}

pub struct ConnectionRetryPeriod {
conf: ConnectionRetryConf,
delay: i64,
}

impl ConnectionRetryPeriod {
pub fn new(conf: &ConnectionRetryConf) -> ConnectionRetryPeriod {
ConnectionRetryPeriod {
conf: conf.clone(),
delay: conf.period_init_ms,
}
}

pub fn duration(&self) -> std::time::Duration {
if self.conf.period_init_ms < 0 {
return std::time::Duration::MAX;
}

if self.conf.period_init_ms == 0 {
return std::time::Duration::from_millis(0);
}

std::time::Duration::from_millis(self.delay as u64)
}

pub fn next_duration(&mut self) -> std::time::Duration {
let res = self.duration();

self.delay = (self.delay as f64 * self.conf.period_increase_factor) as i64;
if self.conf.period_max_ms > 0 && self.delay > self.conf.period_max_ms {
self.delay = self.conf.period_max_ms;
}

res
}
}

fn ms_to_duration(ms: i64) -> std::time::Duration {
if ms >= 0 {
std::time::Duration::from_millis(ms as u64)
} else {
std::time::Duration::MAX
}
}

pub fn get_global_listener_timeout(config: &Config) -> std::time::Duration {
let whatami = config.mode().unwrap_or(defaults::mode);
ms_to_duration(
*config
.listen()
.timeout_ms()
.get(whatami)
.unwrap_or(DEFAULT_LISTEN_TIMEOUT_MS.get(whatami).unwrap()),
)
}

pub fn get_global_connect_timeout(config: &Config) -> std::time::Duration {
let whatami = config.mode().unwrap_or(defaults::mode);
ms_to_duration(
*config
.connect()
.timeout_ms()
.get(whatami)
.unwrap_or(DEFAULT_CONNECT_TIMEOUT_MS.get(whatami).unwrap()),
)
}

pub fn get_retry_config(
config: &Config,
endpoint: Option<&EndPoint>,
listen: bool,
) -> ConnectionRetryConf {
let whatami = config.mode().unwrap_or(defaults::mode);

let default_retry = ConnectionRetryModeDependentConf::default();
let retry: ConnectionRetryModeDependentConf;
let exit_on_failure: bool;
if listen {
retry = config
.listen()
.retry()
.clone()
.unwrap_or_else(|| default_retry.clone());

exit_on_failure = *config
.listen()
.exit_on_failure()
.get(whatami)
.unwrap_or(DEFAULT_LISTEN_EXIT_ON_FAIL.get(whatami).unwrap());
} else {
retry = config
.connect()
.retry()
.clone()
.unwrap_or_else(|| default_retry.clone());

exit_on_failure = *config
.connect()
.exit_on_failure()
.get(whatami)
.unwrap_or(DEFAULT_CONNECT_EXIT_ON_FAIL.get(whatami).unwrap());
}

let mut res = ConnectionRetryConf::new(whatami, exit_on_failure, retry, default_retry);

if let Some(endpoint) = endpoint {
let config = endpoint.config();
if let Some(val) = config.get("exit_on_failure") {
res.exit_on_failure = zparse_default!(val, res.exit_on_failure);
}
if let Some(val) = config.get("retry_period_init_ms") {
res.period_init_ms = zparse_default!(val, res.period_init_ms);
}
if let Some(val) = config.get("retry_period_max_ms") {
res.period_max_ms = zparse_default!(val, res.period_max_ms);
}
if let Some(val) = config.get("retry_period_increase_factor") {
res.period_increase_factor = zparse_default!(val, res.period_increase_factor);
}
}
res
}
27 changes: 27 additions & 0 deletions commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,30 @@ impl Default for SharedMemoryConf {
Self { enabled: false }
}
}

pub const DEFAULT_CONNECT_TIMEOUT_MS: ModeDependentValue<i64> =
ModeDependentValue::Dependent(ModeValues {
client: Some(0),
peer: Some(-1),
router: Some(-1),
});

pub const DEFAULT_CONNECT_EXIT_ON_FAIL: ModeDependentValue<bool> =
ModeDependentValue::Dependent(ModeValues {
client: Some(true),
peer: Some(false),
router: Some(false),
});

pub const DEFAULT_LISTEN_TIMEOUT_MS: ModeDependentValue<i64> = ModeDependentValue::Unique(0);
pub const DEFAULT_LISTEN_EXIT_ON_FAIL: ModeDependentValue<bool> = ModeDependentValue::Unique(true);

impl Default for ConnectionRetryModeDependentConf {
fn default() -> Self {
Self {
period_init_ms: Some(ModeDependentValue::Unique(1000)),
period_max_ms: Some(ModeDependentValue::Unique(4000)),
period_increase_factor: Some(ModeDependentValue::Unique(2.)),
}
}
}
Loading

0 comments on commit 61e06eb

Please sign in to comment.