From b2e21148675843288b2d82f69c5b606657d5c092 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Thu, 25 Apr 2024 15:28:06 +0200 Subject: [PATCH] Fix gossip deadlock --- .../net/routing/hat/linkstate_peer/network.rs | 50 ++++++++++--------- zenoh/src/net/routing/hat/p2p_peer/gossip.rs | 23 +++++---- zenoh/src/net/routing/hat/router/network.rs | 48 ++++++++++-------- 3 files changed, 64 insertions(+), 57 deletions(-) diff --git a/zenoh/src/net/routing/hat/linkstate_peer/network.rs b/zenoh/src/net/routing/hat/linkstate_peer/network.rs index d5f37e3733..a4a6841644 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/network.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/network.rs @@ -486,26 +486,25 @@ impl Network { ); } - if !self.autoconnect.is_empty() { + if !self.autoconnect.is_empty() && self.autoconnect.matches(whatami) { // Connect discovered peers - if zenoh_runtime::ZRuntime::Net - .block_in_place( - strong_runtime.manager().get_transport_unicast(&zid), - ) - .is_none() - && self.autoconnect.matches(whatami) - { - if let Some(locators) = locators { - let runtime = strong_runtime.clone(); - strong_runtime.spawn(async move { + if let Some(locators) = locators { + let runtime = strong_runtime.clone(); + strong_runtime.spawn(async move { + if runtime + .manager() + .get_transport_unicast(&zid) + .await + .is_none() + { // random backoff tokio::time::sleep(std::time::Duration::from_millis( rand::random::() % 100, )) .await; runtime.connect_peer(&zid, &locators).await; - }); - } + } + }); } } } @@ -610,22 +609,25 @@ impl Network { for (_, idx, _) in &link_states { let node = &self.graph[*idx]; if let Some(whatami) = node.whatami { - if zenoh_runtime::ZRuntime::Net - .block_in_place(strong_runtime.manager().get_transport_unicast(&node.zid)) - .is_none() - && self.autoconnect.matches(whatami) - { + if self.autoconnect.matches(whatami) { if let Some(locators) = &node.locators { let runtime = strong_runtime.clone(); let zid = node.zid; let locators = locators.clone(); strong_runtime.spawn(async move { - // random backoff - tokio::time::sleep(std::time::Duration::from_millis( - rand::random::() % 100, - )) - .await; - runtime.connect_peer(&zid, &locators).await; + if runtime + .manager() + .get_transport_unicast(&zid) + .await + .is_none() + { + // random backoff + tokio::time::sleep(std::time::Duration::from_millis( + rand::random::() % 100, + )) + .await; + runtime.connect_peer(&zid, &locators).await; + } }); } } diff --git a/zenoh/src/net/routing/hat/p2p_peer/gossip.rs b/zenoh/src/net/routing/hat/p2p_peer/gossip.rs index a5b72a73eb..537f29aeec 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/gossip.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/gossip.rs @@ -406,24 +406,25 @@ impl Network { ); } - if !self.autoconnect.is_empty() { + if !self.autoconnect.is_empty() && self.autoconnect.matches(whatami) { // Connect discovered peers - if zenoh_runtime::ZRuntime::Acceptor - .block_in_place(strong_runtime.manager().get_transport_unicast(&zid)) - .is_none() - && self.autoconnect.matches(whatami) - { - if let Some(locators) = locators { - let runtime = strong_runtime.clone(); - strong_runtime.spawn(async move { + if let Some(locators) = locators { + let runtime = strong_runtime.clone(); + strong_runtime.spawn(async move { + if runtime + .manager() + .get_transport_unicast(&zid) + .await + .is_none() + { // random backoff tokio::time::sleep(std::time::Duration::from_millis( rand::random::() % 100, )) .await; runtime.connect_peer(&zid, &locators).await; - }); - } + } + }); } } } diff --git a/zenoh/src/net/routing/hat/router/network.rs b/zenoh/src/net/routing/hat/router/network.rs index 727eb6763e..1ee77ae8e2 100644 --- a/zenoh/src/net/routing/hat/router/network.rs +++ b/zenoh/src/net/routing/hat/router/network.rs @@ -489,24 +489,25 @@ impl Network { ); } - if !self.autoconnect.is_empty() { + if !self.autoconnect.is_empty() && self.autoconnect.matches(whatami) { // Connect discovered peers - if zenoh_runtime::ZRuntime::Net - .block_in_place(self.runtime.manager().get_transport_unicast(&zid)) - .is_none() - && self.autoconnect.matches(whatami) - { - if let Some(locators) = locators { - let runtime = self.runtime.clone(); - self.runtime.spawn(async move { + if let Some(locators) = locators { + let runtime = self.runtime.clone(); + self.runtime.spawn(async move { + if runtime + .manager() + .get_transport_unicast(&zid) + .await + .is_none() + { // random backoff tokio::time::sleep(std::time::Duration::from_millis( rand::random::() % 100, )) .await; runtime.connect_peer(&zid, &locators).await; - }); - } + } + }); } } } @@ -611,22 +612,25 @@ impl Network { for (_, idx, _) in &link_states { let node = &self.graph[*idx]; if let Some(whatami) = node.whatami { - if zenoh_runtime::ZRuntime::Net - .block_in_place(self.runtime.manager().get_transport_unicast(&node.zid)) - .is_none() - && self.autoconnect.matches(whatami) - { + if self.autoconnect.matches(whatami) { if let Some(locators) = &node.locators { let runtime = self.runtime.clone(); let zid = node.zid; let locators = locators.clone(); self.runtime.spawn(async move { - // random backoff - tokio::time::sleep(std::time::Duration::from_millis( - rand::random::() % 100, - )) - .await; - runtime.connect_peer(&zid, &locators).await; + if runtime + .manager() + .get_transport_unicast(&zid) + .await + .is_none() + { + // random backoff + tokio::time::sleep(std::time::Duration::from_millis( + rand::random::() % 100, + )) + .await; + runtime.connect_peer(&zid, &locators).await; + } }); } }