Skip to content

Commit

Permalink
Add interfaces to the Link (#718)
Browse files Browse the repository at this point in the history
* Add interfaces to the Link

* Increase timeout for transport_tcp_intermittent tests
  • Loading branch information
sashacmc authored Feb 9, 2024
1 parent 2bd4518 commit 8433384
Show file tree
Hide file tree
Showing 12 changed files with 184 additions and 104 deletions.
2 changes: 2 additions & 0 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ slow-timeout = { period = "60s", terminate-after = 2 }
filter = """
test(=zenoh_session_unicast) |
test(=zenoh_session_multicast) |
test(=transport_tcp_intermittent) |
test(=transport_tcp_intermittent_for_lowlatency_transport) |
test(=three_node_combination)
"""
threads-required = 'num-cpus'
Expand Down
192 changes: 90 additions & 102 deletions commons/zenoh-util/src/std_only/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,39 @@ pub fn set_linger(socket: &TcpStream, dur: Option<Duration>) -> ZResult<()> {
}
}

#[cfg(windows)]
unsafe fn get_adapters_adresses(af_spec: i32) -> ZResult<Vec<u8>> {
use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH;

let mut ret;
let mut retries = 0;
let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE;
let mut buffer: Vec<u8>;
loop {
buffer = Vec::with_capacity(size as usize);
ret = winapi::um::iphlpapi::GetAdaptersAddresses(
af_spec.try_into().unwrap(),
0,
std::ptr::null_mut(),
buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH,
&mut size,
);
if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW {
break;
}
if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES {
break;
}
retries += 1;
}

if ret != 0 {
bail!("GetAdaptersAddresses returned {}", ret)
}

Ok(buffer)
}

pub fn get_interface(name: &str) -> ZResult<Option<IpAddr>> {
#[cfg(unix)]
{
Expand All @@ -117,31 +150,7 @@ pub fn get_interface(name: &str) -> ZResult<Option<IpAddr>> {
use crate::ffi;
use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH;

let mut ret;
let mut retries = 0;
let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE;
let mut buffer: Vec<u8>;
loop {
buffer = Vec::with_capacity(size as usize);
ret = winapi::um::iphlpapi::GetAdaptersAddresses(
winapi::shared::ws2def::AF_INET.try_into().unwrap(),
0,
std::ptr::null_mut(),
buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH,
&mut size,
);
if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW {
break;
}
if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES {
break;
}
retries += 1;
}

if ret != 0 {
bail!("GetAdaptersAddresses returned {}", ret)
}
let buffer = get_adapters_adresses(winapi::shared::ws2def::AF_INET)?;

let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref();
while let Some(iface) = next_iface {
Expand Down Expand Up @@ -218,33 +227,9 @@ pub fn get_local_addresses() -> ZResult<Vec<IpAddr>> {
use crate::ffi;
use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH;

let mut result = vec![];
let mut ret;
let mut retries = 0;
let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE;
let mut buffer: Vec<u8>;
loop {
buffer = Vec::with_capacity(size as usize);
ret = winapi::um::iphlpapi::GetAdaptersAddresses(
winapi::shared::ws2def::AF_UNSPEC.try_into().unwrap(),
0,
std::ptr::null_mut(),
buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH,
&mut size,
);
if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW {
break;
}
if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES {
break;
}
retries += 1;
}

if ret != 0 {
bail!("GetAdaptersAddresses returned {}", ret)
}
let buffer = get_adapters_adresses(winapi::shared::ws2def::AF_UNSPEC)?;

let mut result = vec![];
let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref();
while let Some(iface) = next_iface {
let mut next_ucast_addr = iface.FirstUnicastAddress.as_ref();
Expand Down Expand Up @@ -317,33 +302,9 @@ pub fn get_unicast_addresses_of_interface(name: &str) -> ZResult<Vec<IpAddr>> {
use crate::ffi;
use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH;

let mut addrs = vec![];
let mut ret;
let mut retries = 0;
let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE;
let mut buffer: Vec<u8>;
loop {
buffer = Vec::with_capacity(size as usize);
ret = winapi::um::iphlpapi::GetAdaptersAddresses(
winapi::shared::ws2def::AF_INET.try_into().unwrap(),
0,
std::ptr::null_mut(),
buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH,
&mut size,
);
if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW {
break;
}
if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES {
break;
}
retries += 1;
}

if ret != 0 {
bail!("GetAdaptersAddresses returned {}", ret);
}
let buffer = get_adapters_adresses(winapi::shared::ws2def::AF_INET)?;

let mut addrs = vec![];
let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref();
while let Some(iface) = next_iface {
if name == ffi::pstr_to_string(iface.AdapterName)
Expand Down Expand Up @@ -380,31 +341,7 @@ pub fn get_index_of_interface(addr: IpAddr) -> ZResult<u32> {
use crate::ffi;
use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH;

let mut ret;
let mut retries = 0;
let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE;
let mut buffer: Vec<u8>;
loop {
buffer = Vec::with_capacity(size as usize);
ret = winapi::um::iphlpapi::GetAdaptersAddresses(
winapi::shared::ws2def::AF_INET.try_into().unwrap(),
0,
std::ptr::null_mut(),
buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH,
&mut size,
);
if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW {
break;
}
if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES {
break;
}
retries += 1;
}

if ret != 0 {
bail!("GetAdaptersAddresses returned {}", ret)
}
let buffer = get_adapters_adresses(winapi::shared::ws2def::AF_INET)?;

let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref();
while let Some(iface) = next_iface {
Expand All @@ -424,6 +361,57 @@ pub fn get_index_of_interface(addr: IpAddr) -> ZResult<u32> {
}
}

pub fn get_interface_names_by_addr(addr: IpAddr) -> ZResult<Vec<String>> {
#[cfg(unix)]
{
if addr.is_unspecified() {
Ok(pnet_datalink::interfaces()
.iter()
.map(|iface| iface.name.clone())
.collect::<Vec<String>>())
} else {
Ok(pnet_datalink::interfaces()
.iter()
.filter(|iface| iface.ips.iter().any(|ipnet| ipnet.ip() == addr))
.map(|iface| iface.name.clone())
.collect::<Vec<String>>())
}
}
#[cfg(windows)]
{
let mut result = vec![];
unsafe {
use crate::ffi;
use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH;

let buffer = get_adapters_adresses(winapi::shared::ws2def::AF_UNSPEC)?;

if addr.is_unspecified() {
let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref();
while let Some(iface) = next_iface {
result.push(ffi::pstr_to_string(iface.AdapterName));
next_iface = iface.Next.as_ref();
}
} else {
let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref();
while let Some(iface) = next_iface {
let mut next_ucast_addr = iface.FirstUnicastAddress.as_ref();
while let Some(ucast_addr) = next_ucast_addr {
if let Ok(ifaddr) = ffi::win::sockaddr_to_addr(ucast_addr.Address) {
if ifaddr.ip() == addr {
result.push(ffi::pstr_to_string(iface.AdapterName));
}
}
next_ucast_addr = ucast_addr.Next.as_ref();
}
next_iface = iface.Next.as_ref();
}
}
}
Ok(result)
}
}

pub fn get_ipv4_ipaddrs() -> Vec<IpAddr> {
get_local_addresses()
.unwrap_or_else(|_| vec![])
Expand Down
5 changes: 4 additions & 1 deletion io/zenoh-link-commons/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ extern crate alloc;
mod multicast;
mod unicast;

use alloc::{borrow::ToOwned, boxed::Box, string::String};
use alloc::{borrow::ToOwned, boxed::Box, string::String, vec, vec::Vec};
use async_trait::async_trait;
use core::{cmp::PartialEq, fmt, hash::Hash};
pub use multicast::*;
Expand All @@ -43,6 +43,7 @@ pub struct Link {
pub mtu: u16,
pub is_reliable: bool,
pub is_streamed: bool,
pub interfaces: Vec<String>,
}

#[async_trait]
Expand Down Expand Up @@ -71,6 +72,7 @@ impl From<&LinkUnicast> for Link {
mtu: link.get_mtu(),
is_reliable: link.is_reliable(),
is_streamed: link.is_streamed(),
interfaces: link.get_interface_names(),
}
}
}
Expand All @@ -90,6 +92,7 @@ impl From<&LinkMulticast> for Link {
mtu: link.get_mtu(),
is_reliable: link.is_reliable(),
is_streamed: false,
interfaces: vec![],
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion io/zenoh-link-commons/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use alloc::{boxed::Box, sync::Arc, vec::Vec};
use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec};
use async_trait::async_trait;
use core::{
fmt,
Expand Down Expand Up @@ -45,6 +45,7 @@ pub trait LinkUnicastTrait: Send + Sync {
fn get_dst(&self) -> &Locator;
fn is_reliable(&self) -> bool;
fn is_streamed(&self) -> bool;
fn get_interface_names(&self) -> Vec<String>;
async fn write(&self, buffer: &[u8]) -> ZResult<usize>;
async fn write_all(&self, buffer: &[u8]) -> ZResult<()>;
async fn read(&self, buffer: &mut [u8]) -> ZResult<usize>;
Expand Down
7 changes: 7 additions & 0 deletions io/zenoh-links/zenoh-link-quic/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ impl LinkUnicastTrait for LinkUnicastQuic {
*QUIC_DEFAULT_MTU
}

#[inline(always)]
fn get_interface_names(&self) -> Vec<String> {
// @TODO: Not supported for now
log::debug!("The get_interface_names for LinkUnicastQuic is not supported");
vec![]
}

#[inline(always)]
fn is_reliable(&self) -> bool {
true
Expand Down
7 changes: 7 additions & 0 deletions io/zenoh-links/zenoh-link-serial/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ impl LinkUnicastTrait for LinkUnicastSerial {
*SERIAL_DEFAULT_MTU
}

#[inline(always)]
fn get_interface_names(&self) -> Vec<String> {
// @TODO: Not supported for now
log::debug!("The get_interface_names for LinkUnicastSerial is not supported");
vec![]
}

#[inline(always)]
fn is_reliable(&self) -> bool {
false
Expand Down
22 changes: 22 additions & 0 deletions io/zenoh-links/zenoh-link-tcp/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,28 @@ impl LinkUnicastTrait for LinkUnicastTcp {
*TCP_DEFAULT_MTU
}

#[inline(always)]
fn get_interface_names(&self) -> Vec<String> {
match zenoh_util::net::get_interface_names_by_addr(self.src_addr.ip()) {
Ok(interfaces) => {
log::trace!(
"get_interface_names for {:?}: {:?}",
self.src_addr.ip(),
interfaces
);
interfaces
}
Err(e) => {
log::debug!(
"get_interface_names for {:?} failed: {:?}",
self.src_addr.ip(),
e
);
vec![]
}
}
}

#[inline(always)]
fn is_reliable(&self) -> bool {
true
Expand Down
7 changes: 7 additions & 0 deletions io/zenoh-links/zenoh-link-tls/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,13 @@ impl LinkUnicastTrait for LinkUnicastTls {
*TLS_DEFAULT_MTU
}

#[inline(always)]
fn get_interface_names(&self) -> Vec<String> {
// @TODO: Not supported for now
log::debug!("The get_interface_names for LinkUnicastTls is not supported");
vec![]
}

#[inline(always)]
fn is_reliable(&self) -> bool {
true
Expand Down
22 changes: 22 additions & 0 deletions io/zenoh-links/zenoh-link-udp/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,28 @@ impl LinkUnicastTrait for LinkUnicastUdp {
*UDP_DEFAULT_MTU
}

#[inline(always)]
fn get_interface_names(&self) -> Vec<String> {
match zenoh_util::net::get_interface_names_by_addr(self.src_addr.ip()) {
Ok(interfaces) => {
log::trace!(
"get_interface_names for {:?}: {:?}",
self.src_addr.ip(),
interfaces
);
interfaces
}
Err(e) => {
log::debug!(
"get_interface_names for {:?} failed: {:?}",
self.src_addr.ip(),
e
);
vec![]
}
}
}

#[inline(always)]
fn is_reliable(&self) -> bool {
false
Expand Down
Loading

0 comments on commit 8433384

Please sign in to comment.