diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index e7672c6057..a302a5b8e1 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -331,6 +331,11 @@ /// The supported protocols are: ["tcp" , "udp", "tls", "quic", "ws", "unixsock-stream", "vsock"] /// For example, to only enable "tls" and "quic": // protocols: ["tls", "quic"], + /// + /// Endpoints accept a "priorities" metadata value in the form of a Rust-style half-open range + /// (e.g. `2..4` signifies priorities 2 and 3). This value is used to select the link used + /// for transmission based on the priority of the message in question. + /// /// Configure the zenoh TX parameters of a link tx: { /// The resolution in bits to be used for the message sequence numbers. diff --git a/commons/zenoh-protocol/src/core/mod.rs b/commons/zenoh-protocol/src/core/mod.rs index ebf1bb7f85..9f9bd13797 100644 --- a/commons/zenoh-protocol/src/core/mod.rs +++ b/commons/zenoh-protocol/src/core/mod.rs @@ -24,9 +24,10 @@ use core::{ str::FromStr, }; +use serde::Serialize; pub use uhlc::{Timestamp, NTP64}; use zenoh_keyexpr::OwnedKeyExpr; -use zenoh_result::{bail, zerror}; +use zenoh_result::{bail, zerror, ZResult}; /// The unique Id of the [`HLC`](uhlc::HLC) that generated the concerned [`Timestamp`]. pub type TimestampId = uhlc::ID; @@ -308,6 +309,44 @@ pub enum Priority { Background = 7, } +#[derive(Debug, Default, Copy, Clone, Eq, Hash, PartialEq, Serialize)] +pub struct PriorityRange { + pub start: u8, + pub end: u8, +} + +impl PriorityRange { + pub fn new(start: u8, end: u8) -> ZResult { + if start >= end || start < Priority::MIN as u8 || end > Priority::MAX as u8 + 1 { + bail!("Invalid priority range: {start}..{end}") + }; + + Ok(Self { start, end }) + } + + pub fn includes(&self, priority: Priority) -> bool { + self.start <= (priority as u8) && (priority as u8) < self.end + } + + pub fn start(&self) -> u8 { + self.start + } + + pub fn end(&self) -> u8 { + self.end + } + + #[cfg(feature = "test")] + pub fn rand() -> Self { + use rand::Rng; + let mut rng = rand::thread_rng(); + let start = rng.gen_range(Priority::MIN as u8..Priority::MAX as u8); + let end = rng.gen_range(start..Priority::MAX as u8); + + Self { start, end } + } +} + impl Priority { /// Default pub const DEFAULT: Self = Self::Data; @@ -367,6 +406,16 @@ impl Reliability { } } +impl From for Reliability { + fn from(value: bool) -> Self { + if value { + Reliability::Reliable + } else { + Reliability::BestEffort + } + } +} + #[derive(Debug, Copy, Clone, PartialEq, Eq, Default)] pub struct Channel { pub priority: Priority, diff --git a/commons/zenoh-protocol/src/transport/init.rs b/commons/zenoh-protocol/src/transport/init.rs index 7e56bfd770..0a4e97f95e 100644 --- a/commons/zenoh-protocol/src/transport/init.rs +++ b/commons/zenoh-protocol/src/transport/init.rs @@ -126,13 +126,13 @@ pub struct InitSyn { // Extensions pub mod ext { use crate::{ - common::{ZExtUnit, ZExtZBuf}, - zextunit, zextzbuf, + common::{ZExtUnit, ZExtZ64, ZExtZBuf}, + zextunit, zextz64, zextzbuf, }; /// # QoS extension /// Used to negotiate the use of QoS - pub type QoS = zextunit!(0x1, false); + pub type QoS = zextz64!(0x1, false); /// # Shm extension /// Used as challenge for probing shared memory capabilities @@ -161,7 +161,7 @@ impl InitSyn { pub fn rand() -> Self { use rand::Rng; - use crate::common::{ZExtUnit, ZExtZBuf}; + use crate::common::{ZExtUnit, ZExtZ64, ZExtZBuf}; let mut rng = rand::thread_rng(); @@ -170,7 +170,7 @@ impl InitSyn { let zid = ZenohIdProto::default(); let resolution = Resolution::rand(); let batch_size: BatchSize = rng.gen(); - let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); + let ext_qos = rng.gen_bool(0.5).then_some(ZExtZ64::rand()); #[cfg(feature = "shared-memory")] let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); @@ -217,7 +217,7 @@ impl InitAck { pub fn rand() -> Self { use rand::Rng; - use crate::common::{ZExtUnit, ZExtZBuf}; + use crate::common::{ZExtUnit, ZExtZ64, ZExtZBuf}; let mut rng = rand::thread_rng(); @@ -231,7 +231,7 @@ impl InitAck { }; let batch_size: BatchSize = rng.gen(); let cookie = ZSlice::rand(64); - let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); + let ext_qos = rng.gen_bool(0.5).then_some(ZExtZ64::rand()); #[cfg(feature = "shared-memory")] let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); diff --git a/io/zenoh-link-commons/src/unicast.rs b/io/zenoh-link-commons/src/unicast.rs index 62f39bf86c..3466b3d8fc 100644 --- a/io/zenoh-link-commons/src/unicast.rs +++ b/io/zenoh-link-commons/src/unicast.rs @@ -36,7 +36,13 @@ pub trait LinkManagerUnicastTrait: Send + Sync { async fn get_listeners(&self) -> Vec; async fn get_locators(&self) -> Vec; } -pub type NewLinkChannelSender = flume::Sender; +pub type NewLinkChannelSender = flume::Sender; + +pub struct NewLinkUnicast { + pub link: LinkUnicast, + pub endpoint: EndPoint, +} + pub trait ConstructibleLinkManagerUnicast: Sized { fn new(new_link_sender: NewLinkChannelSender, config: T) -> ZResult; } diff --git a/io/zenoh-links/zenoh-link-quic/src/unicast.rs b/io/zenoh-links/zenoh-link-quic/src/unicast.rs index 2e0d9e0a19..3fdcfdc62d 100644 --- a/io/zenoh-links/zenoh-link-quic/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-quic/src/unicast.rs @@ -27,7 +27,7 @@ use x509_parser::prelude::*; use zenoh_core::zasynclock; use zenoh_link_commons::{ get_ip_interface_names, LinkAuthId, LinkAuthType, LinkManagerUnicastTrait, LinkUnicast, - LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, + LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, NewLinkUnicast, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -332,11 +332,14 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic { // Spawn the accept loop for the listener let token = self.listeners.token.child_token(); - let c_token = token.clone(); - let c_manager = self.manager.clone(); + let task = { + let token = token.clone(); + let manager = self.manager.clone(); + let endpoint = endpoint.clone(); - let task = async move { accept_task(quic_endpoint, c_token, c_manager).await }; + async move { accept_task(endpoint, quic_endpoint, token, manager).await } + }; // Initialize the QuicAcceptor let locator = endpoint.to_locator(); @@ -364,7 +367,8 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastQuic { } async fn accept_task( - endpoint: quinn::Endpoint, + endpoint: EndPoint, + quic_endpoint: quinn::Endpoint, token: CancellationToken, manager: NewLinkChannelSender, ) -> ZResult<()> { @@ -382,7 +386,7 @@ async fn accept_task( Ok(conn) } - let src_addr = endpoint + let src_addr = quic_endpoint .local_addr() .map_err(|e| zerror!("Can not accept QUIC connections: {}", e))?; @@ -393,7 +397,7 @@ async fn accept_task( tokio::select! { _ = token.cancelled() => break, - res = accept(endpoint.accept()) => { + res = accept(quic_endpoint.accept()) => { match res { Ok(quic_conn) => { // Get the bideractional streams. Note that we don't allow unidirectional streams. @@ -429,7 +433,7 @@ async fn accept_task( )); // Communicate the new link to the initial transport manager - if let Err(e) = manager.send_async(LinkUnicast(link)).await { + if let Err(e) = manager.send_async(NewLinkUnicast { link: LinkUnicast(link), endpoint: endpoint.clone() }).await { tracing::error!("{}-{}: {}", file!(), line!(), e) } diff --git a/io/zenoh-links/zenoh-link-serial/src/unicast.rs b/io/zenoh-links/zenoh-link-serial/src/unicast.rs index 5711e5fe5c..b048121a9a 100644 --- a/io/zenoh-links/zenoh-link-serial/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-serial/src/unicast.rs @@ -33,7 +33,7 @@ use z_serial::ZSerial; use zenoh_core::{zasynclock, zasyncread, zasyncwrite}; use zenoh_link_commons::{ ConstructibleLinkManagerUnicast, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, - LinkUnicastTrait, NewLinkChannelSender, + LinkUnicastTrait, NewLinkChannelSender, NewLinkUnicast, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -332,19 +332,23 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastSerial { // Spawn the accept loop for the listener let token = CancellationToken::new(); - let c_token = token.clone(); let mut listeners = zasyncwrite!(self.listeners); - let c_path = path.clone(); - let c_manager = self.manager.clone(); - let c_listeners = self.listeners.clone(); - - let task = async move { - // Wait for the accept loop to terminate - let res = - accept_read_task(link, c_token, c_manager, c_path.clone(), is_connected).await; - zasyncwrite!(c_listeners).remove(&c_path); - res + let task = { + let token = token.clone(); + let path = path.clone(); + let manager = self.manager.clone(); + let listeners = self.listeners.clone(); + let endpoint = endpoint.clone(); + + async move { + // Wait for the accept loop to terminate + let res = + accept_read_task(endpoint, link, token, manager, path.clone(), is_connected) + .await; + zasyncwrite!(listeners).remove(&path); + res + } }; let handle = zenoh_runtime::ZRuntime::Acceptor.spawn(task); @@ -390,6 +394,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastSerial { } async fn accept_read_task( + endpoint: EndPoint, link: Arc, token: CancellationToken, manager: NewLinkChannelSender, @@ -423,7 +428,7 @@ async fn accept_read_task( match res { Ok(link) => { // Communicate the new link to the initial transport manager - if let Err(e) = manager.send_async(LinkUnicast(link.clone())).await { + if let Err(e) = manager.send_async(NewLinkUnicast{ link: LinkUnicast(link.clone()), endpoint: endpoint.clone() }).await { tracing::error!("{}-{}: {}", file!(), line!(), e) } diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index 7532055f8e..f5c1e51e1d 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -21,7 +21,7 @@ use tokio::{ use tokio_util::sync::CancellationToken; use zenoh_link_commons::{ get_ip_interface_names, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, - ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE, + ListenersUnicastIP, NewLinkChannelSender, NewLinkUnicast, BIND_INTERFACE, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -354,10 +354,14 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { )?; let token = self.listeners.token.child_token(); - let c_token = token.clone(); - let c_manager = self.manager.clone(); - let task = async move { accept_task(socket, c_token, c_manager).await }; + let task = { + let token = token.clone(); + let manager = self.manager.clone(); + let endpoint = endpoint.clone(); + + async move { accept_task(endpoint, socket, token, manager).await } + }; let locator = endpoint.to_locator(); self.listeners @@ -421,6 +425,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTcp { } async fn accept_task( + endpoint: EndPoint, socket: TcpListener, token: CancellationToken, manager: NewLinkChannelSender, @@ -457,7 +462,7 @@ async fn accept_task( let link = Arc::new(LinkUnicastTcp::new(stream, src_addr, dst_addr)); // Communicate the new link to the initial transport manager - if let Err(e) = manager.send_async(LinkUnicast(link)).await { + if let Err(e) = manager.send_async(NewLinkUnicast{ link: LinkUnicast(link), endpoint: endpoint.clone() }).await { tracing::error!("{}-{}: {}", file!(), line!(), e) } }, diff --git a/io/zenoh-links/zenoh-link-tls/src/unicast.rs b/io/zenoh-links/zenoh-link-tls/src/unicast.rs index 716eac2121..d75835e653 100644 --- a/io/zenoh-links/zenoh-link-tls/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tls/src/unicast.rs @@ -25,7 +25,7 @@ use x509_parser::prelude::*; use zenoh_core::zasynclock; use zenoh_link_commons::{ get_ip_interface_names, LinkAuthId, LinkAuthType, LinkManagerUnicastTrait, LinkUnicast, - LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, + LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, NewLinkUnicast, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -362,12 +362,16 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTls { let local_port = local_addr.port(); // Initialize the TlsAcceptor - let acceptor = TlsAcceptor::from(Arc::new(tls_server_config.server_config)); let token = self.listeners.token.child_token(); - let c_token = token.clone(); - let c_manager = self.manager.clone(); - let task = async move { accept_task(socket, acceptor, c_token, c_manager).await }; + let task = { + let acceptor = TlsAcceptor::from(Arc::new(tls_server_config.server_config)); + let token = token.clone(); + let manager = self.manager.clone(); + let endpoint = endpoint.clone(); + + async move { accept_task(endpoint, socket, acceptor, token, manager).await } + }; // Update the endpoint locator address let locator = Locator::new( @@ -399,6 +403,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastTls { } async fn accept_task( + endpoint: EndPoint, socket: TcpListener, acceptor: TlsAcceptor, token: CancellationToken, @@ -456,7 +461,7 @@ async fn accept_task( )); // Communicate the new link to the initial transport manager - if let Err(e) = manager.send_async(LinkUnicast(link)).await { + if let Err(e) = manager.send_async(NewLinkUnicast {link: LinkUnicast(link), endpoint: endpoint.clone()}).await { tracing::error!("{}-{}: {}", file!(), line!(), e) } } diff --git a/io/zenoh-links/zenoh-link-udp/src/unicast.rs b/io/zenoh-links/zenoh-link-udp/src/unicast.rs index e67e821363..0c3c6a5039 100644 --- a/io/zenoh-links/zenoh-link-udp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/unicast.rs @@ -25,7 +25,8 @@ use tokio_util::sync::CancellationToken; use zenoh_core::{zasynclock, zlock}; use zenoh_link_commons::{ get_ip_interface_names, ConstructibleLinkManagerUnicast, LinkAuthId, LinkManagerUnicastTrait, - LinkUnicast, LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, BIND_INTERFACE, + LinkUnicast, LinkUnicastTrait, ListenersUnicastIP, NewLinkChannelSender, NewLinkUnicast, + BIND_INTERFACE, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -402,10 +403,14 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUdp { )?; let token = self.listeners.token.child_token(); - let c_token = token.clone(); - let c_manager = self.manager.clone(); - let task = async move { accept_read_task(socket, c_token, c_manager).await }; + let task = { + let token = token.clone(); + let manager = self.manager.clone(); + let endpoint = endpoint.clone(); + + async move { accept_read_task(endpoint, socket, token, manager).await } + }; let locator = endpoint.to_locator(); self.listeners @@ -471,6 +476,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUdp { } async fn accept_read_task( + endpoint: EndPoint, socket: UdpSocket, token: CancellationToken, manager: NewLinkChannelSender, @@ -544,7 +550,7 @@ async fn accept_read_task( LinkUnicastUdpVariant::Unconnected(unconnected), )); // Add the new link to the set of connected peers - if let Err(e) = manager.send_async(LinkUnicast(link)).await { + if let Err(e) = manager.send_async(NewLinkUnicast { link: LinkUnicast(link), endpoint: endpoint.clone() }).await { tracing::error!("{}-{}: {}", file!(), line!(), e) } } diff --git a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs index df93b9cc61..b93bb32398 100644 --- a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs @@ -37,7 +37,7 @@ use unix_named_pipe::{create, open_write}; use zenoh_core::{zasyncread, zasyncwrite, ResolveFuture, Wait}; use zenoh_link_commons::{ ConstructibleLinkManagerUnicast, LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, - LinkUnicastTrait, NewLinkChannelSender, + LinkUnicastTrait, NewLinkChannelSender, NewLinkUnicast, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -273,14 +273,19 @@ async fn handle_incoming_connections( endpoint.metadata(), )?; + let link = Arc::new(UnicastPipe { + r: UnsafeCell::new(dedicated_uplink), + w: UnsafeCell::new(dedicated_downlink), + local, + remote, + }); + // send newly established link to manager manager - .send_async(LinkUnicast(Arc::new(UnicastPipe { - r: UnsafeCell::new(dedicated_uplink), - w: UnsafeCell::new(dedicated_downlink), - local, - remote, - }))) + .send_async(NewLinkUnicast { + link: LinkUnicast(link), + endpoint: endpoint.clone(), + }) .await?; ZResult::Ok(()) diff --git a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs index a07267416d..5864847e8e 100644 --- a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs @@ -28,6 +28,7 @@ use uuid::Uuid; use zenoh_core::{zasyncread, zasyncwrite}; use zenoh_link_commons::{ LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, + NewLinkUnicast, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -392,15 +393,18 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUnixSocketStream { let c_token = token.clone(); let mut listeners = zasyncwrite!(self.listeners); - let c_manager = self.manager.clone(); - let c_listeners = self.listeners.clone(); - let c_path = local_path_str.to_owned(); - - let task = async move { - // Wait for the accept loop to terminate - let res = accept_task(socket, c_token, c_manager).await; - zasyncwrite!(c_listeners).remove(&c_path); - res + let task = { + let manager = self.manager.clone(); + let listeners = self.listeners.clone(); + let path = local_path_str.to_owned(); + let endpoint = endpoint.clone(); + + async move { + // Wait for the accept loop to terminate + let res = accept_task(endpoint, socket, c_token, manager).await; + zasyncwrite!(listeners).remove(&path); + res + } }; let handle = zenoh_runtime::ZRuntime::Acceptor.spawn(task); @@ -459,6 +463,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUnixSocketStream { } async fn accept_task( + endpoint: EndPoint, socket: UnixListener, token: CancellationToken, manager: NewLinkChannelSender, @@ -515,7 +520,7 @@ async fn accept_task( )); // Communicate the new link to the initial transport manager - if let Err(e) = manager.send_async(LinkUnicast(link)).await { + if let Err(e) = manager.send_async(NewLinkUnicast { link: LinkUnicast(link), endpoint: endpoint.clone() }).await { tracing::error!("{}-{}: {}", file!(), line!(), e) } diff --git a/io/zenoh-links/zenoh-link-vsock/src/unicast.rs b/io/zenoh-links/zenoh-link-vsock/src/unicast.rs index e7b261f292..cbdde4b043 100644 --- a/io/zenoh-links/zenoh-link-vsock/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-vsock/src/unicast.rs @@ -29,6 +29,7 @@ use tokio_vsock::{ use zenoh_core::{zasyncread, zasyncwrite}; use zenoh_link_commons::{ LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, + NewLinkUnicast, }; use zenoh_protocol::{ core::{endpoint::Address, EndPoint, Locator}, @@ -272,20 +273,23 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastVsock { endpoint.config(), )?; let token = CancellationToken::new(); - let c_token = token.clone(); - - let c_manager = self.manager.clone(); let locator = endpoint.to_locator(); let mut listeners = zasyncwrite!(self.listeners); - let c_listeners = self.listeners.clone(); - let c_addr = addr; - let task = async move { - // Wait for the accept loop to terminate - let res = accept_task(listener, c_token, c_manager).await; - zasyncwrite!(c_listeners).remove(&c_addr); - res + + let task = { + let token = token.clone(); + let manager = self.manager.clone(); + let listeners = self.listeners.clone(); + let endpoint = endpoint.clone(); + + async move { + // Wait for the accept loop to terminate + let res = accept_task(endpoint.clone(), listener, token, manager).await; + zasyncwrite!(listeners).remove(&addr); + res + } }; let handle = zenoh_runtime::ZRuntime::Acceptor.spawn(task); @@ -329,6 +333,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastVsock { } async fn accept_task( + endpoint: EndPoint, mut socket: VsockListener, token: CancellationToken, manager: NewLinkChannelSender, @@ -356,7 +361,7 @@ async fn accept_task( let link = Arc::new(LinkUnicastVsock::new(stream, src_addr, dst_addr)); // Communicate the new link to the initial transport manager - if let Err(e) = manager.send_async(LinkUnicast(link)).await { + if let Err(e) = manager.send_async(NewLinkUnicast { link: LinkUnicast(link), endpoint: endpoint.clone() }).await { tracing::error!("{}-{}: {}", file!(), line!(), e) } }, diff --git a/io/zenoh-links/zenoh-link-ws/src/unicast.rs b/io/zenoh-links/zenoh-link-ws/src/unicast.rs index 193c9a1724..82dc9993b3 100644 --- a/io/zenoh-links/zenoh-link-ws/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-ws/src/unicast.rs @@ -35,6 +35,7 @@ use tokio_util::sync::CancellationToken; use zenoh_core::{zasynclock, zasyncread, zasyncwrite}; use zenoh_link_commons::{ LinkAuthId, LinkManagerUnicastTrait, LinkUnicast, LinkUnicastTrait, NewLinkChannelSender, + NewLinkUnicast, }; use zenoh_protocol::{ core::{EndPoint, Locator}, @@ -368,16 +369,20 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastWs { // Spawn the accept loop for the listener let token = CancellationToken::new(); - let c_token = token.clone(); - let c_manager = self.manager.clone(); - let c_listeners = self.listeners.clone(); - let c_addr = local_addr; - - let task = async move { - // Wait for the accept loop to terminate - let res = accept_task(socket, c_token, c_manager).await; - zasyncwrite!(c_listeners).remove(&c_addr); - res + + let task = { + let token = token.clone(); + let manager = self.manager.clone(); + let listeners = self.listeners.clone(); + let addr = local_addr; + let endpoint = endpoint.clone(); + + async move { + // Wait for the accept loop to terminate + let res = accept_task(endpoint, socket, token, manager).await; + zasyncwrite!(listeners).remove(&addr); + res + } }; let handle = zenoh_runtime::ZRuntime::Acceptor.spawn(task); @@ -467,6 +472,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastWs { } async fn accept_task( + endpoint: EndPoint, socket: TcpListener, token: CancellationToken, manager: NewLinkChannelSender, @@ -535,7 +541,13 @@ async fn accept_task( let link = Arc::new(LinkUnicastWs::new(stream, src_addr, dst_addr)); // Communicate the new link to the initial transport manager - if let Err(e) = manager.send_async(LinkUnicast(link)).await { + if let Err(e) = manager + .send_async(NewLinkUnicast { + link: LinkUnicast(link), + endpoint: endpoint.clone(), + }) + .await + { tracing::error!("{}-{}: {}", file!(), line!(), e) } } diff --git a/io/zenoh-transport/src/manager.rs b/io/zenoh-transport/src/manager.rs index 305ccab574..3746092d4a 100644 --- a/io/zenoh-transport/src/manager.rs +++ b/io/zenoh-transport/src/manager.rs @@ -419,8 +419,8 @@ impl TransportManager { loop { tokio::select! { res = new_unicast_link_receiver.recv_async() => { - if let Ok(link) = res { - this.handle_new_link_unicast(link).await; + if let Ok(new_link) = res { + this.handle_new_link_unicast(new_link).await; } } _ = cancellation_token.cancelled() => { break; } diff --git a/io/zenoh-transport/src/unicast/establishment/accept.rs b/io/zenoh-transport/src/unicast/establishment/accept.rs index 64949357c6..ae818716bc 100644 --- a/io/zenoh-transport/src/unicast/establishment/accept.rs +++ b/io/zenoh-transport/src/unicast/establishment/accept.rs @@ -20,9 +20,9 @@ use zenoh_buffers::{reader::HasReader, writer::HasWriter, ZSlice}; use zenoh_codec::{RCodec, WCodec, Zenoh080}; use zenoh_core::{zasynclock, zcondfeat, zerror}; use zenoh_crypto::{BlockCipher, PseudoRng}; -use zenoh_link::LinkUnicast; +use zenoh_link::{EndPoint, LinkUnicast}; use zenoh_protocol::{ - core::{Field, Resolution, WhatAmI, ZenohIdProto}, + core::{Field, Reliability, Resolution, WhatAmI, ZenohIdProto}, transport::{ batch_size, close::{self, Close}, @@ -55,7 +55,7 @@ pub(super) type AcceptError = (zenoh_result::Error, Option); struct StateTransport { batch_size: BatchSize, resolution: Resolution, - ext_qos: ext::qos::StateAccept, + ext_qos: ext::qos::QoS, #[cfg(feature = "transport_multilink")] ext_mlink: ext::multilink::StateAccept, #[cfg(feature = "shared-memory")] @@ -636,17 +636,24 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { } } -pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) -> ZResult<()> { +pub(crate) async fn accept_link( + endpoint: EndPoint, + link: LinkUnicast, + manager: &TransportManager, +) -> ZResult<()> { + let direction = TransportLinkUnicastDirection::Inbound; let mtu = link.get_mtu(); let is_streamed = link.is_streamed(); let config = TransportLinkUnicastConfig { - direction: TransportLinkUnicastDirection::Inbound, + direction, batch: BatchConfig { mtu, is_streamed, #[cfg(feature = "transport_compression")] is_compression: false, }, + priorities: None, + reliability: Reliability::from(link.is_reliable()), }; let mut link = TransportLinkUnicast::new(link, config); let mut fsm = AcceptLink { @@ -689,7 +696,7 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - transport: StateTransport { batch_size: manager.config.batch_size.min(batch_size::UNICAST).min(mtu), resolution: manager.config.resolution, - ext_qos: ext::qos::StateAccept::new(manager.config.unicast.is_qos), + ext_qos: ext::qos::QoS::new(manager.config.unicast.is_qos, &endpoint)?, #[cfg(feature = "transport_multilink")] ext_mlink: manager .state @@ -757,7 +764,7 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - whatami: osyn_out.other_whatami, sn_resolution: state.transport.resolution.get(Field::FrameSN), tx_initial_sn: oack_out.open_ack.initial_sn, - is_qos: state.transport.ext_qos.is_qos(), + is_qos: state.transport.ext_qos.is_enabled(), #[cfg(feature = "transport_multilink")] multilink: state.transport.ext_mlink.multilink(), #[cfg(feature = "shared-memory")] @@ -771,13 +778,15 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - }; let a_config = TransportLinkUnicastConfig { - direction: TransportLinkUnicastDirection::Inbound, + direction, batch: BatchConfig { mtu: state.transport.batch_size, is_streamed, #[cfg(feature = "transport_compression")] is_compression: state.link.ext_compression.is_compression(), }, + priorities: state.transport.ext_qos.priorities(), + reliability: Reliability::from(link.link.is_reliable()), }; let a_link = link.reconfigure(a_config); let s_link = format!("{:?}", a_link); diff --git a/io/zenoh-transport/src/unicast/establishment/cookie.rs b/io/zenoh-transport/src/unicast/establishment/cookie.rs index 4220f8e08b..7fb84f258e 100644 --- a/io/zenoh-transport/src/unicast/establishment/cookie.rs +++ b/io/zenoh-transport/src/unicast/establishment/cookie.rs @@ -34,7 +34,7 @@ pub(crate) struct Cookie { pub(crate) batch_size: BatchSize, pub(crate) nonce: u64, // Extensions - pub(crate) ext_qos: ext::qos::StateAccept, + pub(crate) ext_qos: ext::qos::QoS, #[cfg(feature = "transport_multilink")] pub(crate) ext_mlink: ext::multilink::StateAccept, #[cfg(feature = "shared-memory")] @@ -90,7 +90,7 @@ where let batch_size: BatchSize = self.read(&mut *reader)?; let nonce: u64 = self.read(&mut *reader)?; // Extensions - let ext_qos: ext::qos::StateAccept = self.read(&mut *reader)?; + let ext_qos: ext::qos::QoS = self.read(&mut *reader)?; #[cfg(feature = "transport_multilink")] let ext_mlink: ext::multilink::StateAccept = self.read(&mut *reader)?; #[cfg(feature = "shared-memory")] @@ -178,7 +178,7 @@ impl Cookie { resolution: Resolution::rand(), batch_size: rng.gen(), nonce: rng.gen(), - ext_qos: ext::qos::StateAccept::rand(), + ext_qos: ext::qos::QoS::rand(), #[cfg(feature = "transport_multilink")] ext_mlink: ext::multilink::StateAccept::rand(), #[cfg(feature = "shared-memory")] diff --git a/io/zenoh-transport/src/unicast/establishment/ext/qos.rs b/io/zenoh-transport/src/unicast/establishment/ext/qos.rs index f749073805..b2f3bea7c7 100644 --- a/io/zenoh-transport/src/unicast/establishment/ext/qos.rs +++ b/io/zenoh-transport/src/unicast/establishment/ext/qos.rs @@ -14,13 +14,19 @@ use core::marker::PhantomData; use async_trait::async_trait; +use serde::Serialize; use zenoh_buffers::{ reader::{DidntRead, Reader}, writer::{DidntWrite, Writer}, }; use zenoh_codec::{RCodec, WCodec, Zenoh080}; -use zenoh_protocol::transport::{init, open}; -use zenoh_result::Error as ZError; +use zenoh_core::{bail, zerror}; +use zenoh_link::EndPoint; +use zenoh_protocol::{ + core::PriorityRange, + transport::{init, open}, +}; +use zenoh_result::{Error as ZError, ZResult}; use crate::unicast::establishment::{AcceptFsm, OpenFsm}; @@ -35,21 +41,142 @@ impl<'a> QoSFsm<'a> { } } -/*************************************/ -/* OPEN */ -/*************************************/ -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub(crate) struct StateOpen { - is_qos: bool, +#[derive(Clone, Debug, PartialEq, Eq, Serialize)] +pub(crate) enum QoS { + Disabled, + Enabled { priorities: Option }, +} + +impl QoS { + pub(crate) fn new(is_qos: bool, endpoint: &EndPoint) -> ZResult { + if !is_qos { + Ok(Self::Disabled) + } else { + const PRIORITY_METADATA_KEY: &str = "priorities"; + + let endpoint_metadata = endpoint.metadata(); + + let Some(mut priorities) = endpoint_metadata + .get(PRIORITY_METADATA_KEY) + .map(|metadata| metadata.split("..")) + else { + return Ok(Self::Enabled { priorities: None }); + }; + let start = priorities + .next() + .ok_or(zerror!("Invalid priority range syntax"))? + .parse::()?; + + let end = priorities + .next() + .ok_or(zerror!("Invalid priority range syntax"))? + .parse::()?; + + if priorities.next().is_some() { + bail!("Invalid priority range syntax") + }; + + Ok(Self::Enabled { + priorities: Some(PriorityRange::new(start, end)?), + }) + } + } + + fn try_from_u64(value: u64) -> ZResult { + match value { + 0_u64 => Ok(QoS::Disabled), + 1_u64 => Ok(QoS::Enabled { priorities: None }), + mut value if value & 2_u64 == 0 => { + value >>= 1; + let start = (value & 0xff) as u8; + let end = ((value & 0xff00) >> 8) as u8; + + Ok(QoS::Enabled { + priorities: Some(PriorityRange::new(start, end)?), + }) + } + _ => unreachable!(), + } + } + + fn to_u64(&self) -> u64 { + match self { + QoS::Disabled => 0_u64, + QoS::Enabled { priorities: None } => 1_u64, + QoS::Enabled { + priorities: Some(range), + } => ((range.end() as u64) << 9) | ((range.start() as u64) << 1) | 2_u64, + } + } + + fn unify(&self, other: &Self) -> Self { + match (self, other) { + (QoS::Disabled, QoS::Disabled) => QoS::Disabled, + (QoS::Disabled, QoS::Enabled { priorities }) + | (QoS::Enabled { priorities }, QoS::Disabled) => QoS::Enabled { + priorities: priorities.clone(), + }, + (QoS::Enabled { priorities: lhs }, QoS::Enabled { priorities: rhs }) => { + if lhs == rhs { + QoS::Enabled { + priorities: rhs.clone(), + } + } else { + QoS::Enabled { priorities: None } + } + } + } + } + + pub(crate) fn is_enabled(&self) -> bool { + matches!(self, QoS::Enabled { .. }) + } + + pub(crate) fn priorities(&self) -> Option { + match self { + QoS::Disabled | QoS::Enabled { priorities: None } => None, + QoS::Enabled { + priorities: Some(priorities), + } => Some(*priorities), + } + } + + #[cfg(test)] + pub(crate) fn rand() -> Self { + use rand::Rng; + let mut rng = rand::thread_rng(); + if rng.gen_bool(0.5) { + QoS::Disabled + } else if rng.gen_bool(0.5) { + QoS::Enabled { priorities: None } + } else { + QoS::Enabled { + priorities: Some(PriorityRange::rand()), + } + } + } } -impl StateOpen { - pub(crate) const fn new(is_qos: bool) -> Self { - Self { is_qos } +// Codec +impl WCodec<&QoS, &mut W> for Zenoh080 +where + W: Writer, +{ + type Output = Result<(), DidntWrite>; + + fn write(self, writer: &mut W, x: &QoS) -> Self::Output { + self.write(writer, &x.to_u64()) } +} + +impl RCodec for Zenoh080 +where + R: Reader, +{ + type Error = DidntRead; - pub(crate) const fn is_qos(&self) -> bool { - self.is_qos + fn read(self, reader: &mut R) -> Result { + QoS::try_from_u64(self.read(reader)?).map_err(|_| DidntRead) } } @@ -57,28 +184,39 @@ impl StateOpen { impl<'a> OpenFsm for &'a QoSFsm<'a> { type Error = ZError; - type SendInitSynIn = &'a StateOpen; + type SendInitSynIn = &'a QoS; type SendInitSynOut = Option; async fn send_init_syn( self, state: Self::SendInitSynIn, ) -> Result { - let output = state.is_qos.then_some(init::ext::QoS::new()); - Ok(output) + if state.is_enabled() { + Ok(Some(init::ext::QoS::new(state.to_u64()))) + } else { + Ok(None) + } } - type RecvInitAckIn = (&'a mut StateOpen, Option); + type RecvInitAckIn = (&'a mut QoS, Option); type RecvInitAckOut = (); async fn recv_init_ack( self, input: Self::RecvInitAckIn, ) -> Result { - let (state, other_ext) = input; - state.is_qos &= other_ext.is_some(); + let (state_self, other_ext) = input; + + let state_other = if let Some(other_ext) = other_ext { + QoS::try_from_u64(other_ext.value)? + } else { + QoS::Disabled + }; + + *state_self = state_self.unify(&state_other); + Ok(()) } - type SendOpenSynIn = &'a StateOpen; + type SendOpenSynIn = &'a QoS; type SendOpenSynOut = Option; async fn send_open_syn( self, @@ -87,7 +225,7 @@ impl<'a> OpenFsm for &'a QoSFsm<'a> { Ok(None) } - type RecvOpenAckIn = (&'a mut StateOpen, Option); + type RecvOpenAckIn = (&'a mut QoS, Option); type RecvOpenAckOut = (); async fn recv_open_ack( self, @@ -97,84 +235,43 @@ impl<'a> OpenFsm for &'a QoSFsm<'a> { } } -/*************************************/ -/* ACCEPT */ -/*************************************/ -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub(crate) struct StateAccept { - is_qos: bool, -} - -impl StateAccept { - pub(crate) const fn new(is_qos: bool) -> Self { - Self { is_qos } - } - - pub(crate) const fn is_qos(&self) -> bool { - self.is_qos - } - - #[cfg(test)] - pub(crate) fn rand() -> Self { - use rand::Rng; - let mut rng = rand::thread_rng(); - Self::new(rng.gen_bool(0.5)) - } -} - -// Codec -impl WCodec<&StateAccept, &mut W> for Zenoh080 -where - W: Writer, -{ - type Output = Result<(), DidntWrite>; - - fn write(self, writer: &mut W, x: &StateAccept) -> Self::Output { - let is_qos = u8::from(x.is_qos); - self.write(&mut *writer, is_qos)?; - Ok(()) - } -} - -impl RCodec for Zenoh080 -where - R: Reader, -{ - type Error = DidntRead; - - fn read(self, reader: &mut R) -> Result { - let is_qos: u8 = self.read(&mut *reader)?; - let is_qos = is_qos == 1; - Ok(StateAccept { is_qos }) - } -} - #[async_trait] impl<'a> AcceptFsm for &'a QoSFsm<'a> { type Error = ZError; - type RecvInitSynIn = (&'a mut StateAccept, Option); + type RecvInitSynIn = (&'a mut QoS, Option); type RecvInitSynOut = (); async fn recv_init_syn( self, input: Self::RecvInitSynIn, ) -> Result { - let (state, other_ext) = input; - state.is_qos &= other_ext.is_some(); + let (state_self, other_ext) = input; + + let state_other = if let Some(other_ext) = other_ext { + QoS::try_from_u64(other_ext.value)? + } else { + QoS::Disabled + }; + + *state_self = state_self.unify(&state_other); + Ok(()) } - type SendInitAckIn = &'a StateAccept; + type SendInitAckIn = &'a QoS; type SendInitAckOut = Option; async fn send_init_ack( self, state: Self::SendInitAckIn, ) -> Result { - let output = state.is_qos.then_some(init::ext::QoS::new()); - Ok(output) + if state.is_enabled() { + Ok(Some(init::ext::QoS::new(state.to_u64()))) + } else { + Ok(None) + } } - type RecvOpenSynIn = (&'a mut StateAccept, Option); + type RecvOpenSynIn = (&'a mut QoS, Option); type RecvOpenSynOut = (); async fn recv_open_syn( self, @@ -183,7 +280,7 @@ impl<'a> AcceptFsm for &'a QoSFsm<'a> { Ok(()) } - type SendOpenAckIn = &'a StateAccept; + type SendOpenAckIn = &'a QoS; type SendOpenAckOut = Option; async fn send_open_ack( self, diff --git a/io/zenoh-transport/src/unicast/establishment/open.rs b/io/zenoh-transport/src/unicast/establishment/open.rs index a9e797228e..5ec16b4ed8 100644 --- a/io/zenoh-transport/src/unicast/establishment/open.rs +++ b/io/zenoh-transport/src/unicast/establishment/open.rs @@ -18,9 +18,9 @@ use zenoh_buffers::ZSlice; #[cfg(feature = "transport_auth")] use zenoh_core::zasynclock; use zenoh_core::{zcondfeat, zerror}; -use zenoh_link::LinkUnicast; +use zenoh_link::{EndPoint, LinkUnicast}; use zenoh_protocol::{ - core::{Field, Resolution, WhatAmI, ZenohIdProto}, + core::{Field, Reliability, Resolution, WhatAmI, ZenohIdProto}, transport::{ batch_size, close, BatchSize, Close, InitSyn, OpenSyn, TransportBody, TransportMessage, TransportSn, @@ -52,7 +52,7 @@ type OpenError = (zenoh_result::Error, Option); struct StateTransport { batch_size: BatchSize, resolution: Resolution, - ext_qos: ext::qos::StateOpen, + ext_qos: ext::qos::QoS, #[cfg(feature = "transport_multilink")] ext_mlink: ext::multilink::StateOpen, #[cfg(feature = "shared-memory")] @@ -537,18 +537,22 @@ impl<'a, 'b: 'a> OpenFsm for &'a mut OpenLink<'b> { } pub(crate) async fn open_link( + endpoint: EndPoint, link: LinkUnicast, manager: &TransportManager, ) -> ZResult { + let direction = TransportLinkUnicastDirection::Outbound; let is_streamed = link.is_streamed(); let config = TransportLinkUnicastConfig { - direction: TransportLinkUnicastDirection::Outbound, + direction, batch: BatchConfig { mtu: link.get_mtu(), is_streamed, #[cfg(feature = "transport_compression")] is_compression: false, // Perform the exchange Init/Open exchange with no compression }, + priorities: None, + reliability: Reliability::from(link.is_reliable()), }; let mut link = TransportLinkUnicast::new(link, config); let mut fsm = OpenLink { @@ -577,7 +581,7 @@ pub(crate) async fn open_link( .min(batch_size::UNICAST) .min(link.config.batch.mtu), resolution: manager.config.resolution, - ext_qos: ext::qos::StateOpen::new(manager.config.unicast.is_qos), + ext_qos: ext::qos::QoS::new(manager.config.unicast.is_qos, &endpoint)?, #[cfg(feature = "transport_multilink")] ext_mlink: manager .state @@ -645,7 +649,7 @@ pub(crate) async fn open_link( whatami: iack_out.other_whatami, sn_resolution: state.transport.resolution.get(Field::FrameSN), tx_initial_sn: osyn_out.mine_initial_sn, - is_qos: state.transport.ext_qos.is_qos(), + is_qos: state.transport.ext_qos.is_enabled(), #[cfg(feature = "transport_multilink")] multilink: state.transport.ext_mlink.multilink(), #[cfg(feature = "shared-memory")] @@ -659,13 +663,15 @@ pub(crate) async fn open_link( }; let o_config = TransportLinkUnicastConfig { - direction: TransportLinkUnicastDirection::Outbound, + direction, batch: BatchConfig { mtu: state.transport.batch_size, is_streamed, #[cfg(feature = "transport_compression")] is_compression: state.link.ext_compression.is_compression(), }, + priorities: state.transport.ext_qos.priorities(), + reliability: Reliability::from(link.link.is_reliable()), }; let o_link = link.reconfigure(o_config); let s_link = format!("{:?}", o_link); diff --git a/io/zenoh-transport/src/unicast/link.rs b/io/zenoh-transport/src/unicast/link.rs index 736360db63..35f4950088 100644 --- a/io/zenoh-transport/src/unicast/link.rs +++ b/io/zenoh-transport/src/unicast/link.rs @@ -16,7 +16,10 @@ use std::{fmt, sync::Arc}; use zenoh_buffers::{BBuf, ZSlice, ZSliceBuffer}; use zenoh_core::zcondfeat; use zenoh_link::{Link, LinkUnicast}; -use zenoh_protocol::transport::{BatchSize, Close, OpenAck, TransportMessage}; +use zenoh_protocol::{ + core::{PriorityRange, Reliability}, + transport::{BatchSize, Close, OpenAck, TransportMessage}, +}; use zenoh_result::{zerror, ZResult}; use crate::common::batch::{BatchConfig, Decode, Encode, Finalize, RBatch, WBatch}; @@ -32,6 +35,8 @@ pub(crate) struct TransportLinkUnicastConfig { // Inbound / outbound pub(crate) direction: TransportLinkUnicastDirection, pub(crate) batch: BatchConfig, + pub(crate) priorities: Option, + pub(crate) reliability: Reliability, } #[derive(Clone, PartialEq, Eq)] diff --git a/io/zenoh-transport/src/unicast/manager.rs b/io/zenoh-transport/src/unicast/manager.rs index bff221323e..d0fbecad5f 100644 --- a/io/zenoh-transport/src/unicast/manager.rs +++ b/io/zenoh-transport/src/unicast/manager.rs @@ -615,7 +615,7 @@ impl TransportManager { }, { tracing::debug!( - "New transport opened between {} and {} - whatami: {}, sn resolution: {:?}, initial sn: {:?}, qos: {}, multilink: {}, lowlatency: {}", + "New transport opened between {} and {} - whatami: {}, sn resolution: {:?}, initial sn: {:?}, qos: {:?}, multilink: {}, lowlatency: {}", self.config.zid, config.zid, config.whatami, @@ -707,9 +707,9 @@ impl TransportManager { }; // Create a new link associated by calling the Link Manager - let link = manager.new_link(endpoint).await?; + let link = manager.new_link(endpoint.clone()).await?; // Open the link - super::establishment::open::open_link(link, self).await + super::establishment::open::open_link(endpoint, link, self).await } pub async fn get_transport_unicast(&self, peer: &ZenohIdProto) -> Option { @@ -736,7 +736,8 @@ impl TransportManager { Ok(()) } - pub(crate) async fn handle_new_link_unicast(&self, link: LinkUnicast) { + pub(crate) async fn handle_new_link_unicast(&self, new_link: NewLinkUnicast) { + let NewLinkUnicast { link, endpoint } = new_link; let incoming_counter = self.state.unicast.incoming.clone(); if incoming_counter.load(SeqCst) >= self.config.unicast.accept_pending { // We reached the limit of concurrent incoming transport, this means two things: @@ -759,7 +760,7 @@ impl TransportManager { .spawn_with_rt(zenoh_runtime::ZRuntime::Acceptor, async move { if tokio::time::timeout( c_manager.config.unicast.accept_timeout, - super::establishment::accept::accept_link(link, &c_manager), + super::establishment::accept::accept_link(endpoint, link, &c_manager), ) .await .is_err() diff --git a/io/zenoh-transport/src/unicast/universal/tx.rs b/io/zenoh-transport/src/unicast/universal/tx.rs index f7754489ef..11493e75b7 100644 --- a/io/zenoh-transport/src/unicast/universal/tx.rs +++ b/io/zenoh-transport/src/unicast/universal/tx.rs @@ -11,51 +11,60 @@ // Contributors: // ZettaScale Zenoh Team, // -use zenoh_core::zread; -use zenoh_protocol::network::NetworkMessage; +use zenoh_protocol::{core::Reliability, network::NetworkMessage}; use super::transport::TransportUnicastUniversal; #[cfg(feature = "shared-memory")] use crate::shm::map_zmsg_to_partner; +use crate::unicast::transport_unicast_inner::TransportUnicastTrait; impl TransportUnicastUniversal { fn schedule_on_link(&self, msg: NetworkMessage) -> bool { - macro_rules! zpush { - ($guard:expr, $pipeline:expr, $msg:expr) => { - // Drop the guard before the push_zenoh_message since - // the link could be congested and this operation could - // block for fairly long time - let pl = $pipeline.clone(); - drop($guard); - tracing::trace!("Scheduled: {:?}", $msg); - return pl.push_network_message($msg); - }; - } + let transport_links = self + .links + .read() + .expect("reading `TransportUnicastUniversal::links` should not fail"); - let guard = zread!(self.links); - // First try to find the best match between msg and link reliability - if let Some(pl) = guard.iter().find_map(|tl| { - if msg.is_reliable() == tl.link.link.is_reliable() { - Some(&tl.pipeline) - } else { - None - } - }) { - zpush!(guard, pl, msg); - } + let msg_reliability = Reliability::from(msg.is_reliable()); - // No best match found, take the first available link - if let Some(pl) = guard.iter().map(|tl| &tl.pipeline).next() { - zpush!(guard, pl, msg); - } + let Some(transport_link) = transport_links + .iter() + .find(|transport_link| { + transport_link.link.config.reliability == msg_reliability + && transport_link + .link + .config + .priorities + .is_some_and(|p| p.includes(msg.priority())) + }) + .or_else(|| { + transport_links.iter().find(|transport_link| { + transport_link.link.config.reliability == msg_reliability + }) + }) + .or_else(|| transport_links.first()) + else { + tracing::trace!( + "Message dropped because the transport has no links: {}", + msg + ); + + // No Link found + return false; + }; - // No Link found + let pipeline = transport_link.pipeline.clone(); tracing::trace!( - "Message dropped because the transport has no links: {}", - msg + "Scheduled {:?} for transmission to {} ({})", + msg, + transport_link.link.link.get_dst(), + self.get_zid() ); - - false + // Drop the guard before the push_zenoh_message since + // the link could be congested and this operation could + // block for fairly long time + drop(transport_links); + pipeline.push_network_message(msg) } #[allow(unused_mut)] // When feature "shared-memory" is not enabled