diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index e7672c6057..a302a5b8e1 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -331,6 +331,11 @@ /// The supported protocols are: ["tcp" , "udp", "tls", "quic", "ws", "unixsock-stream", "vsock"] /// For example, to only enable "tls" and "quic": // protocols: ["tls", "quic"], + /// + /// Endpoints accept a "priorities" metadata value in the form of a Rust-style half-open range + /// (e.g. `2..4` signifies priorities 2 and 3). This value is used to select the link used + /// for transmission based on the priority of the message in question. + /// /// Configure the zenoh TX parameters of a link tx: { /// The resolution in bits to be used for the message sequence numbers. diff --git a/commons/zenoh-protocol/src/core/mod.rs b/commons/zenoh-protocol/src/core/mod.rs index ebf1bb7f85..3387218558 100644 --- a/commons/zenoh-protocol/src/core/mod.rs +++ b/commons/zenoh-protocol/src/core/mod.rs @@ -24,9 +24,10 @@ use core::{ str::FromStr, }; +use serde::Serialize; pub use uhlc::{Timestamp, NTP64}; use zenoh_keyexpr::OwnedKeyExpr; -use zenoh_result::{bail, zerror}; +use zenoh_result::{bail, zerror, ZResult}; /// The unique Id of the [`HLC`](uhlc::HLC) that generated the concerned [`Timestamp`]. pub type TimestampId = uhlc::ID; @@ -308,6 +309,52 @@ pub enum Priority { Background = 7, } +#[derive(Debug, Default, Copy, Clone, Eq, Hash, PartialEq, Serialize)] +pub struct PriorityRange { + pub start: u8, + pub end: u8, +} + +impl PriorityRange { + pub fn new(start: u8, end: u8) -> ZResult { + if start >= end || start < Priority::MAX as u8 || end > Priority::MIN as u8 + 1 { + bail!("Invalid priority range: {start}..{end}") + }; + + Ok(Self { start, end }) + } + + pub fn includes(&self, priority: Priority) -> bool { + self.start <= (priority as u8) && (priority as u8) < self.end + } + + pub fn len(&self) -> usize { + (self.end - self.start) as usize + } + + pub fn is_empty(&self) -> bool { + self.end == self.start + } + + pub fn start(&self) -> u8 { + self.start + } + + pub fn end(&self) -> u8 { + self.end + } + + #[cfg(feature = "test")] + pub fn rand() -> Self { + use rand::Rng; + let mut rng = rand::thread_rng(); + let start = rng.gen_range(Priority::MAX as u8..Priority::MIN as u8); + let end = rng.gen_range((start + 1)..=Priority::MIN as u8); + + Self { start, end } + } +} + impl Priority { /// Default pub const DEFAULT: Self = Self::Data; @@ -342,7 +389,7 @@ impl TryFrom for Priority { } } -#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)] +#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize)] #[repr(u8)] pub enum Reliability { #[default] @@ -367,6 +414,16 @@ impl Reliability { } } +impl From for Reliability { + fn from(value: bool) -> Self { + if value { + Reliability::Reliable + } else { + Reliability::BestEffort + } + } +} + #[derive(Debug, Copy, Clone, PartialEq, Eq, Default)] pub struct Channel { pub priority: Priority, diff --git a/commons/zenoh-protocol/src/transport/init.rs b/commons/zenoh-protocol/src/transport/init.rs index 7e56bfd770..0a4e97f95e 100644 --- a/commons/zenoh-protocol/src/transport/init.rs +++ b/commons/zenoh-protocol/src/transport/init.rs @@ -126,13 +126,13 @@ pub struct InitSyn { // Extensions pub mod ext { use crate::{ - common::{ZExtUnit, ZExtZBuf}, - zextunit, zextzbuf, + common::{ZExtUnit, ZExtZ64, ZExtZBuf}, + zextunit, zextz64, zextzbuf, }; /// # QoS extension /// Used to negotiate the use of QoS - pub type QoS = zextunit!(0x1, false); + pub type QoS = zextz64!(0x1, false); /// # Shm extension /// Used as challenge for probing shared memory capabilities @@ -161,7 +161,7 @@ impl InitSyn { pub fn rand() -> Self { use rand::Rng; - use crate::common::{ZExtUnit, ZExtZBuf}; + use crate::common::{ZExtUnit, ZExtZ64, ZExtZBuf}; let mut rng = rand::thread_rng(); @@ -170,7 +170,7 @@ impl InitSyn { let zid = ZenohIdProto::default(); let resolution = Resolution::rand(); let batch_size: BatchSize = rng.gen(); - let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); + let ext_qos = rng.gen_bool(0.5).then_some(ZExtZ64::rand()); #[cfg(feature = "shared-memory")] let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); @@ -217,7 +217,7 @@ impl InitAck { pub fn rand() -> Self { use rand::Rng; - use crate::common::{ZExtUnit, ZExtZBuf}; + use crate::common::{ZExtUnit, ZExtZ64, ZExtZBuf}; let mut rng = rand::thread_rng(); @@ -231,7 +231,7 @@ impl InitAck { }; let batch_size: BatchSize = rng.gen(); let cookie = ZSlice::rand(64); - let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); + let ext_qos = rng.gen_bool(0.5).then_some(ZExtZ64::rand()); #[cfg(feature = "shared-memory")] let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); diff --git a/io/zenoh-link-commons/src/lib.rs b/io/zenoh-link-commons/src/lib.rs index 46c0968f3f..505a0c986d 100644 --- a/io/zenoh-link-commons/src/lib.rs +++ b/io/zenoh-link-commons/src/lib.rs @@ -33,7 +33,10 @@ pub use listener::*; pub use multicast::*; use serde::Serialize; pub use unicast::*; -use zenoh_protocol::{core::Locator, transport::BatchSize}; +use zenoh_protocol::{ + core::{Locator, PriorityRange, Reliability}, + transport::BatchSize, +}; use zenoh_result::ZResult; /*************************************/ @@ -48,10 +51,11 @@ pub struct Link { pub dst: Locator, pub group: Option, pub mtu: BatchSize, - pub is_reliable: bool, pub is_streamed: bool, pub interfaces: Vec, pub auth_identifier: LinkAuthId, + pub priorities: Option, + pub reliability: Reliability, } #[async_trait] @@ -70,48 +74,40 @@ impl fmt::Display for Link { } } -impl From<&LinkUnicast> for Link { - fn from(link: &LinkUnicast) -> Link { +impl Link { + pub fn new_unicast( + link: &LinkUnicast, + priorities: Option, + reliability: Reliability, + ) -> Self { Link { src: link.get_src().to_owned(), dst: link.get_dst().to_owned(), group: None, mtu: link.get_mtu(), - is_reliable: link.is_reliable(), is_streamed: link.is_streamed(), interfaces: link.get_interface_names(), auth_identifier: link.get_auth_id().clone(), + priorities, + reliability, } } -} - -impl From for Link { - fn from(link: LinkUnicast) -> Link { - Link::from(&link) - } -} -impl From<&LinkMulticast> for Link { - fn from(link: &LinkMulticast) -> Link { + pub fn new_multicast(link: &LinkMulticast) -> Self { Link { src: link.get_src().to_owned(), dst: link.get_dst().to_owned(), group: Some(link.get_dst().to_owned()), mtu: link.get_mtu(), - is_reliable: link.is_reliable(), is_streamed: false, interfaces: vec![], auth_identifier: LinkAuthId::default(), + priorities: None, + reliability: Reliability::from(link.is_reliable()), } } } -impl From for Link { - fn from(link: LinkMulticast) -> Link { - Link::from(&link) - } -} - impl PartialEq for Link { fn eq(&self, other: &LinkUnicast) -> bool { self.src == *other.get_src() && self.dst == *other.get_dst() diff --git a/io/zenoh-link-commons/src/unicast.rs b/io/zenoh-link-commons/src/unicast.rs index 62f39bf86c..c29f4c7c14 100644 --- a/io/zenoh-link-commons/src/unicast.rs +++ b/io/zenoh-link-commons/src/unicast.rs @@ -36,7 +36,18 @@ pub trait LinkManagerUnicastTrait: Send + Sync { async fn get_listeners(&self) -> Vec; async fn get_locators(&self) -> Vec; } -pub type NewLinkChannelSender = flume::Sender; +pub type NewLinkChannelSender = flume::Sender; + +/// Notification of a new inbound connection. +/// +/// Link implementations should preserve the metadata sections of [`NewLinkUnicast::endpoint`]. +pub struct NewLinkUnicast { + /// The link created in response to a new inbound connection. + pub link: LinkUnicast, + /// Endpoint of the listener. + pub endpoint: EndPoint, +} + pub trait ConstructibleLinkManagerUnicast: Sized { fn new(new_link_sender: NewLinkChannelSender, config: T) -> ZResult; } diff --git a/io/zenoh-links/zenoh-link-quic/src/unicast.rs b/io/zenoh-links/zenoh-link-quic/src/unicast.rs index 2e0d9e0a19..3fdcfdc62d 100644 --- a/io/zenoh-links/zenoh-link-quic/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-quic/src/unicast.rs @@ -27,7 +27,7 @@ use x509_parser::prelude::*; use zenoh_core::zasynclock; use zenoh_link_commons::{ get_ip_interface_names, LinkAuthId, LinkAuthType, LinkManagerUnicastTrait, LinkUnicast, - LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, + LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, NewLinkUnicast, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -332,11 +332,14 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic { // Spawn the accept loop for the listener let token = self.listeners.token.child_token(); - let c_token = token.clone(); - let c_manager = self.manager.clone(); + let task = { + let token = token.clone(); + let manager = self.manager.clone(); + let endpoint = endpoint.clone(); - let task = async move { accept_task(quic_endpoint, c_token, c_manager).await }; + async move { accept_task(endpoint, quic_endpoint, token, manager).await } + }; // Initialize the QuicAcceptor let locator = endpoint.to_locator(); @@ -364,7 +367,8 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic { } async fn accept_task( - endpoint: quinn::Endpoint, + endpoint: EndPoint, + quic_endpoint: quinn::Endpoint, token: CancellationToken, manager: NewLinkChannelSender, ) -> ZResult<()> { @@ -382,7 +386,7 @@ async fn accept_task( Ok(conn) } - let src_addr = endpoint + let src_addr = quic_endpoint .local_addr() .map_err(|e| zerror!("Can not accept QUIC connections: {}", e))?; @@ -393,7 +397,7 @@ async fn accept_task( tokio::select! { _ = token.cancelled() => break, - res = accept(endpoint.accept()) => { + res = accept(quic_endpoint.accept()) => { match res { Ok(quic_conn) => { // Get the bideractional streams. Note that we don't allow unidirectional streams. @@ -429,7 +433,7 @@ async fn accept_task( )); // Communicate the new link to the initial transport manager - if let Err(e) = manager.send_async(LinkUnicast(link)).await { + if let Err(e) = manager.send_async(NewLinkUnicast { link: LinkUnicast(link), endpoint: endpoint.clone() }).await { tracing::error!("{}-{}: {}", file!(), line!(), e) } diff --git a/io/zenoh-links/zenoh-link-serial/src/unicast.rs b/io/zenoh-links/zenoh-link-serial/src/unicast.rs index 5711e5fe5c..b048121a9a 100644 --- a/io/zenoh-links/zenoh-link-serial/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-serial/src/unicast.rs @@ -33,7 +33,7 @@ use z_serial::ZSerial; use zenoh_core::{zasynclock, zasyncread, zasyncwrite}; use zenoh_link_commons::{ ConstructibleLinkManagerUnicast, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, - LinkUnicastTrait, NewLinkChannelSender, + LinkUnicastTrait, NewLinkChannelSender, NewLinkUnicast, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -332,19 +332,23 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastSerial { // Spawn the accept loop for the listener let token = CancellationToken::new(); - let c_token = token.clone(); let mut listeners = zasyncwrite!(self.listeners); - let c_path = path.clone(); - let c_manager = self.manager.clone(); - let c_listeners = self.listeners.clone(); - - let task = async move { - // Wait for the accept loop to terminate - let res = - accept_read_task(link, c_token, c_manager, c_path.clone(), is_connected).await; - zasyncwrite!(c_listeners).remove(&c_path); - res + let task = { + let token = token.clone(); + let path = path.clone(); + let manager = self.manager.clone(); + let listeners = self.listeners.clone(); + let endpoint = endpoint.clone(); + + async move { + // Wait for the accept loop to terminate + let res = + accept_read_task(endpoint, link, token, manager, path.clone(), is_connected) + .await; + zasyncwrite!(listeners).remove(&path); + res + } }; let handle = zenoh_runtime::ZRuntime::Acceptor.spawn(task); @@ -390,6 +394,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastSerial { } async fn accept_read_task( + endpoint: EndPoint, link: Arc, token: CancellationToken, manager: NewLinkChannelSender, @@ -423,7 +428,7 @@ async fn accept_read_task( match res { Ok(link) => { // Communicate the new link to the initial transport manager - if let Err(e) = manager.send_async(LinkUnicast(link.clone())).await { + if let Err(e) = manager.send_async(NewLinkUnicast{ link: LinkUnicast(link.clone()), endpoint: endpoint.clone() }).await { tracing::error!("{}-{}: {}", file!(), line!(), e) } diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index 7532055f8e..f5c1e51e1d 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -21,7 +21,7 @@ use tokio::{ use tokio_util::sync::CancellationToken; use zenoh_link_commons::{ get_ip_interface_names, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, - ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE, + ListenersUnicastIP, NewLinkChannelSender, NewLinkUnicast, BIND_INTERFACE, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -354,10 +354,14 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { )?; let token = self.listeners.token.child_token(); - let c_token = token.clone(); - let c_manager = self.manager.clone(); - let task = async move { accept_task(socket, c_token, c_manager).await }; + let task = { + let token = token.clone(); + let manager = self.manager.clone(); + let endpoint = endpoint.clone(); + + async move { accept_task(endpoint, socket, token, manager).await } + }; let locator = endpoint.to_locator(); self.listeners @@ -421,6 +425,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { } async fn accept_task( + endpoint: EndPoint, socket: TcpListener, token: CancellationToken, manager: NewLinkChannelSender, @@ -457,7 +462,7 @@ async fn accept_task( let link = Arc::new(LinkUnicastTcp::new(stream, src_addr, dst_addr)); // Communicate the new link to the initial transport manager - if let Err(e) = manager.send_async(LinkUnicast(link)).await { + if let Err(e) = manager.send_async(NewLinkUnicast{ link: LinkUnicast(link), endpoint: endpoint.clone() }).await { tracing::error!("{}-{}: {}", file!(), line!(), e) } }, diff --git a/io/zenoh-links/zenoh-link-tls/src/unicast.rs b/io/zenoh-links/zenoh-link-tls/src/unicast.rs index 716eac2121..d75835e653 100644 --- a/io/zenoh-links/zenoh-link-tls/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tls/src/unicast.rs @@ -25,7 +25,7 @@ use x509_parser::prelude::*; use zenoh_core::zasynclock; use zenoh_link_commons::{ get_ip_interface_names, LinkAuthId, LinkAuthType, LinkManagerUnicastTrait, LinkUnicast, - LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, + LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, NewLinkUnicast, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -362,12 +362,16 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTls { let local_port = local_addr.port(); // Initialize the TlsAcceptor - let acceptor = TlsAcceptor::from(Arc::new(tls_server_config.server_config)); let token = self.listeners.token.child_token(); - let c_token = token.clone(); - let c_manager = self.manager.clone(); - let task = async move { accept_task(socket, acceptor, c_token, c_manager).await }; + let task = { + let acceptor = TlsAcceptor::from(Arc::new(tls_server_config.server_config)); + let token = token.clone(); + let manager = self.manager.clone(); + let endpoint = endpoint.clone(); + + async move { accept_task(endpoint, socket, acceptor, token, manager).await } + }; // Update the endpoint locator address let locator = Locator::new( @@ -399,6 +403,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTls { } async fn accept_task( + endpoint: EndPoint, socket: TcpListener, acceptor: TlsAcceptor, token: CancellationToken, @@ -456,7 +461,7 @@ async fn accept_task( )); // Communicate the new link to the initial transport manager - if let Err(e) = manager.send_async(LinkUnicast(link)).await { + if let Err(e) = manager.send_async(NewLinkUnicast {link: LinkUnicast(link), endpoint: endpoint.clone()}).await { tracing::error!("{}-{}: {}", file!(), line!(), e) } } diff --git a/io/zenoh-links/zenoh-link-udp/src/unicast.rs b/io/zenoh-links/zenoh-link-udp/src/unicast.rs index e67e821363..0c3c6a5039 100644 --- a/io/zenoh-links/zenoh-link-udp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/unicast.rs @@ -25,7 +25,8 @@ use tokio_util::sync::CancellationToken; use zenoh_core::{zasynclock, zlock}; use zenoh_link_commons::{ get_ip_interface_names, ConstructibleLinkManagerUnicast, LinkAuthId, LinkManagerUnicastTrait, - LinkUnicast, LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE, + LinkUnicast, LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, NewLinkUnicast, + BIND_INTERFACE, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -402,10 +403,14 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUdp { )?; let token = self.listeners.token.child_token(); - let c_token = token.clone(); - let c_manager = self.manager.clone(); - let task = async move { accept_read_task(socket, c_token, c_manager).await }; + let task = { + let token = token.clone(); + let manager = self.manager.clone(); + let endpoint = endpoint.clone(); + + async move { accept_read_task(endpoint, socket, token, manager).await } + }; let locator = endpoint.to_locator(); self.listeners @@ -471,6 +476,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUdp { } async fn accept_read_task( + endpoint: EndPoint, socket: UdpSocket, token: CancellationToken, manager: NewLinkChannelSender, @@ -544,7 +550,7 @@ async fn accept_read_task( LinkUnicastUdpVariant::Unconnected(unconnected), )); // Add the new link to the set of connected peers - if let Err(e) = manager.send_async(LinkUnicast(link)).await { + if let Err(e) = manager.send_async(NewLinkUnicast { link: LinkUnicast(link), endpoint: endpoint.clone() }).await { tracing::error!("{}-{}: {}", file!(), line!(), e) } } diff --git a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs index df93b9cc61..b93bb32398 100644 --- a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs @@ -37,7 +37,7 @@ use unix_named_pipe::{create, open_write}; use zenoh_core::{zasyncread, zasyncwrite, ResolveFuture, Wait}; use zenoh_link_commons::{ ConstructibleLinkManagerUnicast, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, - LinkUnicastTrait, NewLinkChannelSender, + LinkUnicastTrait, NewLinkChannelSender, NewLinkUnicast, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -273,14 +273,19 @@ async fn handle_incoming_connections( endpoint.metadata(), )?; + let link = Arc::new(UnicastPipe { + r: UnsafeCell::new(dedicated_uplink), + w: UnsafeCell::new(dedicated_downlink), + local, + remote, + }); + // send newly established link to manager manager - .send_async(LinkUnicast(Arc::new(UnicastPipe { - r: UnsafeCell::new(dedicated_uplink), - w: UnsafeCell::new(dedicated_downlink), - local, - remote, - }))) + .send_async(NewLinkUnicast { + link: LinkUnicast(link), + endpoint: endpoint.clone(), + }) .await?; ZResult::Ok(()) diff --git a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs index a07267416d..5864847e8e 100644 --- a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs @@ -28,6 +28,7 @@ use uuid::Uuid; use zenoh_core::{zasyncread, zasyncwrite}; use zenoh_link_commons::{ LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, + NewLinkUnicast, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -392,15 +393,18 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUnixSocketStream { let c_token = token.clone(); let mut listeners = zasyncwrite!(self.listeners); - let c_manager = self.manager.clone(); - let c_listeners = self.listeners.clone(); - let c_path = local_path_str.to_owned(); - - let task = async move { - // Wait for the accept loop to terminate - let res = accept_task(socket, c_token, c_manager).await; - zasyncwrite!(c_listeners).remove(&c_path); - res + let task = { + let manager = self.manager.clone(); + let listeners = self.listeners.clone(); + let path = local_path_str.to_owned(); + let endpoint = endpoint.clone(); + + async move { + // Wait for the accept loop to terminate + let res = accept_task(endpoint, socket, c_token, manager).await; + zasyncwrite!(listeners).remove(&path); + res + } }; let handle = zenoh_runtime::ZRuntime::Acceptor.spawn(task); @@ -459,6 +463,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUnixSocketStream { } async fn accept_task( + endpoint: EndPoint, socket: UnixListener, token: CancellationToken, manager: NewLinkChannelSender, @@ -515,7 +520,7 @@ async fn accept_task( )); // Communicate the new link to the initial transport manager - if let Err(e) = manager.send_async(LinkUnicast(link)).await { + if let Err(e) = manager.send_async(NewLinkUnicast { link: LinkUnicast(link), endpoint: endpoint.clone() }).await { tracing::error!("{}-{}: {}", file!(), line!(), e) } diff --git a/io/zenoh-links/zenoh-link-vsock/src/unicast.rs b/io/zenoh-links/zenoh-link-vsock/src/unicast.rs index e7b261f292..cbdde4b043 100644 --- a/io/zenoh-links/zenoh-link-vsock/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-vsock/src/unicast.rs @@ -29,6 +29,7 @@ use tokio_vsock::{ use zenoh_core::{zasyncread, zasyncwrite}; use zenoh_link_commons::{ LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, + NewLinkUnicast, }; use zenoh_protocol::{ core::{endpoint::Address, EndPoint, Locator}, @@ -272,20 +273,23 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastVsock { endpoint.config(), )?; let token = CancellationToken::new(); - let c_token = token.clone(); - - let c_manager = self.manager.clone(); let locator = endpoint.to_locator(); let mut listeners = zasyncwrite!(self.listeners); - let c_listeners = self.listeners.clone(); - let c_addr = addr; - let task = async move { - // Wait for the accept loop to terminate - let res = accept_task(listener, c_token, c_manager).await; - zasyncwrite!(c_listeners).remove(&c_addr); - res + + let task = { + let token = token.clone(); + let manager = self.manager.clone(); + let listeners = self.listeners.clone(); + let endpoint = endpoint.clone(); + + async move { + // Wait for the accept loop to terminate + let res = accept_task(endpoint.clone(), listener, token, manager).await; + zasyncwrite!(listeners).remove(&addr); + res + } }; let handle = zenoh_runtime::ZRuntime::Acceptor.spawn(task); @@ -329,6 +333,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastVsock { } async fn accept_task( + endpoint: EndPoint, mut socket: VsockListener, token: CancellationToken, manager: NewLinkChannelSender, @@ -356,7 +361,7 @@ async fn accept_task( let link = Arc::new(LinkUnicastVsock::new(stream, src_addr, dst_addr)); // Communicate the new link to the initial transport manager - if let Err(e) = manager.send_async(LinkUnicast(link)).await { + if let Err(e) = manager.send_async(NewLinkUnicast { link: LinkUnicast(link), endpoint: endpoint.clone() }).await { tracing::error!("{}-{}: {}", file!(), line!(), e) } }, diff --git a/io/zenoh-links/zenoh-link-ws/src/unicast.rs b/io/zenoh-links/zenoh-link-ws/src/unicast.rs index 193c9a1724..82dc9993b3 100644 --- a/io/zenoh-links/zenoh-link-ws/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-ws/src/unicast.rs @@ -35,6 +35,7 @@ use tokio_util::sync::CancellationToken; use zenoh_core::{zasynclock, zasyncread, zasyncwrite}; use zenoh_link_commons::{ LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, + NewLinkUnicast, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -368,16 +369,20 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastWs { // Spawn the accept loop for the listener let token = CancellationToken::new(); - let c_token = token.clone(); - let c_manager = self.manager.clone(); - let c_listeners = self.listeners.clone(); - let c_addr = local_addr; - - let task = async move { - // Wait for the accept loop to terminate - let res = accept_task(socket, c_token, c_manager).await; - zasyncwrite!(c_listeners).remove(&c_addr); - res + + let task = { + let token = token.clone(); + let manager = self.manager.clone(); + let listeners = self.listeners.clone(); + let addr = local_addr; + let endpoint = endpoint.clone(); + + async move { + // Wait for the accept loop to terminate + let res = accept_task(endpoint, socket, token, manager).await; + zasyncwrite!(listeners).remove(&addr); + res + } }; let handle = zenoh_runtime::ZRuntime::Acceptor.spawn(task); @@ -467,6 +472,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastWs { } async fn accept_task( + endpoint: EndPoint, socket: TcpListener, token: CancellationToken, manager: NewLinkChannelSender, @@ -535,7 +541,13 @@ async fn accept_task( let link = Arc::new(LinkUnicastWs::new(stream, src_addr, dst_addr)); // Communicate the new link to the initial transport manager - if let Err(e) = manager.send_async(LinkUnicast(link)).await { + if let Err(e) = manager + .send_async(NewLinkUnicast { + link: LinkUnicast(link), + endpoint: endpoint.clone(), + }) + .await + { tracing::error!("{}-{}: {}", file!(), line!(), e) } } diff --git a/io/zenoh-transport/src/manager.rs b/io/zenoh-transport/src/manager.rs index 305ccab574..3746092d4a 100644 --- a/io/zenoh-transport/src/manager.rs +++ b/io/zenoh-transport/src/manager.rs @@ -419,8 +419,8 @@ impl TransportManager { loop { tokio::select! { res = new_unicast_link_receiver.recv_async() => { - if let Ok(link) = res { - this.handle_new_link_unicast(link).await; + if let Ok(new_link) = res { + this.handle_new_link_unicast(new_link).await; } } _ = cancellation_token.cancelled() => { break; } diff --git a/io/zenoh-transport/src/multicast/link.rs b/io/zenoh-transport/src/multicast/link.rs index d0d5ef4fb0..f2258b6935 100644 --- a/io/zenoh-transport/src/multicast/link.rs +++ b/io/zenoh-transport/src/multicast/link.rs @@ -21,7 +21,7 @@ use std::{ use tokio::task::JoinHandle; use zenoh_buffers::{BBuf, ZSlice, ZSliceBuffer}; use zenoh_core::{zcondfeat, zlock}; -use zenoh_link::{Link, LinkMulticast, Locator}; +use zenoh_link::{LinkMulticast, Locator}; use zenoh_protocol::{ core::{Bits, Priority, Resolution, WhatAmI, ZenohIdProto}, transport::{BatchSize, Close, Join, PrioritySn, TransportMessage, TransportSn}, @@ -126,18 +126,6 @@ impl fmt::Debug for TransportLinkMulticast { } } -impl From<&TransportLinkMulticast> for Link { - fn from(link: &TransportLinkMulticast) -> Self { - Link::from(&link.link) - } -} - -impl From for Link { - fn from(link: TransportLinkMulticast) -> Self { - Link::from(link.link) - } -} - pub(crate) struct TransportLinkMulticastTx { pub(crate) inner: TransportLinkMulticast, pub(crate) buffer: Option, diff --git a/io/zenoh-transport/src/multicast/mod.rs b/io/zenoh-transport/src/multicast/mod.rs index 78d76bb6c8..fbb656264d 100644 --- a/io/zenoh-transport/src/multicast/mod.rs +++ b/io/zenoh-transport/src/multicast/mod.rs @@ -92,7 +92,7 @@ impl TransportMulticast { #[inline(always)] pub fn get_link(&self) -> ZResult { let transport = self.get_transport()?; - Ok(transport.get_link().into()) + Ok(Link::new_multicast(&transport.get_link().link)) } #[inline(always)] diff --git a/io/zenoh-transport/src/multicast/transport.rs b/io/zenoh-transport/src/multicast/transport.rs index f0dfec4813..36b9dbbea0 100644 --- a/io/zenoh-transport/src/multicast/transport.rs +++ b/io/zenoh-transport/src/multicast/transport.rs @@ -333,7 +333,7 @@ impl TransportMulticastInner { /* PEER */ /*************************************/ pub(super) fn new_peer(&self, locator: &Locator, join: Join) -> ZResult<()> { - let mut link = Link::from(self.get_link()); + let mut link = Link::new_multicast(&self.get_link().link); link.dst = locator.clone(); let is_shm = zcondfeat!("shared-memory", join.ext_shm.is_some(), false); @@ -452,7 +452,7 @@ impl TransportMulticastInner { zread!(self.peers) .values() .map(|p| { - let mut link = Link::from(self.get_link()); + let mut link = Link::new_multicast(&self.get_link().link); link.dst = p.locator.clone(); TransportPeer { diff --git a/io/zenoh-transport/src/unicast/establishment/accept.rs b/io/zenoh-transport/src/unicast/establishment/accept.rs index 64949357c6..ae818716bc 100644 --- a/io/zenoh-transport/src/unicast/establishment/accept.rs +++ b/io/zenoh-transport/src/unicast/establishment/accept.rs @@ -20,9 +20,9 @@ use zenoh_buffers::{reader::HasReader, writer::HasWriter, ZSlice}; use zenoh_codec::{RCodec, WCodec, Zenoh080}; use zenoh_core::{zasynclock, zcondfeat, zerror}; use zenoh_crypto::{BlockCipher, PseudoRng}; -use zenoh_link::LinkUnicast; +use zenoh_link::{EndPoint, LinkUnicast}; use zenoh_protocol::{ - core::{Field, Resolution, WhatAmI, ZenohIdProto}, + core::{Field, Reliability, Resolution, WhatAmI, ZenohIdProto}, transport::{ batch_size, close::{self, Close}, @@ -55,7 +55,7 @@ pub(super) type AcceptError = (zenoh_result::Error, Option); struct StateTransport { batch_size: BatchSize, resolution: Resolution, - ext_qos: ext::qos::StateAccept, + ext_qos: ext::qos::QoS, #[cfg(feature = "transport_multilink")] ext_mlink: ext::multilink::StateAccept, #[cfg(feature = "shared-memory")] @@ -636,17 +636,24 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { } } -pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) -> ZResult<()> { +pub(crate) async fn accept_link( + endpoint: EndPoint, + link: LinkUnicast, + manager: &TransportManager, +) -> ZResult<()> { + let direction = TransportLinkUnicastDirection::Inbound; let mtu = link.get_mtu(); let is_streamed = link.is_streamed(); let config = TransportLinkUnicastConfig { - direction: TransportLinkUnicastDirection::Inbound, + direction, batch: BatchConfig { mtu, is_streamed, #[cfg(feature = "transport_compression")] is_compression: false, }, + priorities: None, + reliability: Reliability::from(link.is_reliable()), }; let mut link = TransportLinkUnicast::new(link, config); let mut fsm = AcceptLink { @@ -689,7 +696,7 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - transport: StateTransport { batch_size: manager.config.batch_size.min(batch_size::UNICAST).min(mtu), resolution: manager.config.resolution, - ext_qos: ext::qos::StateAccept::new(manager.config.unicast.is_qos), + ext_qos: ext::qos::QoS::new(manager.config.unicast.is_qos, &endpoint)?, #[cfg(feature = "transport_multilink")] ext_mlink: manager .state @@ -757,7 +764,7 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - whatami: osyn_out.other_whatami, sn_resolution: state.transport.resolution.get(Field::FrameSN), tx_initial_sn: oack_out.open_ack.initial_sn, - is_qos: state.transport.ext_qos.is_qos(), + is_qos: state.transport.ext_qos.is_enabled(), #[cfg(feature = "transport_multilink")] multilink: state.transport.ext_mlink.multilink(), #[cfg(feature = "shared-memory")] @@ -771,13 +778,15 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - }; let a_config = TransportLinkUnicastConfig { - direction: TransportLinkUnicastDirection::Inbound, + direction, batch: BatchConfig { mtu: state.transport.batch_size, is_streamed, #[cfg(feature = "transport_compression")] is_compression: state.link.ext_compression.is_compression(), }, + priorities: state.transport.ext_qos.priorities(), + reliability: Reliability::from(link.link.is_reliable()), }; let a_link = link.reconfigure(a_config); let s_link = format!("{:?}", a_link); diff --git a/io/zenoh-transport/src/unicast/establishment/cookie.rs b/io/zenoh-transport/src/unicast/establishment/cookie.rs index 4220f8e08b..7fb84f258e 100644 --- a/io/zenoh-transport/src/unicast/establishment/cookie.rs +++ b/io/zenoh-transport/src/unicast/establishment/cookie.rs @@ -34,7 +34,7 @@ pub(crate) struct Cookie { pub(crate) batch_size: BatchSize, pub(crate) nonce: u64, // Extensions - pub(crate) ext_qos: ext::qos::StateAccept, + pub(crate) ext_qos: ext::qos::QoS, #[cfg(feature = "transport_multilink")] pub(crate) ext_mlink: ext::multilink::StateAccept, #[cfg(feature = "shared-memory")] @@ -90,7 +90,7 @@ where let batch_size: BatchSize = self.read(&mut *reader)?; let nonce: u64 = self.read(&mut *reader)?; // Extensions - let ext_qos: ext::qos::StateAccept = self.read(&mut *reader)?; + let ext_qos: ext::qos::QoS = self.read(&mut *reader)?; #[cfg(feature = "transport_multilink")] let ext_mlink: ext::multilink::StateAccept = self.read(&mut *reader)?; #[cfg(feature = "shared-memory")] @@ -178,7 +178,7 @@ impl Cookie { resolution: Resolution::rand(), batch_size: rng.gen(), nonce: rng.gen(), - ext_qos: ext::qos::StateAccept::rand(), + ext_qos: ext::qos::QoS::rand(), #[cfg(feature = "transport_multilink")] ext_mlink: ext::multilink::StateAccept::rand(), #[cfg(feature = "shared-memory")] diff --git a/io/zenoh-transport/src/unicast/establishment/ext/qos.rs b/io/zenoh-transport/src/unicast/establishment/ext/qos.rs index f749073805..5bebe00bb0 100644 --- a/io/zenoh-transport/src/unicast/establishment/ext/qos.rs +++ b/io/zenoh-transport/src/unicast/establishment/ext/qos.rs @@ -14,13 +14,19 @@ use core::marker::PhantomData; use async_trait::async_trait; +use serde::Serialize; use zenoh_buffers::{ reader::{DidntRead, Reader}, writer::{DidntWrite, Writer}, }; use zenoh_codec::{RCodec, WCodec, Zenoh080}; -use zenoh_protocol::transport::{init, open}; -use zenoh_result::Error as ZError; +use zenoh_core::{bail, zerror}; +use zenoh_link::EndPoint; +use zenoh_protocol::{ + core::PriorityRange, + transport::{init, open}, +}; +use zenoh_result::{Error as ZError, ZResult}; use crate::unicast::establishment::{AcceptFsm, OpenFsm}; @@ -35,21 +41,148 @@ impl<'a> QoSFsm<'a> { } } -/*************************************/ -/* OPEN */ -/*************************************/ -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub(crate) struct StateOpen { - is_qos: bool, +#[derive(Clone, Debug, PartialEq, Eq, Serialize)] +pub(crate) enum QoS { + Disabled, + Enabled { priorities: Option }, +} + +impl QoS { + pub(crate) fn new(is_qos: bool, endpoint: &EndPoint) -> ZResult { + if !is_qos { + Ok(Self::Disabled) + } else { + const PRIORITY_METADATA_KEY: &str = "priorities"; + + let endpoint_metadata = endpoint.metadata(); + + let Some(mut priorities) = endpoint_metadata + .get(PRIORITY_METADATA_KEY) + .map(|metadata| metadata.split("..")) + else { + return Ok(Self::Enabled { priorities: None }); + }; + let start = priorities + .next() + .ok_or(zerror!("Invalid priority range syntax"))? + .parse::()?; + + let end = priorities + .next() + .ok_or(zerror!("Invalid priority range syntax"))? + .parse::()?; + + if priorities.next().is_some() { + bail!("Invalid priority range syntax") + }; + + Ok(Self::Enabled { + priorities: Some(PriorityRange::new(start, end)?), + }) + } + } + + fn try_from_u64(value: u64) -> ZResult { + match value { + 0b00_u64 => Ok(QoS::Disabled), + 0b01_u64 => Ok(QoS::Enabled { priorities: None }), + mut value if value & 0b10_u64 != 0 => { + value >>= 2; + let start = (value & 0xff) as u8; + let end = ((value & 0xff00) >> 8) as u8; + + Ok(QoS::Enabled { + priorities: Some(PriorityRange::new(start, end)?), + }) + } + _ => Err(zerror!("invalid QoS").into()), + } + } + + /// Encodes [`QoS`] as a [`u64`]. + /// + /// The two least significant bits are used to discrimnate three states: + /// + /// 1. QoS is disabled + /// 2. QoS is enabled but no priority range is available + /// 3. QoS is enabled and priority information is range, in which case the next 16 least + /// significant bits are used to encode the priority range. + fn to_u64(&self) -> u64 { + match self { + QoS::Disabled => 0b00_u64, + QoS::Enabled { priorities: None } => 0b01_u64, + QoS::Enabled { + priorities: Some(range), + } => ((range.end() as u64) << 10) | ((range.start() as u64) << 2) | 0b10_u64, + } + } + + fn unify(&self, other: &Self) -> Self { + match (self, other) { + (QoS::Disabled, QoS::Disabled) => QoS::Disabled, + (QoS::Disabled, QoS::Enabled { priorities }) + | (QoS::Enabled { priorities }, QoS::Disabled) => QoS::Enabled { + priorities: *priorities, + }, + (QoS::Enabled { priorities: lhs }, QoS::Enabled { priorities: rhs }) => { + if lhs == rhs { + QoS::Enabled { priorities: *rhs } + } else { + QoS::Enabled { priorities: None } + } + } + } + } + + pub(crate) fn is_enabled(&self) -> bool { + matches!(self, QoS::Enabled { .. }) + } + + pub(crate) fn priorities(&self) -> Option { + match self { + QoS::Disabled | QoS::Enabled { priorities: None } => None, + QoS::Enabled { + priorities: Some(priorities), + } => Some(*priorities), + } + } + + #[cfg(test)] + pub(crate) fn rand() -> Self { + use rand::Rng; + let mut rng = rand::thread_rng(); + if rng.gen_bool(0.5) { + QoS::Disabled + } else if rng.gen_bool(0.5) { + QoS::Enabled { priorities: None } + } else { + QoS::Enabled { + priorities: Some(PriorityRange::rand()), + } + } + } } -impl StateOpen { - pub(crate) const fn new(is_qos: bool) -> Self { - Self { is_qos } +// Codec +impl WCodec<&QoS, &mut W> for Zenoh080 +where + W: Writer, +{ + type Output = Result<(), DidntWrite>; + + fn write(self, writer: &mut W, x: &QoS) -> Self::Output { + self.write(writer, &x.to_u64()) } +} + +impl RCodec for Zenoh080 +where + R: Reader, +{ + type Error = DidntRead; - pub(crate) const fn is_qos(&self) -> bool { - self.is_qos + fn read(self, reader: &mut R) -> Result { + QoS::try_from_u64(self.read(reader)?).map_err(|_| DidntRead) } } @@ -57,28 +190,39 @@ impl StateOpen { impl<'a> OpenFsm for &'a QoSFsm<'a> { type Error = ZError; - type SendInitSynIn = &'a StateOpen; + type SendInitSynIn = &'a QoS; type SendInitSynOut = Option; async fn send_init_syn( self, state: Self::SendInitSynIn, ) -> Result { - let output = state.is_qos.then_some(init::ext::QoS::new()); - Ok(output) + if state.is_enabled() { + Ok(Some(init::ext::QoS::new(state.to_u64()))) + } else { + Ok(None) + } } - type RecvInitAckIn = (&'a mut StateOpen, Option); + type RecvInitAckIn = (&'a mut QoS, Option); type RecvInitAckOut = (); async fn recv_init_ack( self, input: Self::RecvInitAckIn, ) -> Result { - let (state, other_ext) = input; - state.is_qos &= other_ext.is_some(); + let (state_self, other_ext) = input; + + let state_other = if let Some(other_ext) = other_ext { + QoS::try_from_u64(other_ext.value)? + } else { + QoS::Disabled + }; + + *state_self = state_self.unify(&state_other); + Ok(()) } - type SendOpenSynIn = &'a StateOpen; + type SendOpenSynIn = &'a QoS; type SendOpenSynOut = Option; async fn send_open_syn( self, @@ -87,7 +231,7 @@ impl<'a> OpenFsm for &'a QoSFsm<'a> { Ok(None) } - type RecvOpenAckIn = (&'a mut StateOpen, Option); + type RecvOpenAckIn = (&'a mut QoS, Option); type RecvOpenAckOut = (); async fn recv_open_ack( self, @@ -97,84 +241,43 @@ impl<'a> OpenFsm for &'a QoSFsm<'a> { } } -/*************************************/ -/* ACCEPT */ -/*************************************/ -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub(crate) struct StateAccept { - is_qos: bool, -} - -impl StateAccept { - pub(crate) const fn new(is_qos: bool) -> Self { - Self { is_qos } - } - - pub(crate) const fn is_qos(&self) -> bool { - self.is_qos - } - - #[cfg(test)] - pub(crate) fn rand() -> Self { - use rand::Rng; - let mut rng = rand::thread_rng(); - Self::new(rng.gen_bool(0.5)) - } -} - -// Codec -impl WCodec<&StateAccept, &mut W> for Zenoh080 -where - W: Writer, -{ - type Output = Result<(), DidntWrite>; - - fn write(self, writer: &mut W, x: &StateAccept) -> Self::Output { - let is_qos = u8::from(x.is_qos); - self.write(&mut *writer, is_qos)?; - Ok(()) - } -} - -impl RCodec for Zenoh080 -where - R: Reader, -{ - type Error = DidntRead; - - fn read(self, reader: &mut R) -> Result { - let is_qos: u8 = self.read(&mut *reader)?; - let is_qos = is_qos == 1; - Ok(StateAccept { is_qos }) - } -} - #[async_trait] impl<'a> AcceptFsm for &'a QoSFsm<'a> { type Error = ZError; - type RecvInitSynIn = (&'a mut StateAccept, Option); + type RecvInitSynIn = (&'a mut QoS, Option); type RecvInitSynOut = (); async fn recv_init_syn( self, input: Self::RecvInitSynIn, ) -> Result { - let (state, other_ext) = input; - state.is_qos &= other_ext.is_some(); + let (state_self, other_ext) = input; + + let state_other = if let Some(other_ext) = other_ext { + QoS::try_from_u64(other_ext.value)? + } else { + QoS::Disabled + }; + + *state_self = state_self.unify(&state_other); + Ok(()) } - type SendInitAckIn = &'a StateAccept; + type SendInitAckIn = &'a QoS; type SendInitAckOut = Option; async fn send_init_ack( self, state: Self::SendInitAckIn, ) -> Result { - let output = state.is_qos.then_some(init::ext::QoS::new()); - Ok(output) + if state.is_enabled() { + Ok(Some(init::ext::QoS::new(state.to_u64()))) + } else { + Ok(None) + } } - type RecvOpenSynIn = (&'a mut StateAccept, Option); + type RecvOpenSynIn = (&'a mut QoS, Option); type RecvOpenSynOut = (); async fn recv_open_syn( self, @@ -183,7 +286,7 @@ impl<'a> AcceptFsm for &'a QoSFsm<'a> { Ok(()) } - type SendOpenAckIn = &'a StateAccept; + type SendOpenAckIn = &'a QoS; type SendOpenAckOut = Option; async fn send_open_ack( self, diff --git a/io/zenoh-transport/src/unicast/establishment/open.rs b/io/zenoh-transport/src/unicast/establishment/open.rs index a9e797228e..5ec16b4ed8 100644 --- a/io/zenoh-transport/src/unicast/establishment/open.rs +++ b/io/zenoh-transport/src/unicast/establishment/open.rs @@ -18,9 +18,9 @@ use zenoh_buffers::ZSlice; #[cfg(feature = "transport_auth")] use zenoh_core::zasynclock; use zenoh_core::{zcondfeat, zerror}; -use zenoh_link::LinkUnicast; +use zenoh_link::{EndPoint, LinkUnicast}; use zenoh_protocol::{ - core::{Field, Resolution, WhatAmI, ZenohIdProto}, + core::{Field, Reliability, Resolution, WhatAmI, ZenohIdProto}, transport::{ batch_size, close, BatchSize, Close, InitSyn, OpenSyn, TransportBody, TransportMessage, TransportSn, @@ -52,7 +52,7 @@ type OpenError = (zenoh_result::Error, Option); struct StateTransport { batch_size: BatchSize, resolution: Resolution, - ext_qos: ext::qos::StateOpen, + ext_qos: ext::qos::QoS, #[cfg(feature = "transport_multilink")] ext_mlink: ext::multilink::StateOpen, #[cfg(feature = "shared-memory")] @@ -537,18 +537,22 @@ impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> { } pub(crate) async fn open_link( + endpoint: EndPoint, link: LinkUnicast, manager: &TransportManager, ) -> ZResult { + let direction = TransportLinkUnicastDirection::Outbound; let is_streamed = link.is_streamed(); let config = TransportLinkUnicastConfig { - direction: TransportLinkUnicastDirection::Outbound, + direction, batch: BatchConfig { mtu: link.get_mtu(), is_streamed, #[cfg(feature = "transport_compression")] is_compression: false, // Perform the exchange Init/Open exchange with no compression }, + priorities: None, + reliability: Reliability::from(link.is_reliable()), }; let mut link = TransportLinkUnicast::new(link, config); let mut fsm = OpenLink { @@ -577,7 +581,7 @@ pub(crate) async fn open_link( .min(batch_size::UNICAST) .min(link.config.batch.mtu), resolution: manager.config.resolution, - ext_qos: ext::qos::StateOpen::new(manager.config.unicast.is_qos), + ext_qos: ext::qos::QoS::new(manager.config.unicast.is_qos, &endpoint)?, #[cfg(feature = "transport_multilink")] ext_mlink: manager .state @@ -645,7 +649,7 @@ pub(crate) async fn open_link( whatami: iack_out.other_whatami, sn_resolution: state.transport.resolution.get(Field::FrameSN), tx_initial_sn: osyn_out.mine_initial_sn, - is_qos: state.transport.ext_qos.is_qos(), + is_qos: state.transport.ext_qos.is_enabled(), #[cfg(feature = "transport_multilink")] multilink: state.transport.ext_mlink.multilink(), #[cfg(feature = "shared-memory")] @@ -659,13 +663,15 @@ pub(crate) async fn open_link( }; let o_config = TransportLinkUnicastConfig { - direction: TransportLinkUnicastDirection::Outbound, + direction, batch: BatchConfig { mtu: state.transport.batch_size, is_streamed, #[cfg(feature = "transport_compression")] is_compression: state.link.ext_compression.is_compression(), }, + priorities: state.transport.ext_qos.priorities(), + reliability: Reliability::from(link.link.is_reliable()), }; let o_link = link.reconfigure(o_config); let s_link = format!("{:?}", o_link); diff --git a/io/zenoh-transport/src/unicast/link.rs b/io/zenoh-transport/src/unicast/link.rs index 736360db63..6e8425f056 100644 --- a/io/zenoh-transport/src/unicast/link.rs +++ b/io/zenoh-transport/src/unicast/link.rs @@ -16,7 +16,10 @@ use std::{fmt, sync::Arc}; use zenoh_buffers::{BBuf, ZSlice, ZSliceBuffer}; use zenoh_core::zcondfeat; use zenoh_link::{Link, LinkUnicast}; -use zenoh_protocol::transport::{BatchSize, Close, OpenAck, TransportMessage}; +use zenoh_protocol::{ + core::{PriorityRange, Reliability}, + transport::{BatchSize, Close, OpenAck, TransportMessage}, +}; use zenoh_result::{zerror, ZResult}; use crate::common::batch::{BatchConfig, Decode, Encode, Finalize, RBatch, WBatch}; @@ -32,6 +35,8 @@ pub(crate) struct TransportLinkUnicastConfig { // Inbound / outbound pub(crate) direction: TransportLinkUnicastDirection, pub(crate) batch: BatchConfig, + pub(crate) priorities: Option, + pub(crate) reliability: Reliability, } #[derive(Clone, PartialEq, Eq)] @@ -55,7 +60,7 @@ impl TransportLinkUnicast { } pub(crate) fn link(&self) -> Link { - (&self.link).into() + Link::new_unicast(&self.link, self.config.priorities, self.config.reliability) } pub(crate) fn tx(&self) -> TransportLinkUnicastTx { @@ -77,7 +82,7 @@ impl TransportLinkUnicast { pub(crate) fn rx(&self) -> TransportLinkUnicastRx { TransportLinkUnicastRx { link: self.link.clone(), - batch: self.config.batch, + config: self.config.clone(), } } @@ -121,18 +126,6 @@ impl fmt::Debug for TransportLinkUnicast { } } -impl From<&TransportLinkUnicast> for Link { - fn from(link: &TransportLinkUnicast) -> Self { - Link::from(&link.link) - } -} - -impl From for Link { - fn from(link: TransportLinkUnicast) -> Self { - Link::from(&link.link) - } -} - impl PartialEq for TransportLinkUnicast { fn eq(&self, other: &Link) -> bool { &other.src == self.link.get_src() && &other.dst == self.link.get_dst() @@ -201,7 +194,7 @@ impl fmt::Debug for TransportLinkUnicastTx { pub(crate) struct TransportLinkUnicastRx { pub(crate) link: LinkUnicast, - pub(crate) batch: BatchConfig, + pub(crate) config: TransportLinkUnicastConfig, } impl TransportLinkUnicastRx { @@ -235,7 +228,7 @@ impl TransportLinkUnicastRx { let buffer = ZSlice::new(Arc::new(into), 0, end) .map_err(|_| zerror!("{ERR}{self}. ZSlice index(es) out of bounds"))?; - let mut batch = RBatch::new(self.batch, buffer); + let mut batch = RBatch::new(self.config.batch, buffer); batch .initialize(buff) .map_err(|e| zerror!("{ERR}{self}. {e}."))?; @@ -246,7 +239,7 @@ impl TransportLinkUnicastRx { } pub async fn recv(&mut self) -> ZResult { - let mtu = self.batch.mtu as usize; + let mtu = self.config.batch.mtu as usize; let mut batch = self .recv_batch(|| zenoh_buffers::vec::uninit(mtu).into_boxed_slice()) .await?; @@ -259,7 +252,7 @@ impl TransportLinkUnicastRx { impl fmt::Display for TransportLinkUnicastRx { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}:{:?}", self.link, self.batch) + write!(f, "{}:{:?}", self.link, self.config) } } @@ -267,7 +260,7 @@ impl fmt::Debug for TransportLinkUnicastRx { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("TransportLinkUnicastRx") .field("link", &self.link) - .field("config", &self.batch) + .field("config", &self.config) .finish() } } diff --git a/io/zenoh-transport/src/unicast/lowlatency/link.rs b/io/zenoh-transport/src/unicast/lowlatency/link.rs index 3ba1cd724f..950d11f3c2 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/link.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/link.rs @@ -152,7 +152,7 @@ impl TransportUnicastLowlatency { // The pool of buffers let pool = { - let mtu = link_rx.batch.mtu as usize; + let mtu = link_rx.config.batch.mtu as usize; let mut n = rx_buffer_size / mtu; if rx_buffer_size % mtu != 0 { n += 1; diff --git a/io/zenoh-transport/src/unicast/manager.rs b/io/zenoh-transport/src/unicast/manager.rs index bff221323e..d0fbecad5f 100644 --- a/io/zenoh-transport/src/unicast/manager.rs +++ b/io/zenoh-transport/src/unicast/manager.rs @@ -615,7 +615,7 @@ impl TransportManager { }, { tracing::debug!( - "New transport opened between {} and {} - whatami: {}, sn resolution: {:?}, initial sn: {:?}, qos: {}, multilink: {}, lowlatency: {}", + "New transport opened between {} and {} - whatami: {}, sn resolution: {:?}, initial sn: {:?}, qos: {:?}, multilink: {}, lowlatency: {}", self.config.zid, config.zid, config.whatami, @@ -707,9 +707,9 @@ impl TransportManager { }; // Create a new link associated by calling the Link Manager - let link = manager.new_link(endpoint).await?; + let link = manager.new_link(endpoint.clone()).await?; // Open the link - super::establishment::open::open_link(link, self).await + super::establishment::open::open_link(endpoint, link, self).await } pub async fn get_transport_unicast(&self, peer: &ZenohIdProto) -> Option { @@ -736,7 +736,8 @@ impl TransportManager { Ok(()) } - pub(crate) async fn handle_new_link_unicast(&self, link: LinkUnicast) { + pub(crate) async fn handle_new_link_unicast(&self, new_link: NewLinkUnicast) { + let NewLinkUnicast { link, endpoint } = new_link; let incoming_counter = self.state.unicast.incoming.clone(); if incoming_counter.load(SeqCst) >= self.config.unicast.accept_pending { // We reached the limit of concurrent incoming transport, this means two things: @@ -759,7 +760,7 @@ impl TransportManager { .spawn_with_rt(zenoh_runtime::ZRuntime::Acceptor, async move { if tokio::time::timeout( c_manager.config.unicast.accept_timeout, - super::establishment::accept::accept_link(link, &c_manager), + super::establishment::accept::accept_link(endpoint, link, &c_manager), ) .await .is_err() diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index fff842c255..960b2e1f34 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -15,6 +15,7 @@ use std::time::Duration; use tokio_util::{sync::CancellationToken, task::TaskTracker}; use zenoh_buffers::ZSliceBuffer; +use zenoh_link::Link; use zenoh_protocol::transport::{KeepAlive, TransportMessage}; use zenoh_result::{zerror, ZResult}; use zenoh_sync::{RecyclingObject, RecyclingObjectPool}; @@ -113,6 +114,8 @@ impl TransportLinkUnicastUniversal { } pub(super) fn start_rx(&mut self, transport: TransportUnicastUniversal, lease: Duration) { + let priorities = self.link.config.priorities; + let reliability = self.link.config.reliability; let mut rx = self.link.rx(); let token = self.token.clone(); let task = async move { @@ -133,8 +136,12 @@ impl TransportLinkUnicastUniversal { // Spawn a task to avoid a deadlock waiting for this same task // to finish in the close() joining its handle // WARN: Must be spawned on RX - zenoh_runtime::ZRuntime::RX - .spawn(async move { transport.del_link((&rx.link).into()).await }); + + zenoh_runtime::ZRuntime::RX.spawn(async move { + transport + .del_link(Link::new_unicast(&rx.link, priorities, reliability)) + .await + }); // // WARN: This ZRuntime blocks // zenoh_runtime::ZRuntime::Net @@ -248,14 +255,14 @@ async fn rx_task( } // The pool of buffers - let mtu = link.batch.mtu as usize; + let mtu = link.config.batch.mtu as usize; let mut n = rx_buffer_size / mtu; if rx_buffer_size % mtu != 0 { n += 1; } let pool = RecyclingObjectPool::new(n, || vec![0_u8; mtu].into_boxed_slice()); - let l = (&link.link).into(); + let l = Link::new_unicast(&link.link, link.config.priorities, link.config.reliability); loop { tokio::select! { diff --git a/io/zenoh-transport/src/unicast/universal/tx.rs b/io/zenoh-transport/src/unicast/universal/tx.rs index f7754489ef..4fadcdb147 100644 --- a/io/zenoh-transport/src/unicast/universal/tx.rs +++ b/io/zenoh-transport/src/unicast/universal/tx.rs @@ -11,51 +11,81 @@ // Contributors: // ZettaScale Zenoh Team, // -use zenoh_core::zread; -use zenoh_protocol::network::NetworkMessage; +use zenoh_protocol::{ + core::{PriorityRange, Reliability}, + network::NetworkMessage, +}; use super::transport::TransportUnicastUniversal; #[cfg(feature = "shared-memory")] use crate::shm::map_zmsg_to_partner; +use crate::unicast::transport_unicast_inner::TransportUnicastTrait; impl TransportUnicastUniversal { fn schedule_on_link(&self, msg: NetworkMessage) -> bool { - macro_rules! zpush { - ($guard:expr, $pipeline:expr, $msg:expr) => { - // Drop the guard before the push_zenoh_message since - // the link could be congested and this operation could - // block for fairly long time - let pl = $pipeline.clone(); - drop($guard); - tracing::trace!("Scheduled: {:?}", $msg); - return pl.push_network_message($msg); - }; - } + let transport_links = self + .links + .read() + .expect("reading `TransportUnicastUniversal::links` should not fail"); - let guard = zread!(self.links); - // First try to find the best match between msg and link reliability - if let Some(pl) = guard.iter().find_map(|tl| { - if msg.is_reliable() == tl.link.link.is_reliable() { - Some(&tl.pipeline) - } else { - None - } - }) { - zpush!(guard, pl, msg); - } + let msg_reliability = Reliability::from(msg.is_reliable()); - // No best match found, take the first available link - if let Some(pl) = guard.iter().map(|tl| &tl.pipeline).next() { - zpush!(guard, pl, msg); - } + let (full_match, partial_match, any_match) = transport_links.iter().enumerate().fold( + (None::<(_, PriorityRange)>, None, None), + |(mut full_match, mut partial_match, mut any_match), (i, transport_link)| { + let reliability = transport_link.link.config.reliability == msg_reliability; + let priorities = transport_link + .link + .config + .priorities + .and_then(|range| range.includes(msg.priority()).then_some(range)); - // No Link found - tracing::trace!( - "Message dropped because the transport has no links: {}", - msg + match (reliability, priorities) { + (true, Some(priorities)) => { + match full_match { + Some((_, r)) if priorities.len() < r.len() => { + full_match = Some((i, priorities)) + } + None => full_match = Some((i, priorities)), + _ => {} + }; + } + (true, None) if partial_match.is_none() => partial_match = Some(i), + _ if any_match.is_none() => any_match = Some(i), + _ => {} + }; + + (full_match, partial_match, any_match) + }, ); - false + let Some(transport_link_index) = full_match.map(|(i, _)| i).or(partial_match).or(any_match) + else { + tracing::trace!( + "Message dropped because the transport has no links: {}", + msg + ); + + // No Link found + return false; + }; + + let transport_link = transport_links + .get(transport_link_index) + .expect("transport link index should be valid"); + + let pipeline = transport_link.pipeline.clone(); + tracing::trace!( + "Scheduled {:?} for transmission to {} ({})", + msg, + transport_link.link.link.get_dst(), + self.get_zid() + ); + // Drop the guard before the push_zenoh_message since + // the link could be congested and this operation could + // block for fairly long time + drop(transport_links); + pipeline.push_network_message(msg) } #[allow(unused_mut)] // When feature "shared-memory" is not enabled