Skip to content

Commit

Permalink
Use UnicastListener for UDP unicast, make linter happy
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Feb 21, 2024
1 parent a256ba3 commit 7c76c4c
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 97 deletions.
6 changes: 6 additions & 0 deletions io/zenoh-link-commons/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,9 @@ impl UnicastListeners {
locators
}
}

impl Default for UnicastListeners {
fn default() -> Self {
Self::new()
}
}
122 changes: 25 additions & 97 deletions io/zenoh-links/zenoh-link-udp/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,16 @@ use async_std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
use async_std::prelude::*;
use async_std::sync::Mutex as AsyncMutex;
use async_std::task;
use async_std::task::JoinHandle;
use async_trait::async_trait;
use std::collections::HashMap;
use std::fmt;
use std::net::IpAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock, Weak};
use std::sync::{Arc, Mutex, Weak};
use std::time::Duration;
use zenoh_core::{zasynclock, zlock, zread, zwrite};
use zenoh_core::{zasynclock, zlock};
use zenoh_link_commons::{
ConstructibleLinkManagerUnicast, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait,
NewLinkChannelSender,
NewLinkChannelSender, UnicastListeners,
};
use zenoh_protocol::core::{EndPoint, Locator};
use zenoh_result::{bail, zerror, Error as ZError, ZResult};
Expand Down Expand Up @@ -257,42 +255,16 @@ impl fmt::Debug for LinkUnicastUdp {
}
}

/*************************************/
/* LISTENER */
/*************************************/
struct ListenerUnicastUdp {
endpoint: EndPoint,
active: Arc<AtomicBool>,
signal: Signal,
handle: JoinHandle<ZResult<()>>,
}

impl ListenerUnicastUdp {
fn new(
endpoint: EndPoint,
active: Arc<AtomicBool>,
signal: Signal,
handle: JoinHandle<ZResult<()>>,
) -> ListenerUnicastUdp {
ListenerUnicastUdp {
endpoint,
active,
signal,
handle,
}
}
}

pub struct LinkManagerUnicastUdp {
manager: NewLinkChannelSender,
listeners: Arc<RwLock<HashMap<SocketAddr, ListenerUnicastUdp>>>,
listeners: UnicastListeners,
}

impl LinkManagerUnicastUdp {
pub fn new(manager: NewLinkChannelSender) -> Self {
Self {
manager,
listeners: Arc::new(RwLock::new(HashMap::new())),
listeners: UnicastListeners::new(),
}
}
}
Expand Down Expand Up @@ -420,27 +392,21 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUdp {
endpoint.config(),
)?;

// Spawn the accept loop for the listener
let active = Arc::new(AtomicBool::new(true));
let signal = Signal::new();
let mut listeners = zwrite!(self.listeners);

let c_active = active.clone();
let c_signal = signal.clone();
let c_manager = self.manager.clone();
let c_listeners = self.listeners.clone();
let c_addr = local_addr;

let handle = task::spawn(async move {
// Wait for the accept loop to terminate
let res = accept_read_task(socket, c_active, c_signal, c_manager).await;
zwrite!(c_listeners).remove(&c_addr);
res
accept_read_task(socket, c_active, c_signal, c_manager).await
});

let locator = endpoint.to_locator();
let listener = ListenerUnicastUdp::new(endpoint, active, signal, handle);
// Update the list of active listeners on the manager
listeners.insert(local_addr, listener);
self.listeners
.add_listener(endpoint, local_addr, active, signal, handle)
.await?;

return Ok(locator);
}
Expand Down Expand Up @@ -468,73 +434,35 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUdp {

// Stop the listener
let mut errs: Vec<ZError> = vec![];
let mut listener = None;
let mut failed = true;
for a in addrs {
match zwrite!(self.listeners).remove(&a) {
Some(l) => {
// We cannot keep a sync guard across a .await
// Break the loop and assign the listener.
listener = Some(l);
match self.listeners.del_listener(a).await {
Ok(_) => {
failed = false;
break;
}
None => {
errs.push(zerror!("{}", a).into());
Err(err) => {
errs.push(zerror!("{}", err).into());
}
}
}

match listener {
Some(l) => {
// Send the stop signal
l.active.store(false, Ordering::Release);
l.signal.trigger();
l.handle.await
}
None => {
bail!(
"Can not delete the UDP listener bound to {}: {:?}",
endpoint,
errs
)
}
if failed {
bail!(
"Can not delete the TCP listener bound to {}: {:?}",
endpoint,
errs
)
}
Ok(())
}

fn get_listeners(&self) -> Vec<EndPoint> {
zread!(self.listeners)
.values()
.map(|l| l.endpoint.clone())
.collect()
self.listeners.get_endpoints()
}

fn get_locators(&self) -> Vec<Locator> {
let mut locators = vec![];

let guard = zread!(self.listeners);
for (key, value) in guard.iter() {
let (kip, kpt) = (key.ip(), key.port());

// Either ipv4/0.0.0.0 or ipv6/[::]
if kip.is_unspecified() {
let mut addrs = match kip {
IpAddr::V4(_) => zenoh_util::net::get_ipv4_ipaddrs(),
IpAddr::V6(_) => zenoh_util::net::get_ipv6_ipaddrs(),
};
let iter = addrs.drain(..).map(|x| {
Locator::new(
value.endpoint.protocol(),
SocketAddr::new(x, kpt).to_string(),
value.endpoint.metadata(),
)
.unwrap()
});
locators.extend(iter);
} else {
locators.push(value.endpoint.to_locator());
}
}

locators
self.listeners.get_locators()
}
}

Expand Down

0 comments on commit 7c76c4c

Please sign in to comment.