diff --git a/Cargo.lock b/Cargo.lock index 2dc8ae14b6..fa9de7e800 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4779,13 +4779,17 @@ dependencies = [ "async-std", "async-trait", "flume", + "log", "lz4_flex", "serde", "typenum", "zenoh-buffers", "zenoh-codec", + "zenoh-core", "zenoh-protocol", "zenoh-result", + "zenoh-sync", + "zenoh-util", ] [[package]] diff --git a/io/zenoh-link-commons/Cargo.toml b/io/zenoh-link-commons/Cargo.toml index 36e39eceed..019067de56 100644 --- a/io/zenoh-link-commons/Cargo.toml +++ b/io/zenoh-link-commons/Cargo.toml @@ -29,12 +29,16 @@ compression = [] [dependencies] async-std = { workspace = true } +zenoh-sync = { workspace = true } async-trait = { workspace = true } flume = { workspace = true } lz4_flex = { workspace = true } +log = { workspace = true } serde = { workspace = true, features = ["default"] } typenum = { workspace = true } zenoh-buffers = { workspace = true } zenoh-codec = { workspace = true } +zenoh-core = { workspace = true } zenoh-protocol = { workspace = true } zenoh-result = { workspace = true } +zenoh-util = { workspace = true } diff --git a/io/zenoh-link-commons/src/lib.rs b/io/zenoh-link-commons/src/lib.rs index b15a0d9ad5..2ee28c3f08 100644 --- a/io/zenoh-link-commons/src/lib.rs +++ b/io/zenoh-link-commons/src/lib.rs @@ -17,15 +17,16 @@ //! This crate is intended for Zenoh's internal use. //! //! [Click here for Zenoh's documentation](../zenoh/index.html) -#![no_std] extern crate alloc; +mod listener; mod multicast; mod unicast; use alloc::{borrow::ToOwned, boxed::Box, string::String, vec, vec::Vec}; use async_trait::async_trait; use core::{cmp::PartialEq, fmt, hash::Hash}; +pub use listener::*; pub use multicast::*; use serde::Serialize; pub use unicast::*; diff --git a/io/zenoh-link-commons/src/listener.rs b/io/zenoh-link-commons/src/listener.rs new file mode 100644 index 0000000000..1d5d7bb172 --- /dev/null +++ b/io/zenoh-link-commons/src/listener.rs @@ -0,0 +1,141 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use async_std::net::SocketAddr; +use async_std::task; +use async_std::task::JoinHandle; +use std::collections::HashMap; +use std::net::IpAddr; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, RwLock}; +use zenoh_core::{zread, zwrite}; +use zenoh_protocol::core::{EndPoint, Locator}; +use zenoh_result::{zerror, ZResult}; +use zenoh_sync::Signal; + +pub struct ListenerUnicastIP { + endpoint: EndPoint, + active: Arc, + signal: Signal, + handle: JoinHandle>, +} + +impl ListenerUnicastIP { + fn new( + endpoint: EndPoint, + active: Arc, + signal: Signal, + handle: JoinHandle>, + ) -> ListenerUnicastIP { + ListenerUnicastIP { + endpoint, + active, + signal, + handle, + } + } +} + +pub struct ListenersUnicastIP { + listeners: Arc>>, +} + +impl ListenersUnicastIP { + pub fn new() -> ListenersUnicastIP { + ListenersUnicastIP { + listeners: Arc::new(RwLock::new(HashMap::new())), + } + } + + pub async fn add_listener( + &self, + endpoint: EndPoint, + addr: SocketAddr, + active: Arc, + signal: Signal, + handle: JoinHandle>, + ) -> ZResult<()> { + let mut listeners = zwrite!(self.listeners); + let c_listeners = self.listeners.clone(); + let c_addr = addr; + let wraphandle = task::spawn(async move { + // Wait for the accept loop to terminate + let res = handle.await; + zwrite!(c_listeners).remove(&c_addr); + res + }); + + let listener = ListenerUnicastIP::new(endpoint, active, signal, wraphandle); + // Update the list of active listeners on the manager + listeners.insert(addr, listener); + Ok(()) + } + + pub async fn del_listener(&self, addr: SocketAddr) -> ZResult<()> { + // Stop the listener + let listener = zwrite!(self.listeners).remove(&addr).ok_or_else(|| { + zerror!( + "Can not delete the listener because it has not been found: {}", + addr + ) + })?; + + // Send the stop signal + listener.active.store(false, Ordering::Release); + listener.signal.trigger(); + listener.handle.await + } + + pub fn get_endpoints(&self) -> Vec { + zread!(self.listeners) + .values() + .map(|l| l.endpoint.clone()) + .collect() + } + + pub fn get_locators(&self) -> Vec { + let mut locators = vec![]; + + let guard = zread!(self.listeners); + for (key, value) in guard.iter() { + let (kip, kpt) = (key.ip(), key.port()); + + // Either ipv4/0.0.0.0 or ipv6/[::] + if kip.is_unspecified() { + let mut addrs = match kip { + IpAddr::V4(_) => zenoh_util::net::get_ipv4_ipaddrs(), + IpAddr::V6(_) => zenoh_util::net::get_ipv6_ipaddrs(), + }; + let iter = addrs.drain(..).map(|x| { + Locator::new( + value.endpoint.protocol(), + SocketAddr::new(x, kpt).to_string(), + value.endpoint.metadata(), + ) + .unwrap() + }); + locators.extend(iter); + } else { + locators.push(value.endpoint.to_locator()); + } + } + + locators + } +} + +impl Default for ListenersUnicastIP { + fn default() -> Self { + Self::new() + } +} diff --git a/io/zenoh-link-commons/src/unicast.rs b/io/zenoh-link-commons/src/unicast.rs index 19e90c3b2c..1237024ca9 100644 --- a/io/zenoh-link-commons/src/unicast.rs +++ b/io/zenoh-link-commons/src/unicast.rs @@ -12,6 +12,7 @@ // ZettaScale Zenoh Team, // use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec}; +use async_std::net::SocketAddr; use async_trait::async_trait; use core::{ fmt, @@ -100,3 +101,16 @@ impl From> for LinkUnicast { LinkUnicast(link) } } + +pub fn get_ip_interface_names(addr: &SocketAddr) -> Vec { + match zenoh_util::net::get_interface_names_by_addr(addr.ip()) { + Ok(interfaces) => { + log::trace!("get_interface_names for {:?}: {:?}", addr.ip(), interfaces); + interfaces + } + Err(e) => { + log::debug!("get_interface_names for {:?} failed: {:?}", addr.ip(), e); + vec![] + } + } +} diff --git a/io/zenoh-links/zenoh-link-quic/src/unicast.rs b/io/zenoh-links/zenoh-link-quic/src/unicast.rs index dfc70dca28..366860801e 100644 --- a/io/zenoh-links/zenoh-link-quic/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-quic/src/unicast.rs @@ -21,20 +21,19 @@ use async_std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use async_std::prelude::FutureExt; use async_std::sync::Mutex as AsyncMutex; use async_std::task; -use async_std::task::JoinHandle; use async_trait::async_trait; use rustls::{Certificate, PrivateKey}; use rustls_pemfile::Item; -use std::collections::HashMap; use std::fmt; use std::io::BufReader; use std::net::IpAddr; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::Duration; -use zenoh_core::{zasynclock, zread, zwrite}; +use zenoh_core::zasynclock; use zenoh_link_commons::{ - LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, + get_ip_interface_names, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, + ListenersUnicastIP, NewLinkChannelSender, }; use zenoh_protocol::core::{EndPoint, Locator}; use zenoh_result::{bail, zerror, ZError, ZResult}; @@ -145,9 +144,7 @@ impl LinkUnicastTrait for LinkUnicastQuic { #[inline(always)] fn get_interface_names(&self) -> Vec { - // @TODO: Not supported for now - log::debug!("The get_interface_names for LinkUnicastQuic is not supported"); - vec![] + get_ip_interface_names(&self.src_addr) } #[inline(always)] @@ -188,42 +185,16 @@ impl fmt::Debug for LinkUnicastQuic { } } -/*************************************/ -/* LISTENER */ -/*************************************/ -struct ListenerUnicastQuic { - endpoint: EndPoint, - active: Arc, - signal: Signal, - handle: JoinHandle>, -} - -impl ListenerUnicastQuic { - fn new( - endpoint: EndPoint, - active: Arc, - signal: Signal, - handle: JoinHandle>, - ) -> ListenerUnicastQuic { - ListenerUnicastQuic { - endpoint, - active, - signal, - handle, - } - } -} - pub struct LinkManagerUnicastQuic { manager: NewLinkChannelSender, - listeners: Arc>>, + listeners: ListenersUnicastIP, } impl LinkManagerUnicastQuic { pub fn new(manager: NewLinkChannelSender) -> Self { Self { manager, - listeners: Arc::new(RwLock::new(HashMap::new())), + listeners: ListenersUnicastIP::new(), } } } @@ -429,88 +400,40 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic { endpoint.config(), )?; - // Spawn the accept loop for the listener let active = Arc::new(AtomicBool::new(true)); let signal = Signal::new(); - let mut listeners = zwrite!(self.listeners); let c_active = active.clone(); let c_signal = signal.clone(); let c_manager = self.manager.clone(); - let c_listeners = self.listeners.clone(); - let c_addr = local_addr; - let handle = task::spawn(async move { - // Wait for the accept loop to terminate - let res = accept_task(quic_endpoint, c_active, c_signal, c_manager).await; - zwrite!(c_listeners).remove(&c_addr); - res - }); + + let handle = + task::spawn( + async move { accept_task(quic_endpoint, c_active, c_signal, c_manager).await }, + ); // Initialize the QuicAcceptor let locator = endpoint.to_locator(); - let listener = ListenerUnicastQuic::new(endpoint, active, signal, handle); - // Update the list of active listeners on the manager - listeners.insert(local_addr, listener); + + self.listeners + .add_listener(endpoint, local_addr, active, signal, handle) + .await?; Ok(locator) } async fn del_listener(&self, endpoint: &EndPoint) -> ZResult<()> { let epaddr = endpoint.address(); - let addr = get_quic_addr(&epaddr).await?; - - // Stop the listener - let listener = zwrite!(self.listeners).remove(&addr).ok_or_else(|| { - let e = zerror!( - "Can not delete the QUIC listener because it has not been found: {}", - addr - ); - log::trace!("{}", e); - e - })?; - - // Send the stop signal - listener.active.store(false, Ordering::Release); - listener.signal.trigger(); - listener.handle.await + self.listeners.del_listener(addr).await } fn get_listeners(&self) -> Vec { - zread!(self.listeners) - .values() - .map(|x| x.endpoint.clone()) - .collect() + self.listeners.get_endpoints() } fn get_locators(&self) -> Vec { - let mut locators = vec![]; - - let guard = zread!(self.listeners); - for (key, value) in guard.iter() { - let (kip, kpt) = (key.ip(), key.port()); - - // Either ipv4/0.0.0.0 or ipv6/[::] - if kip.is_unspecified() { - let mut addrs = match kip { - IpAddr::V4(_) => zenoh_util::net::get_ipv4_ipaddrs(), - IpAddr::V6(_) => zenoh_util::net::get_ipv6_ipaddrs(), - }; - let iter = addrs.drain(..).map(|x| { - Locator::new( - value.endpoint.protocol(), - SocketAddr::new(x, kpt).to_string(), - value.endpoint.metadata(), - ) - .unwrap() - }); - locators.extend(iter); - } else { - locators.push(value.endpoint.to_locator()); - } - } - - locators + self.listeners.get_locators() } } diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index 3876a947ca..551f4c8c97 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -14,18 +14,16 @@ use async_std::net::{SocketAddr, TcpListener, TcpStream}; use async_std::prelude::*; use async_std::task; -use async_std::task::JoinHandle; use async_trait::async_trait; -use std::collections::HashMap; use std::convert::TryInto; use std::fmt; -use std::net::{IpAddr, Shutdown}; +use std::net::Shutdown; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::Duration; -use zenoh_core::{zread, zwrite}; use zenoh_link_commons::{ - LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, + get_ip_interface_names, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, + ListenersUnicastIP, NewLinkChannelSender, }; use zenoh_protocol::core::{EndPoint, Locator}; use zenoh_result::{bail, zerror, Error as ZError, ZResult}; @@ -146,24 +144,7 @@ impl LinkUnicastTrait for LinkUnicastTcp { #[inline(always)] fn get_interface_names(&self) -> Vec { - match zenoh_util::net::get_interface_names_by_addr(self.src_addr.ip()) { - Ok(interfaces) => { - log::trace!( - "get_interface_names for {:?}: {:?}", - self.src_addr.ip(), - interfaces - ); - interfaces - } - Err(e) => { - log::debug!( - "get_interface_names for {:?} failed: {:?}", - self.src_addr.ip(), - e - ); - vec![] - } - } + get_ip_interface_names(&self.src_addr) } #[inline(always)] @@ -200,42 +181,16 @@ impl fmt::Debug for LinkUnicastTcp { } } -/*************************************/ -/* LISTENER */ -/*************************************/ -struct ListenerUnicastTcp { - endpoint: EndPoint, - active: Arc, - signal: Signal, - handle: JoinHandle>, -} - -impl ListenerUnicastTcp { - fn new( - endpoint: EndPoint, - active: Arc, - signal: Signal, - handle: JoinHandle>, - ) -> ListenerUnicastTcp { - ListenerUnicastTcp { - endpoint, - active, - signal, - handle, - } - } -} - pub struct LinkManagerUnicastTcp { manager: NewLinkChannelSender, - listeners: Arc>>, + listeners: ListenersUnicastIP, } impl LinkManagerUnicastTcp { pub fn new(manager: NewLinkChannelSender) -> Self { Self { manager, - listeners: Arc::new(RwLock::new(HashMap::new())), + listeners: ListenersUnicastIP::new(), } } } @@ -318,27 +273,22 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { endpoint.config(), )?; - // Spawn the accept loop for the listener let active = Arc::new(AtomicBool::new(true)); let signal = Signal::new(); - let mut listeners = zwrite!(self.listeners); let c_active = active.clone(); let c_signal = signal.clone(); let c_manager = self.manager.clone(); - let c_listeners = self.listeners.clone(); - let c_addr = local_addr; + let handle = task::spawn(async move { - // Wait for the accept loop to terminate - let res = accept_task(socket, c_active, c_signal, c_manager).await; - zwrite!(c_listeners).remove(&c_addr); - res + accept_task(socket, c_active, c_signal, c_manager).await }); let locator = endpoint.to_locator(); - let listener = ListenerUnicastTcp::new(endpoint, active, signal, handle); - // Update the list of active listeners on the manager - listeners.insert(local_addr, listener); + + self.listeners + .add_listener(endpoint, local_addr, active, signal, handle) + .await?; return Ok(locator); } @@ -364,73 +314,35 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { // Stop the listener let mut errs: Vec = vec![]; - let mut listener = None; + let mut failed = true; for a in addrs { - match zwrite!(self.listeners).remove(&a) { - Some(l) => { - // We cannot keep a sync guard across a .await - // Break the loop and assign the listener. - listener = Some(l); + match self.listeners.del_listener(a).await { + Ok(_) => { + failed = false; break; } - None => { - errs.push(zerror!("{}", a).into()); + Err(err) => { + errs.push(zerror!("{}", err).into()); } } } - match listener { - Some(l) => { - // Send the stop signal - l.active.store(false, Ordering::Release); - l.signal.trigger(); - l.handle.await - } - None => { - bail!( - "Can not delete the TCP listener bound to {}: {:?}", - endpoint, - errs - ) - } + if failed { + bail!( + "Can not delete the TCP listener bound to {}: {:?}", + endpoint, + errs + ) } + Ok(()) } fn get_listeners(&self) -> Vec { - zread!(self.listeners) - .values() - .map(|l| l.endpoint.clone()) - .collect() + self.listeners.get_endpoints() } fn get_locators(&self) -> Vec { - let mut locators = vec![]; - - let guard = zread!(self.listeners); - for (key, value) in guard.iter() { - let (kip, kpt) = (key.ip(), key.port()); - - // Either ipv4/0.0.0.0 or ipv6/[::] - if kip.is_unspecified() { - let mut addrs = match kip { - IpAddr::V4(_) => zenoh_util::net::get_ipv4_ipaddrs(), - IpAddr::V6(_) => zenoh_util::net::get_ipv6_ipaddrs(), - }; - let iter = addrs.drain(..).map(|x| { - Locator::new( - value.endpoint.protocol(), - SocketAddr::new(x, kpt).to_string(), - value.endpoint.metadata(), - ) - .unwrap() - }); - locators.extend(iter); - } else { - locators.push(value.endpoint.to_locator()); - } - } - - locators + self.listeners.get_locators() } } diff --git a/io/zenoh-links/zenoh-link-tls/src/unicast.rs b/io/zenoh-links/zenoh-link-tls/src/unicast.rs index 383b03a366..e3adea2dff 100644 --- a/io/zenoh-links/zenoh-link-tls/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tls/src/unicast.rs @@ -28,27 +28,26 @@ use async_std::net::{SocketAddr, TcpListener, TcpStream}; use async_std::prelude::FutureExt; use async_std::sync::Mutex as AsyncMutex; use async_std::task; -use async_std::task::JoinHandle; use async_trait::async_trait; use futures::io::AsyncReadExt; use futures::io::AsyncWriteExt; -use std::collections::HashMap; use std::convert::TryInto; use std::fmt; use std::fs::File; use std::io::{BufReader, Cursor}; -use std::net::{IpAddr, Shutdown}; +use std::net::Shutdown; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::Duration; use std::{cell::UnsafeCell, io}; use webpki::{ anchor_from_trusted_cert, types::{CertificateDer, TrustAnchor}, }; -use zenoh_core::{zasynclock, zread, zwrite}; +use zenoh_core::zasynclock; use zenoh_link_commons::{ - LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, + get_ip_interface_names, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, + ListenersUnicastIP, NewLinkChannelSender, }; use zenoh_protocol::core::endpoint::Config; use zenoh_protocol::core::{EndPoint, Locator}; @@ -197,9 +196,7 @@ impl LinkUnicastTrait for LinkUnicastTls { #[inline(always)] fn get_interface_names(&self) -> Vec { - // @TODO: Not supported for now - log::debug!("The get_interface_names for LinkUnicastTls is not supported"); - vec![] + get_ip_interface_names(&self.src_addr) } #[inline(always)] @@ -237,42 +234,16 @@ impl fmt::Debug for LinkUnicastTls { } } -/*************************************/ -/* LISTENER */ -/*************************************/ -struct ListenerUnicastTls { - endpoint: EndPoint, - active: Arc, - signal: Signal, - handle: JoinHandle>, -} - -impl ListenerUnicastTls { - fn new( - endpoint: EndPoint, - active: Arc, - signal: Signal, - handle: JoinHandle>, - ) -> ListenerUnicastTls { - ListenerUnicastTls { - endpoint, - active, - signal, - handle, - } - } -} - pub struct LinkManagerUnicastTls { manager: NewLinkChannelSender, - listeners: Arc>>, + listeners: ListenersUnicastIP, } impl LinkManagerUnicastTls { pub fn new(manager: NewLinkChannelSender) -> Self { Self { manager, - listeners: Arc::new(RwLock::new(HashMap::new())), + listeners: ListenersUnicastIP::new(), } } } @@ -363,17 +334,12 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTls { let active = Arc::new(AtomicBool::new(true)); let signal = Signal::new(); - // Spawn the accept loop for the listener let c_active = active.clone(); let c_signal = signal.clone(); let c_manager = self.manager.clone(); - let c_listeners = self.listeners.clone(); - let c_addr = local_addr; + let handle = task::spawn(async move { - // Wait for the accept loop to terminate - let res = accept_task(socket, acceptor, c_active, c_signal, c_manager).await; - zwrite!(c_listeners).remove(&c_addr); - res + accept_task(socket, acceptor, c_active, c_signal, c_manager).await }); // Update the endpoint locator address @@ -383,69 +349,25 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTls { endpoint.metadata(), )?; - let listener = ListenerUnicastTls::new(endpoint, active, signal, handle); - // Update the list of active listeners on the manager - zwrite!(self.listeners).insert(local_addr, listener); + self.listeners + .add_listener(endpoint, local_addr, active, signal, handle) + .await?; Ok(locator) } async fn del_listener(&self, endpoint: &EndPoint) -> ZResult<()> { let epaddr = endpoint.address(); - let addr = get_tls_addr(&epaddr).await?; - - // Stop the listener - let listener = zwrite!(self.listeners).remove(&addr).ok_or_else(|| { - let e = zerror!( - "Can not delete the TLS listener because it has not been found: {}", - addr - ); - log::trace!("{}", e); - e - })?; - - // Send the stop signal - listener.active.store(false, Ordering::Release); - listener.signal.trigger(); - listener.handle.await + self.listeners.del_listener(addr).await } fn get_listeners(&self) -> Vec { - zread!(self.listeners) - .values() - .map(|x| x.endpoint.clone()) - .collect() + self.listeners.get_endpoints() } fn get_locators(&self) -> Vec { - let mut locators = vec![]; - - let guard = zread!(self.listeners); - for (key, value) in guard.iter() { - let (kip, kpt) = (key.ip(), key.port()); - - // Either ipv4/0.0.0.0 or ipv6/[::] - if kip.is_unspecified() { - let mut addrs = match kip { - IpAddr::V4(_) => zenoh_util::net::get_ipv4_ipaddrs(), - IpAddr::V6(_) => zenoh_util::net::get_ipv6_ipaddrs(), - }; - let iter = addrs.drain(..).map(|x| { - Locator::new( - value.endpoint.protocol(), - SocketAddr::new(x, kpt).to_string(), - value.endpoint.metadata(), - ) - .unwrap() - }); - locators.extend(iter); - } else { - locators.push(value.endpoint.to_locator()); - } - } - - locators + self.listeners.get_locators() } } diff --git a/io/zenoh-links/zenoh-link-udp/src/unicast.rs b/io/zenoh-links/zenoh-link-udp/src/unicast.rs index 210c6f7b7a..a5bd3c7726 100644 --- a/io/zenoh-links/zenoh-link-udp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/unicast.rs @@ -19,18 +19,16 @@ use async_std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; use async_std::prelude::*; use async_std::sync::Mutex as AsyncMutex; use async_std::task; -use async_std::task::JoinHandle; use async_trait::async_trait; use std::collections::HashMap; use std::fmt; -use std::net::IpAddr; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex, RwLock, Weak}; +use std::sync::{Arc, Mutex, Weak}; use std::time::Duration; -use zenoh_core::{zasynclock, zlock, zread, zwrite}; +use zenoh_core::{zasynclock, zlock}; use zenoh_link_commons::{ - ConstructibleLinkManagerUnicast, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, - NewLinkChannelSender, + get_ip_interface_names, ConstructibleLinkManagerUnicast, LinkManagerUnicastTrait, LinkUnicast, + LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, }; use zenoh_protocol::core::{EndPoint, Locator}; use zenoh_result::{bail, zerror, Error as ZError, ZResult}; @@ -210,24 +208,7 @@ impl LinkUnicastTrait for LinkUnicastUdp { #[inline(always)] fn get_interface_names(&self) -> Vec { - match zenoh_util::net::get_interface_names_by_addr(self.src_addr.ip()) { - Ok(interfaces) => { - log::trace!( - "get_interface_names for {:?}: {:?}", - self.src_addr.ip(), - interfaces - ); - interfaces - } - Err(e) => { - log::debug!( - "get_interface_names for {:?} failed: {:?}", - self.src_addr.ip(), - e - ); - vec![] - } - } + get_ip_interface_names(&self.src_addr) } #[inline(always)] @@ -257,42 +238,16 @@ impl fmt::Debug for LinkUnicastUdp { } } -/*************************************/ -/* LISTENER */ -/*************************************/ -struct ListenerUnicastUdp { - endpoint: EndPoint, - active: Arc, - signal: Signal, - handle: JoinHandle>, -} - -impl ListenerUnicastUdp { - fn new( - endpoint: EndPoint, - active: Arc, - signal: Signal, - handle: JoinHandle>, - ) -> ListenerUnicastUdp { - ListenerUnicastUdp { - endpoint, - active, - signal, - handle, - } - } -} - pub struct LinkManagerUnicastUdp { manager: NewLinkChannelSender, - listeners: Arc>>, + listeners: ListenersUnicastIP, } impl LinkManagerUnicastUdp { pub fn new(manager: NewLinkChannelSender) -> Self { Self { manager, - listeners: Arc::new(RwLock::new(HashMap::new())), + listeners: ListenersUnicastIP::new(), } } } @@ -420,27 +375,21 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUdp { endpoint.config(), )?; - // Spawn the accept loop for the listener let active = Arc::new(AtomicBool::new(true)); let signal = Signal::new(); - let mut listeners = zwrite!(self.listeners); let c_active = active.clone(); let c_signal = signal.clone(); let c_manager = self.manager.clone(); - let c_listeners = self.listeners.clone(); - let c_addr = local_addr; + let handle = task::spawn(async move { - // Wait for the accept loop to terminate - let res = accept_read_task(socket, c_active, c_signal, c_manager).await; - zwrite!(c_listeners).remove(&c_addr); - res + accept_read_task(socket, c_active, c_signal, c_manager).await }); let locator = endpoint.to_locator(); - let listener = ListenerUnicastUdp::new(endpoint, active, signal, handle); - // Update the list of active listeners on the manager - listeners.insert(local_addr, listener); + self.listeners + .add_listener(endpoint, local_addr, active, signal, handle) + .await?; return Ok(locator); } @@ -468,73 +417,35 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUdp { // Stop the listener let mut errs: Vec = vec![]; - let mut listener = None; + let mut failed = true; for a in addrs { - match zwrite!(self.listeners).remove(&a) { - Some(l) => { - // We cannot keep a sync guard across a .await - // Break the loop and assign the listener. - listener = Some(l); + match self.listeners.del_listener(a).await { + Ok(_) => { + failed = false; break; } - None => { - errs.push(zerror!("{}", a).into()); + Err(err) => { + errs.push(zerror!("{}", err).into()); } } } - match listener { - Some(l) => { - // Send the stop signal - l.active.store(false, Ordering::Release); - l.signal.trigger(); - l.handle.await - } - None => { - bail!( - "Can not delete the UDP listener bound to {}: {:?}", - endpoint, - errs - ) - } + if failed { + bail!( + "Can not delete the TCP listener bound to {}: {:?}", + endpoint, + errs + ) } + Ok(()) } fn get_listeners(&self) -> Vec { - zread!(self.listeners) - .values() - .map(|l| l.endpoint.clone()) - .collect() + self.listeners.get_endpoints() } fn get_locators(&self) -> Vec { - let mut locators = vec![]; - - let guard = zread!(self.listeners); - for (key, value) in guard.iter() { - let (kip, kpt) = (key.ip(), key.port()); - - // Either ipv4/0.0.0.0 or ipv6/[::] - if kip.is_unspecified() { - let mut addrs = match kip { - IpAddr::V4(_) => zenoh_util::net::get_ipv4_ipaddrs(), - IpAddr::V6(_) => zenoh_util::net::get_ipv6_ipaddrs(), - }; - let iter = addrs.drain(..).map(|x| { - Locator::new( - value.endpoint.protocol(), - SocketAddr::new(x, kpt).to_string(), - value.endpoint.metadata(), - ) - .unwrap() - }); - locators.extend(iter); - } else { - locators.push(value.endpoint.to_locator()); - } - } - - locators + self.listeners.get_locators() } }