From 6b4c4cc35a4247e9b660af7dbb658b2202406127 Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Fri, 6 Sep 2024 08:02:46 +0000 Subject: [PATCH] Remove `NewLinkUnicast` --- commons/zenoh-protocol/src/core/locator.rs | 4 ++++ io/zenoh-link-commons/src/unicast.rs | 13 +------------ io/zenoh-links/zenoh-link-quic/src/unicast.rs | 8 +++----- io/zenoh-links/zenoh-link-serial/src/unicast.rs | 10 +++------- io/zenoh-links/zenoh-link-tcp/src/unicast.rs | 8 +++----- io/zenoh-links/zenoh-link-tls/src/unicast.rs | 8 +++----- io/zenoh-links/zenoh-link-udp/src/unicast.rs | 9 +++------ .../zenoh-link-unixpipe/src/unix/unicast.rs | 9 ++------- .../zenoh-link-unixsock_stream/src/unicast.rs | 7 ++----- io/zenoh-links/zenoh-link-vsock/src/unicast.rs | 7 ++----- io/zenoh-links/zenoh-link-ws/src/unicast.rs | 13 ++----------- .../src/unicast/establishment/accept.rs | 9 +++------ io/zenoh-transport/src/unicast/manager.rs | 5 ++--- 13 files changed, 33 insertions(+), 77 deletions(-) diff --git a/commons/zenoh-protocol/src/core/locator.rs b/commons/zenoh-protocol/src/core/locator.rs index 14f899e7c6..a34cc55320 100644 --- a/commons/zenoh-protocol/src/core/locator.rs +++ b/commons/zenoh-protocol/src/core/locator.rs @@ -64,6 +64,10 @@ impl Locator { pub fn as_str(&self) -> &str { self.0.as_str() } + + pub fn to_endpoint(&self) -> EndPoint { + self.0.clone() + } } impl From for Locator { diff --git a/io/zenoh-link-commons/src/unicast.rs b/io/zenoh-link-commons/src/unicast.rs index 15d7ce8b17..18ee8d2a45 100644 --- a/io/zenoh-link-commons/src/unicast.rs +++ b/io/zenoh-link-commons/src/unicast.rs @@ -36,18 +36,7 @@ pub trait LinkManagerUnicastTrait: Send + Sync { async fn get_listeners(&self) -> Vec; async fn get_locators(&self) -> Vec; } -pub type NewLinkChannelSender = flume::Sender; - -// TODO(fuzzypixelz): Delete this -/// Notification of a new inbound connection. -/// -/// Link implementations should preserve the metadata sections of [`NewLinkUnicast::endpoint`]. -pub struct NewLinkUnicast { - /// The link created in response to a new inbound connection. - pub link: LinkUnicast, - /// Endpoint of the listener. - pub endpoint: EndPoint, -} +pub type NewLinkChannelSender = flume::Sender; pub trait ConstructibleLinkManagerUnicast: Sized { fn new(new_link_sender: NewLinkChannelSender, config: T) -> ZResult; diff --git a/io/zenoh-links/zenoh-link-quic/src/unicast.rs b/io/zenoh-links/zenoh-link-quic/src/unicast.rs index 3fdcfdc62d..266cdb8e31 100644 --- a/io/zenoh-links/zenoh-link-quic/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-quic/src/unicast.rs @@ -27,7 +27,7 @@ use x509_parser::prelude::*; use zenoh_core::zasynclock; use zenoh_link_commons::{ get_ip_interface_names, LinkAuthId, LinkAuthType, LinkManagerUnicastTrait, LinkUnicast, - LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, NewLinkUnicast, + LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -336,9 +336,8 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic { let task = { let token = token.clone(); let manager = self.manager.clone(); - let endpoint = endpoint.clone(); - async move { accept_task(endpoint, quic_endpoint, token, manager).await } + async move { accept_task(quic_endpoint, token, manager).await } }; // Initialize the QuicAcceptor @@ -367,7 +366,6 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic { } async fn accept_task( - endpoint: EndPoint, quic_endpoint: quinn::Endpoint, token: CancellationToken, manager: NewLinkChannelSender, @@ -433,7 +431,7 @@ async fn accept_task( )); // Communicate the new link to the initial transport manager - if let Err(e) = manager.send_async(NewLinkUnicast { link: LinkUnicast(link), endpoint: endpoint.clone() }).await { + if let Err(e) = manager.send_async(LinkUnicast(link)).await { tracing::error!("{}-{}: {}", file!(), line!(), e) } diff --git a/io/zenoh-links/zenoh-link-serial/src/unicast.rs b/io/zenoh-links/zenoh-link-serial/src/unicast.rs index b048121a9a..0d6d4b07ee 100644 --- a/io/zenoh-links/zenoh-link-serial/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-serial/src/unicast.rs @@ -33,7 +33,7 @@ use z_serial::ZSerial; use zenoh_core::{zasynclock, zasyncread, zasyncwrite}; use zenoh_link_commons::{ ConstructibleLinkManagerUnicast, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, - LinkUnicastTrait, NewLinkChannelSender, NewLinkUnicast, + LinkUnicastTrait, NewLinkChannelSender, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -339,13 +339,10 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastSerial { let path = path.clone(); let manager = self.manager.clone(); let listeners = self.listeners.clone(); - let endpoint = endpoint.clone(); async move { // Wait for the accept loop to terminate - let res = - accept_read_task(endpoint, link, token, manager, path.clone(), is_connected) - .await; + let res = accept_read_task(link, token, manager, path.clone(), is_connected).await; zasyncwrite!(listeners).remove(&path); res } @@ -394,7 +391,6 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastSerial { } async fn accept_read_task( - endpoint: EndPoint, link: Arc, token: CancellationToken, manager: NewLinkChannelSender, @@ -428,7 +424,7 @@ async fn accept_read_task( match res { Ok(link) => { // Communicate the new link to the initial transport manager - if let Err(e) = manager.send_async(NewLinkUnicast{ link: LinkUnicast(link.clone()), endpoint: endpoint.clone() }).await { + if let Err(e) = manager.send_async(LinkUnicast(link.clone())).await { tracing::error!("{}-{}: {}", file!(), line!(), e) } diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index f5c1e51e1d..1ab70c6f69 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -21,7 +21,7 @@ use tokio::{ use tokio_util::sync::CancellationToken; use zenoh_link_commons::{ get_ip_interface_names, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, - ListenersUnicastIP, NewLinkChannelSender, NewLinkUnicast, BIND_INTERFACE, + ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -358,9 +358,8 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { let task = { let token = token.clone(); let manager = self.manager.clone(); - let endpoint = endpoint.clone(); - async move { accept_task(endpoint, socket, token, manager).await } + async move { accept_task(socket, token, manager).await } }; let locator = endpoint.to_locator(); @@ -425,7 +424,6 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { } async fn accept_task( - endpoint: EndPoint, socket: TcpListener, token: CancellationToken, manager: NewLinkChannelSender, @@ -462,7 +460,7 @@ async fn accept_task( let link = Arc::new(LinkUnicastTcp::new(stream, src_addr, dst_addr)); // Communicate the new link to the initial transport manager - if let Err(e) = manager.send_async(NewLinkUnicast{ link: LinkUnicast(link), endpoint: endpoint.clone() }).await { + if let Err(e) = manager.send_async(LinkUnicast(link)).await { tracing::error!("{}-{}: {}", file!(), line!(), e) } }, diff --git a/io/zenoh-links/zenoh-link-tls/src/unicast.rs b/io/zenoh-links/zenoh-link-tls/src/unicast.rs index d75835e653..710b3897d6 100644 --- a/io/zenoh-links/zenoh-link-tls/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tls/src/unicast.rs @@ -25,7 +25,7 @@ use x509_parser::prelude::*; use zenoh_core::zasynclock; use zenoh_link_commons::{ get_ip_interface_names, LinkAuthId, LinkAuthType, LinkManagerUnicastTrait, LinkUnicast, - LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, NewLinkUnicast, + LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -368,9 +368,8 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTls { let acceptor = TlsAcceptor::from(Arc::new(tls_server_config.server_config)); let token = token.clone(); let manager = self.manager.clone(); - let endpoint = endpoint.clone(); - async move { accept_task(endpoint, socket, acceptor, token, manager).await } + async move { accept_task(socket, acceptor, token, manager).await } }; // Update the endpoint locator address @@ -403,7 +402,6 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTls { } async fn accept_task( - endpoint: EndPoint, socket: TcpListener, acceptor: TlsAcceptor, token: CancellationToken, @@ -461,7 +459,7 @@ async fn accept_task( )); // Communicate the new link to the initial transport manager - if let Err(e) = manager.send_async(NewLinkUnicast {link: LinkUnicast(link), endpoint: endpoint.clone()}).await { + if let Err(e) = manager.send_async(LinkUnicast(link)).await { tracing::error!("{}-{}: {}", file!(), line!(), e) } } diff --git a/io/zenoh-links/zenoh-link-udp/src/unicast.rs b/io/zenoh-links/zenoh-link-udp/src/unicast.rs index 0c3c6a5039..100f7dab70 100644 --- a/io/zenoh-links/zenoh-link-udp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/unicast.rs @@ -25,8 +25,7 @@ use tokio_util::sync::CancellationToken; use zenoh_core::{zasynclock, zlock}; use zenoh_link_commons::{ get_ip_interface_names, ConstructibleLinkManagerUnicast, LinkAuthId, LinkManagerUnicastTrait, - LinkUnicast, LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, NewLinkUnicast, - BIND_INTERFACE, + LinkUnicast, LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -407,9 +406,8 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUdp { let task = { let token = token.clone(); let manager = self.manager.clone(); - let endpoint = endpoint.clone(); - async move { accept_read_task(endpoint, socket, token, manager).await } + async move { accept_read_task(socket, token, manager).await } }; let locator = endpoint.to_locator(); @@ -476,7 +474,6 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUdp { } async fn accept_read_task( - endpoint: EndPoint, socket: UdpSocket, token: CancellationToken, manager: NewLinkChannelSender, @@ -550,7 +547,7 @@ async fn accept_read_task( LinkUnicastUdpVariant::Unconnected(unconnected), )); // Add the new link to the set of connected peers - if let Err(e) = manager.send_async(NewLinkUnicast { link: LinkUnicast(link), endpoint: endpoint.clone() }).await { + if let Err(e) = manager.send_async(LinkUnicast(link)).await { tracing::error!("{}-{}: {}", file!(), line!(), e) } } diff --git a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs index b93bb32398..ce690ce60d 100644 --- a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs @@ -37,7 +37,7 @@ use unix_named_pipe::{create, open_write}; use zenoh_core::{zasyncread, zasyncwrite, ResolveFuture, Wait}; use zenoh_link_commons::{ ConstructibleLinkManagerUnicast, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, - LinkUnicastTrait, NewLinkChannelSender, NewLinkUnicast, + LinkUnicastTrait, NewLinkChannelSender, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -281,12 +281,7 @@ async fn handle_incoming_connections( }); // send newly established link to manager - manager - .send_async(NewLinkUnicast { - link: LinkUnicast(link), - endpoint: endpoint.clone(), - }) - .await?; + manager.send_async(LinkUnicast(link)).await?; ZResult::Ok(()) } diff --git a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs index 5864847e8e..bf65e876c6 100644 --- a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs @@ -28,7 +28,6 @@ use uuid::Uuid; use zenoh_core::{zasyncread, zasyncwrite}; use zenoh_link_commons::{ LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, - NewLinkUnicast, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -397,11 +396,10 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUnixSocketStream { let manager = self.manager.clone(); let listeners = self.listeners.clone(); let path = local_path_str.to_owned(); - let endpoint = endpoint.clone(); async move { // Wait for the accept loop to terminate - let res = accept_task(endpoint, socket, c_token, manager).await; + let res = accept_task(socket, c_token, manager).await; zasyncwrite!(listeners).remove(&path); res } @@ -463,7 +461,6 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUnixSocketStream { } async fn accept_task( - endpoint: EndPoint, socket: UnixListener, token: CancellationToken, manager: NewLinkChannelSender, @@ -520,7 +517,7 @@ async fn accept_task( )); // Communicate the new link to the initial transport manager - if let Err(e) = manager.send_async(NewLinkUnicast { link: LinkUnicast(link), endpoint: endpoint.clone() }).await { + if let Err(e) = manager.send_async(LinkUnicast(link)).await { tracing::error!("{}-{}: {}", file!(), line!(), e) } diff --git a/io/zenoh-links/zenoh-link-vsock/src/unicast.rs b/io/zenoh-links/zenoh-link-vsock/src/unicast.rs index cbdde4b043..b95913aa0a 100644 --- a/io/zenoh-links/zenoh-link-vsock/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-vsock/src/unicast.rs @@ -29,7 +29,6 @@ use tokio_vsock::{ use zenoh_core::{zasyncread, zasyncwrite}; use zenoh_link_commons::{ LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, - NewLinkUnicast, }; use zenoh_protocol::{ core::{endpoint::Address, EndPoint, Locator}, @@ -282,11 +281,10 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastVsock { let token = token.clone(); let manager = self.manager.clone(); let listeners = self.listeners.clone(); - let endpoint = endpoint.clone(); async move { // Wait for the accept loop to terminate - let res = accept_task(endpoint.clone(), listener, token, manager).await; + let res = accept_task(listener, token, manager).await; zasyncwrite!(listeners).remove(&addr); res } @@ -333,7 +331,6 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastVsock { } async fn accept_task( - endpoint: EndPoint, mut socket: VsockListener, token: CancellationToken, manager: NewLinkChannelSender, @@ -361,7 +358,7 @@ async fn accept_task( let link = Arc::new(LinkUnicastVsock::new(stream, src_addr, dst_addr)); // Communicate the new link to the initial transport manager - if let Err(e) = manager.send_async(NewLinkUnicast { link: LinkUnicast(link), endpoint: endpoint.clone() }).await { + if let Err(e) = manager.send_async(LinkUnicast(link)).await { tracing::error!("{}-{}: {}", file!(), line!(), e) } }, diff --git a/io/zenoh-links/zenoh-link-ws/src/unicast.rs b/io/zenoh-links/zenoh-link-ws/src/unicast.rs index 82dc9993b3..e16b38cc68 100644 --- a/io/zenoh-links/zenoh-link-ws/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-ws/src/unicast.rs @@ -35,7 +35,6 @@ use tokio_util::sync::CancellationToken; use zenoh_core::{zasynclock, zasyncread, zasyncwrite}; use zenoh_link_commons::{ LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, - NewLinkUnicast, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -375,11 +374,10 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastWs { let manager = self.manager.clone(); let listeners = self.listeners.clone(); let addr = local_addr; - let endpoint = endpoint.clone(); async move { // Wait for the accept loop to terminate - let res = accept_task(endpoint, socket, token, manager).await; + let res = accept_task(socket, token, manager).await; zasyncwrite!(listeners).remove(&addr); res } @@ -472,7 +470,6 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastWs { } async fn accept_task( - endpoint: EndPoint, socket: TcpListener, token: CancellationToken, manager: NewLinkChannelSender, @@ -541,13 +538,7 @@ async fn accept_task( let link = Arc::new(LinkUnicastWs::new(stream, src_addr, dst_addr)); // Communicate the new link to the initial transport manager - if let Err(e) = manager - .send_async(NewLinkUnicast { - link: LinkUnicast(link), - endpoint: endpoint.clone(), - }) - .await - { + if let Err(e) = manager.send_async(LinkUnicast(link)).await { tracing::error!("{}-{}: {}", file!(), line!(), e) } } diff --git a/io/zenoh-transport/src/unicast/establishment/accept.rs b/io/zenoh-transport/src/unicast/establishment/accept.rs index efa7acb5ab..0b8c00290f 100644 --- a/io/zenoh-transport/src/unicast/establishment/accept.rs +++ b/io/zenoh-transport/src/unicast/establishment/accept.rs @@ -20,7 +20,7 @@ use zenoh_buffers::{reader::HasReader, writer::HasWriter, ZSlice}; use zenoh_codec::{RCodec, WCodec, Zenoh080}; use zenoh_core::{zasynclock, zcondfeat, zerror}; use zenoh_crypto::{BlockCipher, PseudoRng}; -use zenoh_link::{EndPoint, LinkUnicast}; +use zenoh_link::LinkUnicast; use zenoh_protocol::{ core::{Field, Reliability, Resolution, WhatAmI, ZenohIdProto}, transport::{ @@ -636,11 +636,8 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { } } -pub(crate) async fn accept_link( - endpoint: EndPoint, - link: LinkUnicast, - manager: &TransportManager, -) -> ZResult<()> { +pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) -> ZResult<()> { + let endpoint = link.get_src().to_endpoint(); let direction = TransportLinkUnicastDirection::Inbound; let mtu = link.get_mtu(); let is_streamed = link.is_streamed(); diff --git a/io/zenoh-transport/src/unicast/manager.rs b/io/zenoh-transport/src/unicast/manager.rs index d0fbecad5f..b9c79b681d 100644 --- a/io/zenoh-transport/src/unicast/manager.rs +++ b/io/zenoh-transport/src/unicast/manager.rs @@ -736,8 +736,7 @@ impl TransportManager { Ok(()) } - pub(crate) async fn handle_new_link_unicast(&self, new_link: NewLinkUnicast) { - let NewLinkUnicast { link, endpoint } = new_link; + pub(crate) async fn handle_new_link_unicast(&self, link: LinkUnicast) { let incoming_counter = self.state.unicast.incoming.clone(); if incoming_counter.load(SeqCst) >= self.config.unicast.accept_pending { // We reached the limit of concurrent incoming transport, this means two things: @@ -760,7 +759,7 @@ impl TransportManager { .spawn_with_rt(zenoh_runtime::ZRuntime::Acceptor, async move { if tokio::time::timeout( c_manager.config.unicast.accept_timeout, - super::establishment::accept::accept_link(endpoint, link, &c_manager), + super::establishment::accept::accept_link(link, &c_manager), ) .await .is_err()