From a2ebe189f7668be0d81c13cb27b562dbb619b907 Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Mon, 26 Aug 2024 17:03:09 +0200 Subject: [PATCH] Add wip `QoS`-based priority-to-link dispatch impl --- DEFAULT_CONFIG.json5 | 4 + commons/zenoh-config/src/defaults.rs | 11 +++ commons/zenoh-protocol/src/core/mod.rs | 10 +++ commons/zenoh-protocol/src/transport/init.rs | 14 +-- .../src/unicast/establishment/accept.rs | 13 ++- .../src/unicast/establishment/ext/qos.rs | 85 ++++++++++++++----- .../src/unicast/establishment/mod.rs | 35 +++++++- .../src/unicast/establishment/open.rs | 17 +++- io/zenoh-transport/src/unicast/link.rs | 7 +- .../src/unicast/universal/tx.rs | 71 +++++++++------- 10 files changed, 198 insertions(+), 69 deletions(-) diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index e7672c6057..1e2b0c574d 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -331,6 +331,10 @@ /// The supported protocols are: ["tcp" , "udp", "tls", "quic", "ws", "unixsock-stream", "vsock"] /// For example, to only enable "tls" and "quic": // protocols: ["tls", "quic"], + /// + /// Endpoints accept a "priority" metadata value between 0 and 7 (inclusive). This value is + /// used to select the link used for transmission based on the priority of the message in question. + /// /// Configure the zenoh TX parameters of a link tx: { /// The resolution in bits to be used for the message sequence numbers. diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index cc6bf5854a..27c55cda8c 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -191,6 +191,17 @@ impl Default for LinkTxConf { batch_size: BatchSize::MAX, queue: QueueConf::default(), threads: num, + batching: true, + } + } +} + +impl Default for QueueConf { + fn default() -> Self { + Self { + size: QueueSizeConf::default(), + congestion_control: CongestionControlConf::default(), + backoff: 100, } } } diff --git a/commons/zenoh-protocol/src/core/mod.rs b/commons/zenoh-protocol/src/core/mod.rs index ebf1bb7f85..455b1926ad 100644 --- a/commons/zenoh-protocol/src/core/mod.rs +++ b/commons/zenoh-protocol/src/core/mod.rs @@ -367,6 +367,16 @@ impl Reliability { } } +impl From for Reliability { + fn from(value: bool) -> Self { + if value { + Reliability::Reliable + } else { + Reliability::BestEffort + } + } +} + #[derive(Debug, Copy, Clone, PartialEq, Eq, Default)] pub struct Channel { pub priority: Priority, diff --git a/commons/zenoh-protocol/src/transport/init.rs b/commons/zenoh-protocol/src/transport/init.rs index 7e56bfd770..0a4e97f95e 100644 --- a/commons/zenoh-protocol/src/transport/init.rs +++ b/commons/zenoh-protocol/src/transport/init.rs @@ -126,13 +126,13 @@ pub struct InitSyn { // Extensions pub mod ext { use crate::{ - common::{ZExtUnit, ZExtZBuf}, - zextunit, zextzbuf, + common::{ZExtUnit, ZExtZ64, ZExtZBuf}, + zextunit, zextz64, zextzbuf, }; /// # QoS extension /// Used to negotiate the use of QoS - pub type QoS = zextunit!(0x1, false); + pub type QoS = zextz64!(0x1, false); /// # Shm extension /// Used as challenge for probing shared memory capabilities @@ -161,7 +161,7 @@ impl InitSyn { pub fn rand() -> Self { use rand::Rng; - use crate::common::{ZExtUnit, ZExtZBuf}; + use crate::common::{ZExtUnit, ZExtZ64, ZExtZBuf}; let mut rng = rand::thread_rng(); @@ -170,7 +170,7 @@ impl InitSyn { let zid = ZenohIdProto::default(); let resolution = Resolution::rand(); let batch_size: BatchSize = rng.gen(); - let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); + let ext_qos = rng.gen_bool(0.5).then_some(ZExtZ64::rand()); #[cfg(feature = "shared-memory")] let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); @@ -217,7 +217,7 @@ impl InitAck { pub fn rand() -> Self { use rand::Rng; - use crate::common::{ZExtUnit, ZExtZBuf}; + use crate::common::{ZExtUnit, ZExtZ64, ZExtZBuf}; let mut rng = rand::thread_rng(); @@ -231,7 +231,7 @@ impl InitAck { }; let batch_size: BatchSize = rng.gen(); let cookie = ZSlice::rand(64); - let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); + let ext_qos = rng.gen_bool(0.5).then_some(ZExtZ64::rand()); #[cfg(feature = "shared-memory")] let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); diff --git a/io/zenoh-transport/src/unicast/establishment/accept.rs b/io/zenoh-transport/src/unicast/establishment/accept.rs index 64949357c6..dd4782c0cd 100644 --- a/io/zenoh-transport/src/unicast/establishment/accept.rs +++ b/io/zenoh-transport/src/unicast/establishment/accept.rs @@ -40,7 +40,9 @@ use crate::shm::TransportShmConfig; use crate::{ common::batch::BatchConfig, unicast::{ - establishment::{compute_sn, ext, AcceptFsm, Cookie, Zenoh080Cookie}, + establishment::{ + compute_sn, ext, link_priority, link_reliability, AcceptFsm, Cookie, Zenoh080Cookie, + }, link::{ LinkUnicastWithOpenAck, TransportLinkUnicast, TransportLinkUnicastConfig, TransportLinkUnicastDirection, @@ -637,16 +639,19 @@ impl<'a, 'b: 'a> AcceptFsm for &'a mut AcceptLink<'b> { } pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) -> ZResult<()> { + let direction = TransportLinkUnicastDirection::Inbound; let mtu = link.get_mtu(); let is_streamed = link.is_streamed(); let config = TransportLinkUnicastConfig { - direction: TransportLinkUnicastDirection::Inbound, + direction, batch: BatchConfig { mtu, is_streamed, #[cfg(feature = "transport_compression")] is_compression: false, }, + reliability: link_reliability(&link), + priority: link_priority(&link, direction)?, }; let mut link = TransportLinkUnicast::new(link, config); let mut fsm = AcceptLink { @@ -771,13 +776,15 @@ pub(crate) async fn accept_link(link: LinkUnicast, manager: &TransportManager) - }; let a_config = TransportLinkUnicastConfig { - direction: TransportLinkUnicastDirection::Inbound, + direction, batch: BatchConfig { mtu: state.transport.batch_size, is_streamed, #[cfg(feature = "transport_compression")] is_compression: state.link.ext_compression.is_compression(), }, + reliability: link_reliability(&link.link), + priority: link_priority(&link.link, direction)?, }; let a_link = link.reconfigure(a_config); let s_link = format!("{:?}", a_link); diff --git a/io/zenoh-transport/src/unicast/establishment/ext/qos.rs b/io/zenoh-transport/src/unicast/establishment/ext/qos.rs index f749073805..da93b23d9f 100644 --- a/io/zenoh-transport/src/unicast/establishment/ext/qos.rs +++ b/io/zenoh-transport/src/unicast/establishment/ext/qos.rs @@ -12,6 +12,7 @@ // ZettaScale Zenoh Team, // use core::marker::PhantomData; +use std::ops::Range; use async_trait::async_trait; use zenoh_buffers::{ @@ -19,10 +20,18 @@ use zenoh_buffers::{ writer::{DidntWrite, Writer}, }; use zenoh_codec::{RCodec, WCodec, Zenoh080}; -use zenoh_protocol::transport::{init, open}; -use zenoh_result::Error as ZError; +use zenoh_core::{bail, zerror}; +use zenoh_link::LinkUnicast; +use zenoh_protocol::{ + core::Priority, + transport::{init, open}, +}; +use zenoh_result::{Error as ZError, ZResult}; -use crate::unicast::establishment::{AcceptFsm, OpenFsm}; +use crate::unicast::{ + establishment::{AcceptFsm, OpenFsm}, + link::TransportLinkUnicastDirection, +}; // Extension Fsm pub(crate) struct QoSFsm<'a> { @@ -38,18 +47,58 @@ impl<'a> QoSFsm<'a> { /*************************************/ /* OPEN */ /*************************************/ -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub(crate) struct StateOpen { - is_qos: bool, +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) enum StateOpen { + Disabled, + Enabled { priorities: Option> }, } impl StateOpen { - pub(crate) const fn new(is_qos: bool) -> Self { - Self { is_qos } - } - - pub(crate) const fn is_qos(&self) -> bool { - self.is_qos + pub(crate) fn new( + is_qos: bool, + link: &LinkUnicast, + direction: TransportLinkUnicastDirection, + ) -> ZResult { + if is_qos { + Ok(Self::Disabled) + } else { + const PRIORITY_METADATA_KEY: &str = "priorities"; + + let link_locator = match direction { + TransportLinkUnicastDirection::Inbound => link.get_src(), + TransportLinkUnicastDirection::Outbound => link.get_dst(), + }; + + let link_locator_metadata = link_locator.metadata(); + + let Some(priorities_raw) = link_locator_metadata.get(PRIORITY_METADATA_KEY) else { + return Ok(Self::Enabled { priorities: None }); + }; + let mut chunks = priorities_raw.split(".."); + let start = chunks + .next() + .ok_or(zerror!("Invalid priority range syntax"))? + .parse::()?; + if start < Priority::MIN as u8 { + bail!("Invalid priority range start: {start}") + } + + let end = chunks + .next() + .ok_or(zerror!("Invalid priority range syntax"))? + .parse::()?; + if end > Priority::MAX as u8 { + bail!("Invalid priority range end: {end}") + } + + Ok(Self::Enabled { + priorities: Some(start..end), + }) + } + } + + pub(crate) fn is_qos(&self) -> bool { + self != &Self::Disabled } } @@ -63,8 +112,7 @@ impl<'a> OpenFsm for &'a QoSFsm<'a> { self, state: Self::SendInitSynIn, ) -> Result { - let output = state.is_qos.then_some(init::ext::QoS::new()); - Ok(output) + todo!() } type RecvInitAckIn = (&'a mut StateOpen, Option); @@ -74,8 +122,7 @@ impl<'a> OpenFsm for &'a QoSFsm<'a> { input: Self::RecvInitAckIn, ) -> Result { let (state, other_ext) = input; - state.is_qos &= other_ext.is_some(); - Ok(()) + todo!() } type SendOpenSynIn = &'a StateOpen; @@ -160,8 +207,7 @@ impl<'a> AcceptFsm for &'a QoSFsm<'a> { input: Self::RecvInitSynIn, ) -> Result { let (state, other_ext) = input; - state.is_qos &= other_ext.is_some(); - Ok(()) + todo!() } type SendInitAckIn = &'a StateAccept; @@ -170,8 +216,7 @@ impl<'a> AcceptFsm for &'a QoSFsm<'a> { self, state: Self::SendInitAckIn, ) -> Result { - let output = state.is_qos.then_some(init::ext::QoS::new()); - Ok(output) + todo!() } type RecvOpenSynIn = (&'a mut StateAccept, Option); diff --git a/io/zenoh-transport/src/unicast/establishment/mod.rs b/io/zenoh-transport/src/unicast/establishment/mod.rs index ca46b40ed1..82f37e0825 100644 --- a/io/zenoh-transport/src/unicast/establishment/mod.rs +++ b/io/zenoh-transport/src/unicast/establishment/mod.rs @@ -22,11 +22,14 @@ use sha3::{ digest::{ExtendableOutput, Update, XofReader}, Shake128, }; +use zenoh_link::LinkUnicast; use zenoh_protocol::{ - core::{Field, Resolution, ZenohIdProto}, + core::{Field, Priority, Reliability, Resolution, ZenohIdProto}, transport::TransportSn, }; +use zenoh_result::ZResult; +use super::link::TransportLinkUnicastDirection; use crate::common::seq_num; /*************************************/ @@ -116,3 +119,33 @@ pub(super) fn compute_sn( hasher.finalize_xof().read(&mut array); TransportSn::from_le_bytes(array) & seq_num::get_mask(resolution.get(Field::FrameSN)) } + +pub(super) fn link_reliability(link: &LinkUnicast) -> Reliability { + if link.is_reliable() { + Reliability::Reliable + } else { + Reliability::BestEffort + } +} + +pub(super) fn link_priority( + link: &LinkUnicast, + direction: TransportLinkUnicastDirection, +) -> ZResult> { + const PRIORITY_METADATA_KEY: &str = "priority"; + + let link_locator = match direction { + TransportLinkUnicastDirection::Inbound => link.get_src(), + TransportLinkUnicastDirection::Outbound => link.get_dst(), + }; + + let link_locator_metadata = link_locator.metadata(); + + let Some(priority_raw) = link_locator_metadata.get(PRIORITY_METADATA_KEY) else { + return Ok(None); + }; + + let priority = Priority::try_from(priority_raw.parse::()?)?; + + Ok(Some(priority)) +} diff --git a/io/zenoh-transport/src/unicast/establishment/open.rs b/io/zenoh-transport/src/unicast/establishment/open.rs index a9e797228e..fe6a1ee203 100644 --- a/io/zenoh-transport/src/unicast/establishment/open.rs +++ b/io/zenoh-transport/src/unicast/establishment/open.rs @@ -37,7 +37,7 @@ use crate::unicast::establishment::ext::auth::UsrPwdId; use crate::{ common::batch::BatchConfig, unicast::{ - establishment::{compute_sn, ext, OpenFsm}, + establishment::{compute_sn, ext, link_priority, link_reliability, OpenFsm}, link::{ LinkUnicastWithOpenAck, TransportLinkUnicast, TransportLinkUnicastConfig, TransportLinkUnicastDirection, @@ -540,15 +540,18 @@ pub(crate) async fn open_link( link: LinkUnicast, manager: &TransportManager, ) -> ZResult { + let direction = TransportLinkUnicastDirection::Outbound; let is_streamed = link.is_streamed(); let config = TransportLinkUnicastConfig { - direction: TransportLinkUnicastDirection::Outbound, + direction, batch: BatchConfig { mtu: link.get_mtu(), is_streamed, #[cfg(feature = "transport_compression")] is_compression: false, // Perform the exchange Init/Open exchange with no compression }, + reliability: link_reliability(&link), + priority: link_priority(&link, direction)?, }; let mut link = TransportLinkUnicast::new(link, config); let mut fsm = OpenLink { @@ -577,7 +580,11 @@ pub(crate) async fn open_link( .min(batch_size::UNICAST) .min(link.config.batch.mtu), resolution: manager.config.resolution, - ext_qos: ext::qos::StateOpen::new(manager.config.unicast.is_qos), + ext_qos: ext::qos::StateOpen::new( + manager.config.unicast.is_qos, + &link.link, + direction, + )?, #[cfg(feature = "transport_multilink")] ext_mlink: manager .state @@ -659,13 +666,15 @@ pub(crate) async fn open_link( }; let o_config = TransportLinkUnicastConfig { - direction: TransportLinkUnicastDirection::Outbound, + direction, batch: BatchConfig { mtu: state.transport.batch_size, is_streamed, #[cfg(feature = "transport_compression")] is_compression: state.link.ext_compression.is_compression(), }, + reliability: link_reliability(&link.link), + priority: link_priority(&link.link, direction)?, }; let o_link = link.reconfigure(o_config); let s_link = format!("{:?}", o_link); diff --git a/io/zenoh-transport/src/unicast/link.rs b/io/zenoh-transport/src/unicast/link.rs index 736360db63..7088376269 100644 --- a/io/zenoh-transport/src/unicast/link.rs +++ b/io/zenoh-transport/src/unicast/link.rs @@ -16,7 +16,10 @@ use std::{fmt, sync::Arc}; use zenoh_buffers::{BBuf, ZSlice, ZSliceBuffer}; use zenoh_core::zcondfeat; use zenoh_link::{Link, LinkUnicast}; -use zenoh_protocol::transport::{BatchSize, Close, OpenAck, TransportMessage}; +use zenoh_protocol::{ + core::{Priority, Reliability}, + transport::{BatchSize, Close, OpenAck, TransportMessage}, +}; use zenoh_result::{zerror, ZResult}; use crate::common::batch::{BatchConfig, Decode, Encode, Finalize, RBatch, WBatch}; @@ -32,6 +35,8 @@ pub(crate) struct TransportLinkUnicastConfig { // Inbound / outbound pub(crate) direction: TransportLinkUnicastDirection, pub(crate) batch: BatchConfig, + pub(crate) reliability: Reliability, + pub(crate) priority: Option, } #[derive(Clone, PartialEq, Eq)] diff --git a/io/zenoh-transport/src/unicast/universal/tx.rs b/io/zenoh-transport/src/unicast/universal/tx.rs index f7754489ef..3a2ca26874 100644 --- a/io/zenoh-transport/src/unicast/universal/tx.rs +++ b/io/zenoh-transport/src/unicast/universal/tx.rs @@ -11,51 +11,56 @@ // Contributors: // ZettaScale Zenoh Team, // -use zenoh_core::zread; -use zenoh_protocol::network::NetworkMessage; +use zenoh_protocol::{core::Reliability, network::NetworkMessage}; use super::transport::TransportUnicastUniversal; #[cfg(feature = "shared-memory")] use crate::shm::map_zmsg_to_partner; +use crate::unicast::transport_unicast_inner::TransportUnicastTrait; impl TransportUnicastUniversal { fn schedule_on_link(&self, msg: NetworkMessage) -> bool { - macro_rules! zpush { - ($guard:expr, $pipeline:expr, $msg:expr) => { - // Drop the guard before the push_zenoh_message since - // the link could be congested and this operation could - // block for fairly long time - let pl = $pipeline.clone(); - drop($guard); - tracing::trace!("Scheduled: {:?}", $msg); - return pl.push_network_message($msg); - }; - } + let transport_links = self + .links + .read() + .expect("reading `TransportUnicastUniversal::links` should not fail"); - let guard = zread!(self.links); - // First try to find the best match between msg and link reliability - if let Some(pl) = guard.iter().find_map(|tl| { - if msg.is_reliable() == tl.link.link.is_reliable() { - Some(&tl.pipeline) - } else { - None - } - }) { - zpush!(guard, pl, msg); - } + let msg_reliability = Reliability::from(msg.is_reliable()); - // No best match found, take the first available link - if let Some(pl) = guard.iter().map(|tl| &tl.pipeline).next() { - zpush!(guard, pl, msg); - } + let Some(transport_link) = transport_links + .iter() + .find(|transport_link| { + transport_link.link.config.reliability == msg_reliability + && transport_link.link.config.priority == Some(msg.priority()) + }) + .or_else(|| { + transport_links.iter().find(|transport_link| { + transport_link.link.config.reliability == msg_reliability + }) + }) + .or_else(|| transport_links.first()) + else { + tracing::trace!( + "Message dropped because the transport has no links: {}", + msg + ); + + // No Link found + return false; + }; - // No Link found + let pipeline = transport_link.pipeline.clone(); tracing::trace!( - "Message dropped because the transport has no links: {}", - msg + "Scheduled {:?} for transmission to {} ({})", + msg, + transport_link.link.link.get_dst(), + self.get_zid() ); - - false + // Drop the guard before the push_zenoh_message since + // the link could be congested and this operation could + // block for fairly long time + drop(transport_links); + pipeline.push_network_message(msg) } #[allow(unused_mut)] // When feature "shared-memory" is not enabled