Skip to content

Commit

Permalink
links refactoring (eclipse-zenoh#748)
Browse files Browse the repository at this point in the history
* Add draft UnicastListener

* Use UnicastListener for TCP unicast, code clanup and fixes

* Use UnicastListener for UDP unicast, make linter happy

* Use UnicastListener for TLS and QUIC unicasts

* Rename UnicastListener to ListenerUnicastIP for consistency

* Move get_ip_interface_names to commons
  • Loading branch information
sashacmc authored and jerry73204 committed Mar 6, 2024
1 parent 73d7464 commit c0e5e37
Show file tree
Hide file tree
Showing 9 changed files with 255 additions and 423 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions io/zenoh-link-commons/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@ compression = []

[dependencies]
async-std = { workspace = true }
zenoh-sync = { workspace = true }
async-trait = { workspace = true }
flume = { workspace = true }
lz4_flex = { workspace = true }
log = { workspace = true }
serde = { workspace = true, features = ["default"] }
typenum = { workspace = true }
zenoh-buffers = { workspace = true }
zenoh-codec = { workspace = true }
zenoh-core = { workspace = true }
zenoh-protocol = { workspace = true }
zenoh-result = { workspace = true }
zenoh-util = { workspace = true }
3 changes: 2 additions & 1 deletion io/zenoh-link-commons/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@
//! This crate is intended for Zenoh's internal use.
//!
//! [Click here for Zenoh's documentation](../zenoh/index.html)
#![no_std]
extern crate alloc;

mod listener;
mod multicast;
mod unicast;

use alloc::{borrow::ToOwned, boxed::Box, string::String, vec, vec::Vec};
use async_trait::async_trait;
use core::{cmp::PartialEq, fmt, hash::Hash};
pub use listener::*;
pub use multicast::*;
use serde::Serialize;
pub use unicast::*;
Expand Down
141 changes: 141 additions & 0 deletions io/zenoh-link-commons/src/listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use async_std::net::SocketAddr;
use async_std::task;
use async_std::task::JoinHandle;
use std::collections::HashMap;
use std::net::IpAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use zenoh_core::{zread, zwrite};
use zenoh_protocol::core::{EndPoint, Locator};
use zenoh_result::{zerror, ZResult};
use zenoh_sync::Signal;

pub struct ListenerUnicastIP {
endpoint: EndPoint,
active: Arc<AtomicBool>,
signal: Signal,
handle: JoinHandle<ZResult<()>>,
}

impl ListenerUnicastIP {
fn new(
endpoint: EndPoint,
active: Arc<AtomicBool>,
signal: Signal,
handle: JoinHandle<ZResult<()>>,
) -> ListenerUnicastIP {
ListenerUnicastIP {
endpoint,
active,
signal,
handle,
}
}
}

pub struct ListenersUnicastIP {
listeners: Arc<RwLock<HashMap<SocketAddr, ListenerUnicastIP>>>,
}

impl ListenersUnicastIP {
pub fn new() -> ListenersUnicastIP {
ListenersUnicastIP {
listeners: Arc::new(RwLock::new(HashMap::new())),
}
}

pub async fn add_listener(
&self,
endpoint: EndPoint,
addr: SocketAddr,
active: Arc<AtomicBool>,
signal: Signal,
handle: JoinHandle<ZResult<()>>,
) -> ZResult<()> {
let mut listeners = zwrite!(self.listeners);
let c_listeners = self.listeners.clone();
let c_addr = addr;
let wraphandle = task::spawn(async move {
// Wait for the accept loop to terminate
let res = handle.await;
zwrite!(c_listeners).remove(&c_addr);
res
});

let listener = ListenerUnicastIP::new(endpoint, active, signal, wraphandle);
// Update the list of active listeners on the manager
listeners.insert(addr, listener);
Ok(())
}

pub async fn del_listener(&self, addr: SocketAddr) -> ZResult<()> {
// Stop the listener
let listener = zwrite!(self.listeners).remove(&addr).ok_or_else(|| {
zerror!(
"Can not delete the listener because it has not been found: {}",
addr
)
})?;

// Send the stop signal
listener.active.store(false, Ordering::Release);
listener.signal.trigger();
listener.handle.await
}

pub fn get_endpoints(&self) -> Vec<EndPoint> {
zread!(self.listeners)
.values()
.map(|l| l.endpoint.clone())
.collect()
}

pub fn get_locators(&self) -> Vec<Locator> {
let mut locators = vec![];

let guard = zread!(self.listeners);
for (key, value) in guard.iter() {
let (kip, kpt) = (key.ip(), key.port());

// Either ipv4/0.0.0.0 or ipv6/[::]
if kip.is_unspecified() {
let mut addrs = match kip {
IpAddr::V4(_) => zenoh_util::net::get_ipv4_ipaddrs(),
IpAddr::V6(_) => zenoh_util::net::get_ipv6_ipaddrs(),
};
let iter = addrs.drain(..).map(|x| {
Locator::new(
value.endpoint.protocol(),
SocketAddr::new(x, kpt).to_string(),
value.endpoint.metadata(),
)
.unwrap()
});
locators.extend(iter);
} else {
locators.push(value.endpoint.to_locator());
}
}

locators
}
}

impl Default for ListenersUnicastIP {
fn default() -> Self {
Self::new()
}
}
14 changes: 14 additions & 0 deletions io/zenoh-link-commons/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec};
use async_std::net::SocketAddr;
use async_trait::async_trait;
use core::{
fmt,
Expand Down Expand Up @@ -100,3 +101,16 @@ impl From<Arc<dyn LinkUnicastTrait>> for LinkUnicast {
LinkUnicast(link)
}
}

pub fn get_ip_interface_names(addr: &SocketAddr) -> Vec<String> {
match zenoh_util::net::get_interface_names_by_addr(addr.ip()) {
Ok(interfaces) => {
log::trace!("get_interface_names for {:?}: {:?}", addr.ip(), interfaces);
interfaces
}
Err(e) => {
log::debug!("get_interface_names for {:?} failed: {:?}", addr.ip(), e);
vec![]
}
}
}
Loading

0 comments on commit c0e5e37

Please sign in to comment.