-
Notifications
You must be signed in to change notification settings - Fork 177
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
82 additions
and
73 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,25 +11,25 @@ | |
// Contributors: | ||
// ZettaScale Zenoh Team, <[email protected]> | ||
// | ||
|
||
use async_trait::async_trait; | ||
use std::cell::UnsafeCell; | ||
use std::collections::HashMap; | ||
use std::convert::TryInto; | ||
use std::fmt; | ||
use std::sync::Arc; | ||
use std::time::Duration; | ||
use tokio::io::{AsyncReadExt, AsyncWriteExt}; | ||
use tokio::sync::RwLock as AsyncRwLock; | ||
use tokio::task::JoinHandle; | ||
use tokio_util::sync::CancellationToken; | ||
use zenoh_core::{zasyncread, zasyncwrite}; | ||
use zenoh_link_commons::{ | ||
get_ip_interface_names, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, | ||
ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE, | ||
LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, | ||
}; | ||
use zenoh_protocol::core::{EndPoint, Locator}; | ||
use zenoh_result::{bail, zerror, Error as ZError, ZResult}; | ||
use zenoh_result::{bail, zerror, ZResult}; | ||
|
||
use super::{get_vsock_addr, TCP_ACCEPT_THROTTLE_TIME, VSOCK_DEFAULT_MTU, VSOCK_LOCATOR_PREFIX}; | ||
use super::{get_vsock_addr, VSOCK_ACCEPT_THROTTLE_TIME, VSOCK_DEFAULT_MTU, VSOCK_LOCATOR_PREFIX}; | ||
use tokio_vsock::{VsockAddr, VsockListener, VsockStream}; | ||
|
||
pub struct LinkUnicastVsock { | ||
|
@@ -66,7 +66,6 @@ impl LinkUnicastVsock { | |
impl LinkUnicastTrait for LinkUnicastVsock { | ||
async fn close(&self) -> ZResult<()> { | ||
log::trace!("Closing vsock link: {}", self); | ||
// Close the underlying vsock socket | ||
self.get_mut_socket().shutdown().await.map_err(|e| { | ||
let e = zerror!("vsock link shutdown {}: {:?}", self, e); | ||
log::trace!("{}", e); | ||
|
@@ -128,8 +127,7 @@ impl LinkUnicastTrait for LinkUnicastVsock { | |
|
||
#[inline(always)] | ||
fn get_interface_names(&self) -> Vec<String> { | ||
// TODO(sashacmc): get_ip_interface_names(&self.src_addr) | ||
vec![] | ||
vec!["vsock".to_string()] | ||
} | ||
|
||
#[inline(always)] | ||
|
@@ -152,7 +150,7 @@ impl fmt::Display for LinkUnicastVsock { | |
|
||
impl fmt::Debug for LinkUnicastVsock { | ||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
f.debug_struct("Tcp") | ||
f.debug_struct("Vsock") | ||
.field("src", &self.src_addr) | ||
.field("dst", &self.dst_addr) | ||
.finish() | ||
|
@@ -181,14 +179,14 @@ impl ListenerUnicastVsock { | |
|
||
pub struct LinkManagerUnicastVsock { | ||
manager: NewLinkChannelSender, | ||
listeners: tokio::sync::RwLock<HashMap<VsockAddr, ListenerUnicastVsock>>, | ||
listeners: Arc<AsyncRwLock<HashMap<VsockAddr, ListenerUnicastVsock>>>, | ||
} | ||
|
||
impl LinkManagerUnicastVsock { | ||
pub fn new(manager: NewLinkChannelSender) -> Self { | ||
Self { | ||
manager, | ||
listeners: tokio::sync::RwLock::new(HashMap::new()), | ||
listeners: Arc::new(AsyncRwLock::new(HashMap::new())), | ||
} | ||
} | ||
} | ||
|
@@ -198,11 +196,9 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastVsock { | |
async fn new_link(&self, endpoint: EndPoint) -> ZResult<LinkUnicast> { | ||
let addr = get_vsock_addr(endpoint.address())?; | ||
if let Ok(stream) = VsockStream::connect(addr).await { | ||
let link = Arc::new(LinkUnicastVsock::new( | ||
stream, | ||
stream.local_addr()?, | ||
stream.peer_addr()?, | ||
)); | ||
let local_addr = stream.local_addr()?; | ||
let peer_addr = stream.peer_addr()?; | ||
let link = Arc::new(LinkUnicastVsock::new(stream, local_addr, peer_addr)); | ||
return Ok(LinkUnicast(link)); | ||
} | ||
|
||
|
@@ -224,80 +220,83 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastVsock { | |
let c_token = token.clone(); | ||
|
||
let c_manager = self.manager.clone(); | ||
let task = async move { accept_task(listener, c_token, c_manager).await }; | ||
|
||
let locator = endpoint.to_locator(); | ||
zasyncwrite!(self.listeners) | ||
.add_listener(endpoint, local_addr, task, token) | ||
.await?; | ||
|
||
let mut listeners = zasyncwrite!(self.listeners); | ||
let c_listeners = self.listeners.clone(); | ||
let c_addr = addr; | ||
let task = async move { | ||
// Wait for the accept loop to terminate | ||
let res = accept_task(listener, c_token, c_manager).await; | ||
zasyncwrite!(c_listeners).remove(&c_addr); | ||
res | ||
}; | ||
let handle = zenoh_runtime::ZRuntime::Acceptor.spawn(task); | ||
|
||
let listener = ListenerUnicastVsock::new(endpoint, token, handle); | ||
// Update the list of active listeners on the manager | ||
listeners.insert(addr, listener); | ||
return Ok(locator); | ||
} | ||
|
||
bail!("Can not create a new vsock listener bound to {}", endpoint) | ||
} | ||
|
||
async fn del_listener(&self, endpoint: &EndPoint) -> ZResult<()> { | ||
let addrs = get_vsock_addrs(endpoint.address()).await?; | ||
|
||
// Stop the listener | ||
let mut errs: Vec<ZError> = vec![]; | ||
let mut failed = true; | ||
for a in addrs { | ||
match self.listeners.del_listener(a).await { | ||
Ok(_) => { | ||
failed = false; | ||
break; | ||
} | ||
Err(err) => { | ||
errs.push(zerror!("{}", err).into()); | ||
} | ||
} | ||
} | ||
let addr = get_vsock_addr(endpoint.address())?; | ||
|
||
if failed { | ||
bail!( | ||
"Can not delete the TCP listener bound to {}: {:?}", | ||
endpoint, | ||
errs | ||
let listener = zasyncwrite!(self.listeners).remove(&addr).ok_or_else(|| { | ||
zerror!( | ||
"Can not delete the listener because it has not been found: {}", | ||
addr | ||
) | ||
} | ||
Ok(()) | ||
})?; | ||
|
||
// Send the stop signal | ||
listener.stop().await; | ||
listener.handle.await? | ||
} | ||
|
||
async fn get_listeners(&self) -> Vec<EndPoint> { | ||
self.listeners.get_endpoints() | ||
zasyncread!(self.listeners) | ||
.values() | ||
.map(|x| x.endpoint.clone()) | ||
.collect() | ||
} | ||
|
||
async fn get_locators(&self) -> Vec<Locator> { | ||
self.listeners.get_locators() | ||
zasyncread!(self.listeners) | ||
.values() | ||
.map(|x| x.endpoint.to_locator()) | ||
.collect() | ||
} | ||
} | ||
|
||
async fn accept_task( | ||
socket: VsockListener, | ||
mut socket: VsockListener, | ||
token: CancellationToken, | ||
manager: NewLinkChannelSender, | ||
) -> ZResult<()> { | ||
async fn accept(socket: &VsockListener) -> ZResult<(VsockStream, VsockAddr)> { | ||
async fn accept(socket: &mut VsockListener) -> ZResult<(VsockStream, VsockAddr)> { | ||
let res = socket.accept().await.map_err(|e| zerror!(e))?; | ||
Ok(res) | ||
} | ||
|
||
let src_addr = socket.local_addr().map_err(|e| { | ||
let e = zerror!("Can not accept TCP connections: {}", e); | ||
let e = zerror!("Can not accept vsock connections: {}", e); | ||
log::warn!("{}", e); | ||
e | ||
})?; | ||
|
||
log::trace!("Ready to accept TCP connections on: {:?}", src_addr); | ||
log::trace!("Ready to accept vsock connections on: {:?}", src_addr); | ||
loop { | ||
tokio::select! { | ||
_ = token.cancelled() => break, | ||
res = accept(&socket) => { | ||
res = accept(&mut socket) => { | ||
match res { | ||
Ok((stream, dst_addr)) => { | ||
log::debug!("Accepted TCP connection on {:?}: {:?}", src_addr, dst_addr); | ||
log::debug!("Accepted vsock connection on {:?}: {:?}", src_addr, dst_addr); | ||
// Create the new link object | ||
let link = Arc::new(LinkUnicastVsock::new(stream, src_addr, dst_addr)); | ||
|
||
|
@@ -314,7 +313,7 @@ async fn accept_task( | |
// Linux systems this limit can be changed by using the "ulimit" command line | ||
// tool. In case of systemd-based systems, this can be changed by using the | ||
// "sysctl" command line tool. | ||
tokio::time::sleep(Duration::from_micros(*TCP_ACCEPT_THROTTLE_TIME)).await; | ||
tokio::time::sleep(Duration::from_micros(*VSOCK_ACCEPT_THROTTLE_TIME)).await; | ||
} | ||
|
||
} | ||
|