From e25cd532f432643f8b5e6559697a7a0f03333b73 Mon Sep 17 00:00:00 2001 From: aeon Date: Tue, 26 Mar 2024 17:25:44 +0800 Subject: [PATCH] Add zenoh-link-btp crate --- Cargo.toml | 1 + io/zenoh-links/zenoh-link-btp/Cargo.toml | 39 ++ io/zenoh-links/zenoh-link-btp/src/lib.rs | 103 +++ .../zenoh-link-btp/src/multicast.rs | 337 ++++++++++ io/zenoh-links/zenoh-link-btp/src/unicast.rs | 632 ++++++++++++++++++ 5 files changed, 1112 insertions(+) create mode 100644 io/zenoh-links/zenoh-link-btp/Cargo.toml create mode 100644 io/zenoh-links/zenoh-link-btp/src/lib.rs create mode 100644 io/zenoh-links/zenoh-link-btp/src/multicast.rs create mode 100644 io/zenoh-links/zenoh-link-btp/src/unicast.rs diff --git a/Cargo.toml b/Cargo.toml index 363a63153f..bbc57f07ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ members = [ "io/zenoh-links/zenoh-link-unixsock_stream/", "io/zenoh-links/zenoh-link-ws/", "io/zenoh-links/zenoh-link-unixpipe/", + "io/zenoh-links/zenoh-link-btp/", "io/zenoh-transport", "plugins/example-plugin", "plugins/zenoh-backend-traits", diff --git a/io/zenoh-links/zenoh-link-btp/Cargo.toml b/io/zenoh-links/zenoh-link-btp/Cargo.toml new file mode 100644 index 0000000000..f4c06a3eaf --- /dev/null +++ b/io/zenoh-links/zenoh-link-btp/Cargo.toml @@ -0,0 +1,39 @@ +# +# Copyright (c) 2023 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# +[package] +rust-version = { workspace = true } +name = "zenoh-link-btp" +version = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +authors = { workspace = true } +edition = { workspace = true } +license = { workspace = true } +categories = { workspace = true } +description = "Internal crate for zenoh." +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-std = { workspace = true } +async-trait = { workspace = true } +log = { workspace = true } +socket2 = { workspace = true } +zenoh-buffers = { workspace = true } +zenoh-collections = { workspace = true } +zenoh-core = { workspace = true } +zenoh-link-commons = { workspace = true } +zenoh-protocol = { workspace = true } +zenoh-result = { workspace = true } +zenoh-sync = { workspace = true } +zenoh-util = { workspace = true } diff --git a/io/zenoh-links/zenoh-link-btp/src/lib.rs b/io/zenoh-links/zenoh-link-btp/src/lib.rs new file mode 100644 index 0000000000..20a48e8f4d --- /dev/null +++ b/io/zenoh-links/zenoh-link-btp/src/lib.rs @@ -0,0 +1,103 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +//! ⚠️ WARNING ⚠️ +//! +//! This crate is intended for Zenoh's internal use. +//! +//! [Click here for Zenoh's documentation](../zenoh/index.html) +mod multicast; +mod unicast; + +use async_std::net::ToSocketAddrs; +use async_trait::async_trait; +pub use multicast::*; +use std::net::SocketAddr; +pub use unicast::*; +use zenoh_core::zconfigurable; +use zenoh_link_commons::LocatorInspector; +use zenoh_protocol::core::{endpoint::Address, Locator}; +use zenoh_result::{zerror, ZResult}; + +// NOTE: In case of using UDP in high-throughput scenarios, it is recommended to set the +// UDP buffer size on the host to a reasonable size. Usually, default values for UDP buffers +// size are undersized. Setting UDP buffers on the host to a size of 4M can be considered +// as a safe choice. +// Usually, on Linux systems this could be achieved by executing: +// $ sysctl -w net.core.rmem_max=4194304 +// $ sysctl -w net.core.rmem_default=4194304 + +// Maximum MTU (UDP PDU) in bytes. +// NOTE: The UDP field size sets a theoretical limit of 65,535 bytes (8 byte header + 65,527 bytes of +// data) for a UDP datagram. However the actual limit for the data length, which is imposed by +// the underlying IPv4 protocol, is 65,507 bytes (65,535 − 8 byte UDP header − 20 byte IP header). +// Although in IPv6 it is possible to have UDP datagrams of size greater than 65,535 bytes via +// IPv6 Jumbograms, its usage in Zenoh is discouraged unless the consequences are very well +// understood. +const UDP_MAX_MTU: u16 = 65_507; + +pub const UDP_LOCATOR_PREFIX: &str = "udp"; + +#[cfg(any(target_os = "linux", target_os = "windows"))] +// Linux default value of a maximum datagram size is set to UDP MAX MTU. +const UDP_MTU_LIMIT: u16 = UDP_MAX_MTU; + +#[cfg(target_os = "macos")] +// Mac OS X default value of a maximum datagram size is set to 9216 bytes. +const UDP_MTU_LIMIT: u16 = 9_216; + +#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))] +const UDP_MTU_LIMIT: u16 = 8_192; + +zconfigurable! { + // Default MTU (UDP PDU) in bytes. + static ref UDP_DEFAULT_MTU: u16 = UDP_MTU_LIMIT; + // Amount of time in microseconds to throttle the accept loop upon an error. + // Default set to 100 ms. + static ref UDP_ACCEPT_THROTTLE_TIME: u64 = 100_000; +} + +#[derive(Default, Clone, Copy)] +pub struct UdpLocatorInspector; +#[async_trait] +impl LocatorInspector for UdpLocatorInspector { + fn protocol(&self) -> &str { + UDP_LOCATOR_PREFIX + } + + async fn is_multicast(&self, locator: &Locator) -> ZResult { + let is_multicast = get_udp_addrs(locator.address()) + .await? + .any(|x| x.ip().is_multicast()); + Ok(is_multicast) + } +} + +pub mod config { + pub const UDP_MULTICAST_IFACE: &str = "iface"; + pub const UDP_MULTICAST_JOIN: &str = "join"; +} + +pub async fn get_udp_addrs(address: Address<'_>) -> ZResult> { + let iter = address + .as_str() + .to_socket_addrs() + .await + .map_err(|e| zerror!("{}", e))?; + Ok(iter) +} + +pub(crate) fn socket_addr_to_udp_locator(addr: &SocketAddr) -> Locator { + Locator::new(UDP_LOCATOR_PREFIX, addr.to_string(), "").unwrap() +} diff --git a/io/zenoh-links/zenoh-link-btp/src/multicast.rs b/io/zenoh-links/zenoh-link-btp/src/multicast.rs new file mode 100644 index 0000000000..838bb8acd5 --- /dev/null +++ b/io/zenoh-links/zenoh-link-btp/src/multicast.rs @@ -0,0 +1,337 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use super::{config::*, UDP_DEFAULT_MTU}; +use crate::{get_udp_addrs, socket_addr_to_udp_locator}; +use async_std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; +use async_trait::async_trait; +use socket2::{Domain, Protocol, Socket, Type}; +use std::sync::Arc; +use std::{borrow::Cow, fmt}; +use zenoh_link_commons::{LinkManagerMulticastTrait, LinkMulticast, LinkMulticastTrait}; +use zenoh_protocol::core::{Config, EndPoint, Locator}; +use zenoh_result::{bail, zerror, Error as ZError, ZResult}; + +pub struct LinkMulticastUdp { + // The unicast socket address of this link + unicast_addr: SocketAddr, + unicast_locator: Locator, + // The unicast UDP socket used for write (and porentiablly read) operations + unicast_socket: UdpSocket, + // The multicast socket address of this link + multicast_addr: SocketAddr, + multicast_locator: Locator, + // The multicast UDP socket used for read operations + mcast_sock: UdpSocket, +} + +impl LinkMulticastUdp { + fn new( + unicast_addr: SocketAddr, + unicast_socket: UdpSocket, + multicast_addr: SocketAddr, + mcast_sock: UdpSocket, + ) -> LinkMulticastUdp { + LinkMulticastUdp { + unicast_locator: socket_addr_to_udp_locator(&unicast_addr), + multicast_locator: socket_addr_to_udp_locator(&multicast_addr), + unicast_addr, + unicast_socket, + multicast_addr, + mcast_sock, + } + } +} + +#[async_trait] +impl LinkMulticastTrait for LinkMulticastUdp { + async fn close(&self) -> ZResult<()> { + log::trace!("Closing UDP link: {}", self); + match self.multicast_addr.ip() { + IpAddr::V4(dst_ip4) => match self.multicast_addr.ip() { + IpAddr::V4(src_ip4) => self.mcast_sock.leave_multicast_v4(dst_ip4, src_ip4), + IpAddr::V6(_) => unreachable!(), + }, + IpAddr::V6(dst_ip6) => self.mcast_sock.leave_multicast_v6(&dst_ip6, 0), + } + .map_err(|e| { + let e = zerror!("Close error on UDP link {}: {}", self, e); + log::trace!("{}", e); + e.into() + }) + } + + async fn write(&self, buffer: &[u8]) -> ZResult { + self.unicast_socket + .send_to(buffer, self.multicast_addr) + .await + .map_err(|e| { + let e = zerror!("Write error on UDP link {}: {}", self, e); + log::trace!("{}", e); + e.into() + }) + } + + async fn write_all(&self, buffer: &[u8]) -> ZResult<()> { + let mut written: usize = 0; + while written < buffer.len() { + written += self.write(&buffer[written..]).await?; + } + Ok(()) + } + + async fn read<'a>(&'a self, buffer: &mut [u8]) -> ZResult<(usize, Cow<'a, Locator>)> { + loop { + let (n, addr) = self.mcast_sock.recv_from(buffer).await.map_err(|e| { + let e = zerror!("Read error on UDP link {}: {}", self, e); + log::trace!("{}", e); + e + })?; + + if self.unicast_addr == addr { + continue; // We are reading our own messages, skip it + } else { + let locator = socket_addr_to_udp_locator(&addr); + break Ok((n, Cow::Owned(locator))); + } + } + } + + #[inline(always)] + fn get_src(&self) -> &Locator { + &self.unicast_locator + } + + #[inline(always)] + fn get_dst(&self) -> &Locator { + &self.multicast_locator + } + + #[inline(always)] + fn get_mtu(&self) -> u16 { + *UDP_DEFAULT_MTU + } + + #[inline(always)] + fn is_reliable(&self) -> bool { + false + } +} + +impl fmt::Display for LinkMulticastUdp { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{} => {}", self.unicast_addr, self.multicast_addr)?; + Ok(()) + } +} + +impl fmt::Debug for LinkMulticastUdp { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Udp") + .field("src", &self.unicast_addr) + .field("dst", &self.multicast_addr) + .finish() + } +} + +/*************************************/ +/* MANAGER */ +/*************************************/ +#[derive(Default)] +pub struct LinkManagerMulticastUdp; + +impl LinkManagerMulticastUdp { + async fn new_link_inner( + &self, + mcast_addr: &SocketAddr, + config: Config<'_>, + ) -> ZResult<(UdpSocket, UdpSocket, SocketAddr)> { + let domain = match mcast_addr.ip() { + IpAddr::V4(_) => Domain::IPV4, + IpAddr::V6(_) => Domain::IPV6, + }; + + // Get default iface address to bind the socket on if provided + let mut iface_addr: Option = None; + if let Some(iface) = config.get(UDP_MULTICAST_IFACE) { + iface_addr = match iface.parse() { + Ok(addr) => Some(addr), + Err(_) => zenoh_util::net::get_unicast_addresses_of_interface(iface)? + .into_iter() + .filter(|x| match mcast_addr.ip() { + IpAddr::V4(_) => x.is_ipv4(), + IpAddr::V6(_) => x.is_ipv6(), + }) + .take(1) + .collect::>() + .get(0) + .copied(), + }; + } + + // Get local unicast address to bind the socket on + let local_addr = match iface_addr { + Some(iface_addr) => iface_addr, + None => { + let iface = zenoh_util::net::get_unicast_addresses_of_multicast_interfaces() + .into_iter() + .filter(|x| { + !x.is_loopback() + && match mcast_addr.ip() { + IpAddr::V4(_) => x.is_ipv4(), + IpAddr::V6(_) => x.is_ipv6(), + } + }) + .take(1) + .collect::>() + .get(0) + .copied(); + + match iface { + Some(iface) => iface, + None => match mcast_addr.ip() { + IpAddr::V4(_) => IpAddr::V4(Ipv4Addr::UNSPECIFIED), + IpAddr::V6(_) => IpAddr::V6(Ipv6Addr::UNSPECIFIED), + }, + } + } + }; + + // Establish a unicast UDP socket + let ucast_sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP)) + .map_err(|e| zerror!("{}: {}", mcast_addr, e))?; + match &local_addr { + IpAddr::V4(addr) => { + ucast_sock + .set_multicast_if_v4(addr) + .map_err(|e| zerror!("{}: {}", mcast_addr, e))?; + } + IpAddr::V6(_) => match zenoh_util::net::get_index_of_interface(local_addr) { + Ok(idx) => ucast_sock + .set_multicast_if_v6(idx) + .map_err(|e| zerror!("{}: {}", mcast_addr, e))?, + Err(e) => bail!("{}: {}", mcast_addr, e), + }, + } + + ucast_sock + .bind(&SocketAddr::new(local_addr, 0).into()) + .map_err(|e| zerror!("{}: {}", mcast_addr, e))?; + + let ucast_sock: UdpSocket = std::net::UdpSocket::from(ucast_sock).into(); + + // Establish a multicast UDP socket + let mcast_sock = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP)) + .map_err(|e| zerror!("{}: {}", mcast_addr, e))?; + mcast_sock + .set_reuse_address(true) + .map_err(|e| zerror!("{}: {}", mcast_addr, e))?; + #[cfg(target_family = "unix")] + { + mcast_sock + .set_reuse_port(true) + .map_err(|e| zerror!("{}: {}", mcast_addr, e))?; + } + + // Bind the socket: let's bing to the unspecified address so we can join and read + // from multiple multicast groups. + let bind_mcast_addr = match mcast_addr.ip() { + IpAddr::V4(_) => IpAddr::V4(Ipv4Addr::UNSPECIFIED), + IpAddr::V6(_) => IpAddr::V6(Ipv6Addr::UNSPECIFIED), + }; + mcast_sock + .bind(&SocketAddr::new(bind_mcast_addr, mcast_addr.port()).into()) + .map_err(|e| zerror!("{}: {}", mcast_addr, e))?; + + // Join the multicast group + let join = config.values(UDP_MULTICAST_JOIN); + match mcast_addr.ip() { + IpAddr::V4(dst_ip4) => match local_addr { + IpAddr::V4(src_ip4) => { + // Join default multicast group + mcast_sock + .join_multicast_v4(&dst_ip4, &src_ip4) + .map_err(|e| zerror!("{}: {}", mcast_addr, e))?; + // Join any additional multicast group + for g in join { + let g: Ipv4Addr = + g.parse().map_err(|e| zerror!("{}: {}", mcast_addr, e))?; + mcast_sock + .join_multicast_v4(&g, &src_ip4) + .map_err(|e| zerror!("{}: {}", mcast_addr, e))?; + } + } + IpAddr::V6(src_ip6) => bail!("{}: unexepcted IPv6 source address", src_ip6), + }, + IpAddr::V6(dst_ip6) => { + // Join default multicast group + mcast_sock + .join_multicast_v6(&dst_ip6, 0) + .map_err(|e| zerror!("{}: {}", mcast_addr, e))?; + // Join any additional multicast group + for g in join { + let g: Ipv6Addr = g.parse().map_err(|e| zerror!("{}: {}", mcast_addr, e))?; + mcast_sock + .join_multicast_v6(&g, 0) + .map_err(|e| zerror!("{}: {}", mcast_addr, e))?; + } + } + }; + + // Build the async_std multicast UdpSocket + let mcast_sock: UdpSocket = std::net::UdpSocket::from(mcast_sock).into(); + + let ucast_addr = ucast_sock + .local_addr() + .map_err(|e| zerror!("{}: {}", mcast_addr, e))?; + assert_eq!(ucast_addr.ip(), local_addr); + + Ok((mcast_sock, ucast_sock, ucast_addr)) + } +} + +#[async_trait] +impl LinkManagerMulticastTrait for LinkManagerMulticastUdp { + async fn new_link(&self, endpoint: &EndPoint) -> ZResult { + let mcast_addrs = get_udp_addrs(endpoint.address()) + .await? + .filter(|a| a.ip().is_multicast()) + .collect::>(); + + let mut errs: Vec = vec![]; + for maddr in mcast_addrs { + match self.new_link_inner(&maddr, endpoint.config()).await { + Ok((mcast_sock, ucast_sock, ucast_addr)) => { + let link = Arc::new(LinkMulticastUdp::new( + ucast_addr, ucast_sock, maddr, mcast_sock, + )); + + return Ok(LinkMulticast(link)); + } + Err(e) => { + errs.push(e); + } + } + } + + if errs.is_empty() { + errs.push(zerror!("No UDP multicast addresses available").into()); + } + + bail!( + "Can not create a new UDP link bound to {}: {:?}", + endpoint, + errs + ) + } +} diff --git a/io/zenoh-links/zenoh-link-btp/src/unicast.rs b/io/zenoh-links/zenoh-link-btp/src/unicast.rs new file mode 100644 index 0000000000..585442ed71 --- /dev/null +++ b/io/zenoh-links/zenoh-link-btp/src/unicast.rs @@ -0,0 +1,632 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use super::{ + get_udp_addrs, socket_addr_to_udp_locator, UDP_ACCEPT_THROTTLE_TIME, UDP_DEFAULT_MTU, + UDP_MAX_MTU, +}; +use async_std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; +use async_std::prelude::*; +use async_std::sync::Mutex as AsyncMutex; +use async_std::task; +use async_std::task::JoinHandle; +use async_trait::async_trait; +use std::collections::HashMap; +use std::fmt; +use std::net::IpAddr; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex, RwLock, Weak}; +use std::time::Duration; +use zenoh_core::{zasynclock, zlock, zread, zwrite}; +use zenoh_link_commons::{ + ConstructibleLinkManagerUnicast, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, + NewLinkChannelSender, +}; +use zenoh_protocol::core::{EndPoint, Locator}; +use zenoh_result::{bail, zerror, Error as ZError, ZResult}; +use zenoh_sync::Mvar; +use zenoh_sync::Signal; + +type LinkHashMap = Arc>>>; +type LinkInput = (Vec, usize); +type LinkLeftOver = (Vec, usize, usize); + +struct LinkUnicastUdpConnected { + socket: Arc, +} + +impl LinkUnicastUdpConnected { + async fn read(&self, buffer: &mut [u8]) -> ZResult { + self.socket + .recv(buffer) + .await + .map_err(|e| zerror!(e).into()) + } + + async fn write(&self, buffer: &[u8]) -> ZResult { + self.socket + .send(buffer) + .await + .map_err(|e| zerror!(e).into()) + } + + async fn close(&self) -> ZResult<()> { + Ok(()) + } +} + +struct LinkUnicastUdpUnconnected { + socket: Weak, + links: LinkHashMap, + input: Mvar, + leftover: AsyncMutex>, +} + +impl LinkUnicastUdpUnconnected { + async fn received(&self, buffer: Vec, len: usize) { + self.input.put((buffer, len)).await; + } + + async fn read(&self, buffer: &mut [u8]) -> ZResult { + let mut guard = zasynclock!(self.leftover); + let (slice, start, len) = match guard.take() { + Some(tuple) => tuple, + None => { + let (slice, len) = self.input.take().await; + (slice, 0, len) + } + }; + // Copy the read bytes into the target buffer + let len_min = (len - start).min(buffer.len()); + let end = start + len_min; + buffer[0..len_min].copy_from_slice(&slice[start..end]); + if end < len { + // Store the leftover + *guard = Some((slice, end, len)); + } else { + // Recycle the buffer + drop(slice); + } + // Return the amount read + Ok(len_min) + } + + async fn write(&self, buffer: &[u8], dst_addr: SocketAddr) -> ZResult { + match self.socket.upgrade() { + Some(socket) => socket + .send_to(buffer, &dst_addr) + .await + .map_err(|e| zerror!(e).into()), + None => bail!("UDP listener has been dropped"), + } + } + + async fn close(&self, src_addr: SocketAddr, dst_addr: SocketAddr) -> ZResult<()> { + // Delete the link from the list of links + zlock!(self.links).remove(&(src_addr, dst_addr)); + Ok(()) + } +} + +enum LinkUnicastUdpVariant { + Connected(LinkUnicastUdpConnected), + Unconnected(Arc), +} + +pub struct LinkUnicastUdp { + // The source socket address of this link (address used on the local host) + src_addr: SocketAddr, + src_locator: Locator, + // The destination socket address of this link (address used on the remote host) + dst_addr: SocketAddr, + dst_locator: Locator, + // The UDP socket is connected to the peer + variant: LinkUnicastUdpVariant, +} + +impl LinkUnicastUdp { + fn new( + src_addr: SocketAddr, + dst_addr: SocketAddr, + variant: LinkUnicastUdpVariant, + ) -> LinkUnicastUdp { + LinkUnicastUdp { + src_locator: socket_addr_to_udp_locator(&src_addr), + dst_locator: socket_addr_to_udp_locator(&dst_addr), + src_addr, + dst_addr, + variant, + } + } +} + +#[async_trait] +impl LinkUnicastTrait for LinkUnicastUdp { + async fn close(&self) -> ZResult<()> { + log::trace!("Closing UDP link: {}", self); + match &self.variant { + LinkUnicastUdpVariant::Connected(link) => link.close().await, + LinkUnicastUdpVariant::Unconnected(link) => { + link.close(self.src_addr, self.dst_addr).await + } + } + } + + async fn write(&self, buffer: &[u8]) -> ZResult { + match &self.variant { + LinkUnicastUdpVariant::Connected(link) => link.write(buffer).await, + LinkUnicastUdpVariant::Unconnected(link) => link.write(buffer, self.dst_addr).await, + } + } + + async fn write_all(&self, buffer: &[u8]) -> ZResult<()> { + let mut written: usize = 0; + while written < buffer.len() { + written += self.write(&buffer[written..]).await?; + } + Ok(()) + } + + async fn read(&self, buffer: &mut [u8]) -> ZResult { + match &self.variant { + LinkUnicastUdpVariant::Connected(link) => link.read(buffer).await, + LinkUnicastUdpVariant::Unconnected(link) => link.read(buffer).await, + } + } + + async fn read_exact(&self, buffer: &mut [u8]) -> ZResult<()> { + let mut read: usize = 0; + while read < buffer.len() { + let n = self.read(&mut buffer[read..]).await?; + read += n; + } + Ok(()) + } + + #[inline(always)] + fn get_src(&self) -> &Locator { + &self.src_locator + } + + #[inline(always)] + fn get_dst(&self) -> &Locator { + &self.dst_locator + } + + #[inline(always)] + fn get_mtu(&self) -> u16 { + *UDP_DEFAULT_MTU + } + + #[inline(always)] + fn is_reliable(&self) -> bool { + false + } + + #[inline(always)] + fn is_streamed(&self) -> bool { + false + } +} + +impl fmt::Display for LinkUnicastUdp { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{} => {}", self.src_addr, self.dst_addr)?; + Ok(()) + } +} + +impl fmt::Debug for LinkUnicastUdp { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Udp") + .field("src", &self.src_addr) + .field("dst", &self.dst_addr) + .finish() + } +} + +/*************************************/ +/* LISTENER */ +/*************************************/ +struct ListenerUnicastUdp { + endpoint: EndPoint, + active: Arc, + signal: Signal, + handle: JoinHandle>, +} + +impl ListenerUnicastUdp { + fn new( + endpoint: EndPoint, + active: Arc, + signal: Signal, + handle: JoinHandle>, + ) -> ListenerUnicastUdp { + ListenerUnicastUdp { + endpoint, + active, + signal, + handle, + } + } +} + +pub struct LinkManagerUnicastUdp { + manager: NewLinkChannelSender, + listeners: Arc>>, +} + +impl LinkManagerUnicastUdp { + pub fn new(manager: NewLinkChannelSender) -> Self { + Self { + manager, + listeners: Arc::new(RwLock::new(HashMap::new())), + } + } +} +impl ConstructibleLinkManagerUnicast<()> for LinkManagerUnicastUdp { + fn new(new_link_sender: NewLinkChannelSender, _: ()) -> ZResult { + Ok(Self::new(new_link_sender)) + } +} + +impl LinkManagerUnicastUdp { + async fn new_link_inner( + &self, + dst_addr: &SocketAddr, + ) -> ZResult<(UdpSocket, SocketAddr, SocketAddr)> { + // Establish a UDP socket + let socket = UdpSocket::bind(SocketAddr::new( + if dst_addr.is_ipv4() { + Ipv4Addr::UNSPECIFIED.into() + } else { + Ipv6Addr::UNSPECIFIED.into() + }, // UDP addr + 0, // UDP port + )) + .await + .map_err(|e| { + let e = zerror!("Can not create a new UDP link bound to {}: {}", dst_addr, e); + log::warn!("{}", e); + e + })?; + + // Connect the socket to the remote address + socket.connect(dst_addr).await.map_err(|e| { + let e = zerror!("Can not create a new UDP link bound to {}: {}", dst_addr, e); + log::warn!("{}", e); + e + })?; + + // Get source and destination UDP addresses + let src_addr = socket.local_addr().map_err(|e| { + let e = zerror!("Can not create a new UDP link bound to {}: {}", dst_addr, e); + log::warn!("{}", e); + e + })?; + + let dst_addr = socket.peer_addr().map_err(|e| { + let e = zerror!("Can not create a new UDP link bound to {}: {}", dst_addr, e); + log::warn!("{}", e); + e + })?; + + Ok((socket, src_addr, dst_addr)) + } + + async fn new_listener_inner(&self, addr: &SocketAddr) -> ZResult<(UdpSocket, SocketAddr)> { + // Bind the UDP socket + let socket = UdpSocket::bind(addr).await.map_err(|e| { + let e = zerror!("Can not create a new UDP listener on {}: {}", addr, e); + log::warn!("{}", e); + e + })?; + + let local_addr = socket.local_addr().map_err(|e| { + let e = zerror!("Can not create a new UDP listener on {}: {}", addr, e); + log::warn!("{}", e); + e + })?; + + Ok((socket, local_addr)) + } +} + +#[async_trait] +impl LinkManagerUnicastTrait for LinkManagerUnicastUdp { + async fn new_link(&self, endpoint: EndPoint) -> ZResult { + let dst_addrs = get_udp_addrs(endpoint.address()) + .await? + .filter(|a| !a.ip().is_multicast()); + + let mut errs: Vec = vec![]; + for da in dst_addrs { + match self.new_link_inner(&da).await { + Ok((socket, src_addr, dst_addr)) => { + // Create UDP link + let link = Arc::new(LinkUnicastUdp::new( + src_addr, + dst_addr, + LinkUnicastUdpVariant::Connected(LinkUnicastUdpConnected { + socket: Arc::new(socket), + }), + )); + + return Ok(LinkUnicast(link)); + } + Err(e) => { + errs.push(e); + } + } + } + + if errs.is_empty() { + errs.push(zerror!("No UDP unicast addresses available").into()); + } + + bail!( + "Can not create a new UDP link bound to {}: {:?}", + endpoint, + errs + ) + } + + async fn new_listener(&self, mut endpoint: EndPoint) -> ZResult { + let addrs = get_udp_addrs(endpoint.address()) + .await? + .filter(|a| !a.ip().is_multicast()); + + let mut errs: Vec = vec![]; + for da in addrs { + match self.new_listener_inner(&da).await { + Ok((socket, local_addr)) => { + // Update the endpoint locator address + endpoint = EndPoint::new( + endpoint.protocol(), + local_addr.to_string(), + endpoint.metadata(), + endpoint.config(), + )?; + + // Spawn the accept loop for the listener + let active = Arc::new(AtomicBool::new(true)); + let signal = Signal::new(); + let mut listeners = zwrite!(self.listeners); + + let c_active = active.clone(); + let c_signal = signal.clone(); + let c_manager = self.manager.clone(); + let c_listeners = self.listeners.clone(); + let c_addr = local_addr; + let handle = task::spawn(async move { + // Wait for the accept loop to terminate + let res = accept_read_task(socket, c_active, c_signal, c_manager).await; + zwrite!(c_listeners).remove(&c_addr); + res + }); + + let locator = endpoint.to_locator(); + let listener = ListenerUnicastUdp::new(endpoint, active, signal, handle); + // Update the list of active listeners on the manager + listeners.insert(local_addr, listener); + + return Ok(locator); + } + Err(e) => { + errs.push(e); + } + } + } + + if errs.is_empty() { + errs.push(zerror!("No UDP unicast addresses available").into()); + } + + bail!( + "Can not create a new UDP listener bound to {}: {:?}", + endpoint, + errs + ) + } + + async fn del_listener(&self, endpoint: &EndPoint) -> ZResult<()> { + let addrs = get_udp_addrs(endpoint.address()) + .await? + .filter(|x| !x.ip().is_multicast()); + + // Stop the listener + let mut errs: Vec = vec![]; + let mut listener = None; + for a in addrs { + match zwrite!(self.listeners).remove(&a) { + Some(l) => { + // We cannot keep a sync guard across a .await + // Break the loop and assign the listener. + listener = Some(l); + break; + } + None => { + errs.push(zerror!("{}", a).into()); + } + } + } + + match listener { + Some(l) => { + // Send the stop signal + l.active.store(false, Ordering::Release); + l.signal.trigger(); + l.handle.await + } + None => { + bail!( + "Can not delete the UDP listener bound to {}: {:?}", + endpoint, + errs + ) + } + } + } + + fn get_listeners(&self) -> Vec { + zread!(self.listeners) + .values() + .map(|l| l.endpoint.clone()) + .collect() + } + + fn get_locators(&self) -> Vec { + let mut locators = vec![]; + + let guard = zread!(self.listeners); + for (key, value) in guard.iter() { + let (kip, kpt) = (key.ip(), key.port()); + + // Either ipv4/0.0.0.0 or ipv6/[::] + if kip.is_unspecified() { + let mut addrs = match kip { + IpAddr::V4(_) => zenoh_util::net::get_ipv4_ipaddrs(), + IpAddr::V6(_) => zenoh_util::net::get_ipv6_ipaddrs(), + }; + let iter = addrs.drain(..).map(|x| { + Locator::new( + value.endpoint.protocol(), + SocketAddr::new(x, kpt).to_string(), + value.endpoint.metadata(), + ) + .unwrap() + }); + locators.extend(iter); + } else { + locators.push(value.endpoint.to_locator()); + } + } + + locators + } +} + +async fn accept_read_task( + socket: UdpSocket, + active: Arc, + signal: Signal, + manager: NewLinkChannelSender, +) -> ZResult<()> { + let socket = Arc::new(socket); + let links: LinkHashMap = Arc::new(Mutex::new(HashMap::new())); + + macro_rules! zaddlink { + ($src:expr, $dst:expr, $link:expr) => { + zlock!(links).insert(($src, $dst), $link); + }; + } + + macro_rules! zdellink { + ($src:expr, $dst:expr) => { + zlock!(links).remove(&($src, $dst)); + }; + } + + macro_rules! zgetlink { + ($src:expr, $dst:expr) => { + zlock!(links).get(&($src, $dst)).map(|link| link.clone()) + }; + } + + enum Action { + Receive((usize, SocketAddr)), + Stop, + } + + async fn receive(socket: Arc, buffer: &mut [u8]) -> ZResult { + let res = socket.recv_from(buffer).await.map_err(|e| zerror!(e))?; + Ok(Action::Receive(res)) + } + + async fn stop(signal: Signal) -> ZResult { + signal.wait().await; + Ok(Action::Stop) + } + + let src_addr = socket.local_addr().map_err(|e| { + let e = zerror!("Can not accept UDP connections: {}", e); + log::warn!("{}", e); + e + })?; + + log::trace!("Ready to accept UDP connections on: {:?}", src_addr); + // Buffers for deserialization + while active.load(Ordering::Acquire) { + let mut buff = zenoh_buffers::vec::uninit(UDP_MAX_MTU as usize); + // Wait for incoming connections + let (n, dst_addr) = match receive(socket.clone(), &mut buff) + .race(stop(signal.clone())) + .await + { + Ok(action) => match action { + Action::Receive((n, addr)) => (n, addr), + Action::Stop => break, + }, + Err(e) => { + log::warn!("{}. Hint: increase the system open file limit.", e); + // Throttle the accept loop upon an error + // NOTE: This might be due to various factors. However, the most common case is that + // the process has reached the maximum number of open files in the system. On + // Linux systems this limit can be changed by using the "ulimit" command line + // tool. In case of systemd-based systems, this can be changed by using the + // "sysctl" command line tool. + task::sleep(Duration::from_micros(*UDP_ACCEPT_THROTTLE_TIME)).await; + continue; + } + }; + + let link = loop { + let res = zgetlink!(src_addr, dst_addr); + match res { + Some(link) => break link.upgrade(), + None => { + // A new peers has sent data to this socket + log::debug!("Accepted UDP connection on {}: {}", src_addr, dst_addr); + let unconnected = Arc::new(LinkUnicastUdpUnconnected { + socket: Arc::downgrade(&socket), + links: links.clone(), + input: Mvar::new(), + leftover: AsyncMutex::new(None), + }); + zaddlink!(src_addr, dst_addr, Arc::downgrade(&unconnected)); + // Create the new link object + let link = Arc::new(LinkUnicastUdp::new( + src_addr, + dst_addr, + LinkUnicastUdpVariant::Unconnected(unconnected), + )); + // Add the new link to the set of connected peers + if let Err(e) = manager.send_async(LinkUnicast(link)).await { + log::error!("{}-{}: {}", file!(), line!(), e) + } + } + } + }; + + match link { + Some(link) => { + link.received(buff, n).await; + } + None => { + zdellink!(src_addr, dst_addr); + } + } + } + + Ok(()) +}