Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Links refactoring #748

Merged
merged 6 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions io/zenoh-link-commons/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@ compression = []

[dependencies]
async-std = { workspace = true }
zenoh-sync = { workspace = true }
async-trait = { workspace = true }
flume = { workspace = true }
lz4_flex = { workspace = true }
log = { workspace = true }
serde = { workspace = true, features = ["default"] }
typenum = { workspace = true }
zenoh-buffers = { workspace = true }
zenoh-codec = { workspace = true }
zenoh-core = { workspace = true }
zenoh-protocol = { workspace = true }
zenoh-result = { workspace = true }
zenoh-util = { workspace = true }
3 changes: 2 additions & 1 deletion io/zenoh-link-commons/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@
//! This crate is intended for Zenoh's internal use.
//!
//! [Click here for Zenoh's documentation](../zenoh/index.html)
#![no_std]
extern crate alloc;

mod listener;
mod multicast;
mod unicast;

use alloc::{borrow::ToOwned, boxed::Box, string::String, vec, vec::Vec};
use async_trait::async_trait;
use core::{cmp::PartialEq, fmt, hash::Hash};
pub use listener::*;
pub use multicast::*;
use serde::Serialize;
pub use unicast::*;
Expand Down
141 changes: 141 additions & 0 deletions io/zenoh-link-commons/src/listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use async_std::net::SocketAddr;
use async_std::task;
use async_std::task::JoinHandle;
use std::collections::HashMap;
use std::net::IpAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use zenoh_core::{zread, zwrite};
use zenoh_protocol::core::{EndPoint, Locator};
use zenoh_result::{zerror, ZResult};
use zenoh_sync::Signal;

pub struct ListenerUnicastIP {
endpoint: EndPoint,
active: Arc<AtomicBool>,
signal: Signal,
handle: JoinHandle<ZResult<()>>,
}

impl ListenerUnicastIP {
fn new(
endpoint: EndPoint,
active: Arc<AtomicBool>,
signal: Signal,
handle: JoinHandle<ZResult<()>>,
) -> ListenerUnicastIP {
ListenerUnicastIP {
endpoint,
active,
signal,
handle,
}
}
}

pub struct ListenersUnicastIP {
listeners: Arc<RwLock<HashMap<SocketAddr, ListenerUnicastIP>>>,
}

impl ListenersUnicastIP {
pub fn new() -> ListenersUnicastIP {
ListenersUnicastIP {
listeners: Arc::new(RwLock::new(HashMap::new())),
}
}

pub async fn add_listener(
&self,
endpoint: EndPoint,
addr: SocketAddr,
active: Arc<AtomicBool>,
signal: Signal,
handle: JoinHandle<ZResult<()>>,
) -> ZResult<()> {
let mut listeners = zwrite!(self.listeners);
let c_listeners = self.listeners.clone();
let c_addr = addr;
let wraphandle = task::spawn(async move {
// Wait for the accept loop to terminate
let res = handle.await;
zwrite!(c_listeners).remove(&c_addr);
res
});

let listener = ListenerUnicastIP::new(endpoint, active, signal, wraphandle);
// Update the list of active listeners on the manager
listeners.insert(addr, listener);
Ok(())
}

pub async fn del_listener(&self, addr: SocketAddr) -> ZResult<()> {
// Stop the listener
let listener = zwrite!(self.listeners).remove(&addr).ok_or_else(|| {
zerror!(
"Can not delete the listener because it has not been found: {}",
addr
)
})?;

// Send the stop signal
listener.active.store(false, Ordering::Release);
listener.signal.trigger();
listener.handle.await
}

pub fn get_endpoints(&self) -> Vec<EndPoint> {
zread!(self.listeners)
.values()
.map(|l| l.endpoint.clone())
.collect()
}

pub fn get_locators(&self) -> Vec<Locator> {
let mut locators = vec![];

let guard = zread!(self.listeners);
for (key, value) in guard.iter() {
let (kip, kpt) = (key.ip(), key.port());

// Either ipv4/0.0.0.0 or ipv6/[::]
if kip.is_unspecified() {
let mut addrs = match kip {
IpAddr::V4(_) => zenoh_util::net::get_ipv4_ipaddrs(),
IpAddr::V6(_) => zenoh_util::net::get_ipv6_ipaddrs(),
};
let iter = addrs.drain(..).map(|x| {
Locator::new(
value.endpoint.protocol(),
SocketAddr::new(x, kpt).to_string(),
value.endpoint.metadata(),
)
.unwrap()
});
locators.extend(iter);
} else {
locators.push(value.endpoint.to_locator());
}
}

locators
}
}

impl Default for ListenersUnicastIP {
fn default() -> Self {
Self::new()
}
}
14 changes: 14 additions & 0 deletions io/zenoh-link-commons/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec};
use async_std::net::SocketAddr;
use async_trait::async_trait;
use core::{
fmt,
Expand Down Expand Up @@ -100,3 +101,16 @@ impl From<Arc<dyn LinkUnicastTrait>> for LinkUnicast {
LinkUnicast(link)
}
}

pub fn get_ip_interface_names(addr: &SocketAddr) -> Vec<String> {
match zenoh_util::net::get_interface_names_by_addr(addr.ip()) {
Ok(interfaces) => {
log::trace!("get_interface_names for {:?}: {:?}", addr.ip(), interfaces);
interfaces
}
Err(e) => {
log::debug!("get_interface_names for {:?} failed: {:?}", addr.ip(), e);
vec![]
}
}
}
Loading
Loading