Skip to content

Commit

Permalink
Add draft UnicastListener
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Feb 20, 2024
1 parent 7ebdb3c commit ad43d4f
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 2 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions io/zenoh-link-commons/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@ compression = []

[dependencies]
async-std = { workspace = true }
zenoh-sync = { workspace = true }
async-trait = { workspace = true }
flume = { workspace = true }
lz4_flex = { workspace = true }
serde = { workspace = true, features = ["default"] }
typenum = { workspace = true }
zenoh-buffers = { workspace = true }
zenoh-codec = { workspace = true }
zenoh-core = { workspace = true }
zenoh-protocol = { workspace = true }
zenoh-result = { workspace = true }
zenoh-util = { workspace = true }
3 changes: 2 additions & 1 deletion io/zenoh-link-commons/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@
//! This crate is intended for Zenoh's internal use.
//!
//! [Click here for Zenoh's documentation](../zenoh/index.html)
#![no_std]
extern crate alloc;

mod listener;
mod multicast;
mod unicast;

use alloc::{borrow::ToOwned, boxed::Box, string::String, vec, vec::Vec};
use async_trait::async_trait;
use core::{cmp::PartialEq, fmt, hash::Hash};
pub use listener::*;
pub use multicast::*;
use serde::Serialize;
pub use unicast::*;
Expand Down
140 changes: 140 additions & 0 deletions io/zenoh-link-commons/src/listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
//
// Copyright (c) 202 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use async_std::net::{SocketAddr, TcpListener, TcpStream};
use async_std::prelude::*;
use async_std::task;
use async_std::task::JoinHandle;
use async_trait::async_trait;
use std::collections::HashMap;
use std::convert::TryInto;
use std::fmt;
use std::net::{IpAddr, Shutdown};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use zenoh_core::{zread, zwrite};
use zenoh_protocol::core::{EndPoint, Locator};
use zenoh_result::{bail, zerror, Error as ZError, ZResult};
use zenoh_sync::Signal;

pub struct UnicastListener {
endpoint: EndPoint,
active: Arc<AtomicBool>,
signal: Signal,
handle: JoinHandle<ZResult<()>>,
}

impl UnicastListener {
fn new(
endpoint: EndPoint,
active: Arc<AtomicBool>,
signal: Signal,
handle: JoinHandle<ZResult<()>>,
) -> UnicastListener {
UnicastListener {
endpoint,
active,
signal,
handle,
}
}
}

pub struct UnicastListeners {
listeners: Arc<RwLock<HashMap<SocketAddr, UnicastListener>>>,
}

impl UnicastListeners {
fn new() -> UnicastListeners {
UnicastListeners {
listeners: Arc::new(RwLock::new(HashMap::new())),
}
}

async fn add_listener(
&self,
endpoint: EndPoint,
addr: SocketAddr,
active: Arc<AtomicBool>,
signal: Signal,
handle: JoinHandle<ZResult<()>>,
) -> ZResult<()> {
let mut listeners = zwrite!(self.listeners);
let c_listeners = self.listeners.clone();
let c_addr = addr;
let wraphandle = task::spawn(async move {
// Wait for the accept loop to terminate
let res = handle.await;
zwrite!(c_listeners).remove(&c_addr);
res
});

let listener = UnicastListener::new(endpoint, active, signal, wraphandle);
// Update the list of active listeners on the manager
listeners.insert(addr, listener);
Ok(())
}

async fn del_listener(&self, addr: SocketAddr) -> ZResult<()> {
// Stop the listener
let listener = zwrite!(self.listeners).remove(&addr).ok_or_else(|| {
zerror!(
"Can not delete the listener because it has not been found: {}",
addr
)
})?;

// Send the stop signal
listener.active.store(false, Ordering::Release);
listener.signal.trigger();
listener.handle.await
}

fn get_endpoints(&self) -> Vec<EndPoint> {
zread!(self.listeners)
.values()
.map(|l| l.endpoint.clone())
.collect()
}

fn get_locators(&self) -> Vec<Locator> {
let mut locators = vec![];

let guard = zread!(self.listeners);
for (key, value) in guard.iter() {
let (kip, kpt) = (key.ip(), key.port());

// Either ipv4/0.0.0.0 or ipv6/[::]
if kip.is_unspecified() {
let mut addrs = match kip {
IpAddr::V4(_) => zenoh_util::net::get_ipv4_ipaddrs(),
IpAddr::V6(_) => zenoh_util::net::get_ipv6_ipaddrs(),
};
let iter = addrs.drain(..).map(|x| {
Locator::new(
value.endpoint.protocol(),
SocketAddr::new(x, kpt).to_string(),
value.endpoint.metadata(),
)
.unwrap()
});
locators.extend(iter);
} else {
locators.push(value.endpoint.to_locator());
}
}

locators
}
}
3 changes: 2 additions & 1 deletion io/zenoh-links/zenoh-link-tcp/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use std::sync::{Arc, RwLock};
use std::time::Duration;
use zenoh_core::{zread, zwrite};
use zenoh_link_commons::{
LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender,
LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, UnicastListener,
UnicastListeners,
};
use zenoh_protocol::core::{EndPoint, Locator};
use zenoh_result::{bail, zerror, Error as ZError, ZResult};
Expand Down

0 comments on commit ad43d4f

Please sign in to comment.