From ad43d4f7ccc69f32462532897351033f21686950 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Tue, 20 Feb 2024 19:24:09 +0100 Subject: [PATCH] Add draft UnicastListener --- Cargo.lock | 3 + io/zenoh-link-commons/Cargo.toml | 3 + io/zenoh-link-commons/src/lib.rs | 3 +- io/zenoh-link-commons/src/listener.rs | 140 +++++++++++++++++++ io/zenoh-links/zenoh-link-tcp/src/unicast.rs | 3 +- 5 files changed, 150 insertions(+), 2 deletions(-) create mode 100644 io/zenoh-link-commons/src/listener.rs diff --git a/Cargo.lock b/Cargo.lock index 2dc8ae14b6..849ebaa27e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4784,8 +4784,11 @@ dependencies = [ "typenum", "zenoh-buffers", "zenoh-codec", + "zenoh-core", "zenoh-protocol", "zenoh-result", + "zenoh-sync", + "zenoh-util", ] [[package]] diff --git a/io/zenoh-link-commons/Cargo.toml b/io/zenoh-link-commons/Cargo.toml index 36e39eceed..6a6cd6dc87 100644 --- a/io/zenoh-link-commons/Cargo.toml +++ b/io/zenoh-link-commons/Cargo.toml @@ -29,6 +29,7 @@ compression = [] [dependencies] async-std = { workspace = true } +zenoh-sync = { workspace = true } async-trait = { workspace = true } flume = { workspace = true } lz4_flex = { workspace = true } @@ -36,5 +37,7 @@ serde = { workspace = true, features = ["default"] } typenum = { workspace = true } zenoh-buffers = { workspace = true } zenoh-codec = { workspace = true } +zenoh-core = { workspace = true } zenoh-protocol = { workspace = true } zenoh-result = { workspace = true } +zenoh-util = { workspace = true } diff --git a/io/zenoh-link-commons/src/lib.rs b/io/zenoh-link-commons/src/lib.rs index b15a0d9ad5..2ee28c3f08 100644 --- a/io/zenoh-link-commons/src/lib.rs +++ b/io/zenoh-link-commons/src/lib.rs @@ -17,15 +17,16 @@ //! This crate is intended for Zenoh's internal use. //! //! [Click here for Zenoh's documentation](../zenoh/index.html) -#![no_std] extern crate alloc; +mod listener; mod multicast; mod unicast; use alloc::{borrow::ToOwned, boxed::Box, string::String, vec, vec::Vec}; use async_trait::async_trait; use core::{cmp::PartialEq, fmt, hash::Hash}; +pub use listener::*; pub use multicast::*; use serde::Serialize; pub use unicast::*; diff --git a/io/zenoh-link-commons/src/listener.rs b/io/zenoh-link-commons/src/listener.rs new file mode 100644 index 0000000000..9d56823a88 --- /dev/null +++ b/io/zenoh-link-commons/src/listener.rs @@ -0,0 +1,140 @@ +// +// Copyright (c) 202 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use async_std::net::{SocketAddr, TcpListener, TcpStream}; +use async_std::prelude::*; +use async_std::task; +use async_std::task::JoinHandle; +use async_trait::async_trait; +use std::collections::HashMap; +use std::convert::TryInto; +use std::fmt; +use std::net::{IpAddr, Shutdown}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, RwLock}; +use std::time::Duration; +use zenoh_core::{zread, zwrite}; +use zenoh_protocol::core::{EndPoint, Locator}; +use zenoh_result::{bail, zerror, Error as ZError, ZResult}; +use zenoh_sync::Signal; + +pub struct UnicastListener { + endpoint: EndPoint, + active: Arc, + signal: Signal, + handle: JoinHandle>, +} + +impl UnicastListener { + fn new( + endpoint: EndPoint, + active: Arc, + signal: Signal, + handle: JoinHandle>, + ) -> UnicastListener { + UnicastListener { + endpoint, + active, + signal, + handle, + } + } +} + +pub struct UnicastListeners { + listeners: Arc>>, +} + +impl UnicastListeners { + fn new() -> UnicastListeners { + UnicastListeners { + listeners: Arc::new(RwLock::new(HashMap::new())), + } + } + + async fn add_listener( + &self, + endpoint: EndPoint, + addr: SocketAddr, + active: Arc, + signal: Signal, + handle: JoinHandle>, + ) -> ZResult<()> { + let mut listeners = zwrite!(self.listeners); + let c_listeners = self.listeners.clone(); + let c_addr = addr; + let wraphandle = task::spawn(async move { + // Wait for the accept loop to terminate + let res = handle.await; + zwrite!(c_listeners).remove(&c_addr); + res + }); + + let listener = UnicastListener::new(endpoint, active, signal, wraphandle); + // Update the list of active listeners on the manager + listeners.insert(addr, listener); + Ok(()) + } + + async fn del_listener(&self, addr: SocketAddr) -> ZResult<()> { + // Stop the listener + let listener = zwrite!(self.listeners).remove(&addr).ok_or_else(|| { + zerror!( + "Can not delete the listener because it has not been found: {}", + addr + ) + })?; + + // Send the stop signal + listener.active.store(false, Ordering::Release); + listener.signal.trigger(); + listener.handle.await + } + + fn get_endpoints(&self) -> Vec { + zread!(self.listeners) + .values() + .map(|l| l.endpoint.clone()) + .collect() + } + + fn get_locators(&self) -> Vec { + let mut locators = vec![]; + + let guard = zread!(self.listeners); + for (key, value) in guard.iter() { + let (kip, kpt) = (key.ip(), key.port()); + + // Either ipv4/0.0.0.0 or ipv6/[::] + if kip.is_unspecified() { + let mut addrs = match kip { + IpAddr::V4(_) => zenoh_util::net::get_ipv4_ipaddrs(), + IpAddr::V6(_) => zenoh_util::net::get_ipv6_ipaddrs(), + }; + let iter = addrs.drain(..).map(|x| { + Locator::new( + value.endpoint.protocol(), + SocketAddr::new(x, kpt).to_string(), + value.endpoint.metadata(), + ) + .unwrap() + }); + locators.extend(iter); + } else { + locators.push(value.endpoint.to_locator()); + } + } + + locators + } +} diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index 3876a947ca..5c91c9be90 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -25,7 +25,8 @@ use std::sync::{Arc, RwLock}; use std::time::Duration; use zenoh_core::{zread, zwrite}; use zenoh_link_commons::{ - LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, + LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, UnicastListener, + UnicastListeners, }; use zenoh_protocol::core::{EndPoint, Locator}; use zenoh_result::{bail, zerror, Error as ZError, ZResult};