From 01ccbb73c767aef2dcf2322d4e15ae9781a7e6ef Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Tue, 23 Apr 2024 16:06:56 +0200 Subject: [PATCH] Don't wait for full scouting delay when peers connected all configured connect endpoints --- zenoh/src/net/runtime/orchestrator.rs | 92 +++++++++++++++++++++++---- zenoh/src/session.rs | 5 +- 2 files changed, 82 insertions(+), 15 deletions(-) diff --git a/zenoh/src/net/runtime/orchestrator.rs b/zenoh/src/net/runtime/orchestrator.rs index 687fa90649..38a1b28eb0 100644 --- a/zenoh/src/net/runtime/orchestrator.rs +++ b/zenoh/src/net/runtime/orchestrator.rs @@ -15,8 +15,12 @@ use super::{Runtime, RuntimeSession}; use futures::prelude::*; use socket2::{Domain, Socket, Type}; use std::net::{IpAddr, Ipv6Addr, SocketAddr}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; use std::time::Duration; use tokio::net::UdpSocket; +use tokio::sync::futures::Notified; +use tokio::sync::Notify; use zenoh_buffers::reader::DidntRead; use zenoh_buffers::{reader::HasReader, writer::HasWriter}; use zenoh_codec::{RCodec, WCodec, Zenoh080}; @@ -42,6 +46,28 @@ pub enum Loop { Break, } +#[derive(Default)] +struct StartConditions { + notify: Notify, + peer_connectors: AtomicUsize, +} + +impl StartConditions { + fn notified(&self) -> Notified<'_> { + self.notify.notified() + } + + fn add_peer_connector(&self) { + self.peer_connectors.fetch_add(1, Ordering::Relaxed); + } + + fn terminate_peer_connector(&self) { + if self.peer_connectors.fetch_sub(1, Ordering::Relaxed) <= 1 { + self.notify.notify_one(); + } + } +} + impl Runtime { pub(crate) async fn start(&mut self) -> ZResult<()> { match self.whatami() { @@ -85,7 +111,7 @@ impl Runtime { bail!("No peer specified and multicast scouting desactivated!") } } - _ => self.connect_peers(&peers, true).await, + _ => self.connect_peers(&peers, true, None).await, } } @@ -124,12 +150,22 @@ impl Runtime { self.bind_listeners(&listeners).await?; - self.connect_peers(&peers, false).await?; + let start_conditions = Arc::new(StartConditions::default()); + + self.connect_peers(&peers, false, Some(start_conditions.clone())) + .await?; if scouting { self.start_scout(listen, autoconnect, addr, ifaces).await?; } - tokio::time::sleep(delay).await; + + if tokio::time::timeout(delay, start_conditions.notified()) + .await + .is_err() + && !peers.is_empty() + { + tracing::warn!("Scouting delay elapsed before start conditions are met."); + } Ok(()) } @@ -167,7 +203,7 @@ impl Runtime { self.bind_listeners(&listeners).await?; - self.connect_peers(&peers, false).await?; + self.connect_peers(&peers, false, None).await?; if scouting { self.start_scout(listen, autoconnect, addr, ifaces).await?; @@ -218,13 +254,20 @@ impl Runtime { Ok(()) } - async fn connect_peers(&self, peers: &[EndPoint], single_link: bool) -> ZResult<()> { + async fn connect_peers( + &self, + peers: &[EndPoint], + single_link: bool, + notify: Option>, + ) -> ZResult<()> { let timeout = self.get_global_connect_timeout(); if timeout.is_zero() { - self.connect_peers_impl(peers, single_link).await + self.connect_peers_impl(peers, single_link, notify).await } else { let res = tokio::time::timeout(timeout, async { - self.connect_peers_impl(peers, single_link).await.ok() + self.connect_peers_impl(peers, single_link, notify) + .await + .ok() }) .await; match res { @@ -242,11 +285,16 @@ impl Runtime { } } - async fn connect_peers_impl(&self, peers: &[EndPoint], single_link: bool) -> ZResult<()> { + async fn connect_peers_impl( + &self, + peers: &[EndPoint], + single_link: bool, + notify: Option>, + ) -> ZResult<()> { if single_link { self.connect_peers_single_link(peers).await } else { - self.connect_peers_multiply_links(peers).await + self.connect_peers_multiply_links(peers, notify).await } } @@ -284,7 +332,11 @@ impl Runtime { Err(e.into()) } - async fn connect_peers_multiply_links(&self, peers: &[EndPoint]) -> ZResult<()> { + async fn connect_peers_multiply_links( + &self, + peers: &[EndPoint], + notify: Option>, + ) -> ZResult<()> { for peer in peers { let endpoint = peer.clone(); let retry_config = self.get_connect_retry_config(&endpoint); @@ -306,7 +358,7 @@ impl Runtime { self.peer_connector_retry(endpoint).await; } else { // try to connect in background - self.spawn_peer_connector(endpoint).await? + self.spawn_peer_connector(endpoint, notify.clone()).await? } } Ok(()) @@ -369,7 +421,7 @@ impl Runtime { } false }) { - self.spawn_peer_connector(peer).await?; + self.spawn_peer_connector(peer, None).await?; } } } @@ -644,13 +696,25 @@ impl Runtime { Ok(udp_socket) } - async fn spawn_peer_connector(&self, peer: EndPoint) -> ZResult<()> { + async fn spawn_peer_connector( + &self, + peer: EndPoint, + notify: Option>, + ) -> ZResult<()> { if !LocatorInspector::default() .is_multicast(&peer.to_locator()) .await? { let this = self.clone(); - self.spawn(async move { this.peer_connector_retry(peer).await }); + if let Some(notify) = notify.as_ref() { + notify.add_peer_connector(); + } + self.spawn(async move { + this.peer_connector_retry(peer).await; + if let Some(notify) = notify { + notify.terminate_peer_connector(); + } + }); Ok(()) } else { bail!("Forbidden multicast endpoint in connect list!") diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index cd3216a5bb..075e9d0df4 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -850,6 +850,7 @@ impl Session { tracing::debug!("Config: {:?}", &config); let aggregated_subscribers = config.aggregation().subscribers().clone(); let aggregated_publishers = config.aggregation().publishers().clone(); + let peer_linkstate = unwrap_or_default!(config.routing().peer().mode()) == *"linkstate"; let mut runtime = Runtime::init( config, #[cfg(all(feature = "unstable", feature = "shared-memory"))] @@ -866,7 +867,9 @@ impl Session { .await; session.owns_runtime = true; runtime.start().await?; - if runtime.whatami() != WhatAmI::Client { + if runtime.whatami() == WhatAmI::Router + || (runtime.whatami() == WhatAmI::Peer && peer_linkstate) + { // Workaround for the declare_and_shoot problem tokio::time::sleep(Duration::from_millis(*API_OPEN_SESSION_DELAY)).await; }