From ad43d4f7ccc69f32462532897351033f21686950 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Tue, 20 Feb 2024 19:24:09 +0100 Subject: [PATCH 1/6] Add draft UnicastListener --- Cargo.lock | 3 + io/zenoh-link-commons/Cargo.toml | 3 + io/zenoh-link-commons/src/lib.rs | 3 +- io/zenoh-link-commons/src/listener.rs | 140 +++++++++++++++++++ io/zenoh-links/zenoh-link-tcp/src/unicast.rs | 3 +- 5 files changed, 150 insertions(+), 2 deletions(-) create mode 100644 io/zenoh-link-commons/src/listener.rs diff --git a/Cargo.lock b/Cargo.lock index 2dc8ae14b6..849ebaa27e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4784,8 +4784,11 @@ dependencies = [ "typenum", "zenoh-buffers", "zenoh-codec", + "zenoh-core", "zenoh-protocol", "zenoh-result", + "zenoh-sync", + "zenoh-util", ] [[package]] diff --git a/io/zenoh-link-commons/Cargo.toml b/io/zenoh-link-commons/Cargo.toml index 36e39eceed..6a6cd6dc87 100644 --- a/io/zenoh-link-commons/Cargo.toml +++ b/io/zenoh-link-commons/Cargo.toml @@ -29,6 +29,7 @@ compression = [] [dependencies] async-std = { workspace = true } +zenoh-sync = { workspace = true } async-trait = { workspace = true } flume = { workspace = true } lz4_flex = { workspace = true } @@ -36,5 +37,7 @@ serde = { workspace = true, features = ["default"] } typenum = { workspace = true } zenoh-buffers = { workspace = true } zenoh-codec = { workspace = true } +zenoh-core = { workspace = true } zenoh-protocol = { workspace = true } zenoh-result = { workspace = true } +zenoh-util = { workspace = true } diff --git a/io/zenoh-link-commons/src/lib.rs b/io/zenoh-link-commons/src/lib.rs index b15a0d9ad5..2ee28c3f08 100644 --- a/io/zenoh-link-commons/src/lib.rs +++ b/io/zenoh-link-commons/src/lib.rs @@ -17,15 +17,16 @@ //! This crate is intended for Zenoh's internal use. //! //! [Click here for Zenoh's documentation](../zenoh/index.html) -#![no_std] extern crate alloc; +mod listener; mod multicast; mod unicast; use alloc::{borrow::ToOwned, boxed::Box, string::String, vec, vec::Vec}; use async_trait::async_trait; use core::{cmp::PartialEq, fmt, hash::Hash}; +pub use listener::*; pub use multicast::*; use serde::Serialize; pub use unicast::*; diff --git a/io/zenoh-link-commons/src/listener.rs b/io/zenoh-link-commons/src/listener.rs new file mode 100644 index 0000000000..9d56823a88 --- /dev/null +++ b/io/zenoh-link-commons/src/listener.rs @@ -0,0 +1,140 @@ +// +// Copyright (c) 202 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use async_std::net::{SocketAddr, TcpListener, TcpStream}; +use async_std::prelude::*; +use async_std::task; +use async_std::task::JoinHandle; +use async_trait::async_trait; +use std::collections::HashMap; +use std::convert::TryInto; +use std::fmt; +use std::net::{IpAddr, Shutdown}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, RwLock}; +use std::time::Duration; +use zenoh_core::{zread, zwrite}; +use zenoh_protocol::core::{EndPoint, Locator}; +use zenoh_result::{bail, zerror, Error as ZError, ZResult}; +use zenoh_sync::Signal; + +pub struct UnicastListener { + endpoint: EndPoint, + active: Arc, + signal: Signal, + handle: JoinHandle>, +} + +impl UnicastListener { + fn new( + endpoint: EndPoint, + active: Arc, + signal: Signal, + handle: JoinHandle>, + ) -> UnicastListener { + UnicastListener { + endpoint, + active, + signal, + handle, + } + } +} + +pub struct UnicastListeners { + listeners: Arc>>, +} + +impl UnicastListeners { + fn new() -> UnicastListeners { + UnicastListeners { + listeners: Arc::new(RwLock::new(HashMap::new())), + } + } + + async fn add_listener( + &self, + endpoint: EndPoint, + addr: SocketAddr, + active: Arc, + signal: Signal, + handle: JoinHandle>, + ) -> ZResult<()> { + let mut listeners = zwrite!(self.listeners); + let c_listeners = self.listeners.clone(); + let c_addr = addr; + let wraphandle = task::spawn(async move { + // Wait for the accept loop to terminate + let res = handle.await; + zwrite!(c_listeners).remove(&c_addr); + res + }); + + let listener = UnicastListener::new(endpoint, active, signal, wraphandle); + // Update the list of active listeners on the manager + listeners.insert(addr, listener); + Ok(()) + } + + async fn del_listener(&self, addr: SocketAddr) -> ZResult<()> { + // Stop the listener + let listener = zwrite!(self.listeners).remove(&addr).ok_or_else(|| { + zerror!( + "Can not delete the listener because it has not been found: {}", + addr + ) + })?; + + // Send the stop signal + listener.active.store(false, Ordering::Release); + listener.signal.trigger(); + listener.handle.await + } + + fn get_endpoints(&self) -> Vec { + zread!(self.listeners) + .values() + .map(|l| l.endpoint.clone()) + .collect() + } + + fn get_locators(&self) -> Vec { + let mut locators = vec![]; + + let guard = zread!(self.listeners); + for (key, value) in guard.iter() { + let (kip, kpt) = (key.ip(), key.port()); + + // Either ipv4/0.0.0.0 or ipv6/[::] + if kip.is_unspecified() { + let mut addrs = match kip { + IpAddr::V4(_) => zenoh_util::net::get_ipv4_ipaddrs(), + IpAddr::V6(_) => zenoh_util::net::get_ipv6_ipaddrs(), + }; + let iter = addrs.drain(..).map(|x| { + Locator::new( + value.endpoint.protocol(), + SocketAddr::new(x, kpt).to_string(), + value.endpoint.metadata(), + ) + .unwrap() + }); + locators.extend(iter); + } else { + locators.push(value.endpoint.to_locator()); + } + } + + locators + } +} diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index 3876a947ca..5c91c9be90 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -25,7 +25,8 @@ use std::sync::{Arc, RwLock}; use std::time::Duration; use zenoh_core::{zread, zwrite}; use zenoh_link_commons::{ - LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, + LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, UnicastListener, + UnicastListeners, }; use zenoh_protocol::core::{EndPoint, Locator}; use zenoh_result::{bail, zerror, Error as ZError, ZResult}; From a256ba3a21287290fc21bec9e7b5589a80ceb5c6 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Wed, 21 Feb 2024 18:39:32 +0100 Subject: [PATCH 2/6] Use UnicastListener for TCP unicast, code clanup and fixes --- io/zenoh-link-commons/src/listener.rs | 21 ++-- io/zenoh-links/zenoh-link-tcp/src/unicast.rs | 125 ++++--------------- 2 files changed, 34 insertions(+), 112 deletions(-) diff --git a/io/zenoh-link-commons/src/listener.rs b/io/zenoh-link-commons/src/listener.rs index 9d56823a88..eb5c08106c 100644 --- a/io/zenoh-link-commons/src/listener.rs +++ b/io/zenoh-link-commons/src/listener.rs @@ -11,21 +11,16 @@ // Contributors: // ZettaScale Zenoh Team, // -use async_std::net::{SocketAddr, TcpListener, TcpStream}; -use async_std::prelude::*; +use async_std::net::SocketAddr; use async_std::task; use async_std::task::JoinHandle; -use async_trait::async_trait; use std::collections::HashMap; -use std::convert::TryInto; -use std::fmt; -use std::net::{IpAddr, Shutdown}; +use std::net::IpAddr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; -use std::time::Duration; use zenoh_core::{zread, zwrite}; use zenoh_protocol::core::{EndPoint, Locator}; -use zenoh_result::{bail, zerror, Error as ZError, ZResult}; +use zenoh_result::{zerror, ZResult}; use zenoh_sync::Signal; pub struct UnicastListener { @@ -56,13 +51,13 @@ pub struct UnicastListeners { } impl UnicastListeners { - fn new() -> UnicastListeners { + pub fn new() -> UnicastListeners { UnicastListeners { listeners: Arc::new(RwLock::new(HashMap::new())), } } - async fn add_listener( + pub async fn add_listener( &self, endpoint: EndPoint, addr: SocketAddr, @@ -86,7 +81,7 @@ impl UnicastListeners { Ok(()) } - async fn del_listener(&self, addr: SocketAddr) -> ZResult<()> { + pub async fn del_listener(&self, addr: SocketAddr) -> ZResult<()> { // Stop the listener let listener = zwrite!(self.listeners).remove(&addr).ok_or_else(|| { zerror!( @@ -101,14 +96,14 @@ impl UnicastListeners { listener.handle.await } - fn get_endpoints(&self) -> Vec { + pub fn get_endpoints(&self) -> Vec { zread!(self.listeners) .values() .map(|l| l.endpoint.clone()) .collect() } - fn get_locators(&self) -> Vec { + pub fn get_locators(&self) -> Vec { let mut locators = vec![]; let guard = zread!(self.listeners); diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index 5c91c9be90..b1e9a47405 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -14,19 +14,15 @@ use async_std::net::{SocketAddr, TcpListener, TcpStream}; use async_std::prelude::*; use async_std::task; -use async_std::task::JoinHandle; use async_trait::async_trait; -use std::collections::HashMap; use std::convert::TryInto; use std::fmt; -use std::net::{IpAddr, Shutdown}; +use std::net::Shutdown; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::Duration; -use zenoh_core::{zread, zwrite}; use zenoh_link_commons::{ - LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, UnicastListener, - UnicastListeners, + LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, UnicastListeners, }; use zenoh_protocol::core::{EndPoint, Locator}; use zenoh_result::{bail, zerror, Error as ZError, ZResult}; @@ -201,42 +197,16 @@ impl fmt::Debug for LinkUnicastTcp { } } -/*************************************/ -/* LISTENER */ -/*************************************/ -struct ListenerUnicastTcp { - endpoint: EndPoint, - active: Arc, - signal: Signal, - handle: JoinHandle>, -} - -impl ListenerUnicastTcp { - fn new( - endpoint: EndPoint, - active: Arc, - signal: Signal, - handle: JoinHandle>, - ) -> ListenerUnicastTcp { - ListenerUnicastTcp { - endpoint, - active, - signal, - handle, - } - } -} - pub struct LinkManagerUnicastTcp { manager: NewLinkChannelSender, - listeners: Arc>>, + listeners: UnicastListeners, } impl LinkManagerUnicastTcp { pub fn new(manager: NewLinkChannelSender) -> Self { Self { manager, - listeners: Arc::new(RwLock::new(HashMap::new())), + listeners: UnicastListeners::new(), } } } @@ -319,27 +289,22 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { endpoint.config(), )?; - // Spawn the accept loop for the listener let active = Arc::new(AtomicBool::new(true)); let signal = Signal::new(); - let mut listeners = zwrite!(self.listeners); let c_active = active.clone(); let c_signal = signal.clone(); let c_manager = self.manager.clone(); - let c_listeners = self.listeners.clone(); - let c_addr = local_addr; + let handle = task::spawn(async move { - // Wait for the accept loop to terminate - let res = accept_task(socket, c_active, c_signal, c_manager).await; - zwrite!(c_listeners).remove(&c_addr); - res + accept_task(socket, c_active, c_signal, c_manager).await }); let locator = endpoint.to_locator(); - let listener = ListenerUnicastTcp::new(endpoint, active, signal, handle); - // Update the list of active listeners on the manager - listeners.insert(local_addr, listener); + + self.listeners + .add_listener(endpoint, local_addr, active, signal, handle) + .await?; return Ok(locator); } @@ -365,73 +330,35 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { // Stop the listener let mut errs: Vec = vec![]; - let mut listener = None; + let mut failed = true; for a in addrs { - match zwrite!(self.listeners).remove(&a) { - Some(l) => { - // We cannot keep a sync guard across a .await - // Break the loop and assign the listener. - listener = Some(l); + match self.listeners.del_listener(a).await { + Ok(_) => { + failed = false; break; } - None => { - errs.push(zerror!("{}", a).into()); + Err(err) => { + errs.push(zerror!("{}", err).into()); } } } - match listener { - Some(l) => { - // Send the stop signal - l.active.store(false, Ordering::Release); - l.signal.trigger(); - l.handle.await - } - None => { - bail!( - "Can not delete the TCP listener bound to {}: {:?}", - endpoint, - errs - ) - } + if failed { + bail!( + "Can not delete the TCP listener bound to {}: {:?}", + endpoint, + errs + ) } + Ok(()) } fn get_listeners(&self) -> Vec { - zread!(self.listeners) - .values() - .map(|l| l.endpoint.clone()) - .collect() + self.listeners.get_endpoints() } fn get_locators(&self) -> Vec { - let mut locators = vec![]; - - let guard = zread!(self.listeners); - for (key, value) in guard.iter() { - let (kip, kpt) = (key.ip(), key.port()); - - // Either ipv4/0.0.0.0 or ipv6/[::] - if kip.is_unspecified() { - let mut addrs = match kip { - IpAddr::V4(_) => zenoh_util::net::get_ipv4_ipaddrs(), - IpAddr::V6(_) => zenoh_util::net::get_ipv6_ipaddrs(), - }; - let iter = addrs.drain(..).map(|x| { - Locator::new( - value.endpoint.protocol(), - SocketAddr::new(x, kpt).to_string(), - value.endpoint.metadata(), - ) - .unwrap() - }); - locators.extend(iter); - } else { - locators.push(value.endpoint.to_locator()); - } - } - - locators + self.listeners.get_locators() } } From 7c76c4c0160d88d483946c7242615dcd3f154b97 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Wed, 21 Feb 2024 19:16:43 +0100 Subject: [PATCH 3/6] Use UnicastListener for UDP unicast, make linter happy --- io/zenoh-link-commons/src/listener.rs | 6 + io/zenoh-links/zenoh-link-udp/src/unicast.rs | 122 ++++--------------- 2 files changed, 31 insertions(+), 97 deletions(-) diff --git a/io/zenoh-link-commons/src/listener.rs b/io/zenoh-link-commons/src/listener.rs index eb5c08106c..ccb7cb6230 100644 --- a/io/zenoh-link-commons/src/listener.rs +++ b/io/zenoh-link-commons/src/listener.rs @@ -133,3 +133,9 @@ impl UnicastListeners { locators } } + +impl Default for UnicastListeners { + fn default() -> Self { + Self::new() + } +} diff --git a/io/zenoh-links/zenoh-link-udp/src/unicast.rs b/io/zenoh-links/zenoh-link-udp/src/unicast.rs index 210c6f7b7a..76ca2e221b 100644 --- a/io/zenoh-links/zenoh-link-udp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/unicast.rs @@ -19,18 +19,16 @@ use async_std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; use async_std::prelude::*; use async_std::sync::Mutex as AsyncMutex; use async_std::task; -use async_std::task::JoinHandle; use async_trait::async_trait; use std::collections::HashMap; use std::fmt; -use std::net::IpAddr; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex, RwLock, Weak}; +use std::sync::{Arc, Mutex, Weak}; use std::time::Duration; -use zenoh_core::{zasynclock, zlock, zread, zwrite}; +use zenoh_core::{zasynclock, zlock}; use zenoh_link_commons::{ ConstructibleLinkManagerUnicast, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, - NewLinkChannelSender, + NewLinkChannelSender, UnicastListeners, }; use zenoh_protocol::core::{EndPoint, Locator}; use zenoh_result::{bail, zerror, Error as ZError, ZResult}; @@ -257,42 +255,16 @@ impl fmt::Debug for LinkUnicastUdp { } } -/*************************************/ -/* LISTENER */ -/*************************************/ -struct ListenerUnicastUdp { - endpoint: EndPoint, - active: Arc, - signal: Signal, - handle: JoinHandle>, -} - -impl ListenerUnicastUdp { - fn new( - endpoint: EndPoint, - active: Arc, - signal: Signal, - handle: JoinHandle>, - ) -> ListenerUnicastUdp { - ListenerUnicastUdp { - endpoint, - active, - signal, - handle, - } - } -} - pub struct LinkManagerUnicastUdp { manager: NewLinkChannelSender, - listeners: Arc>>, + listeners: UnicastListeners, } impl LinkManagerUnicastUdp { pub fn new(manager: NewLinkChannelSender) -> Self { Self { manager, - listeners: Arc::new(RwLock::new(HashMap::new())), + listeners: UnicastListeners::new(), } } } @@ -420,27 +392,21 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUdp { endpoint.config(), )?; - // Spawn the accept loop for the listener let active = Arc::new(AtomicBool::new(true)); let signal = Signal::new(); - let mut listeners = zwrite!(self.listeners); let c_active = active.clone(); let c_signal = signal.clone(); let c_manager = self.manager.clone(); - let c_listeners = self.listeners.clone(); - let c_addr = local_addr; + let handle = task::spawn(async move { - // Wait for the accept loop to terminate - let res = accept_read_task(socket, c_active, c_signal, c_manager).await; - zwrite!(c_listeners).remove(&c_addr); - res + accept_read_task(socket, c_active, c_signal, c_manager).await }); let locator = endpoint.to_locator(); - let listener = ListenerUnicastUdp::new(endpoint, active, signal, handle); - // Update the list of active listeners on the manager - listeners.insert(local_addr, listener); + self.listeners + .add_listener(endpoint, local_addr, active, signal, handle) + .await?; return Ok(locator); } @@ -468,73 +434,35 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUdp { // Stop the listener let mut errs: Vec = vec![]; - let mut listener = None; + let mut failed = true; for a in addrs { - match zwrite!(self.listeners).remove(&a) { - Some(l) => { - // We cannot keep a sync guard across a .await - // Break the loop and assign the listener. - listener = Some(l); + match self.listeners.del_listener(a).await { + Ok(_) => { + failed = false; break; } - None => { - errs.push(zerror!("{}", a).into()); + Err(err) => { + errs.push(zerror!("{}", err).into()); } } } - match listener { - Some(l) => { - // Send the stop signal - l.active.store(false, Ordering::Release); - l.signal.trigger(); - l.handle.await - } - None => { - bail!( - "Can not delete the UDP listener bound to {}: {:?}", - endpoint, - errs - ) - } + if failed { + bail!( + "Can not delete the TCP listener bound to {}: {:?}", + endpoint, + errs + ) } + Ok(()) } fn get_listeners(&self) -> Vec { - zread!(self.listeners) - .values() - .map(|l| l.endpoint.clone()) - .collect() + self.listeners.get_endpoints() } fn get_locators(&self) -> Vec { - let mut locators = vec![]; - - let guard = zread!(self.listeners); - for (key, value) in guard.iter() { - let (kip, kpt) = (key.ip(), key.port()); - - // Either ipv4/0.0.0.0 or ipv6/[::] - if kip.is_unspecified() { - let mut addrs = match kip { - IpAddr::V4(_) => zenoh_util::net::get_ipv4_ipaddrs(), - IpAddr::V6(_) => zenoh_util::net::get_ipv6_ipaddrs(), - }; - let iter = addrs.drain(..).map(|x| { - Locator::new( - value.endpoint.protocol(), - SocketAddr::new(x, kpt).to_string(), - value.endpoint.metadata(), - ) - .unwrap() - }); - locators.extend(iter); - } else { - locators.push(value.endpoint.to_locator()); - } - } - - locators + self.listeners.get_locators() } } From 0a85d902635f1d9688a298267ab6d4ca1bdacd49 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Wed, 21 Feb 2024 19:37:11 +0100 Subject: [PATCH 4/6] Use UnicastListener for TLS and QUIC unicasts --- io/zenoh-link-commons/src/listener.rs | 2 +- io/zenoh-links/zenoh-link-quic/src/unicast.rs | 110 +++--------------- io/zenoh-links/zenoh-link-tls/src/unicast.rs | 105 +++-------------- 3 files changed, 32 insertions(+), 185 deletions(-) diff --git a/io/zenoh-link-commons/src/listener.rs b/io/zenoh-link-commons/src/listener.rs index ccb7cb6230..a9ed7c65cc 100644 --- a/io/zenoh-link-commons/src/listener.rs +++ b/io/zenoh-link-commons/src/listener.rs @@ -1,5 +1,5 @@ // -// Copyright (c) 202 ZettaScale Technology +// Copyright (c) 2024 ZettaScale Technology // // This program and the accompanying materials are made available under the // terms of the Eclipse Public License 2.0 which is available at diff --git a/io/zenoh-links/zenoh-link-quic/src/unicast.rs b/io/zenoh-links/zenoh-link-quic/src/unicast.rs index dfc70dca28..a7e45f1ef8 100644 --- a/io/zenoh-links/zenoh-link-quic/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-quic/src/unicast.rs @@ -21,20 +21,18 @@ use async_std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use async_std::prelude::FutureExt; use async_std::sync::Mutex as AsyncMutex; use async_std::task; -use async_std::task::JoinHandle; use async_trait::async_trait; use rustls::{Certificate, PrivateKey}; use rustls_pemfile::Item; -use std::collections::HashMap; use std::fmt; use std::io::BufReader; use std::net::IpAddr; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::Duration; -use zenoh_core::{zasynclock, zread, zwrite}; +use zenoh_core::zasynclock; use zenoh_link_commons::{ - LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, + LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, UnicastListeners, }; use zenoh_protocol::core::{EndPoint, Locator}; use zenoh_result::{bail, zerror, ZError, ZResult}; @@ -188,42 +186,16 @@ impl fmt::Debug for LinkUnicastQuic { } } -/*************************************/ -/* LISTENER */ -/*************************************/ -struct ListenerUnicastQuic { - endpoint: EndPoint, - active: Arc, - signal: Signal, - handle: JoinHandle>, -} - -impl ListenerUnicastQuic { - fn new( - endpoint: EndPoint, - active: Arc, - signal: Signal, - handle: JoinHandle>, - ) -> ListenerUnicastQuic { - ListenerUnicastQuic { - endpoint, - active, - signal, - handle, - } - } -} - pub struct LinkManagerUnicastQuic { manager: NewLinkChannelSender, - listeners: Arc>>, + listeners: UnicastListeners, } impl LinkManagerUnicastQuic { pub fn new(manager: NewLinkChannelSender) -> Self { Self { manager, - listeners: Arc::new(RwLock::new(HashMap::new())), + listeners: UnicastListeners::new(), } } } @@ -429,88 +401,40 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic { endpoint.config(), )?; - // Spawn the accept loop for the listener let active = Arc::new(AtomicBool::new(true)); let signal = Signal::new(); - let mut listeners = zwrite!(self.listeners); let c_active = active.clone(); let c_signal = signal.clone(); let c_manager = self.manager.clone(); - let c_listeners = self.listeners.clone(); - let c_addr = local_addr; - let handle = task::spawn(async move { - // Wait for the accept loop to terminate - let res = accept_task(quic_endpoint, c_active, c_signal, c_manager).await; - zwrite!(c_listeners).remove(&c_addr); - res - }); + + let handle = + task::spawn( + async move { accept_task(quic_endpoint, c_active, c_signal, c_manager).await }, + ); // Initialize the QuicAcceptor let locator = endpoint.to_locator(); - let listener = ListenerUnicastQuic::new(endpoint, active, signal, handle); - // Update the list of active listeners on the manager - listeners.insert(local_addr, listener); + + self.listeners + .add_listener(endpoint, local_addr, active, signal, handle) + .await?; Ok(locator) } async fn del_listener(&self, endpoint: &EndPoint) -> ZResult<()> { let epaddr = endpoint.address(); - let addr = get_quic_addr(&epaddr).await?; - - // Stop the listener - let listener = zwrite!(self.listeners).remove(&addr).ok_or_else(|| { - let e = zerror!( - "Can not delete the QUIC listener because it has not been found: {}", - addr - ); - log::trace!("{}", e); - e - })?; - - // Send the stop signal - listener.active.store(false, Ordering::Release); - listener.signal.trigger(); - listener.handle.await + self.listeners.del_listener(addr).await } fn get_listeners(&self) -> Vec { - zread!(self.listeners) - .values() - .map(|x| x.endpoint.clone()) - .collect() + self.listeners.get_endpoints() } fn get_locators(&self) -> Vec { - let mut locators = vec![]; - - let guard = zread!(self.listeners); - for (key, value) in guard.iter() { - let (kip, kpt) = (key.ip(), key.port()); - - // Either ipv4/0.0.0.0 or ipv6/[::] - if kip.is_unspecified() { - let mut addrs = match kip { - IpAddr::V4(_) => zenoh_util::net::get_ipv4_ipaddrs(), - IpAddr::V6(_) => zenoh_util::net::get_ipv6_ipaddrs(), - }; - let iter = addrs.drain(..).map(|x| { - Locator::new( - value.endpoint.protocol(), - SocketAddr::new(x, kpt).to_string(), - value.endpoint.metadata(), - ) - .unwrap() - }); - locators.extend(iter); - } else { - locators.push(value.endpoint.to_locator()); - } - } - - locators + self.listeners.get_locators() } } diff --git a/io/zenoh-links/zenoh-link-tls/src/unicast.rs b/io/zenoh-links/zenoh-link-tls/src/unicast.rs index 383b03a366..eec28ebca4 100644 --- a/io/zenoh-links/zenoh-link-tls/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tls/src/unicast.rs @@ -28,27 +28,25 @@ use async_std::net::{SocketAddr, TcpListener, TcpStream}; use async_std::prelude::FutureExt; use async_std::sync::Mutex as AsyncMutex; use async_std::task; -use async_std::task::JoinHandle; use async_trait::async_trait; use futures::io::AsyncReadExt; use futures::io::AsyncWriteExt; -use std::collections::HashMap; use std::convert::TryInto; use std::fmt; use std::fs::File; use std::io::{BufReader, Cursor}; -use std::net::{IpAddr, Shutdown}; +use std::net::Shutdown; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::Duration; use std::{cell::UnsafeCell, io}; use webpki::{ anchor_from_trusted_cert, types::{CertificateDer, TrustAnchor}, }; -use zenoh_core::{zasynclock, zread, zwrite}; +use zenoh_core::zasynclock; use zenoh_link_commons::{ - LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, + LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, UnicastListeners, }; use zenoh_protocol::core::endpoint::Config; use zenoh_protocol::core::{EndPoint, Locator}; @@ -237,42 +235,16 @@ impl fmt::Debug for LinkUnicastTls { } } -/*************************************/ -/* LISTENER */ -/*************************************/ -struct ListenerUnicastTls { - endpoint: EndPoint, - active: Arc, - signal: Signal, - handle: JoinHandle>, -} - -impl ListenerUnicastTls { - fn new( - endpoint: EndPoint, - active: Arc, - signal: Signal, - handle: JoinHandle>, - ) -> ListenerUnicastTls { - ListenerUnicastTls { - endpoint, - active, - signal, - handle, - } - } -} - pub struct LinkManagerUnicastTls { manager: NewLinkChannelSender, - listeners: Arc>>, + listeners: UnicastListeners, } impl LinkManagerUnicastTls { pub fn new(manager: NewLinkChannelSender) -> Self { Self { manager, - listeners: Arc::new(RwLock::new(HashMap::new())), + listeners: UnicastListeners::new(), } } } @@ -363,17 +335,12 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTls { let active = Arc::new(AtomicBool::new(true)); let signal = Signal::new(); - // Spawn the accept loop for the listener let c_active = active.clone(); let c_signal = signal.clone(); let c_manager = self.manager.clone(); - let c_listeners = self.listeners.clone(); - let c_addr = local_addr; + let handle = task::spawn(async move { - // Wait for the accept loop to terminate - let res = accept_task(socket, acceptor, c_active, c_signal, c_manager).await; - zwrite!(c_listeners).remove(&c_addr); - res + accept_task(socket, acceptor, c_active, c_signal, c_manager).await }); // Update the endpoint locator address @@ -383,69 +350,25 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTls { endpoint.metadata(), )?; - let listener = ListenerUnicastTls::new(endpoint, active, signal, handle); - // Update the list of active listeners on the manager - zwrite!(self.listeners).insert(local_addr, listener); + self.listeners + .add_listener(endpoint, local_addr, active, signal, handle) + .await?; Ok(locator) } async fn del_listener(&self, endpoint: &EndPoint) -> ZResult<()> { let epaddr = endpoint.address(); - let addr = get_tls_addr(&epaddr).await?; - - // Stop the listener - let listener = zwrite!(self.listeners).remove(&addr).ok_or_else(|| { - let e = zerror!( - "Can not delete the TLS listener because it has not been found: {}", - addr - ); - log::trace!("{}", e); - e - })?; - - // Send the stop signal - listener.active.store(false, Ordering::Release); - listener.signal.trigger(); - listener.handle.await + self.listeners.del_listener(addr).await } fn get_listeners(&self) -> Vec { - zread!(self.listeners) - .values() - .map(|x| x.endpoint.clone()) - .collect() + self.listeners.get_endpoints() } fn get_locators(&self) -> Vec { - let mut locators = vec![]; - - let guard = zread!(self.listeners); - for (key, value) in guard.iter() { - let (kip, kpt) = (key.ip(), key.port()); - - // Either ipv4/0.0.0.0 or ipv6/[::] - if kip.is_unspecified() { - let mut addrs = match kip { - IpAddr::V4(_) => zenoh_util::net::get_ipv4_ipaddrs(), - IpAddr::V6(_) => zenoh_util::net::get_ipv6_ipaddrs(), - }; - let iter = addrs.drain(..).map(|x| { - Locator::new( - value.endpoint.protocol(), - SocketAddr::new(x, kpt).to_string(), - value.endpoint.metadata(), - ) - .unwrap() - }); - locators.extend(iter); - } else { - locators.push(value.endpoint.to_locator()); - } - } - - locators + self.listeners.get_locators() } } From a4173561ccd225bf3cadd1a1fd0b75326ef1a193 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Wed, 21 Feb 2024 20:03:48 +0100 Subject: [PATCH 5/6] Rename UnicastListener to ListenerUnicastIP for consistency --- io/zenoh-link-commons/src/listener.rs | 22 +++++++++---------- io/zenoh-links/zenoh-link-quic/src/unicast.rs | 7 +++--- io/zenoh-links/zenoh-link-tcp/src/unicast.rs | 7 +++--- io/zenoh-links/zenoh-link-tls/src/unicast.rs | 7 +++--- io/zenoh-links/zenoh-link-udp/src/unicast.rs | 6 ++--- 5 files changed, 26 insertions(+), 23 deletions(-) diff --git a/io/zenoh-link-commons/src/listener.rs b/io/zenoh-link-commons/src/listener.rs index a9ed7c65cc..1d5d7bb172 100644 --- a/io/zenoh-link-commons/src/listener.rs +++ b/io/zenoh-link-commons/src/listener.rs @@ -23,21 +23,21 @@ use zenoh_protocol::core::{EndPoint, Locator}; use zenoh_result::{zerror, ZResult}; use zenoh_sync::Signal; -pub struct UnicastListener { +pub struct ListenerUnicastIP { endpoint: EndPoint, active: Arc, signal: Signal, handle: JoinHandle>, } -impl UnicastListener { +impl ListenerUnicastIP { fn new( endpoint: EndPoint, active: Arc, signal: Signal, handle: JoinHandle>, - ) -> UnicastListener { - UnicastListener { + ) -> ListenerUnicastIP { + ListenerUnicastIP { endpoint, active, signal, @@ -46,13 +46,13 @@ impl UnicastListener { } } -pub struct UnicastListeners { - listeners: Arc>>, +pub struct ListenersUnicastIP { + listeners: Arc>>, } -impl UnicastListeners { - pub fn new() -> UnicastListeners { - UnicastListeners { +impl ListenersUnicastIP { + pub fn new() -> ListenersUnicastIP { + ListenersUnicastIP { listeners: Arc::new(RwLock::new(HashMap::new())), } } @@ -75,7 +75,7 @@ impl UnicastListeners { res }); - let listener = UnicastListener::new(endpoint, active, signal, wraphandle); + let listener = ListenerUnicastIP::new(endpoint, active, signal, wraphandle); // Update the list of active listeners on the manager listeners.insert(addr, listener); Ok(()) @@ -134,7 +134,7 @@ impl UnicastListeners { } } -impl Default for UnicastListeners { +impl Default for ListenersUnicastIP { fn default() -> Self { Self::new() } diff --git a/io/zenoh-links/zenoh-link-quic/src/unicast.rs b/io/zenoh-links/zenoh-link-quic/src/unicast.rs index a7e45f1ef8..fe18e24465 100644 --- a/io/zenoh-links/zenoh-link-quic/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-quic/src/unicast.rs @@ -32,7 +32,8 @@ use std::sync::Arc; use std::time::Duration; use zenoh_core::zasynclock; use zenoh_link_commons::{ - LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, UnicastListeners, + LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, ListenersUnicastIP, + NewLinkChannelSender, }; use zenoh_protocol::core::{EndPoint, Locator}; use zenoh_result::{bail, zerror, ZError, ZResult}; @@ -188,14 +189,14 @@ impl fmt::Debug for LinkUnicastQuic { pub struct LinkManagerUnicastQuic { manager: NewLinkChannelSender, - listeners: UnicastListeners, + listeners: ListenersUnicastIP, } impl LinkManagerUnicastQuic { pub fn new(manager: NewLinkChannelSender) -> Self { Self { manager, - listeners: UnicastListeners::new(), + listeners: ListenersUnicastIP::new(), } } } diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index b1e9a47405..ae88d54db8 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -22,7 +22,8 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use zenoh_link_commons::{ - LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, UnicastListeners, + LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, ListenersUnicastIP, + NewLinkChannelSender, }; use zenoh_protocol::core::{EndPoint, Locator}; use zenoh_result::{bail, zerror, Error as ZError, ZResult}; @@ -199,14 +200,14 @@ impl fmt::Debug for LinkUnicastTcp { pub struct LinkManagerUnicastTcp { manager: NewLinkChannelSender, - listeners: UnicastListeners, + listeners: ListenersUnicastIP, } impl LinkManagerUnicastTcp { pub fn new(manager: NewLinkChannelSender) -> Self { Self { manager, - listeners: UnicastListeners::new(), + listeners: ListenersUnicastIP::new(), } } } diff --git a/io/zenoh-links/zenoh-link-tls/src/unicast.rs b/io/zenoh-links/zenoh-link-tls/src/unicast.rs index eec28ebca4..abf5b13bbe 100644 --- a/io/zenoh-links/zenoh-link-tls/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tls/src/unicast.rs @@ -46,7 +46,8 @@ use webpki::{ }; use zenoh_core::zasynclock; use zenoh_link_commons::{ - LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, UnicastListeners, + LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, ListenersUnicastIP, + NewLinkChannelSender, }; use zenoh_protocol::core::endpoint::Config; use zenoh_protocol::core::{EndPoint, Locator}; @@ -237,14 +238,14 @@ impl fmt::Debug for LinkUnicastTls { pub struct LinkManagerUnicastTls { manager: NewLinkChannelSender, - listeners: UnicastListeners, + listeners: ListenersUnicastIP, } impl LinkManagerUnicastTls { pub fn new(manager: NewLinkChannelSender) -> Self { Self { manager, - listeners: UnicastListeners::new(), + listeners: ListenersUnicastIP::new(), } } } diff --git a/io/zenoh-links/zenoh-link-udp/src/unicast.rs b/io/zenoh-links/zenoh-link-udp/src/unicast.rs index 76ca2e221b..71fd56926c 100644 --- a/io/zenoh-links/zenoh-link-udp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/unicast.rs @@ -28,7 +28,7 @@ use std::time::Duration; use zenoh_core::{zasynclock, zlock}; use zenoh_link_commons::{ ConstructibleLinkManagerUnicast, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, - NewLinkChannelSender, UnicastListeners, + ListenersUnicastIP, NewLinkChannelSender, }; use zenoh_protocol::core::{EndPoint, Locator}; use zenoh_result::{bail, zerror, Error as ZError, ZResult}; @@ -257,14 +257,14 @@ impl fmt::Debug for LinkUnicastUdp { pub struct LinkManagerUnicastUdp { manager: NewLinkChannelSender, - listeners: UnicastListeners, + listeners: ListenersUnicastIP, } impl LinkManagerUnicastUdp { pub fn new(manager: NewLinkChannelSender) -> Self { Self { manager, - listeners: UnicastListeners::new(), + listeners: ListenersUnicastIP::new(), } } } From 7118f0f0a06b023ec071d62a9f8f6c633de29d9f Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Thu, 22 Feb 2024 15:57:27 +0100 Subject: [PATCH 6/6] Move get_ip_interface_names to commons --- Cargo.lock | 1 + io/zenoh-link-commons/Cargo.toml | 1 + io/zenoh-link-commons/src/unicast.rs | 14 +++++++++++ io/zenoh-links/zenoh-link-quic/src/unicast.rs | 8 +++---- io/zenoh-links/zenoh-link-tcp/src/unicast.rs | 23 +++---------------- io/zenoh-links/zenoh-link-tls/src/unicast.rs | 8 +++---- io/zenoh-links/zenoh-link-udp/src/unicast.rs | 23 +++---------------- 7 files changed, 28 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 849ebaa27e..fa9de7e800 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4779,6 +4779,7 @@ dependencies = [ "async-std", "async-trait", "flume", + "log", "lz4_flex", "serde", "typenum", diff --git a/io/zenoh-link-commons/Cargo.toml b/io/zenoh-link-commons/Cargo.toml index 6a6cd6dc87..019067de56 100644 --- a/io/zenoh-link-commons/Cargo.toml +++ b/io/zenoh-link-commons/Cargo.toml @@ -33,6 +33,7 @@ zenoh-sync = { workspace = true } async-trait = { workspace = true } flume = { workspace = true } lz4_flex = { workspace = true } +log = { workspace = true } serde = { workspace = true, features = ["default"] } typenum = { workspace = true } zenoh-buffers = { workspace = true } diff --git a/io/zenoh-link-commons/src/unicast.rs b/io/zenoh-link-commons/src/unicast.rs index 19e90c3b2c..1237024ca9 100644 --- a/io/zenoh-link-commons/src/unicast.rs +++ b/io/zenoh-link-commons/src/unicast.rs @@ -12,6 +12,7 @@ // ZettaScale Zenoh Team, // use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec}; +use async_std::net::SocketAddr; use async_trait::async_trait; use core::{ fmt, @@ -100,3 +101,16 @@ impl From> for LinkUnicast { LinkUnicast(link) } } + +pub fn get_ip_interface_names(addr: &SocketAddr) -> Vec { + match zenoh_util::net::get_interface_names_by_addr(addr.ip()) { + Ok(interfaces) => { + log::trace!("get_interface_names for {:?}: {:?}", addr.ip(), interfaces); + interfaces + } + Err(e) => { + log::debug!("get_interface_names for {:?} failed: {:?}", addr.ip(), e); + vec![] + } + } +} diff --git a/io/zenoh-links/zenoh-link-quic/src/unicast.rs b/io/zenoh-links/zenoh-link-quic/src/unicast.rs index fe18e24465..366860801e 100644 --- a/io/zenoh-links/zenoh-link-quic/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-quic/src/unicast.rs @@ -32,8 +32,8 @@ use std::sync::Arc; use std::time::Duration; use zenoh_core::zasynclock; use zenoh_link_commons::{ - LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, ListenersUnicastIP, - NewLinkChannelSender, + get_ip_interface_names, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, + ListenersUnicastIP, NewLinkChannelSender, }; use zenoh_protocol::core::{EndPoint, Locator}; use zenoh_result::{bail, zerror, ZError, ZResult}; @@ -144,9 +144,7 @@ impl LinkUnicastTrait for LinkUnicastQuic { #[inline(always)] fn get_interface_names(&self) -> Vec { - // @TODO: Not supported for now - log::debug!("The get_interface_names for LinkUnicastQuic is not supported"); - vec![] + get_ip_interface_names(&self.src_addr) } #[inline(always)] diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index ae88d54db8..551f4c8c97 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -22,8 +22,8 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use zenoh_link_commons::{ - LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, ListenersUnicastIP, - NewLinkChannelSender, + get_ip_interface_names, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, + ListenersUnicastIP, NewLinkChannelSender, }; use zenoh_protocol::core::{EndPoint, Locator}; use zenoh_result::{bail, zerror, Error as ZError, ZResult}; @@ -144,24 +144,7 @@ impl LinkUnicastTrait for LinkUnicastTcp { #[inline(always)] fn get_interface_names(&self) -> Vec { - match zenoh_util::net::get_interface_names_by_addr(self.src_addr.ip()) { - Ok(interfaces) => { - log::trace!( - "get_interface_names for {:?}: {:?}", - self.src_addr.ip(), - interfaces - ); - interfaces - } - Err(e) => { - log::debug!( - "get_interface_names for {:?} failed: {:?}", - self.src_addr.ip(), - e - ); - vec![] - } - } + get_ip_interface_names(&self.src_addr) } #[inline(always)] diff --git a/io/zenoh-links/zenoh-link-tls/src/unicast.rs b/io/zenoh-links/zenoh-link-tls/src/unicast.rs index abf5b13bbe..e3adea2dff 100644 --- a/io/zenoh-links/zenoh-link-tls/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tls/src/unicast.rs @@ -46,8 +46,8 @@ use webpki::{ }; use zenoh_core::zasynclock; use zenoh_link_commons::{ - LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, ListenersUnicastIP, - NewLinkChannelSender, + get_ip_interface_names, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, + ListenersUnicastIP, NewLinkChannelSender, }; use zenoh_protocol::core::endpoint::Config; use zenoh_protocol::core::{EndPoint, Locator}; @@ -196,9 +196,7 @@ impl LinkUnicastTrait for LinkUnicastTls { #[inline(always)] fn get_interface_names(&self) -> Vec { - // @TODO: Not supported for now - log::debug!("The get_interface_names for LinkUnicastTls is not supported"); - vec![] + get_ip_interface_names(&self.src_addr) } #[inline(always)] diff --git a/io/zenoh-links/zenoh-link-udp/src/unicast.rs b/io/zenoh-links/zenoh-link-udp/src/unicast.rs index 71fd56926c..a5bd3c7726 100644 --- a/io/zenoh-links/zenoh-link-udp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/unicast.rs @@ -27,8 +27,8 @@ use std::sync::{Arc, Mutex, Weak}; use std::time::Duration; use zenoh_core::{zasynclock, zlock}; use zenoh_link_commons::{ - ConstructibleLinkManagerUnicast, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, - ListenersUnicastIP, NewLinkChannelSender, + get_ip_interface_names, ConstructibleLinkManagerUnicast, LinkManagerUnicastTrait, LinkUnicast, + LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, }; use zenoh_protocol::core::{EndPoint, Locator}; use zenoh_result::{bail, zerror, Error as ZError, ZResult}; @@ -208,24 +208,7 @@ impl LinkUnicastTrait for LinkUnicastUdp { #[inline(always)] fn get_interface_names(&self) -> Vec { - match zenoh_util::net::get_interface_names_by_addr(self.src_addr.ip()) { - Ok(interfaces) => { - log::trace!( - "get_interface_names for {:?}: {:?}", - self.src_addr.ip(), - interfaces - ); - interfaces - } - Err(e) => { - log::debug!( - "get_interface_names for {:?} failed: {:?}", - self.src_addr.ip(), - e - ); - vec![] - } - } + get_ip_interface_names(&self.src_addr) } #[inline(always)]