Skip to content

Commit

Permalink
Use UnicastListener for TLS and QUIC unicasts
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Feb 21, 2024
1 parent 7c76c4c commit 0a85d90
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 185 deletions.
2 changes: 1 addition & 1 deletion io/zenoh-link-commons/src/listener.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (c) 202 ZettaScale Technology
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
Expand Down
110 changes: 17 additions & 93 deletions io/zenoh-links/zenoh-link-quic/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,18 @@ use async_std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use async_std::prelude::FutureExt;
use async_std::sync::Mutex as AsyncMutex;
use async_std::task;
use async_std::task::JoinHandle;
use async_trait::async_trait;
use rustls::{Certificate, PrivateKey};
use rustls_pemfile::Item;
use std::collections::HashMap;
use std::fmt;
use std::io::BufReader;
use std::net::IpAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use std::time::Duration;
use zenoh_core::{zasynclock, zread, zwrite};
use zenoh_core::zasynclock;
use zenoh_link_commons::{
LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender,
LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, UnicastListeners,
};
use zenoh_protocol::core::{EndPoint, Locator};
use zenoh_result::{bail, zerror, ZError, ZResult};
Expand Down Expand Up @@ -188,42 +186,16 @@ impl fmt::Debug for LinkUnicastQuic {
}
}

/*************************************/
/* LISTENER */
/*************************************/
struct ListenerUnicastQuic {
endpoint: EndPoint,
active: Arc<AtomicBool>,
signal: Signal,
handle: JoinHandle<ZResult<()>>,
}

impl ListenerUnicastQuic {
fn new(
endpoint: EndPoint,
active: Arc<AtomicBool>,
signal: Signal,
handle: JoinHandle<ZResult<()>>,
) -> ListenerUnicastQuic {
ListenerUnicastQuic {
endpoint,
active,
signal,
handle,
}
}
}

pub struct LinkManagerUnicastQuic {
manager: NewLinkChannelSender,
listeners: Arc<RwLock<HashMap<SocketAddr, ListenerUnicastQuic>>>,
listeners: UnicastListeners,
}

impl LinkManagerUnicastQuic {
pub fn new(manager: NewLinkChannelSender) -> Self {
Self {
manager,
listeners: Arc::new(RwLock::new(HashMap::new())),
listeners: UnicastListeners::new(),
}
}
}
Expand Down Expand Up @@ -429,88 +401,40 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic {
endpoint.config(),
)?;

// Spawn the accept loop for the listener
let active = Arc::new(AtomicBool::new(true));
let signal = Signal::new();
let mut listeners = zwrite!(self.listeners);

let c_active = active.clone();
let c_signal = signal.clone();
let c_manager = self.manager.clone();
let c_listeners = self.listeners.clone();
let c_addr = local_addr;
let handle = task::spawn(async move {
// Wait for the accept loop to terminate
let res = accept_task(quic_endpoint, c_active, c_signal, c_manager).await;
zwrite!(c_listeners).remove(&c_addr);
res
});

let handle =
task::spawn(
async move { accept_task(quic_endpoint, c_active, c_signal, c_manager).await },
);

// Initialize the QuicAcceptor
let locator = endpoint.to_locator();
let listener = ListenerUnicastQuic::new(endpoint, active, signal, handle);
// Update the list of active listeners on the manager
listeners.insert(local_addr, listener);

self.listeners
.add_listener(endpoint, local_addr, active, signal, handle)
.await?;

Ok(locator)
}

async fn del_listener(&self, endpoint: &EndPoint) -> ZResult<()> {
let epaddr = endpoint.address();

let addr = get_quic_addr(&epaddr).await?;

// Stop the listener
let listener = zwrite!(self.listeners).remove(&addr).ok_or_else(|| {
let e = zerror!(
"Can not delete the QUIC listener because it has not been found: {}",
addr
);
log::trace!("{}", e);
e
})?;

// Send the stop signal
listener.active.store(false, Ordering::Release);
listener.signal.trigger();
listener.handle.await
self.listeners.del_listener(addr).await
}

fn get_listeners(&self) -> Vec<EndPoint> {
zread!(self.listeners)
.values()
.map(|x| x.endpoint.clone())
.collect()
self.listeners.get_endpoints()
}

fn get_locators(&self) -> Vec<Locator> {
let mut locators = vec![];

let guard = zread!(self.listeners);
for (key, value) in guard.iter() {
let (kip, kpt) = (key.ip(), key.port());

// Either ipv4/0.0.0.0 or ipv6/[::]
if kip.is_unspecified() {
let mut addrs = match kip {
IpAddr::V4(_) => zenoh_util::net::get_ipv4_ipaddrs(),
IpAddr::V6(_) => zenoh_util::net::get_ipv6_ipaddrs(),
};
let iter = addrs.drain(..).map(|x| {
Locator::new(
value.endpoint.protocol(),
SocketAddr::new(x, kpt).to_string(),
value.endpoint.metadata(),
)
.unwrap()
});
locators.extend(iter);
} else {
locators.push(value.endpoint.to_locator());
}
}

locators
self.listeners.get_locators()
}
}

Expand Down
105 changes: 14 additions & 91 deletions io/zenoh-links/zenoh-link-tls/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,25 @@ use async_std::net::{SocketAddr, TcpListener, TcpStream};
use async_std::prelude::FutureExt;
use async_std::sync::Mutex as AsyncMutex;
use async_std::task;
use async_std::task::JoinHandle;
use async_trait::async_trait;
use futures::io::AsyncReadExt;
use futures::io::AsyncWriteExt;
use std::collections::HashMap;
use std::convert::TryInto;
use std::fmt;
use std::fs::File;
use std::io::{BufReader, Cursor};
use std::net::{IpAddr, Shutdown};
use std::net::Shutdown;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use std::time::Duration;
use std::{cell::UnsafeCell, io};
use webpki::{
anchor_from_trusted_cert,
types::{CertificateDer, TrustAnchor},
};
use zenoh_core::{zasynclock, zread, zwrite};
use zenoh_core::zasynclock;
use zenoh_link_commons::{
LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender,
LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, UnicastListeners,
};
use zenoh_protocol::core::endpoint::Config;
use zenoh_protocol::core::{EndPoint, Locator};
Expand Down Expand Up @@ -237,42 +235,16 @@ impl fmt::Debug for LinkUnicastTls {
}
}

/*************************************/
/* LISTENER */
/*************************************/
struct ListenerUnicastTls {
endpoint: EndPoint,
active: Arc<AtomicBool>,
signal: Signal,
handle: JoinHandle<ZResult<()>>,
}

impl ListenerUnicastTls {
fn new(
endpoint: EndPoint,
active: Arc<AtomicBool>,
signal: Signal,
handle: JoinHandle<ZResult<()>>,
) -> ListenerUnicastTls {
ListenerUnicastTls {
endpoint,
active,
signal,
handle,
}
}
}

pub struct LinkManagerUnicastTls {
manager: NewLinkChannelSender,
listeners: Arc<RwLock<HashMap<SocketAddr, ListenerUnicastTls>>>,
listeners: UnicastListeners,
}

impl LinkManagerUnicastTls {
pub fn new(manager: NewLinkChannelSender) -> Self {
Self {
manager,
listeners: Arc::new(RwLock::new(HashMap::new())),
listeners: UnicastListeners::new(),
}
}
}
Expand Down Expand Up @@ -363,17 +335,12 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTls {
let active = Arc::new(AtomicBool::new(true));
let signal = Signal::new();

// Spawn the accept loop for the listener
let c_active = active.clone();
let c_signal = signal.clone();
let c_manager = self.manager.clone();
let c_listeners = self.listeners.clone();
let c_addr = local_addr;

let handle = task::spawn(async move {
// Wait for the accept loop to terminate
let res = accept_task(socket, acceptor, c_active, c_signal, c_manager).await;
zwrite!(c_listeners).remove(&c_addr);
res
accept_task(socket, acceptor, c_active, c_signal, c_manager).await
});

// Update the endpoint locator address
Expand All @@ -383,69 +350,25 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTls {
endpoint.metadata(),
)?;

let listener = ListenerUnicastTls::new(endpoint, active, signal, handle);
// Update the list of active listeners on the manager
zwrite!(self.listeners).insert(local_addr, listener);
self.listeners
.add_listener(endpoint, local_addr, active, signal, handle)
.await?;

Ok(locator)
}

async fn del_listener(&self, endpoint: &EndPoint) -> ZResult<()> {
let epaddr = endpoint.address();

let addr = get_tls_addr(&epaddr).await?;

// Stop the listener
let listener = zwrite!(self.listeners).remove(&addr).ok_or_else(|| {
let e = zerror!(
"Can not delete the TLS listener because it has not been found: {}",
addr
);
log::trace!("{}", e);
e
})?;

// Send the stop signal
listener.active.store(false, Ordering::Release);
listener.signal.trigger();
listener.handle.await
self.listeners.del_listener(addr).await
}

fn get_listeners(&self) -> Vec<EndPoint> {
zread!(self.listeners)
.values()
.map(|x| x.endpoint.clone())
.collect()
self.listeners.get_endpoints()
}

fn get_locators(&self) -> Vec<Locator> {
let mut locators = vec![];

let guard = zread!(self.listeners);
for (key, value) in guard.iter() {
let (kip, kpt) = (key.ip(), key.port());

// Either ipv4/0.0.0.0 or ipv6/[::]
if kip.is_unspecified() {
let mut addrs = match kip {
IpAddr::V4(_) => zenoh_util::net::get_ipv4_ipaddrs(),
IpAddr::V6(_) => zenoh_util::net::get_ipv6_ipaddrs(),
};
let iter = addrs.drain(..).map(|x| {
Locator::new(
value.endpoint.protocol(),
SocketAddr::new(x, kpt).to_string(),
value.endpoint.metadata(),
)
.unwrap()
});
locators.extend(iter);
} else {
locators.push(value.endpoint.to_locator());
}
}

locators
self.listeners.get_locators()
}
}

Expand Down

0 comments on commit 0a85d90

Please sign in to comment.