Skip to content

Commit

Permalink
Wait for gossip and related connections attempts from connect endpoin…
Browse files Browse the repository at this point in the history
…ts before returning to open
  • Loading branch information
OlivierHecart committed Apr 23, 2024
1 parent 89cf6b1 commit 3413888
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 46 deletions.
1 change: 1 addition & 0 deletions zenoh/src/net/routing/hat/p2p_peer/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ impl Network {
}
}
}
strong_runtime.start_conditions().terminate_peer_connector();
}
}

Expand Down
8 changes: 8 additions & 0 deletions zenoh/src/net/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
mod adminspace;
pub mod orchestrator;

use self::orchestrator::StartConditions;

use super::primitives::DeMux;
use super::routing;
use super::routing::router::Router;
Expand Down Expand Up @@ -59,6 +61,7 @@ struct RuntimeState {
locators: std::sync::RwLock<Vec<Locator>>,
hlc: Option<Arc<HLC>>,
task_controller: TaskController,
start_conditions: Arc<StartConditions>,
}

pub struct WeakRuntime {
Expand Down Expand Up @@ -136,6 +139,7 @@ impl Runtime {
locators: std::sync::RwLock::new(vec![]),
hlc,
task_controller: TaskController::default(),
start_conditions: Arc::new(StartConditions::default()),
}),
};
*handler.runtime.write().unwrap() = Runtime::downgrade(&runtime);
Expand Down Expand Up @@ -269,6 +273,10 @@ impl Runtime {
pub fn get_cancellation_token(&self) -> CancellationToken {
self.state.task_controller.get_cancellation_token()
}

pub(crate) fn start_conditions(&self) -> &Arc<StartConditions> {
&self.state.start_conditions
}
}

struct RuntimeTransportEventHandler {
Expand Down
67 changes: 21 additions & 46 deletions zenoh/src/net/runtime/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use futures::prelude::*;
use socket2::{Domain, Socket, Type};
use std::net::{IpAddr, Ipv6Addr, SocketAddr};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::net::UdpSocket;
use tokio::sync::futures::Notified;
Expand Down Expand Up @@ -47,21 +46,21 @@ pub enum Loop {
}

#[derive(Default)]
struct StartConditions {
pub(crate) struct StartConditions {
notify: Notify,
peer_connectors: AtomicUsize,
}

impl StartConditions {
fn notified(&self) -> Notified<'_> {
pub(crate) fn notified(&self) -> Notified<'_> {
self.notify.notified()
}

fn add_peer_connector(&self) {
pub(crate) fn add_peer_connector(&self) {
self.peer_connectors.fetch_add(1, Ordering::Relaxed);
}

fn terminate_peer_connector(&self) {
pub(crate) fn terminate_peer_connector(&self) {
if self.peer_connectors.fetch_sub(1, Ordering::Relaxed) <= 1 {
self.notify.notify_one();
}
Expand Down Expand Up @@ -111,7 +110,7 @@ impl Runtime {
bail!("No peer specified and multicast scouting desactivated!")
}
}
_ => self.connect_peers(&peers, true, None).await,
_ => self.connect_peers(&peers, true).await,
}
}

Expand Down Expand Up @@ -150,16 +149,13 @@ impl Runtime {

self.bind_listeners(&listeners).await?;

let start_conditions = Arc::new(StartConditions::default());

self.connect_peers(&peers, false, Some(start_conditions.clone()))
.await?;
self.connect_peers(&peers, false).await?;

if scouting {
self.start_scout(listen, autoconnect, addr, ifaces).await?;
}

if tokio::time::timeout(delay, start_conditions.notified())
if tokio::time::timeout(delay, self.state.start_conditions.notified())
.await
.is_err()
&& !peers.is_empty()
Expand Down Expand Up @@ -203,7 +199,7 @@ impl Runtime {

self.bind_listeners(&listeners).await?;

self.connect_peers(&peers, false, None).await?;
self.connect_peers(&peers, false).await?;

if scouting {
self.start_scout(listen, autoconnect, addr, ifaces).await?;
Expand Down Expand Up @@ -254,20 +250,13 @@ impl Runtime {
Ok(())
}

async fn connect_peers(
&self,
peers: &[EndPoint],
single_link: bool,
notify: Option<Arc<StartConditions>>,
) -> ZResult<()> {
async fn connect_peers(&self, peers: &[EndPoint], single_link: bool) -> ZResult<()> {
let timeout = self.get_global_connect_timeout();
if timeout.is_zero() {
self.connect_peers_impl(peers, single_link, notify).await
self.connect_peers_impl(peers, single_link).await
} else {
let res = tokio::time::timeout(timeout, async {
self.connect_peers_impl(peers, single_link, notify)
.await
.ok()
self.connect_peers_impl(peers, single_link).await.ok()
})
.await;
match res {
Expand All @@ -285,16 +274,11 @@ impl Runtime {
}
}

async fn connect_peers_impl(
&self,
peers: &[EndPoint],
single_link: bool,
notify: Option<Arc<StartConditions>>,
) -> ZResult<()> {
async fn connect_peers_impl(&self, peers: &[EndPoint], single_link: bool) -> ZResult<()> {
if single_link {
self.connect_peers_single_link(peers).await
} else {
self.connect_peers_multiply_links(peers, notify).await
self.connect_peers_multiply_links(peers).await
}
}

Expand Down Expand Up @@ -332,11 +316,7 @@ impl Runtime {
Err(e.into())
}

async fn connect_peers_multiply_links(
&self,
peers: &[EndPoint],
notify: Option<Arc<StartConditions>>,
) -> ZResult<()> {
async fn connect_peers_multiply_links(&self, peers: &[EndPoint]) -> ZResult<()> {
for peer in peers {
let endpoint = peer.clone();
let retry_config = self.get_connect_retry_config(&endpoint);
Expand All @@ -358,7 +338,7 @@ impl Runtime {
self.peer_connector_retry(endpoint).await;
} else {
// try to connect in background
self.spawn_peer_connector(endpoint, notify.clone()).await?
self.spawn_peer_connector(endpoint).await?
}
}
Ok(())
Expand Down Expand Up @@ -421,7 +401,7 @@ impl Runtime {
}
false
}) {
self.spawn_peer_connector(peer, None).await?;
self.spawn_peer_connector(peer).await?;
}
}
}
Expand Down Expand Up @@ -696,23 +676,18 @@ impl Runtime {
Ok(udp_socket)
}

async fn spawn_peer_connector(
&self,
peer: EndPoint,
notify: Option<Arc<StartConditions>>,
) -> ZResult<()> {
async fn spawn_peer_connector(&self, peer: EndPoint) -> ZResult<()> {
if !LocatorInspector::default()
.is_multicast(&peer.to_locator())
.await?
{
let this = self.clone();
if let Some(notify) = notify.as_ref() {
notify.add_peer_connector();
}
self.state.start_conditions.add_peer_connector();
self.spawn(async move {
this.peer_connector_retry(peer).await;
if let Some(notify) = notify {
notify.terminate_peer_connector();
let config = this.config().lock();
if unwrap_or_default!(config.scouting().gossip().enabled()) {
this.state.start_conditions.terminate_peer_connector();
}
});
Ok(())
Expand Down

0 comments on commit 3413888

Please sign in to comment.