diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 2ee6c5ddb5..cd05f4ae01 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -30,6 +30,8 @@ /// The list of endpoints to connect to. /// Accepts a single list (e.g. endpoints: ["tcp/10.10.10.10:7447", "tcp/11.11.11.11:7447"]) /// or different lists for router, peer and client (e.g. endpoints: { router: ["tcp/10.10.10.10:7447"], peer: ["tcp/11.11.11.11:7447"] }). + /// + /// See https://docs.rs/zenoh/latest/zenoh/config/struct.EndPoint.html endpoints: [ // "/
" ], @@ -67,6 +69,8 @@ /// The list of endpoints to listen on. /// Accepts a single list (e.g. endpoints: ["tcp/[::]:7447", "udp/[::]:7447"]) /// or different lists for router, peer and client (e.g. endpoints: { router: ["tcp/[::]:7447"], peer: ["tcp/[::]:0"] }). + /// + /// See https://docs.rs/zenoh/latest/zenoh/config/struct.EndPoint.html endpoints: { router: ["tcp/[::]:7447"], peer: ["tcp/[::]:0"] }, /// Global listen configuration, @@ -333,11 +337,11 @@ }, }, link: { - /// An optional whitelist of protocols to be used for accepting and opening sessions. - /// If not configured, all the supported protocols are automatically whitelisted. - /// The supported protocols are: ["tcp" , "udp", "tls", "quic", "ws", "unixsock-stream", "vsock"] - /// For example, to only enable "tls" and "quic": - // protocols: ["tls", "quic"], + /// An optional whitelist of protocols to be used for accepting and opening sessions. If not + /// configured, all the supported protocols are automatically whitelisted. The supported + /// protocols are: ["tcp" , "udp", "tls", "quic", "ws", "unixsock-stream", "vsock"] For + /// example, to only enable "tls" and "quic": protocols: ["tls", "quic"], + /// /// Configure the zenoh TX parameters of a link tx: { /// The resolution in bits to be used for the message sequence numbers. @@ -394,7 +398,7 @@ enabled: true, /// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens. time_limit: 1, - } + }, }, }, /// Configure the zenoh RX parameters of a link diff --git a/commons/zenoh-codec/src/transport/init.rs b/commons/zenoh-codec/src/transport/init.rs index c559fdbd51..9185955759 100644 --- a/commons/zenoh-codec/src/transport/init.rs +++ b/commons/zenoh-codec/src/transport/init.rs @@ -45,6 +45,7 @@ where resolution, batch_size, ext_qos, + ext_qos_optimized, #[cfg(feature = "shared-memory")] ext_shm, ext_auth, @@ -59,6 +60,7 @@ where header |= flag::S; } let mut n_exts = (ext_qos.is_some() as u8) + + (ext_qos_optimized.is_some() as u8) + (ext_auth.is_some() as u8) + (ext_mlink.is_some() as u8) + (ext_lowlatency.is_some() as u8) @@ -98,6 +100,10 @@ where n_exts -= 1; self.write(&mut *writer, (qos, n_exts != 0))?; } + if let Some(qos_optimized) = ext_qos_optimized.as_ref() { + n_exts -= 1; + self.write(&mut *writer, (qos_optimized, n_exts != 0))?; + } #[cfg(feature = "shared-memory")] if let Some(shm) = ext_shm.as_ref() { n_exts -= 1; @@ -173,6 +179,7 @@ where // Extensions let mut ext_qos = None; + let mut ext_qos_optimized = None; #[cfg(feature = "shared-memory")] let mut ext_shm = None; let mut ext_auth = None; @@ -190,6 +197,11 @@ where ext_qos = Some(q); has_ext = ext; } + ext::QoSOptimized::ID => { + let (q, ext): (ext::QoSOptimized, bool) = eodec.read(&mut *reader)?; + ext_qos_optimized = Some(q); + has_ext = ext; + } #[cfg(feature = "shared-memory")] ext::Shm::ID => { let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?; @@ -229,6 +241,7 @@ where resolution, batch_size, ext_qos, + ext_qos_optimized, #[cfg(feature = "shared-memory")] ext_shm, ext_auth, @@ -255,6 +268,7 @@ where batch_size, cookie, ext_qos, + ext_qos_optimized, #[cfg(feature = "shared-memory")] ext_shm, ext_auth, @@ -269,6 +283,7 @@ where header |= flag::S; } let mut n_exts = (ext_qos.is_some() as u8) + + (ext_qos_optimized.is_some() as u8) + (ext_auth.is_some() as u8) + (ext_mlink.is_some() as u8) + (ext_lowlatency.is_some() as u8) @@ -311,6 +326,10 @@ where n_exts -= 1; self.write(&mut *writer, (qos, n_exts != 0))?; } + if let Some(qos_optimized) = ext_qos_optimized.as_ref() { + n_exts -= 1; + self.write(&mut *writer, (qos_optimized, n_exts != 0))?; + } #[cfg(feature = "shared-memory")] if let Some(shm) = ext_shm.as_ref() { n_exts -= 1; @@ -389,6 +408,7 @@ where // Extensions let mut ext_qos = None; + let mut ext_qos_optimized = None; #[cfg(feature = "shared-memory")] let mut ext_shm = None; let mut ext_auth = None; @@ -406,6 +426,11 @@ where ext_qos = Some(q); has_ext = ext; } + ext::QoSOptimized::ID => { + let (q, ext): (ext::QoSOptimized, bool) = eodec.read(&mut *reader)?; + ext_qos_optimized = Some(q); + has_ext = ext; + } #[cfg(feature = "shared-memory")] ext::Shm::ID => { let (s, ext): (ext::Shm, bool) = eodec.read(&mut *reader)?; @@ -446,6 +471,7 @@ where batch_size, cookie, ext_qos, + ext_qos_optimized, #[cfg(feature = "shared-memory")] ext_shm, ext_auth, diff --git a/commons/zenoh-protocol/src/core/endpoint.rs b/commons/zenoh-protocol/src/core/endpoint.rs index 96b9b40665..5f6ac1c38f 100644 --- a/commons/zenoh-protocol/src/core/endpoint.rs +++ b/commons/zenoh-protocol/src/core/endpoint.rs @@ -187,6 +187,9 @@ impl fmt::Debug for AddressMut<'_> { pub struct Metadata<'a>(pub(super) &'a str); impl<'a> Metadata<'a> { + pub const RELIABILITY: &'static str = "reliability"; + pub const PRIORITIES: &'static str = "priorities"; + pub fn as_str(&self) -> &'a str { self.0 } @@ -443,10 +446,29 @@ impl fmt::Debug for ConfigMut<'_> { /// A string that respects the [`EndPoint`] canon form: `[#]`. /// -/// `` is a valid [`Locator`] and `` is of the form `=;...;=` where keys are alphabetically sorted. -/// `` is optional and can be provided to configure some aspectes for an [`EndPoint`], e.g. the interface to listen on or connect to. +/// `` is a valid [`Locator`] and `` is of the form +/// `=;...;=` where keys are alphabetically sorted. `` is +/// optional and can be provided to configure some aspects for an [`EndPoint`], e.g. the interface +/// to listen on or connect to. /// /// A full [`EndPoint`] string is hence in the form of `/
[?][#config]`. +/// +/// ## Metadata +/// +/// - **`priorities`**: a range bounded inclusively below and above (e.g. `2-4` signifies +/// priorities 2, 3 and 4). This value is used to select the link used for transmission based on the +/// Priority of the message in question. +/// +/// For example, `tcp/localhost:7447?priorities=1-3` assigns priorities +/// [`crate::core::Priority::RealTime`], [`crate::core::Priority::InteractiveHigh`] and +/// [`crate::core::Priority::InteractiveLow`] to the established link. +/// +/// - **`reliability`**: either "best_effort" or "reliable". This value is used to select the link +/// used for transmission based on the Reliability of the message in question. +/// +/// For example, `tcp/localhost:7447?priorities=6-7;reliability=best_effort` assigns priorities +/// [`crate::core::Priority::DataLow`] and [`crate::core::Priority::Background`], and +/// [`crate::core::Reliability::BestEffort`] to the established link. #[derive(Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] #[serde(into = "String")] #[serde(try_from = "String")] diff --git a/commons/zenoh-protocol/src/core/locator.rs b/commons/zenoh-protocol/src/core/locator.rs index 14f899e7c6..a34cc55320 100644 --- a/commons/zenoh-protocol/src/core/locator.rs +++ b/commons/zenoh-protocol/src/core/locator.rs @@ -64,6 +64,10 @@ impl Locator { pub fn as_str(&self) -> &str { self.0.as_str() } + + pub fn to_endpoint(&self) -> EndPoint { + self.0.clone() + } } impl From for Locator { diff --git a/commons/zenoh-protocol/src/core/mod.rs b/commons/zenoh-protocol/src/core/mod.rs index 361da4b5da..1c15f3830e 100644 --- a/commons/zenoh-protocol/src/core/mod.rs +++ b/commons/zenoh-protocol/src/core/mod.rs @@ -19,11 +19,13 @@ use alloc::{ }; use core::{ convert::{From, TryFrom, TryInto}, - fmt, + fmt::{self, Display}, hash::Hash, + ops::{Deref, RangeInclusive}, str::FromStr, }; +use serde::Serialize; pub use uhlc::{Timestamp, NTP64}; use zenoh_keyexpr::OwnedKeyExpr; use zenoh_result::{bail, zerror}; @@ -295,7 +297,7 @@ impl EntityGlobalIdProto { } #[repr(u8)] -#[derive(Debug, Default, Copy, Clone, Eq, Hash, PartialEq)] +#[derive(Debug, Default, Copy, Clone, Eq, Hash, PartialEq, PartialOrd, Ord, Serialize)] pub enum Priority { Control = 0, RealTime = 1, @@ -308,6 +310,116 @@ pub enum Priority { Background = 7, } +#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize)] +/// A [`Priority`] range bounded inclusively below and above. +pub struct PriorityRange(RangeInclusive); + +impl Deref for PriorityRange { + type Target = RangeInclusive; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl PriorityRange { + pub fn new(range: RangeInclusive) -> Self { + Self(range) + } + + /// Returns `true` if `self` is a superset of `other`. + pub fn includes(&self, other: &PriorityRange) -> bool { + self.start() <= other.start() && other.end() <= self.end() + } + + pub fn len(&self) -> usize { + *self.end() as usize - *self.start() as usize + 1 + } + + pub fn is_empty(&self) -> bool { + false + } + + #[cfg(feature = "test")] + pub fn rand() -> Self { + use rand::Rng; + let mut rng = rand::thread_rng(); + let start = rng.gen_range(Priority::MAX as u8..Priority::MIN as u8); + let end = rng.gen_range((start + 1)..=Priority::MIN as u8); + + Self(Priority::try_from(start).unwrap()..=Priority::try_from(end).unwrap()) + } +} + +impl Display for PriorityRange { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}..={}", *self.start() as u8, *self.end() as u8) + } +} + +#[derive(Debug)] +pub enum InvalidPriorityRange { + InvalidSyntax { found: String }, + InvalidBound { message: String }, +} + +impl Display for InvalidPriorityRange { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + InvalidPriorityRange::InvalidSyntax { found } => write!(f, "invalid PriorityRange string, expected an range of the form `start..=end` but found {found}"), + InvalidPriorityRange::InvalidBound { message } => write!(f, "invalid PriorityRange bound: {message}"), + } + } +} + +#[cfg(feature = "std")] +impl std::error::Error for InvalidPriorityRange {} + +impl FromStr for PriorityRange { + type Err = InvalidPriorityRange; + + fn from_str(s: &str) -> Result { + const SEPARATOR: &str = "-"; + let mut metadata = s.split(SEPARATOR); + + let start = metadata + .next() + .ok_or_else(|| InvalidPriorityRange::InvalidSyntax { + found: s.to_string(), + })? + .parse::() + .map(Priority::try_from) + .map_err(|err| InvalidPriorityRange::InvalidBound { + message: err.to_string(), + })? + .map_err(|err| InvalidPriorityRange::InvalidBound { + message: err.to_string(), + })?; + + let end = metadata + .next() + .ok_or_else(|| InvalidPriorityRange::InvalidSyntax { + found: s.to_string(), + })? + .parse::() + .map(Priority::try_from) + .map_err(|err| InvalidPriorityRange::InvalidBound { + message: err.to_string(), + })? + .map_err(|err| InvalidPriorityRange::InvalidBound { + message: err.to_string(), + })?; + + if metadata.next().is_some() { + return Err(InvalidPriorityRange::InvalidSyntax { + found: s.to_string(), + }); + }; + + Ok(PriorityRange::new(start..=end)) + } +} + impl Priority { /// Default pub const DEFAULT: Self = Self::Data; @@ -342,7 +454,7 @@ impl TryFrom for Priority { } } -#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)] +#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize)] #[repr(u8)] pub enum Reliability { #[default] @@ -353,6 +465,16 @@ pub enum Reliability { impl Reliability { pub const DEFAULT: Self = Self::Reliable; + const BEST_EFFORT_STR: &'static str = "best_effort"; + const RELIABLE_STR: &'static str = "reliable"; + + pub fn as_str(&self) -> &str { + match self { + Reliability::BestEffort => Reliability::BEST_EFFORT_STR, + Reliability::Reliable => Reliability::RELIABLE_STR, + } + } + #[cfg(feature = "test")] pub fn rand() -> Self { use rand::Rng; @@ -367,6 +489,59 @@ impl Reliability { } } +impl From for Reliability { + fn from(value: bool) -> Self { + if value { + Reliability::Reliable + } else { + Reliability::BestEffort + } + } +} + +impl From for bool { + fn from(value: Reliability) -> Self { + match value { + Reliability::BestEffort => false, + Reliability::Reliable => true, + } + } +} + +#[derive(Debug)] +pub struct InvalidReliability { + found: String, +} + +impl Display for InvalidReliability { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "invalid Reliability string, expected `{}` or `{}` but found {}", + Reliability::BEST_EFFORT_STR, + Reliability::RELIABLE_STR, + self.found + ) + } +} + +#[cfg(feature = "std")] +impl std::error::Error for InvalidReliability {} + +impl FromStr for Reliability { + type Err = InvalidReliability; + + fn from_str(s: &str) -> Result { + match s { + Reliability::RELIABLE_STR => Ok(Reliability::Reliable), + Reliability::BEST_EFFORT_STR => Ok(Reliability::BestEffort), + other => Err(InvalidReliability { + found: other.to_string(), + }), + } + } +} + #[derive(Debug, Copy, Clone, PartialEq, Eq, Default)] pub struct Channel { pub priority: Priority, diff --git a/commons/zenoh-protocol/src/transport/init.rs b/commons/zenoh-protocol/src/transport/init.rs index 7e56bfd770..7650035d41 100644 --- a/commons/zenoh-protocol/src/transport/init.rs +++ b/commons/zenoh-protocol/src/transport/init.rs @@ -115,6 +115,7 @@ pub struct InitSyn { pub resolution: Resolution, pub batch_size: BatchSize, pub ext_qos: Option, + pub ext_qos_optimized: Option, #[cfg(feature = "shared-memory")] pub ext_shm: Option, pub ext_auth: Option, @@ -126,13 +127,14 @@ pub struct InitSyn { // Extensions pub mod ext { use crate::{ - common::{ZExtUnit, ZExtZBuf}, - zextunit, zextzbuf, + common::{ZExtUnit, ZExtZ64, ZExtZBuf}, + zextunit, zextz64, zextzbuf, }; /// # QoS extension /// Used to negotiate the use of QoS - pub type QoS = zextunit!(0x1, false); + pub type QoS = zextz64!(0x1, false); + pub type QoSOptimized = zextunit!(0x1, false); /// # Shm extension /// Used as challenge for probing shared memory capabilities @@ -161,7 +163,7 @@ impl InitSyn { pub fn rand() -> Self { use rand::Rng; - use crate::common::{ZExtUnit, ZExtZBuf}; + use crate::common::{ZExtUnit, ZExtZ64, ZExtZBuf}; let mut rng = rand::thread_rng(); @@ -170,7 +172,8 @@ impl InitSyn { let zid = ZenohIdProto::default(); let resolution = Resolution::rand(); let batch_size: BatchSize = rng.gen(); - let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); + let ext_qos = rng.gen_bool(0.5).then_some(ZExtZ64::rand()); + let ext_qos_optimized = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); #[cfg(feature = "shared-memory")] let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); @@ -185,6 +188,7 @@ impl InitSyn { resolution, batch_size, ext_qos, + ext_qos_optimized, #[cfg(feature = "shared-memory")] ext_shm, ext_auth, @@ -204,6 +208,7 @@ pub struct InitAck { pub batch_size: BatchSize, pub cookie: ZSlice, pub ext_qos: Option, + pub ext_qos_optimized: Option, #[cfg(feature = "shared-memory")] pub ext_shm: Option, pub ext_auth: Option, @@ -217,7 +222,7 @@ impl InitAck { pub fn rand() -> Self { use rand::Rng; - use crate::common::{ZExtUnit, ZExtZBuf}; + use crate::common::{ZExtUnit, ZExtZ64, ZExtZBuf}; let mut rng = rand::thread_rng(); @@ -231,7 +236,8 @@ impl InitAck { }; let batch_size: BatchSize = rng.gen(); let cookie = ZSlice::rand(64); - let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); + let ext_qos = rng.gen_bool(0.5).then_some(ZExtZ64::rand()); + let ext_qos_optimized = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); #[cfg(feature = "shared-memory")] let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); @@ -247,6 +253,7 @@ impl InitAck { batch_size, cookie, ext_qos, + ext_qos_optimized, #[cfg(feature = "shared-memory")] ext_shm, ext_auth, diff --git a/io/zenoh-link-commons/src/lib.rs b/io/zenoh-link-commons/src/lib.rs index 9ecf1a0dfc..9ca18f8c0e 100644 --- a/io/zenoh-link-commons/src/lib.rs +++ b/io/zenoh-link-commons/src/lib.rs @@ -33,7 +33,10 @@ pub use listener::*; pub use multicast::*; use serde::Serialize; pub use unicast::*; -use zenoh_protocol::{core::Locator, transport::BatchSize}; +use zenoh_protocol::{ + core::{Locator, Metadata, PriorityRange, Reliability}, + transport::BatchSize, +}; use zenoh_result::ZResult; /*************************************/ @@ -48,10 +51,11 @@ pub struct Link { pub dst: Locator, pub group: Option, pub mtu: BatchSize, - pub is_reliable: bool, pub is_streamed: bool, pub interfaces: Vec, pub auth_identifier: LinkAuthId, + pub priorities: Option, + pub reliability: Option, } #[async_trait] @@ -70,51 +74,56 @@ impl fmt::Display for Link { } } -impl From<&LinkUnicast> for Link { - fn from(link: &LinkUnicast) -> Link { +impl Link { + pub fn new_unicast( + link: &LinkUnicast, + priorities: Option, + reliability: Option, + ) -> Self { Link { - src: link.get_src().to_owned(), - dst: link.get_dst().to_owned(), + src: Self::to_patched_locator(link.get_src(), priorities.as_ref(), reliability), + dst: Self::to_patched_locator(link.get_dst(), priorities.as_ref(), reliability), group: None, mtu: link.get_mtu(), - is_reliable: link.is_reliable(), is_streamed: link.is_streamed(), interfaces: link.get_interface_names(), auth_identifier: link.get_auth_id().clone(), + priorities, + reliability, } } -} - -impl From for Link { - fn from(link: LinkUnicast) -> Link { - Link::from(&link) - } -} -impl From<&LinkMulticast> for Link { - fn from(link: &LinkMulticast) -> Link { + pub fn new_multicast(link: &LinkMulticast) -> Self { Link { src: link.get_src().to_owned(), dst: link.get_dst().to_owned(), group: Some(link.get_dst().to_owned()), mtu: link.get_mtu(), - is_reliable: link.is_reliable(), is_streamed: false, interfaces: vec![], auth_identifier: LinkAuthId::default(), + priorities: None, + reliability: None, } } -} -impl From for Link { - fn from(link: LinkMulticast) -> Link { - Link::from(&link) - } -} - -impl PartialEq for Link { - fn eq(&self, other: &LinkUnicast) -> bool { - self.src == *other.get_src() && self.dst == *other.get_dst() + /// Updates the metadata of the `locator` with `priorities` and `reliability`. + fn to_patched_locator( + locator: &Locator, + priorities: Option<&PriorityRange>, + reliability: Option, + ) -> Locator { + let mut locator = locator.clone(); + let mut metadata = locator.metadata_mut(); + reliability + .map(|r| metadata.insert(Metadata::RELIABILITY, r.as_str())) + .transpose() + .expect("adding `reliability` to Locator metadata should not fail"); + priorities + .map(|ps| metadata.insert(Metadata::PRIORITIES, ps.to_string())) + .transpose() + .expect("adding `priorities` to Locator metadata should not fail"); + locator } } diff --git a/io/zenoh-link-commons/src/unicast.rs b/io/zenoh-link-commons/src/unicast.rs index 62f39bf86c..18ee8d2a45 100644 --- a/io/zenoh-link-commons/src/unicast.rs +++ b/io/zenoh-link-commons/src/unicast.rs @@ -37,6 +37,7 @@ pub trait LinkManagerUnicastTrait: Send + Sync { async fn get_locators(&self) -> Vec; } pub type NewLinkChannelSender = flume::Sender; + pub trait ConstructibleLinkManagerUnicast: Sized { fn new(new_link_sender: NewLinkChannelSender, config: T) -> ZResult; } diff --git a/io/zenoh-links/zenoh-link-quic/src/unicast.rs b/io/zenoh-links/zenoh-link-quic/src/unicast.rs index 2e0d9e0a19..266cdb8e31 100644 --- a/io/zenoh-links/zenoh-link-quic/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-quic/src/unicast.rs @@ -332,11 +332,13 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic { // Spawn the accept loop for the listener let token = self.listeners.token.child_token(); - let c_token = token.clone(); - let c_manager = self.manager.clone(); + let task = { + let token = token.clone(); + let manager = self.manager.clone(); - let task = async move { accept_task(quic_endpoint, c_token, c_manager).await }; + async move { accept_task(quic_endpoint, token, manager).await } + }; // Initialize the QuicAcceptor let locator = endpoint.to_locator(); @@ -364,7 +366,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic { } async fn accept_task( - endpoint: quinn::Endpoint, + quic_endpoint: quinn::Endpoint, token: CancellationToken, manager: NewLinkChannelSender, ) -> ZResult<()> { @@ -382,7 +384,7 @@ async fn accept_task( Ok(conn) } - let src_addr = endpoint + let src_addr = quic_endpoint .local_addr() .map_err(|e| zerror!("Can not accept QUIC connections: {}", e))?; @@ -393,7 +395,7 @@ async fn accept_task( tokio::select! { _ = token.cancelled() => break, - res = accept(endpoint.accept()) => { + res = accept(quic_endpoint.accept()) => { match res { Ok(quic_conn) => { // Get the bideractional streams. Note that we don't allow unidirectional streams. diff --git a/io/zenoh-links/zenoh-link-serial/src/unicast.rs b/io/zenoh-links/zenoh-link-serial/src/unicast.rs index 5711e5fe5c..0d6d4b07ee 100644 --- a/io/zenoh-links/zenoh-link-serial/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-serial/src/unicast.rs @@ -332,19 +332,20 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastSerial { // Spawn the accept loop for the listener let token = CancellationToken::new(); - let c_token = token.clone(); let mut listeners = zasyncwrite!(self.listeners); - let c_path = path.clone(); - let c_manager = self.manager.clone(); - let c_listeners = self.listeners.clone(); - - let task = async move { - // Wait for the accept loop to terminate - let res = - accept_read_task(link, c_token, c_manager, c_path.clone(), is_connected).await; - zasyncwrite!(c_listeners).remove(&c_path); - res + let task = { + let token = token.clone(); + let path = path.clone(); + let manager = self.manager.clone(); + let listeners = self.listeners.clone(); + + async move { + // Wait for the accept loop to terminate + let res = accept_read_task(link, token, manager, path.clone(), is_connected).await; + zasyncwrite!(listeners).remove(&path); + res + } }; let handle = zenoh_runtime::ZRuntime::Acceptor.spawn(task); diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index ec2e6e06b7..5b7d7ab7cb 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -354,10 +354,13 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { )?; let token = self.listeners.token.child_token(); - let c_token = token.clone(); - let c_manager = self.manager.clone(); - let task = async move { accept_task(socket, c_token, c_manager).await }; + let task = { + let token = token.clone(); + let manager = self.manager.clone(); + + async move { accept_task(socket, token, manager).await } + }; let locator = endpoint.to_locator(); self.listeners diff --git a/io/zenoh-links/zenoh-link-tls/src/unicast.rs b/io/zenoh-links/zenoh-link-tls/src/unicast.rs index 716eac2121..710b3897d6 100644 --- a/io/zenoh-links/zenoh-link-tls/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tls/src/unicast.rs @@ -362,12 +362,15 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTls { let local_port = local_addr.port(); // Initialize the TlsAcceptor - let acceptor = TlsAcceptor::from(Arc::new(tls_server_config.server_config)); let token = self.listeners.token.child_token(); - let c_token = token.clone(); - let c_manager = self.manager.clone(); - let task = async move { accept_task(socket, acceptor, c_token, c_manager).await }; + let task = { + let acceptor = TlsAcceptor::from(Arc::new(tls_server_config.server_config)); + let token = token.clone(); + let manager = self.manager.clone(); + + async move { accept_task(socket, acceptor, token, manager).await } + }; // Update the endpoint locator address let locator = Locator::new( diff --git a/io/zenoh-links/zenoh-link-udp/src/unicast.rs b/io/zenoh-links/zenoh-link-udp/src/unicast.rs index e67e821363..100f7dab70 100644 --- a/io/zenoh-links/zenoh-link-udp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/unicast.rs @@ -402,10 +402,13 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUdp { )?; let token = self.listeners.token.child_token(); - let c_token = token.clone(); - let c_manager = self.manager.clone(); - let task = async move { accept_read_task(socket, c_token, c_manager).await }; + let task = { + let token = token.clone(); + let manager = self.manager.clone(); + + async move { accept_read_task(socket, token, manager).await } + }; let locator = endpoint.to_locator(); self.listeners diff --git a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs index df93b9cc61..ce690ce60d 100644 --- a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs @@ -273,15 +273,15 @@ async fn handle_incoming_connections( endpoint.metadata(), )?; + let link = Arc::new(UnicastPipe { + r: UnsafeCell::new(dedicated_uplink), + w: UnsafeCell::new(dedicated_downlink), + local, + remote, + }); + // send newly established link to manager - manager - .send_async(LinkUnicast(Arc::new(UnicastPipe { - r: UnsafeCell::new(dedicated_uplink), - w: UnsafeCell::new(dedicated_downlink), - local, - remote, - }))) - .await?; + manager.send_async(LinkUnicast(link)).await?; ZResult::Ok(()) } diff --git a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs index 5632da26f4..248647e7f6 100644 --- a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs @@ -392,15 +392,17 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUnixSocketStream { let c_token = token.clone(); let mut listeners = zasyncwrite!(self.listeners); - let c_manager = self.manager.clone(); - let c_listeners = self.listeners.clone(); - let c_path = local_path_str.to_owned(); - - let task = async move { - // Wait for the accept loop to terminate - let res = accept_task(socket, c_token, c_manager).await; - zasyncwrite!(c_listeners).remove(&c_path); - res + let task = { + let manager = self.manager.clone(); + let listeners = self.listeners.clone(); + let path = local_path_str.to_owned(); + + async move { + // Wait for the accept loop to terminate + let res = accept_task(socket, c_token, manager).await; + zasyncwrite!(listeners).remove(&path); + res + } }; let handle = zenoh_runtime::ZRuntime::Acceptor.spawn(task); diff --git a/io/zenoh-links/zenoh-link-vsock/src/unicast.rs b/io/zenoh-links/zenoh-link-vsock/src/unicast.rs index 979048b585..fb29d6539d 100644 --- a/io/zenoh-links/zenoh-link-vsock/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-vsock/src/unicast.rs @@ -272,20 +272,22 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastVsock { endpoint.config(), )?; let token = CancellationToken::new(); - let c_token = token.clone(); - - let c_manager = self.manager.clone(); let locator = endpoint.to_locator(); let mut listeners = zasyncwrite!(self.listeners); - let c_listeners = self.listeners.clone(); - let c_addr = addr; - let task = async move { - // Wait for the accept loop to terminate - let res = accept_task(listener, c_token, c_manager).await; - zasyncwrite!(c_listeners).remove(&c_addr); - res + + let task = { + let token = token.clone(); + let manager = self.manager.clone(); + let listeners = self.listeners.clone(); + + async move { + // Wait for the accept loop to terminate + let res = accept_task(listener, token, manager).await; + zasyncwrite!(listeners).remove(&addr); + res + } }; let handle = zenoh_runtime::ZRuntime::Acceptor.spawn(task); diff --git a/io/zenoh-links/zenoh-link-ws/src/unicast.rs b/io/zenoh-links/zenoh-link-ws/src/unicast.rs index 193c9a1724..e16b38cc68 100644 --- a/io/zenoh-links/zenoh-link-ws/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-ws/src/unicast.rs @@ -368,16 +368,19 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastWs { // Spawn the accept loop for the listener let token = CancellationToken::new(); - let c_token = token.clone(); - let c_manager = self.manager.clone(); - let c_listeners = self.listeners.clone(); - let c_addr = local_addr; - - let task = async move { - // Wait for the accept loop to terminate - let res = accept_task(socket, c_token, c_manager).await; - zasyncwrite!(c_listeners).remove(&c_addr); - res + + let task = { + let token = token.clone(); + let manager = self.manager.clone(); + let listeners = self.listeners.clone(); + let addr = local_addr; + + async move { + // Wait for the accept loop to terminate + let res = accept_task(socket, token, manager).await; + zasyncwrite!(listeners).remove(&addr); + res + } }; let handle = zenoh_runtime::ZRuntime::Acceptor.spawn(task); diff --git a/io/zenoh-transport/src/manager.rs b/io/zenoh-transport/src/manager.rs index 305ccab574..3746092d4a 100644 --- a/io/zenoh-transport/src/manager.rs +++ b/io/zenoh-transport/src/manager.rs @@ -419,8 +419,8 @@ impl TransportManager { loop { tokio::select! { res = new_unicast_link_receiver.recv_async() => { - if let Ok(link) = res { - this.handle_new_link_unicast(link).await; + if let Ok(new_link) = res { + this.handle_new_link_unicast(new_link).await; } } _ = cancellation_token.cancelled() => { break; } diff --git a/io/zenoh-transport/src/multicast/link.rs b/io/zenoh-transport/src/multicast/link.rs index d0d5ef4fb0..f2258b6935 100644 --- a/io/zenoh-transport/src/multicast/link.rs +++ b/io/zenoh-transport/src/multicast/link.rs @@ -21,7 +21,7 @@ use std::{ use tokio::task::JoinHandle; use zenoh_buffers::{BBuf, ZSlice, ZSliceBuffer}; use zenoh_core::{zcondfeat, zlock}; -use zenoh_link::{Link, LinkMulticast, Locator}; +use zenoh_link::{LinkMulticast, Locator}; use zenoh_protocol::{ core::{Bits, Priority, Resolution, WhatAmI, ZenohIdProto}, transport::{BatchSize, Close, Join, PrioritySn, TransportMessage, TransportSn}, @@ -126,18 +126,6 @@ impl fmt::Debug for TransportLinkMulticast { } } -impl From<&TransportLinkMulticast> for Link { - fn from(link: &TransportLinkMulticast) -> Self { - Link::from(&link.link) - } -} - -impl From for Link { - fn from(link: TransportLinkMulticast) -> Self { - Link::from(link.link) - } -} - pub(crate) struct TransportLinkMulticastTx { pub(crate) inner: TransportLinkMulticast, pub(crate) buffer: Option, diff --git a/io/zenoh-transport/src/multicast/mod.rs b/io/zenoh-transport/src/multicast/mod.rs index 78d76bb6c8..fbb656264d 100644 --- a/io/zenoh-transport/src/multicast/mod.rs +++ b/io/zenoh-transport/src/multicast/mod.rs @@ -92,7 +92,7 @@ impl TransportMulticast { #[inline(always)] pub fn get_link(&self) -> ZResult { let transport = self.get_transport()?; - Ok(transport.get_link().into()) + Ok(Link::new_multicast(&transport.get_link().link)) } #[inline(always)] diff --git a/io/zenoh-transport/src/multicast/transport.rs b/io/zenoh-transport/src/multicast/transport.rs index f0dfec4813..36b9dbbea0 100644 --- a/io/zenoh-transport/src/multicast/transport.rs +++ b/io/zenoh-transport/src/multicast/transport.rs @@ -333,7 +333,7 @@ impl TransportMulticastInner { /* PEER */ /*************************************/ pub(super) fn new_peer(&self, locator: &Locator, join: Join) -> ZResult<()> { - let mut link = Link::from(self.get_link()); + let mut link = Link::new_multicast(&self.get_link().link); link.dst = locator.clone(); let is_shm = zcondfeat!("shared-memory", join.ext_shm.is_some(), false); @@ -452,7 +452,7 @@ impl TransportMulticastInner { zread!(self.peers) .values() .map(|p| { - let mut link = Link::from(self.get_link()); + let mut link = Link::new_multicast(&self.get_link().link); link.dst = p.locator.clone(); TransportPeer { diff --git a/io/zenoh-transport/src/unicast/establishment/accept.rs b/io/zenoh-transport/src/unicast/establishment/accept.rs index 9d34896d95..d344a0c8d0 100644 --- a/io/zenoh-transport/src/unicast/establishment/accept.rs +++ b/io/zenoh-transport/src/unicast/establishment/accept.rs @@ -224,7 +224,10 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { // Extension QoS self.ext_qos - .recv_init_syn((&mut state.transport.ext_qos, init_syn.ext_qos)) + .recv_init_syn(( + &mut state.transport.ext_qos, + (init_syn.ext_qos, init_syn.ext_qos_optimized), + )) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; @@ -284,7 +287,7 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { let (mut state, input) = input; // Extension QoS - let ext_qos = self + let (ext_qos, ext_qos_optimized) = self .ext_qos .send_init_ack(&state.transport.ext_qos) .await @@ -381,6 +384,7 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { batch_size: state.transport.batch_size, cookie, ext_qos, + ext_qos_optimized, #[cfg(feature = "shared-memory")] ext_shm, ext_auth, @@ -642,16 +646,20 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { } pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) -> ZResult<()> { + let endpoint = link.get_src().to_endpoint(); + let direction = TransportLinkUnicastDirection::Inbound; let mtu = link.get_mtu(); let is_streamed = link.is_streamed(); let config = TransportLinkUnicastConfig { - direction: TransportLinkUnicastDirection::Inbound, + direction, batch: BatchConfig { mtu, is_streamed, #[cfg(feature = "transport_compression")] is_compression: false, }, + priorities: None, + reliability: None, }; let mut link = TransportLinkUnicast::new(link, config); let mut fsm = AcceptLink { @@ -699,7 +707,7 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - transport: StateTransport { batch_size, resolution: manager.config.resolution, - ext_qos: ext::qos::StateAccept::new(manager.config.unicast.is_qos), + ext_qos: ext::qos::StateAccept::new(manager.config.unicast.is_qos, &endpoint)?, #[cfg(feature = "transport_multilink")] ext_mlink: manager .state @@ -781,13 +789,15 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - }; let a_config = TransportLinkUnicastConfig { - direction: TransportLinkUnicastDirection::Inbound, + direction, batch: BatchConfig { mtu: state.transport.batch_size, is_streamed, #[cfg(feature = "transport_compression")] is_compression: state.link.ext_compression.is_compression(), }, + priorities: state.transport.ext_qos.priorities(), + reliability: state.transport.ext_qos.reliability(), }; let a_link = link.reconfigure(a_config); let s_link = format!("{:?}", a_link); diff --git a/io/zenoh-transport/src/unicast/establishment/ext/qos.rs b/io/zenoh-transport/src/unicast/establishment/ext/qos.rs index f749073805..350794ff7b 100644 --- a/io/zenoh-transport/src/unicast/establishment/ext/qos.rs +++ b/io/zenoh-transport/src/unicast/establishment/ext/qos.rs @@ -12,6 +12,7 @@ // ZettaScale Zenoh Team, // use core::marker::PhantomData; +use std::str::FromStr; use async_trait::async_trait; use zenoh_buffers::{ @@ -19,8 +20,13 @@ use zenoh_buffers::{ writer::{DidntWrite, Writer}, }; use zenoh_codec::{RCodec, WCodec, Zenoh080}; -use zenoh_protocol::transport::{init, open}; -use zenoh_result::Error as ZError; +use zenoh_core::zerror; +use zenoh_link::EndPoint; +use zenoh_protocol::{ + core::{Metadata, Priority, PriorityRange, Reliability}, + transport::{init, open}, +}; +use zenoh_result::{Error as ZError, ZResult}; use crate::unicast::establishment::{AcceptFsm, OpenFsm}; @@ -35,21 +41,226 @@ impl<'a> QoSFsm<'a> { } } -/*************************************/ -/* OPEN */ -/*************************************/ -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub(crate) struct StateOpen { - is_qos: bool, +#[derive(Clone, Debug, PartialEq, Eq)] +enum State { + NoQoS, + QoS { + reliability: Option, + priorities: Option, + }, +} + +impl State { + fn new(is_qos: bool, endpoint: &EndPoint) -> ZResult { + if !is_qos { + Ok(State::NoQoS) + } else { + let metadata = endpoint.metadata(); + + let reliability = metadata + .get(Metadata::RELIABILITY) + .map(Reliability::from_str) + .transpose()?; + + let priorities = metadata + .get(Metadata::PRIORITIES) + .map(PriorityRange::from_str) + .transpose()?; + + Ok(State::QoS { + priorities, + reliability, + }) + } + } + + fn try_from_u64(value: u64) -> ZResult { + match value { + 0b000_u64 => Ok(State::NoQoS), + 0b001_u64 => Ok(State::QoS { + priorities: None, + reliability: None, + }), + value if value & 0b110_u64 != 0 => { + let tag = value & 0b111_u64; + + let priorities = if tag & 0b010_u64 != 0 { + let start = Priority::try_from(((value >> 3) & 0xff) as u8)?; + let end = Priority::try_from(((value >> (3 + 8)) & 0xff) as u8)?; + + Some(PriorityRange::new(start..=end)) + } else { + None + }; + + let reliability = if tag & 0b100_u64 != 0 { + let bit = ((value >> (3 + 8 + 8)) & 0x1) as u8 == 1; + + Some(Reliability::from(bit)) + } else { + None + }; + + Ok(State::QoS { + priorities, + reliability, + }) + } + _ => Err(zerror!("invalid QoS").into()), + } + } + + /// Encodes [`QoS`] as a [`u64`]. + /// + /// This function is used for encoding both of [`StateAccept`] in + /// [`crate::unicast::establishment::cookie::Cookie::ext_qos`] and + /// [`zenoh_protocol::transport::init::ext::QoS`]. + /// + /// The three least significant bits are used to discrimnate five states: + /// + /// 1. QoS is disabled + /// 2. QoS is enabled but no priority range and no reliability setting are available + /// 3. QoS is enabled and priority range is available but reliability is unavailable + /// 4. QoS is enabled and reliability is available but priority range is unavailable + /// 5. QoS is enabled and both priority range and reliability are available + fn to_u64(&self) -> u64 { + match self { + State::NoQoS => 0b000_u64, + State::QoS { + priorities, + reliability, + } => { + if reliability.is_none() && priorities.is_none() { + return 0b001_u64; + } + + let mut value = 0b000_u64; + + if let Some(priorities) = priorities { + value |= 0b010_u64; + value |= (*priorities.start() as u64) << 3; + value |= (*priorities.end() as u64) << (3 + 8); + } + + if let Some(reliability) = reliability { + value |= 0b100_u64; + value |= (bool::from(*reliability) as u64) << (3 + 8 + 8); + } + + value + } + } + } + + fn to_exts(&self) -> (Option, Option) { + match self { + State::NoQoS => (None, None), + State::QoS { + reliability: None, + priorities: None, + } => (None, Some(init::ext::QoSOptimized::new())), + State::QoS { + reliability: Some(_), + .. + } + | State::QoS { + priorities: Some(_), + .. + } => (Some(init::ext::QoS::new(self.to_u64())), None), + } + } + + fn try_from_exts( + (qos, qos_optimized): (Option, Option), + ) -> ZResult { + match (qos, qos_optimized) { + (Some(_), Some(_)) => Err(zerror!( + "Extensions QoS and QoSOptimized cannot both be enabled at once" + ) + .into()), + (None, None) => Ok(State::NoQoS), + (None, Some(_)) => Ok(State::QoS { + reliability: None, + priorities: None, + }), + (Some(qos), None) => State::try_from_u64(qos.value), + } + } + + fn is_qos(&self) -> bool { + matches!(self, State::QoS { .. }) + } + + fn priorities(&self) -> Option { + match self { + State::NoQoS + | State::QoS { + priorities: None, .. + } => None, + State::QoS { + priorities: Some(priorities), + .. + } => Some(priorities.clone()), + } + } + + fn reliability(&self) -> Option { + match self { + State::NoQoS + | State::QoS { + reliability: None, .. + } => None, + State::QoS { + reliability: Some(reliability), + .. + } => Some(*reliability), + } + } + + #[cfg(test)] + fn rand() -> Self { + use rand::Rng; + let mut rng = rand::thread_rng(); + if rng.gen_bool(0.5) { + State::NoQoS + } else { + let priorities = rng.gen_bool(0.5).then(PriorityRange::rand); + let reliability = rng + .gen_bool(0.5) + .then(|| Reliability::from(rng.gen_bool(0.5))); + + State::QoS { + priorities, + reliability, + } + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct StateOpen(State); + +impl From for StateOpen { + fn from(value: State) -> Self { + StateOpen(value) + } } impl StateOpen { - pub(crate) const fn new(is_qos: bool) -> Self { - Self { is_qos } + pub(crate) fn new(is_qos: bool, endpoint: &EndPoint) -> ZResult { + State::new(is_qos, endpoint).map(StateOpen) + } + + pub(crate) fn is_qos(&self) -> bool { + self.0.is_qos() } - pub(crate) const fn is_qos(&self) -> bool { - self.is_qos + pub(crate) fn priorities(&self) -> Option { + self.0.priorities() + } + + pub(crate) fn reliability(&self) -> Option { + self.0.reliability() } } @@ -58,23 +269,76 @@ impl<'a> OpenFsm for &'a QoSFsm<'a> { type Error = ZError; type SendInitSynIn = &'a StateOpen; - type SendInitSynOut = Option; + type SendInitSynOut = (Option, Option); async fn send_init_syn( self, state: Self::SendInitSynIn, ) -> Result { - let output = state.is_qos.then_some(init::ext::QoS::new()); - Ok(output) + Ok(state.0.to_exts()) } - type RecvInitAckIn = (&'a mut StateOpen, Option); + type RecvInitAckIn = ( + &'a mut StateOpen, + (Option, Option), + ); type RecvInitAckOut = (); async fn recv_init_ack( self, input: Self::RecvInitAckIn, ) -> Result { - let (state, other_ext) = input; - state.is_qos &= other_ext.is_some(); + let (state_self, other_ext) = input; + + let state_other = State::try_from_exts(other_ext)?; + + let ( + State::QoS { + reliability: self_reliability, + priorities: self_priorities, + }, + State::QoS { + reliability: other_reliability, + priorities: other_priorities, + }, + ) = (state_self.0.clone(), state_other) + else { + *state_self = State::NoQoS.into(); + return Ok(()); + }; + + let priorities = match (self_priorities, other_priorities) { + (None, priorities) | (priorities, None) => priorities, + (Some(self_priorities), Some(other_priorities)) => { + if other_priorities.includes(&self_priorities) { + Some(self_priorities) + } else { + return Err(zerror!( + "The PriorityRange received in InitAck is not a superset of my PriorityRange" + ) + .into()); + } + } + }; + + let reliability = match (self_reliability, other_reliability) { + (None, reliability) | (reliability, None) => reliability, + (Some(self_reliability), Some(other_reliability)) => { + if self_reliability == other_reliability { + Some(self_reliability) + } else { + return Err(zerror!( + "The Reliability received in InitAck doesn't match my Reliability" + ) + .into()); + } + } + }; + + *state_self = State::QoS { + reliability, + priorities, + } + .into(); + Ok(()) } @@ -97,28 +361,35 @@ impl<'a> OpenFsm for &'a QoSFsm<'a> { } } -/*************************************/ -/* ACCEPT */ -/*************************************/ -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub(crate) struct StateAccept { - is_qos: bool, +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct StateAccept(State); + +impl From for StateAccept { + fn from(value: State) -> Self { + StateAccept(value) + } } impl StateAccept { - pub(crate) const fn new(is_qos: bool) -> Self { - Self { is_qos } + pub(crate) fn new(is_qos: bool, endpoint: &EndPoint) -> ZResult { + State::new(is_qos, endpoint).map(StateAccept::from) } - pub(crate) const fn is_qos(&self) -> bool { - self.is_qos + pub(crate) fn is_qos(&self) -> bool { + self.0.is_qos() + } + + pub(crate) fn priorities(&self) -> Option { + self.0.priorities() + } + + pub(crate) fn reliability(&self) -> Option { + self.0.reliability() } #[cfg(test)] pub(crate) fn rand() -> Self { - use rand::Rng; - let mut rng = rand::thread_rng(); - Self::new(rng.gen_bool(0.5)) + State::rand().into() } } @@ -130,9 +401,7 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &StateAccept) -> Self::Output { - let is_qos = u8::from(x.is_qos); - self.write(&mut *writer, is_qos)?; - Ok(()) + self.write(writer, &x.0.to_u64()) } } @@ -143,9 +412,9 @@ where type Error = DidntRead; fn read(self, reader: &mut R) -> Result { - let is_qos: u8 = self.read(&mut *reader)?; - let is_qos = is_qos == 1; - Ok(StateAccept { is_qos }) + Ok(State::try_from_u64(self.read(reader)?) + .map_err(|_| DidntRead)? + .into()) } } @@ -153,25 +422,78 @@ where impl<'a> AcceptFsm for &'a QoSFsm<'a> { type Error = ZError; - type RecvInitSynIn = (&'a mut StateAccept, Option); + type RecvInitSynIn = ( + &'a mut StateAccept, + (Option, Option), + ); type RecvInitSynOut = (); async fn recv_init_syn( self, input: Self::RecvInitSynIn, ) -> Result { - let (state, other_ext) = input; - state.is_qos &= other_ext.is_some(); + let (state_self, other_ext) = input; + + let state_other = State::try_from_exts(other_ext)?; + + let ( + State::QoS { + reliability: self_reliability, + priorities: self_priorities, + }, + State::QoS { + reliability: other_reliability, + priorities: other_priorities, + }, + ) = (state_self.0.clone(), state_other) + else { + *state_self = State::NoQoS.into(); + return Ok(()); + }; + + let priorities = match (self_priorities, other_priorities) { + (None, priorities) | (priorities, None) => priorities, + (Some(self_priorities), Some(other_priorities)) => { + if self_priorities.includes(&other_priorities) { + Some(other_priorities) + } else { + return Err(zerror!( + "The PriorityRange received in InitSyn is not a subset of my PriorityRange" + ) + .into()); + } + } + }; + + let reliability = match (self_reliability, other_reliability) { + (None, reliability) | (reliability, None) => reliability, + (Some(self_reliability), Some(other_reliability)) => { + if self_reliability == other_reliability { + Some(other_reliability) + } else { + return Err(zerror!( + "The Reliability received in InitSyn doesn't match my Reliability" + ) + .into()); + } + } + }; + + *state_self = State::QoS { + reliability, + priorities, + } + .into(); + Ok(()) } type SendInitAckIn = &'a StateAccept; - type SendInitAckOut = Option; + type SendInitAckOut = (Option, Option); async fn send_init_ack( self, state: Self::SendInitAckIn, ) -> Result { - let output = state.is_qos.then_some(init::ext::QoS::new()); - Ok(output) + Ok(state.0.to_exts()) } type RecvOpenSynIn = (&'a mut StateAccept, Option); @@ -192,3 +514,252 @@ impl<'a> AcceptFsm for &'a QoSFsm<'a> { Ok(None) } } + +#[cfg(test)] +mod tests { + use zenoh_protocol::core::{PriorityRange, Reliability}; + use zenoh_result::ZResult; + + use super::{QoSFsm, State, StateAccept, StateOpen}; + use crate::unicast::establishment::{AcceptFsm, OpenFsm}; + + macro_rules! priority_range { + ($start:literal, $end:literal) => { + PriorityRange::new($start.try_into().unwrap()..=$end.try_into().unwrap()) + }; + } + + async fn test_negotiation( + state_open: &mut StateOpen, + state_accept: &mut StateAccept, + ) -> ZResult<()> { + let fsm = QoSFsm::new(); + + let ext = fsm.send_init_syn(&*state_open).await?; + fsm.recv_init_syn((state_accept, ext)).await?; + + let ext = fsm.send_init_ack(&*state_accept).await?; + fsm.recv_init_ack((state_open, ext)).await?; + + Ok(()) + } + + async fn test_negotiation_ok(state_open: State, state_accept: State, state_expected: State) { + let mut state_open = state_open.into(); + let mut state_accept = state_accept.into(); + + match test_negotiation(&mut state_open, &mut state_accept).await { + Err(err) => panic!("expected `Ok(())`, got: {err}"), + Ok(()) => { + assert_eq!(state_open.0, state_accept.0); + assert_eq!(state_open.0, state_expected); + } + }; + } + + async fn test_negotiation_err(state_open: State, state_accept: State) { + let mut state_open = state_open.into(); + let mut state_accept = state_accept.into(); + + if let Ok(()) = test_negotiation(&mut state_open, &mut state_accept).await { + panic!("expected `Err(_)`, got `Ok(())`") + } + } + + #[tokio::test] + async fn test_priority_range_negotiation_scenario_1() { + test_negotiation_ok( + State::QoS { + priorities: None, + reliability: None, + }, + State::QoS { + priorities: None, + reliability: None, + }, + State::QoS { + priorities: None, + reliability: None, + }, + ) + .await + } + + #[tokio::test] + async fn test_priority_range_negotiation_scenario_2() { + test_negotiation_ok( + State::QoS { + priorities: None, + reliability: None, + }, + State::QoS { + priorities: Some(priority_range!(1, 3)), + reliability: None, + }, + State::QoS { + priorities: Some(priority_range!(1, 3)), + reliability: None, + }, + ) + .await + } + + #[tokio::test] + async fn test_priority_range_negotiation_scenario_3() { + test_negotiation_ok( + State::QoS { + priorities: Some(priority_range!(1, 3)), + reliability: None, + }, + State::QoS { + priorities: None, + reliability: None, + }, + State::QoS { + priorities: Some(priority_range!(1, 3)), + reliability: None, + }, + ) + .await + } + + #[tokio::test] + async fn test_priority_range_negotiation_scenario_4() { + test_negotiation_ok( + State::QoS { + priorities: Some(priority_range!(1, 3)), + reliability: None, + }, + State::QoS { + priorities: Some(priority_range!(1, 3)), + reliability: None, + }, + State::QoS { + priorities: Some(priority_range!(1, 3)), + reliability: None, + }, + ) + .await + } + + #[tokio::test] + async fn test_priority_range_negotiation_scenario_5() { + test_negotiation_ok( + State::QoS { + priorities: Some(priority_range!(1, 3)), + reliability: None, + }, + State::QoS { + priorities: Some(priority_range!(0, 4)), + reliability: None, + }, + State::QoS { + priorities: Some(priority_range!(1, 3)), + reliability: None, + }, + ) + .await + } + + #[tokio::test] + async fn test_priority_range_negotiation_scenario_6() { + test_negotiation_err( + State::QoS { + priorities: Some(priority_range!(1, 3)), + reliability: None, + }, + State::QoS { + priorities: Some(priority_range!(2, 3)), + reliability: None, + }, + ) + .await + } + + #[tokio::test] + async fn test_reliability_negotiation_scenario_2() { + test_negotiation_ok( + State::QoS { + reliability: None, + priorities: None, + }, + State::QoS { + reliability: Some(Reliability::BestEffort), + priorities: None, + }, + State::QoS { + reliability: Some(Reliability::BestEffort), + priorities: None, + }, + ) + .await + } + + #[tokio::test] + async fn test_reliability_negotiation_scenario_3() { + test_negotiation_err( + State::QoS { + reliability: Some(Reliability::Reliable), + priorities: None, + }, + State::QoS { + reliability: Some(Reliability::BestEffort), + priorities: None, + }, + ) + .await + } + + #[tokio::test] + async fn test_reliability_negotiation_scenario_4() { + test_negotiation_ok( + State::QoS { + reliability: Some(Reliability::Reliable), + priorities: None, + }, + State::QoS { + reliability: Some(Reliability::Reliable), + priorities: None, + }, + State::QoS { + reliability: Some(Reliability::Reliable), + priorities: None, + }, + ) + .await + } + + #[tokio::test] + async fn test_reliability_negotiation_scenario_5() { + test_negotiation_err( + State::QoS { + reliability: Some(Reliability::BestEffort), + priorities: None, + }, + State::QoS { + reliability: Some(Reliability::Reliable), + priorities: None, + }, + ) + .await + } + + #[tokio::test] + async fn test_priority_range_and_reliability_negotiation_scenario_1() { + test_negotiation_ok( + State::QoS { + reliability: Some(Reliability::BestEffort), + priorities: Some(priority_range!(1, 3)), + }, + State::QoS { + reliability: Some(Reliability::BestEffort), + priorities: Some(priority_range!(1, 4)), + }, + State::QoS { + reliability: Some(Reliability::BestEffort), + priorities: Some(priority_range!(1, 3)), + }, + ) + .await + } +} diff --git a/io/zenoh-transport/src/unicast/establishment/open.rs b/io/zenoh-transport/src/unicast/establishment/open.rs index ff73e213c2..3e6d49d85f 100644 --- a/io/zenoh-transport/src/unicast/establishment/open.rs +++ b/io/zenoh-transport/src/unicast/establishment/open.rs @@ -18,7 +18,7 @@ use zenoh_buffers::ZSlice; #[cfg(feature = "transport_auth")] use zenoh_core::zasynclock; use zenoh_core::{zcondfeat, zerror}; -use zenoh_link::LinkUnicast; +use zenoh_link::{EndPoint, LinkUnicast}; use zenoh_protocol::{ core::{Field, Resolution, WhatAmI, ZenohIdProto}, transport::{ @@ -139,7 +139,7 @@ impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> { let (link, state, input) = input; // Extension QoS - let ext_qos = self + let (ext_qos, ext_qos_optimized) = self .ext_qos .send_init_syn(&state.transport.ext_qos) .await @@ -199,6 +199,7 @@ impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> { batch_size: state.transport.batch_size, resolution: state.transport.resolution, ext_qos, + ext_qos_optimized, #[cfg(feature = "shared-memory")] ext_shm, ext_auth, @@ -302,7 +303,10 @@ impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> { // Extension QoS self.ext_qos - .recv_init_ack((&mut state.transport.ext_qos, init_ack.ext_qos)) + .recv_init_ack(( + &mut state.transport.ext_qos, + (init_ack.ext_qos, init_ack.ext_qos_optimized), + )) .await .map_err(|e| (e, Some(close::reason::GENERIC)))?; @@ -537,18 +541,22 @@ impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> { } pub(crate) async fn open_link( + endpoint: EndPoint, link: LinkUnicast, manager: &TransportManager, ) -> ZResult { + let direction = TransportLinkUnicastDirection::Outbound; let is_streamed = link.is_streamed(); let config = TransportLinkUnicastConfig { - direction: TransportLinkUnicastDirection::Outbound, + direction, batch: BatchConfig { mtu: link.get_mtu(), is_streamed, #[cfg(feature = "transport_compression")] is_compression: false, // Perform the exchange Init/Open exchange with no compression }, + priorities: None, + reliability: None, }; let mut link = TransportLinkUnicast::new(link, config); let mut fsm = OpenLink { @@ -582,7 +590,7 @@ pub(crate) async fn open_link( transport: StateTransport { batch_size, resolution: manager.config.resolution, - ext_qos: ext::qos::StateOpen::new(manager.config.unicast.is_qos), + ext_qos: ext::qos::StateOpen::new(manager.config.unicast.is_qos, &endpoint)?, #[cfg(feature = "transport_multilink")] ext_mlink: manager .state @@ -664,13 +672,15 @@ pub(crate) async fn open_link( }; let o_config = TransportLinkUnicastConfig { - direction: TransportLinkUnicastDirection::Outbound, + direction, batch: BatchConfig { mtu: state.transport.batch_size, is_streamed, #[cfg(feature = "transport_compression")] is_compression: state.link.ext_compression.is_compression(), }, + priorities: state.transport.ext_qos.priorities(), + reliability: state.transport.ext_qos.reliability(), }; let o_link = link.reconfigure(o_config); let s_link = format!("{:?}", o_link); diff --git a/io/zenoh-transport/src/unicast/link.rs b/io/zenoh-transport/src/unicast/link.rs index 736360db63..8b1b3004a9 100644 --- a/io/zenoh-transport/src/unicast/link.rs +++ b/io/zenoh-transport/src/unicast/link.rs @@ -16,7 +16,10 @@ use std::{fmt, sync::Arc}; use zenoh_buffers::{BBuf, ZSlice, ZSliceBuffer}; use zenoh_core::zcondfeat; use zenoh_link::{Link, LinkUnicast}; -use zenoh_protocol::transport::{BatchSize, Close, OpenAck, TransportMessage}; +use zenoh_protocol::{ + core::{PriorityRange, Reliability}, + transport::{BatchSize, Close, OpenAck, TransportMessage}, +}; use zenoh_result::{zerror, ZResult}; use crate::common::batch::{BatchConfig, Decode, Encode, Finalize, RBatch, WBatch}; @@ -27,11 +30,13 @@ pub(crate) enum TransportLinkUnicastDirection { Outbound, } -#[derive(Clone, Copy, PartialEq, Eq, Debug)] +#[derive(Clone, PartialEq, Eq, Debug)] pub(crate) struct TransportLinkUnicastConfig { // Inbound / outbound pub(crate) direction: TransportLinkUnicastDirection, pub(crate) batch: BatchConfig, + pub(crate) priorities: Option, + pub(crate) reliability: Option, } #[derive(Clone, PartialEq, Eq)] @@ -55,7 +60,11 @@ impl TransportLinkUnicast { } pub(crate) fn link(&self) -> Link { - (&self.link).into() + Link::new_unicast( + &self.link, + self.config.priorities.clone(), + self.config.reliability, + ) } pub(crate) fn tx(&self) -> TransportLinkUnicastTx { @@ -77,7 +86,7 @@ impl TransportLinkUnicast { pub(crate) fn rx(&self) -> TransportLinkUnicastRx { TransportLinkUnicastRx { link: self.link.clone(), - batch: self.config.batch, + config: self.config.clone(), } } @@ -121,18 +130,6 @@ impl fmt::Debug for TransportLinkUnicast { } } -impl From<&TransportLinkUnicast> for Link { - fn from(link: &TransportLinkUnicast) -> Self { - Link::from(&link.link) - } -} - -impl From for Link { - fn from(link: TransportLinkUnicast) -> Self { - Link::from(&link.link) - } -} - impl PartialEq for TransportLinkUnicast { fn eq(&self, other: &Link) -> bool { &other.src == self.link.get_src() && &other.dst == self.link.get_dst() @@ -201,7 +198,7 @@ impl fmt::Debug for TransportLinkUnicastTx { pub(crate) struct TransportLinkUnicastRx { pub(crate) link: LinkUnicast, - pub(crate) batch: BatchConfig, + pub(crate) config: TransportLinkUnicastConfig, } impl TransportLinkUnicastRx { @@ -235,7 +232,7 @@ impl TransportLinkUnicastRx { let buffer = ZSlice::new(Arc::new(into), 0, end) .map_err(|_| zerror!("{ERR}{self}. ZSlice index(es) out of bounds"))?; - let mut batch = RBatch::new(self.batch, buffer); + let mut batch = RBatch::new(self.config.batch, buffer); batch .initialize(buff) .map_err(|e| zerror!("{ERR}{self}. {e}."))?; @@ -246,7 +243,7 @@ impl TransportLinkUnicastRx { } pub async fn recv(&mut self) -> ZResult { - let mtu = self.batch.mtu as usize; + let mtu = self.config.batch.mtu as usize; let mut batch = self .recv_batch(|| zenoh_buffers::vec::uninit(mtu).into_boxed_slice()) .await?; @@ -259,7 +256,7 @@ impl TransportLinkUnicastRx { impl fmt::Display for TransportLinkUnicastRx { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}:{:?}", self.link, self.batch) + write!(f, "{}:{:?}", self.link, self.config) } } @@ -267,7 +264,7 @@ impl fmt::Debug for TransportLinkUnicastRx { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("TransportLinkUnicastRx") .field("link", &self.link) - .field("config", &self.batch) + .field("config", &self.config) .finish() } } diff --git a/io/zenoh-transport/src/unicast/lowlatency/link.rs b/io/zenoh-transport/src/unicast/lowlatency/link.rs index 3ba1cd724f..950d11f3c2 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/link.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/link.rs @@ -152,7 +152,7 @@ impl TransportUnicastLowlatency { // The pool of buffers let pool = { - let mtu = link_rx.batch.mtu as usize; + let mtu = link_rx.config.batch.mtu as usize; let mut n = rx_buffer_size / mtu; if rx_buffer_size % mtu != 0 { n += 1; diff --git a/io/zenoh-transport/src/unicast/manager.rs b/io/zenoh-transport/src/unicast/manager.rs index bff221323e..b9c79b681d 100644 --- a/io/zenoh-transport/src/unicast/manager.rs +++ b/io/zenoh-transport/src/unicast/manager.rs @@ -615,7 +615,7 @@ impl TransportManager { }, { tracing::debug!( - "New transport opened between {} and {} - whatami: {}, sn resolution: {:?}, initial sn: {:?}, qos: {}, multilink: {}, lowlatency: {}", + "New transport opened between {} and {} - whatami: {}, sn resolution: {:?}, initial sn: {:?}, qos: {:?}, multilink: {}, lowlatency: {}", self.config.zid, config.zid, config.whatami, @@ -707,9 +707,9 @@ impl TransportManager { }; // Create a new link associated by calling the Link Manager - let link = manager.new_link(endpoint).await?; + let link = manager.new_link(endpoint.clone()).await?; // Open the link - super::establishment::open::open_link(link, self).await + super::establishment::open::open_link(endpoint, link, self).await } pub async fn get_transport_unicast(&self, peer: &ZenohIdProto) -> Option { diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index fff842c255..bb88a912d4 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -15,6 +15,7 @@ use std::time::Duration; use tokio_util::{sync::CancellationToken, task::TaskTracker}; use zenoh_buffers::ZSliceBuffer; +use zenoh_link::Link; use zenoh_protocol::transport::{KeepAlive, TransportMessage}; use zenoh_result::{zerror, ZResult}; use zenoh_sync::{RecyclingObject, RecyclingObjectPool}; @@ -113,6 +114,8 @@ impl TransportLinkUnicastUniversal { } pub(super) fn start_rx(&mut self, transport: TransportUnicastUniversal, lease: Duration) { + let priorities = self.link.config.priorities.clone(); + let reliability = self.link.config.reliability; let mut rx = self.link.rx(); let token = self.token.clone(); let task = async move { @@ -133,8 +136,12 @@ impl TransportLinkUnicastUniversal { // Spawn a task to avoid a deadlock waiting for this same task // to finish in the close() joining its handle // WARN: Must be spawned on RX - zenoh_runtime::ZRuntime::RX - .spawn(async move { transport.del_link((&rx.link).into()).await }); + + zenoh_runtime::ZRuntime::RX.spawn(async move { + transport + .del_link(Link::new_unicast(&rx.link, priorities, reliability)) + .await + }); // // WARN: This ZRuntime blocks // zenoh_runtime::ZRuntime::Net @@ -248,14 +255,18 @@ async fn rx_task( } // The pool of buffers - let mtu = link.batch.mtu as usize; + let mtu = link.config.batch.mtu as usize; let mut n = rx_buffer_size / mtu; if rx_buffer_size % mtu != 0 { n += 1; } let pool = RecyclingObjectPool::new(n, || vec![0_u8; mtu].into_boxed_slice()); - let l = (&link.link).into(); + let l = Link::new_unicast( + &link.link, + link.config.priorities.clone(), + link.config.reliability, + ); loop { tokio::select! { diff --git a/io/zenoh-transport/src/unicast/universal/transport.rs b/io/zenoh-transport/src/unicast/universal/transport.rs index 47f2ff344c..f01a4a8f18 100644 --- a/io/zenoh-transport/src/unicast/universal/transport.rs +++ b/io/zenoh-transport/src/unicast/universal/transport.rs @@ -42,13 +42,6 @@ use crate::{ TransportManager, TransportPeerEventHandler, }; -macro_rules! zlinkindex { - ($guard:expr, $link:expr) => { - // Compare LinkUnicast link to not compare TransportLinkUnicast direction - $guard.iter().position(|tl| tl.link == $link) - }; -} - /*************************************/ /* UNIVERSAL TRANSPORT */ /*************************************/ @@ -175,7 +168,15 @@ impl TransportUnicastUniversal { let target = { let mut guard = zwrite!(self.links); - if let Some(index) = zlinkindex!(guard, link) { + if let Some(index) = guard.iter().position(|tl| { + // Compare LinkUnicast link to not compare TransportLinkUnicast direction + Link::new_unicast( + &tl.link.link, + tl.link.config.priorities.clone(), + tl.link.config.reliability, + ) + .eq(&link) + }) { let is_last = guard.len() == 1; if is_last { // Close the whole transport diff --git a/io/zenoh-transport/src/unicast/universal/tx.rs b/io/zenoh-transport/src/unicast/universal/tx.rs index f7754489ef..cfad30771d 100644 --- a/io/zenoh-transport/src/unicast/universal/tx.rs +++ b/io/zenoh-transport/src/unicast/universal/tx.rs @@ -11,51 +11,106 @@ // Contributors: // ZettaScale Zenoh Team, // -use zenoh_core::zread; -use zenoh_protocol::network::NetworkMessage; +use zenoh_protocol::{ + core::{Priority, PriorityRange, Reliability}, + network::NetworkMessage, +}; use super::transport::TransportUnicastUniversal; #[cfg(feature = "shared-memory")] use crate::shm::map_zmsg_to_partner; +use crate::unicast::transport_unicast_inner::TransportUnicastTrait; impl TransportUnicastUniversal { - fn schedule_on_link(&self, msg: NetworkMessage) -> bool { - macro_rules! zpush { - ($guard:expr, $pipeline:expr, $msg:expr) => { - // Drop the guard before the push_zenoh_message since - // the link could be congested and this operation could - // block for fairly long time - let pl = $pipeline.clone(); - drop($guard); - tracing::trace!("Scheduled: {:?}", $msg); - return pl.push_network_message($msg); - }; + /// Returns the index of the best matching [`Reliability`]-[`PriorityRange`] pair. + /// + /// The result is either: + /// 1. A "full match" where the pair matches both `reliability` and `priority`. In case of + /// multiple candidates, the pair with the smaller range is selected. + /// 2. A "partial match" where the pair match `reliability` and **not** `priority`. + /// 3. An "any match" where any available pair is selected. + /// + /// If `elements` is empty then [`None`] is returned. + fn select( + elements: impl Iterator)>, + reliability: Reliability, + priority: Priority, + ) -> Option { + #[derive(Default)] + struct Match { + full: Option, + partial: Option, + any: Option, } - let guard = zread!(self.links); - // First try to find the best match between msg and link reliability - if let Some(pl) = guard.iter().find_map(|tl| { - if msg.is_reliable() == tl.link.link.is_reliable() { - Some(&tl.pipeline) - } else { - None - } - }) { - zpush!(guard, pl, msg); - } + let (match_, _) = elements.enumerate().fold( + (Match::default(), Option::::None), + |(mut match_, mut prev_priorities), (i, (r, ps))| { + match (r.eq(&reliability), ps.filter(|ps| ps.contains(&priority))) { + (true, Some(priorities)) + if prev_priorities + .as_ref() + .map_or(true, |ps| ps.len() > priorities.len()) => + { + match_.full = Some(i); + prev_priorities = Some(priorities); + } + (true, None) if match_.partial.is_none() => match_.partial = Some(i), + _ if match_.any.is_none() => match_.any = Some(i), + _ => {} + }; - // No best match found, take the first available link - if let Some(pl) = guard.iter().map(|tl| &tl.pipeline).next() { - zpush!(guard, pl, msg); - } + (match_, prev_priorities) + }, + ); + + match_.full.or(match_.partial).or(match_.any) + } - // No Link found + fn schedule_on_link(&self, msg: NetworkMessage) -> bool { + let transport_links = self + .links + .read() + .expect("reading `TransportUnicastUniversal::links` should not fail"); + + let Some(transport_link_index) = Self::select( + transport_links.iter().map(|tl| { + ( + tl.link + .config + .reliability + .unwrap_or(Reliability::from(tl.link.link.is_reliable())), + tl.link.config.priorities.clone(), + ) + }), + Reliability::from(msg.is_reliable()), + msg.priority(), + ) else { + tracing::trace!( + "Message dropped because the transport has no links: {}", + msg + ); + + // No Link found + return false; + }; + + let transport_link = transport_links + .get(transport_link_index) + .expect("transport link index should be valid"); + + let pipeline = transport_link.pipeline.clone(); tracing::trace!( - "Message dropped because the transport has no links: {}", - msg + "Scheduled {:?} for transmission to {} ({})", + msg, + transport_link.link.link.get_dst(), + self.get_zid() ); - - false + // Drop the guard before the push_zenoh_message since + // the link could be congested and this operation could + // block for fairly long time + drop(transport_links); + pipeline.push_network_message(msg) } #[allow(unused_mut)] // When feature "shared-memory" is not enabled @@ -82,3 +137,73 @@ impl TransportUnicastUniversal { res } } + +#[cfg(test)] +mod tests { + use zenoh_protocol::core::{Priority, PriorityRange, Reliability}; + + use crate::unicast::universal::transport::TransportUnicastUniversal; + + macro_rules! priority_range { + ($start:literal, $end:literal) => { + PriorityRange::new($start.try_into().unwrap()..=$end.try_into().unwrap()) + }; + } + + #[test] + /// Tests the "full match" scenario with exactly one candidate. + fn test_link_selection_scenario_1() { + let selection = TransportUnicastUniversal::select( + [ + (Reliability::Reliable, Some(priority_range!(0, 1))), + (Reliability::Reliable, Some(priority_range!(1, 2))), + (Reliability::BestEffort, Some(priority_range!(0, 1))), + ] + .into_iter(), + Reliability::Reliable, + Priority::try_from(0).unwrap(), + ); + assert_eq!(selection, Some(0)); + } + + #[test] + /// Tests the "full match" scenario with multiple candidates. + fn test_link_selection_scenario_2() { + let selection = TransportUnicastUniversal::select( + [ + (Reliability::Reliable, Some(priority_range!(0, 2))), + (Reliability::Reliable, Some(priority_range!(0, 1))), + ] + .into_iter(), + Reliability::Reliable, + Priority::try_from(0).unwrap(), + ); + assert_eq!(selection, Some(1)); + } + + #[test] + /// Tests the "partial match" scenario. + fn test_link_selection_scenario_3() { + let selection = TransportUnicastUniversal::select( + [ + (Reliability::BestEffort, Some(priority_range!(0, 1))), + (Reliability::Reliable, None), + ] + .into_iter(), + Reliability::Reliable, + Priority::try_from(0).unwrap(), + ); + assert_eq!(selection, Some(1)); + } + + #[test] + /// Tests the "any match" scenario. + fn test_link_selection_scenario_4() { + let selection = TransportUnicastUniversal::select( + [(Reliability::BestEffort, None)].into_iter(), + Reliability::Reliable, + Priority::try_from(0).unwrap(), + ); + assert_eq!(selection, Some(0)); + } +}