diff --git a/io/zenoh-link-commons/src/listener.rs b/io/zenoh-link-commons/src/listener.rs index 9d56823a88..eb5c08106c 100644 --- a/io/zenoh-link-commons/src/listener.rs +++ b/io/zenoh-link-commons/src/listener.rs @@ -11,21 +11,16 @@ // Contributors: // ZettaScale Zenoh Team, // -use async_std::net::{SocketAddr, TcpListener, TcpStream}; -use async_std::prelude::*; +use async_std::net::SocketAddr; use async_std::task; use async_std::task::JoinHandle; -use async_trait::async_trait; use std::collections::HashMap; -use std::convert::TryInto; -use std::fmt; -use std::net::{IpAddr, Shutdown}; +use std::net::IpAddr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; -use std::time::Duration; use zenoh_core::{zread, zwrite}; use zenoh_protocol::core::{EndPoint, Locator}; -use zenoh_result::{bail, zerror, Error as ZError, ZResult}; +use zenoh_result::{zerror, ZResult}; use zenoh_sync::Signal; pub struct UnicastListener { @@ -56,13 +51,13 @@ pub struct UnicastListeners { } impl UnicastListeners { - fn new() -> UnicastListeners { + pub fn new() -> UnicastListeners { UnicastListeners { listeners: Arc::new(RwLock::new(HashMap::new())), } } - async fn add_listener( + pub async fn add_listener( &self, endpoint: EndPoint, addr: SocketAddr, @@ -86,7 +81,7 @@ impl UnicastListeners { Ok(()) } - async fn del_listener(&self, addr: SocketAddr) -> ZResult<()> { + pub async fn del_listener(&self, addr: SocketAddr) -> ZResult<()> { // Stop the listener let listener = zwrite!(self.listeners).remove(&addr).ok_or_else(|| { zerror!( @@ -101,14 +96,14 @@ impl UnicastListeners { listener.handle.await } - fn get_endpoints(&self) -> Vec { + pub fn get_endpoints(&self) -> Vec { zread!(self.listeners) .values() .map(|l| l.endpoint.clone()) .collect() } - fn get_locators(&self) -> Vec { + pub fn get_locators(&self) -> Vec { let mut locators = vec![]; let guard = zread!(self.listeners); diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index 5c91c9be90..b1e9a47405 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -14,19 +14,15 @@ use async_std::net::{SocketAddr, TcpListener, TcpStream}; use async_std::prelude::*; use async_std::task; -use async_std::task::JoinHandle; use async_trait::async_trait; -use std::collections::HashMap; use std::convert::TryInto; use std::fmt; -use std::net::{IpAddr, Shutdown}; +use std::net::Shutdown; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::Duration; -use zenoh_core::{zread, zwrite}; use zenoh_link_commons::{ - LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, UnicastListener, - UnicastListeners, + LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, UnicastListeners, }; use zenoh_protocol::core::{EndPoint, Locator}; use zenoh_result::{bail, zerror, Error as ZError, ZResult}; @@ -201,42 +197,16 @@ impl fmt::Debug for LinkUnicastTcp { } } -/*************************************/ -/* LISTENER */ -/*************************************/ -struct ListenerUnicastTcp { - endpoint: EndPoint, - active: Arc, - signal: Signal, - handle: JoinHandle>, -} - -impl ListenerUnicastTcp { - fn new( - endpoint: EndPoint, - active: Arc, - signal: Signal, - handle: JoinHandle>, - ) -> ListenerUnicastTcp { - ListenerUnicastTcp { - endpoint, - active, - signal, - handle, - } - } -} - pub struct LinkManagerUnicastTcp { manager: NewLinkChannelSender, - listeners: Arc>>, + listeners: UnicastListeners, } impl LinkManagerUnicastTcp { pub fn new(manager: NewLinkChannelSender) -> Self { Self { manager, - listeners: Arc::new(RwLock::new(HashMap::new())), + listeners: UnicastListeners::new(), } } } @@ -319,27 +289,22 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { endpoint.config(), )?; - // Spawn the accept loop for the listener let active = Arc::new(AtomicBool::new(true)); let signal = Signal::new(); - let mut listeners = zwrite!(self.listeners); let c_active = active.clone(); let c_signal = signal.clone(); let c_manager = self.manager.clone(); - let c_listeners = self.listeners.clone(); - let c_addr = local_addr; + let handle = task::spawn(async move { - // Wait for the accept loop to terminate - let res = accept_task(socket, c_active, c_signal, c_manager).await; - zwrite!(c_listeners).remove(&c_addr); - res + accept_task(socket, c_active, c_signal, c_manager).await }); let locator = endpoint.to_locator(); - let listener = ListenerUnicastTcp::new(endpoint, active, signal, handle); - // Update the list of active listeners on the manager - listeners.insert(local_addr, listener); + + self.listeners + .add_listener(endpoint, local_addr, active, signal, handle) + .await?; return Ok(locator); } @@ -365,73 +330,35 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { // Stop the listener let mut errs: Vec = vec![]; - let mut listener = None; + let mut failed = true; for a in addrs { - match zwrite!(self.listeners).remove(&a) { - Some(l) => { - // We cannot keep a sync guard across a .await - // Break the loop and assign the listener. - listener = Some(l); + match self.listeners.del_listener(a).await { + Ok(_) => { + failed = false; break; } - None => { - errs.push(zerror!("{}", a).into()); + Err(err) => { + errs.push(zerror!("{}", err).into()); } } } - match listener { - Some(l) => { - // Send the stop signal - l.active.store(false, Ordering::Release); - l.signal.trigger(); - l.handle.await - } - None => { - bail!( - "Can not delete the TCP listener bound to {}: {:?}", - endpoint, - errs - ) - } + if failed { + bail!( + "Can not delete the TCP listener bound to {}: {:?}", + endpoint, + errs + ) } + Ok(()) } fn get_listeners(&self) -> Vec { - zread!(self.listeners) - .values() - .map(|l| l.endpoint.clone()) - .collect() + self.listeners.get_endpoints() } fn get_locators(&self) -> Vec { - let mut locators = vec![]; - - let guard = zread!(self.listeners); - for (key, value) in guard.iter() { - let (kip, kpt) = (key.ip(), key.port()); - - // Either ipv4/0.0.0.0 or ipv6/[::] - if kip.is_unspecified() { - let mut addrs = match kip { - IpAddr::V4(_) => zenoh_util::net::get_ipv4_ipaddrs(), - IpAddr::V6(_) => zenoh_util::net::get_ipv6_ipaddrs(), - }; - let iter = addrs.drain(..).map(|x| { - Locator::new( - value.endpoint.protocol(), - SocketAddr::new(x, kpt).to_string(), - value.endpoint.metadata(), - ) - .unwrap() - }); - locators.extend(iter); - } else { - locators.push(value.endpoint.to_locator()); - } - } - - locators + self.listeners.get_locators() } }