diff --git a/zenoh/src/net/routing/hat/p2p_peer/gossip.rs b/zenoh/src/net/routing/hat/p2p_peer/gossip.rs index 3f0eccc40f..bfb32e629e 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/gossip.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/gossip.rs @@ -428,6 +428,7 @@ impl Network { } } } + strong_runtime.start_conditions().terminate_peer_connector(); } } diff --git a/zenoh/src/net/runtime/mod.rs b/zenoh/src/net/runtime/mod.rs index 68e121847d..d1a64e2072 100644 --- a/zenoh/src/net/runtime/mod.rs +++ b/zenoh/src/net/runtime/mod.rs @@ -20,6 +20,8 @@ mod adminspace; pub mod orchestrator; +use self::orchestrator::StartConditions; + use super::primitives::DeMux; use super::routing; use super::routing::router::Router; @@ -59,6 +61,7 @@ struct RuntimeState { locators: std::sync::RwLock>, hlc: Option>, task_controller: TaskController, + start_conditions: Arc, } pub struct WeakRuntime { @@ -136,6 +139,7 @@ impl Runtime { locators: std::sync::RwLock::new(vec![]), hlc, task_controller: TaskController::default(), + start_conditions: Arc::new(StartConditions::default()), }), }; *handler.runtime.write().unwrap() = Runtime::downgrade(&runtime); @@ -269,6 +273,10 @@ impl Runtime { pub fn get_cancellation_token(&self) -> CancellationToken { self.state.task_controller.get_cancellation_token() } + + pub(crate) fn start_conditions(&self) -> &Arc { + &self.state.start_conditions + } } struct RuntimeTransportEventHandler { diff --git a/zenoh/src/net/runtime/orchestrator.rs b/zenoh/src/net/runtime/orchestrator.rs index 38a1b28eb0..a97ae17070 100644 --- a/zenoh/src/net/runtime/orchestrator.rs +++ b/zenoh/src/net/runtime/orchestrator.rs @@ -16,7 +16,6 @@ use futures::prelude::*; use socket2::{Domain, Socket, Type}; use std::net::{IpAddr, Ipv6Addr, SocketAddr}; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; use std::time::Duration; use tokio::net::UdpSocket; use tokio::sync::futures::Notified; @@ -47,21 +46,21 @@ pub enum Loop { } #[derive(Default)] -struct StartConditions { +pub(crate) struct StartConditions { notify: Notify, peer_connectors: AtomicUsize, } impl StartConditions { - fn notified(&self) -> Notified<'_> { + pub(crate) fn notified(&self) -> Notified<'_> { self.notify.notified() } - fn add_peer_connector(&self) { + pub(crate) fn add_peer_connector(&self) { self.peer_connectors.fetch_add(1, Ordering::Relaxed); } - fn terminate_peer_connector(&self) { + pub(crate) fn terminate_peer_connector(&self) { if self.peer_connectors.fetch_sub(1, Ordering::Relaxed) <= 1 { self.notify.notify_one(); } @@ -111,7 +110,7 @@ impl Runtime { bail!("No peer specified and multicast scouting desactivated!") } } - _ => self.connect_peers(&peers, true, None).await, + _ => self.connect_peers(&peers, true).await, } } @@ -150,16 +149,13 @@ impl Runtime { self.bind_listeners(&listeners).await?; - let start_conditions = Arc::new(StartConditions::default()); - - self.connect_peers(&peers, false, Some(start_conditions.clone())) - .await?; + self.connect_peers(&peers, false).await?; if scouting { self.start_scout(listen, autoconnect, addr, ifaces).await?; } - if tokio::time::timeout(delay, start_conditions.notified()) + if tokio::time::timeout(delay, self.state.start_conditions.notified()) .await .is_err() && !peers.is_empty() @@ -203,7 +199,7 @@ impl Runtime { self.bind_listeners(&listeners).await?; - self.connect_peers(&peers, false, None).await?; + self.connect_peers(&peers, false).await?; if scouting { self.start_scout(listen, autoconnect, addr, ifaces).await?; @@ -254,20 +250,13 @@ impl Runtime { Ok(()) } - async fn connect_peers( - &self, - peers: &[EndPoint], - single_link: bool, - notify: Option>, - ) -> ZResult<()> { + async fn connect_peers(&self, peers: &[EndPoint], single_link: bool) -> ZResult<()> { let timeout = self.get_global_connect_timeout(); if timeout.is_zero() { - self.connect_peers_impl(peers, single_link, notify).await + self.connect_peers_impl(peers, single_link).await } else { let res = tokio::time::timeout(timeout, async { - self.connect_peers_impl(peers, single_link, notify) - .await - .ok() + self.connect_peers_impl(peers, single_link).await.ok() }) .await; match res { @@ -285,16 +274,11 @@ impl Runtime { } } - async fn connect_peers_impl( - &self, - peers: &[EndPoint], - single_link: bool, - notify: Option>, - ) -> ZResult<()> { + async fn connect_peers_impl(&self, peers: &[EndPoint], single_link: bool) -> ZResult<()> { if single_link { self.connect_peers_single_link(peers).await } else { - self.connect_peers_multiply_links(peers, notify).await + self.connect_peers_multiply_links(peers).await } } @@ -332,11 +316,7 @@ impl Runtime { Err(e.into()) } - async fn connect_peers_multiply_links( - &self, - peers: &[EndPoint], - notify: Option>, - ) -> ZResult<()> { + async fn connect_peers_multiply_links(&self, peers: &[EndPoint]) -> ZResult<()> { for peer in peers { let endpoint = peer.clone(); let retry_config = self.get_connect_retry_config(&endpoint); @@ -358,7 +338,7 @@ impl Runtime { self.peer_connector_retry(endpoint).await; } else { // try to connect in background - self.spawn_peer_connector(endpoint, notify.clone()).await? + self.spawn_peer_connector(endpoint).await? } } Ok(()) @@ -421,7 +401,7 @@ impl Runtime { } false }) { - self.spawn_peer_connector(peer, None).await?; + self.spawn_peer_connector(peer).await?; } } } @@ -696,23 +676,18 @@ impl Runtime { Ok(udp_socket) } - async fn spawn_peer_connector( - &self, - peer: EndPoint, - notify: Option>, - ) -> ZResult<()> { + async fn spawn_peer_connector(&self, peer: EndPoint) -> ZResult<()> { if !LocatorInspector::default() .is_multicast(&peer.to_locator()) .await? { let this = self.clone(); - if let Some(notify) = notify.as_ref() { - notify.add_peer_connector(); - } + self.state.start_conditions.add_peer_connector(); self.spawn(async move { this.peer_connector_retry(peer).await; - if let Some(notify) = notify { - notify.terminate_peer_connector(); + let config = this.config().lock(); + if unwrap_or_default!(config.scouting().gossip().enabled()) { + this.state.start_conditions.terminate_peer_connector(); } }); Ok(())