diff --git a/.config/nextest.toml b/.config/nextest.toml index 8c57297dba..aa2c3ac37b 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -8,6 +8,8 @@ slow-timeout = { period = "60s", terminate-after = 2 } filter = """ test(=zenoh_session_unicast) | test(=zenoh_session_multicast) | +test(=transport_tcp_intermittent) | +test(=transport_tcp_intermittent_for_lowlatency_transport) | test(=three_node_combination) """ threads-required = 'num-cpus' diff --git a/commons/zenoh-util/src/std_only/net/mod.rs b/commons/zenoh-util/src/std_only/net/mod.rs index fd5a215952..67e732d3ee 100644 --- a/commons/zenoh-util/src/std_only/net/mod.rs +++ b/commons/zenoh-util/src/std_only/net/mod.rs @@ -91,6 +91,39 @@ pub fn set_linger(socket: &TcpStream, dur: Option) -> ZResult<()> { } } +#[cfg(windows)] +unsafe fn get_adapters_adresses(af_spec: i32) -> ZResult> { + use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; + + let mut ret; + let mut retries = 0; + let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE; + let mut buffer: Vec; + loop { + buffer = Vec::with_capacity(size as usize); + ret = winapi::um::iphlpapi::GetAdaptersAddresses( + af_spec.try_into().unwrap(), + 0, + std::ptr::null_mut(), + buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH, + &mut size, + ); + if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW { + break; + } + if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES { + break; + } + retries += 1; + } + + if ret != 0 { + bail!("GetAdaptersAddresses returned {}", ret) + } + + Ok(buffer) +} + pub fn get_interface(name: &str) -> ZResult> { #[cfg(unix)] { @@ -117,31 +150,7 @@ pub fn get_interface(name: &str) -> ZResult> { use crate::ffi; use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; - let mut ret; - let mut retries = 0; - let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE; - let mut buffer: Vec; - loop { - buffer = Vec::with_capacity(size as usize); - ret = winapi::um::iphlpapi::GetAdaptersAddresses( - winapi::shared::ws2def::AF_INET.try_into().unwrap(), - 0, - std::ptr::null_mut(), - buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH, - &mut size, - ); - if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW { - break; - } - if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES { - break; - } - retries += 1; - } - - if ret != 0 { - bail!("GetAdaptersAddresses returned {}", ret) - } + let buffer = get_adapters_adresses(winapi::shared::ws2def::AF_INET)?; let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); while let Some(iface) = next_iface { @@ -218,33 +227,9 @@ pub fn get_local_addresses() -> ZResult> { use crate::ffi; use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; - let mut result = vec![]; - let mut ret; - let mut retries = 0; - let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE; - let mut buffer: Vec; - loop { - buffer = Vec::with_capacity(size as usize); - ret = winapi::um::iphlpapi::GetAdaptersAddresses( - winapi::shared::ws2def::AF_UNSPEC.try_into().unwrap(), - 0, - std::ptr::null_mut(), - buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH, - &mut size, - ); - if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW { - break; - } - if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES { - break; - } - retries += 1; - } - - if ret != 0 { - bail!("GetAdaptersAddresses returned {}", ret) - } + let buffer = get_adapters_adresses(winapi::shared::ws2def::AF_UNSPEC)?; + let mut result = vec![]; let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); while let Some(iface) = next_iface { let mut next_ucast_addr = iface.FirstUnicastAddress.as_ref(); @@ -317,33 +302,9 @@ pub fn get_unicast_addresses_of_interface(name: &str) -> ZResult> { use crate::ffi; use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; - let mut addrs = vec![]; - let mut ret; - let mut retries = 0; - let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE; - let mut buffer: Vec; - loop { - buffer = Vec::with_capacity(size as usize); - ret = winapi::um::iphlpapi::GetAdaptersAddresses( - winapi::shared::ws2def::AF_INET.try_into().unwrap(), - 0, - std::ptr::null_mut(), - buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH, - &mut size, - ); - if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW { - break; - } - if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES { - break; - } - retries += 1; - } - - if ret != 0 { - bail!("GetAdaptersAddresses returned {}", ret); - } + let buffer = get_adapters_adresses(winapi::shared::ws2def::AF_INET)?; + let mut addrs = vec![]; let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); while let Some(iface) = next_iface { if name == ffi::pstr_to_string(iface.AdapterName) @@ -380,31 +341,7 @@ pub fn get_index_of_interface(addr: IpAddr) -> ZResult { use crate::ffi; use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; - let mut ret; - let mut retries = 0; - let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE; - let mut buffer: Vec; - loop { - buffer = Vec::with_capacity(size as usize); - ret = winapi::um::iphlpapi::GetAdaptersAddresses( - winapi::shared::ws2def::AF_INET.try_into().unwrap(), - 0, - std::ptr::null_mut(), - buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH, - &mut size, - ); - if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW { - break; - } - if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES { - break; - } - retries += 1; - } - - if ret != 0 { - bail!("GetAdaptersAddresses returned {}", ret) - } + let buffer = get_adapters_adresses(winapi::shared::ws2def::AF_INET)?; let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); while let Some(iface) = next_iface { @@ -424,6 +361,57 @@ pub fn get_index_of_interface(addr: IpAddr) -> ZResult { } } +pub fn get_interface_names_by_addr(addr: IpAddr) -> ZResult> { + #[cfg(unix)] + { + if addr.is_unspecified() { + Ok(pnet_datalink::interfaces() + .iter() + .map(|iface| iface.name.clone()) + .collect::>()) + } else { + Ok(pnet_datalink::interfaces() + .iter() + .filter(|iface| iface.ips.iter().any(|ipnet| ipnet.ip() == addr)) + .map(|iface| iface.name.clone()) + .collect::>()) + } + } + #[cfg(windows)] + { + let mut result = vec![]; + unsafe { + use crate::ffi; + use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; + + let buffer = get_adapters_adresses(winapi::shared::ws2def::AF_UNSPEC)?; + + if addr.is_unspecified() { + let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); + while let Some(iface) = next_iface { + result.push(ffi::pstr_to_string(iface.AdapterName)); + next_iface = iface.Next.as_ref(); + } + } else { + let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); + while let Some(iface) = next_iface { + let mut next_ucast_addr = iface.FirstUnicastAddress.as_ref(); + while let Some(ucast_addr) = next_ucast_addr { + if let Ok(ifaddr) = ffi::win::sockaddr_to_addr(ucast_addr.Address) { + if ifaddr.ip() == addr { + result.push(ffi::pstr_to_string(iface.AdapterName)); + } + } + next_ucast_addr = ucast_addr.Next.as_ref(); + } + next_iface = iface.Next.as_ref(); + } + } + } + Ok(result) + } +} + pub fn get_ipv4_ipaddrs() -> Vec { get_local_addresses() .unwrap_or_else(|_| vec![]) diff --git a/io/zenoh-link-commons/src/lib.rs b/io/zenoh-link-commons/src/lib.rs index 790f4792a4..b15a0d9ad5 100644 --- a/io/zenoh-link-commons/src/lib.rs +++ b/io/zenoh-link-commons/src/lib.rs @@ -23,7 +23,7 @@ extern crate alloc; mod multicast; mod unicast; -use alloc::{borrow::ToOwned, boxed::Box, string::String}; +use alloc::{borrow::ToOwned, boxed::Box, string::String, vec, vec::Vec}; use async_trait::async_trait; use core::{cmp::PartialEq, fmt, hash::Hash}; pub use multicast::*; @@ -43,6 +43,7 @@ pub struct Link { pub mtu: u16, pub is_reliable: bool, pub is_streamed: bool, + pub interfaces: Vec, } #[async_trait] @@ -71,6 +72,7 @@ impl From<&LinkUnicast> for Link { mtu: link.get_mtu(), is_reliable: link.is_reliable(), is_streamed: link.is_streamed(), + interfaces: link.get_interface_names(), } } } @@ -90,6 +92,7 @@ impl From<&LinkMulticast> for Link { mtu: link.get_mtu(), is_reliable: link.is_reliable(), is_streamed: false, + interfaces: vec![], } } } diff --git a/io/zenoh-link-commons/src/unicast.rs b/io/zenoh-link-commons/src/unicast.rs index d44686ff50..19e90c3b2c 100644 --- a/io/zenoh-link-commons/src/unicast.rs +++ b/io/zenoh-link-commons/src/unicast.rs @@ -11,7 +11,7 @@ // Contributors: // ZettaScale Zenoh Team, // -use alloc::{boxed::Box, sync::Arc, vec::Vec}; +use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec}; use async_trait::async_trait; use core::{ fmt, @@ -45,6 +45,7 @@ pub trait LinkUnicastTrait: Send + Sync { fn get_dst(&self) -> &Locator; fn is_reliable(&self) -> bool; fn is_streamed(&self) -> bool; + fn get_interface_names(&self) -> Vec; async fn write(&self, buffer: &[u8]) -> ZResult; async fn write_all(&self, buffer: &[u8]) -> ZResult<()>; async fn read(&self, buffer: &mut [u8]) -> ZResult; diff --git a/io/zenoh-links/zenoh-link-quic/src/unicast.rs b/io/zenoh-links/zenoh-link-quic/src/unicast.rs index 2b1c59ad23..dfc70dca28 100644 --- a/io/zenoh-links/zenoh-link-quic/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-quic/src/unicast.rs @@ -143,6 +143,13 @@ impl LinkUnicastTrait for LinkUnicastQuic { *QUIC_DEFAULT_MTU } + #[inline(always)] + fn get_interface_names(&self) -> Vec { + // @TODO: Not supported for now + log::debug!("The get_interface_names for LinkUnicastQuic is not supported"); + vec![] + } + #[inline(always)] fn is_reliable(&self) -> bool { true diff --git a/io/zenoh-links/zenoh-link-serial/src/unicast.rs b/io/zenoh-links/zenoh-link-serial/src/unicast.rs index 6ec8f8f279..51d734f91b 100644 --- a/io/zenoh-links/zenoh-link-serial/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-serial/src/unicast.rs @@ -181,6 +181,13 @@ impl LinkUnicastTrait for LinkUnicastSerial { *SERIAL_DEFAULT_MTU } + #[inline(always)] + fn get_interface_names(&self) -> Vec { + // @TODO: Not supported for now + log::debug!("The get_interface_names for LinkUnicastSerial is not supported"); + vec![] + } + #[inline(always)] fn is_reliable(&self) -> bool { false diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index 3960b91228..3876a947ca 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -144,6 +144,28 @@ impl LinkUnicastTrait for LinkUnicastTcp { *TCP_DEFAULT_MTU } + #[inline(always)] + fn get_interface_names(&self) -> Vec { + match zenoh_util::net::get_interface_names_by_addr(self.src_addr.ip()) { + Ok(interfaces) => { + log::trace!( + "get_interface_names for {:?}: {:?}", + self.src_addr.ip(), + interfaces + ); + interfaces + } + Err(e) => { + log::debug!( + "get_interface_names for {:?} failed: {:?}", + self.src_addr.ip(), + e + ); + vec![] + } + } + } + #[inline(always)] fn is_reliable(&self) -> bool { true diff --git a/io/zenoh-links/zenoh-link-tls/src/unicast.rs b/io/zenoh-links/zenoh-link-tls/src/unicast.rs index 7761195e4b..383b03a366 100644 --- a/io/zenoh-links/zenoh-link-tls/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tls/src/unicast.rs @@ -195,6 +195,13 @@ impl LinkUnicastTrait for LinkUnicastTls { *TLS_DEFAULT_MTU } + #[inline(always)] + fn get_interface_names(&self) -> Vec { + // @TODO: Not supported for now + log::debug!("The get_interface_names for LinkUnicastTls is not supported"); + vec![] + } + #[inline(always)] fn is_reliable(&self) -> bool { true diff --git a/io/zenoh-links/zenoh-link-udp/src/unicast.rs b/io/zenoh-links/zenoh-link-udp/src/unicast.rs index 585442ed71..210c6f7b7a 100644 --- a/io/zenoh-links/zenoh-link-udp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/unicast.rs @@ -208,6 +208,28 @@ impl LinkUnicastTrait for LinkUnicastUdp { *UDP_DEFAULT_MTU } + #[inline(always)] + fn get_interface_names(&self) -> Vec { + match zenoh_util::net::get_interface_names_by_addr(self.src_addr.ip()) { + Ok(interfaces) => { + log::trace!( + "get_interface_names for {:?}: {:?}", + self.src_addr.ip(), + interfaces + ); + interfaces + } + Err(e) => { + log::debug!( + "get_interface_names for {:?} failed: {:?}", + self.src_addr.ip(), + e + ); + vec![] + } + } + } + #[inline(always)] fn is_reliable(&self) -> bool { false diff --git a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs index 156698d195..83f6414dee 100644 --- a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs @@ -492,6 +492,13 @@ impl LinkUnicastTrait for UnicastPipe { LINUX_PIPE_MAX_MTU } + #[inline(always)] + fn get_interface_names(&self) -> Vec { + // @TODO: Not supported for now + log::debug!("The get_interface_names for UnicastPipe is not supported"); + vec![] + } + #[inline(always)] fn is_reliable(&self) -> bool { true diff --git a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs index e4d751344f..3ac1bcbfe6 100644 --- a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs @@ -115,6 +115,13 @@ impl LinkUnicastTrait for LinkUnicastUnixSocketStream { *UNIXSOCKSTREAM_DEFAULT_MTU } + #[inline(always)] + fn get_interface_names(&self) -> Vec { + // @TODO: Not supported for now + log::debug!("The get_interface_names for LinkUnicastUnixSocketStream is not supported"); + vec![] + } + #[inline(always)] fn is_reliable(&self) -> bool { true diff --git a/io/zenoh-links/zenoh-link-ws/src/unicast.rs b/io/zenoh-links/zenoh-link-ws/src/unicast.rs index 2238dcb4a6..4276e2bfaf 100644 --- a/io/zenoh-links/zenoh-link-ws/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-ws/src/unicast.rs @@ -207,6 +207,13 @@ impl LinkUnicastTrait for LinkUnicastWs { *WS_DEFAULT_MTU } + #[inline(always)] + fn get_interface_names(&self) -> Vec { + // @TODO: Not supported for now + log::debug!("The get_interface_names for LinkUnicastWs is not supported"); + vec![] + } + #[inline(always)] fn is_reliable(&self) -> bool { true