Skip to content

Commit

Permalink
Don't wait for full scouting delay when peers connected all configure…
Browse files Browse the repository at this point in the history
…d connect endpoints
  • Loading branch information
OlivierHecart committed Apr 26, 2024
1 parent d3b01e2 commit 01ccbb7
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 15 deletions.
92 changes: 78 additions & 14 deletions zenoh/src/net/runtime/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@ use super::{Runtime, RuntimeSession};
use futures::prelude::*;
use socket2::{Domain, Socket, Type};
use std::net::{IpAddr, Ipv6Addr, SocketAddr};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::net::UdpSocket;
use tokio::sync::futures::Notified;
use tokio::sync::Notify;
use zenoh_buffers::reader::DidntRead;
use zenoh_buffers::{reader::HasReader, writer::HasWriter};
use zenoh_codec::{RCodec, WCodec, Zenoh080};
Expand All @@ -42,6 +46,28 @@ pub enum Loop {
Break,
}

#[derive(Default)]
struct StartConditions {
notify: Notify,
peer_connectors: AtomicUsize,
}

impl StartConditions {
fn notified(&self) -> Notified<'_> {
self.notify.notified()
}

fn add_peer_connector(&self) {
self.peer_connectors.fetch_add(1, Ordering::Relaxed);
}

fn terminate_peer_connector(&self) {
if self.peer_connectors.fetch_sub(1, Ordering::Relaxed) <= 1 {
self.notify.notify_one();
}
}
}

impl Runtime {
pub(crate) async fn start(&mut self) -> ZResult<()> {
match self.whatami() {
Expand Down Expand Up @@ -85,7 +111,7 @@ impl Runtime {
bail!("No peer specified and multicast scouting desactivated!")
}
}
_ => self.connect_peers(&peers, true).await,
_ => self.connect_peers(&peers, true, None).await,
}
}

Expand Down Expand Up @@ -124,12 +150,22 @@ impl Runtime {

self.bind_listeners(&listeners).await?;

self.connect_peers(&peers, false).await?;
let start_conditions = Arc::new(StartConditions::default());

self.connect_peers(&peers, false, Some(start_conditions.clone()))
.await?;

if scouting {
self.start_scout(listen, autoconnect, addr, ifaces).await?;
}
tokio::time::sleep(delay).await;

if tokio::time::timeout(delay, start_conditions.notified())
.await
.is_err()
&& !peers.is_empty()
{
tracing::warn!("Scouting delay elapsed before start conditions are met.");
}
Ok(())
}

Expand Down Expand Up @@ -167,7 +203,7 @@ impl Runtime {

self.bind_listeners(&listeners).await?;

self.connect_peers(&peers, false).await?;
self.connect_peers(&peers, false, None).await?;

if scouting {
self.start_scout(listen, autoconnect, addr, ifaces).await?;
Expand Down Expand Up @@ -218,13 +254,20 @@ impl Runtime {
Ok(())
}

async fn connect_peers(&self, peers: &[EndPoint], single_link: bool) -> ZResult<()> {
async fn connect_peers(
&self,
peers: &[EndPoint],
single_link: bool,
notify: Option<Arc<StartConditions>>,
) -> ZResult<()> {
let timeout = self.get_global_connect_timeout();
if timeout.is_zero() {
self.connect_peers_impl(peers, single_link).await
self.connect_peers_impl(peers, single_link, notify).await
} else {
let res = tokio::time::timeout(timeout, async {
self.connect_peers_impl(peers, single_link).await.ok()
self.connect_peers_impl(peers, single_link, notify)
.await
.ok()
})
.await;
match res {
Expand All @@ -242,11 +285,16 @@ impl Runtime {
}
}

async fn connect_peers_impl(&self, peers: &[EndPoint], single_link: bool) -> ZResult<()> {
async fn connect_peers_impl(
&self,
peers: &[EndPoint],
single_link: bool,
notify: Option<Arc<StartConditions>>,
) -> ZResult<()> {
if single_link {
self.connect_peers_single_link(peers).await
} else {
self.connect_peers_multiply_links(peers).await
self.connect_peers_multiply_links(peers, notify).await
}
}

Expand Down Expand Up @@ -284,7 +332,11 @@ impl Runtime {
Err(e.into())
}

async fn connect_peers_multiply_links(&self, peers: &[EndPoint]) -> ZResult<()> {
async fn connect_peers_multiply_links(
&self,
peers: &[EndPoint],
notify: Option<Arc<StartConditions>>,
) -> ZResult<()> {
for peer in peers {
let endpoint = peer.clone();
let retry_config = self.get_connect_retry_config(&endpoint);
Expand All @@ -306,7 +358,7 @@ impl Runtime {
self.peer_connector_retry(endpoint).await;
} else {
// try to connect in background
self.spawn_peer_connector(endpoint).await?
self.spawn_peer_connector(endpoint, notify.clone()).await?
}
}
Ok(())
Expand Down Expand Up @@ -369,7 +421,7 @@ impl Runtime {
}
false
}) {
self.spawn_peer_connector(peer).await?;
self.spawn_peer_connector(peer, None).await?;
}
}
}
Expand Down Expand Up @@ -644,13 +696,25 @@ impl Runtime {
Ok(udp_socket)
}

async fn spawn_peer_connector(&self, peer: EndPoint) -> ZResult<()> {
async fn spawn_peer_connector(
&self,
peer: EndPoint,
notify: Option<Arc<StartConditions>>,
) -> ZResult<()> {
if !LocatorInspector::default()
.is_multicast(&peer.to_locator())
.await?
{
let this = self.clone();
self.spawn(async move { this.peer_connector_retry(peer).await });
if let Some(notify) = notify.as_ref() {
notify.add_peer_connector();
}
self.spawn(async move {
this.peer_connector_retry(peer).await;
if let Some(notify) = notify {
notify.terminate_peer_connector();
}
});
Ok(())
} else {
bail!("Forbidden multicast endpoint in connect list!")
Expand Down
5 changes: 4 additions & 1 deletion zenoh/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,7 @@ impl Session {
tracing::debug!("Config: {:?}", &config);
let aggregated_subscribers = config.aggregation().subscribers().clone();
let aggregated_publishers = config.aggregation().publishers().clone();
let peer_linkstate = unwrap_or_default!(config.routing().peer().mode()) == *"linkstate";
let mut runtime = Runtime::init(
config,
#[cfg(all(feature = "unstable", feature = "shared-memory"))]
Expand All @@ -866,7 +867,9 @@ impl Session {
.await;
session.owns_runtime = true;
runtime.start().await?;
if runtime.whatami() != WhatAmI::Client {
if runtime.whatami() == WhatAmI::Router
|| (runtime.whatami() == WhatAmI::Peer && peer_linkstate)
{
// Workaround for the declare_and_shoot problem
tokio::time::sleep(Duration::from_millis(*API_OPEN_SESSION_DELAY)).await;
}
Expand Down

0 comments on commit 01ccbb7

Please sign in to comment.