From 389308e5bf6ed2a98ce513759084455540d053c3 Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Fri, 30 Aug 2024 11:53:33 +0200 Subject: [PATCH] Add priority into in `QoS` --- commons/zenoh-protocol/src/transport/init.rs | 14 +-- .../src/unicast/establishment/ext/qos.rs | 85 ++++++++++++++----- .../src/unicast/establishment/open.rs | 6 +- 3 files changed, 77 insertions(+), 28 deletions(-) diff --git a/commons/zenoh-protocol/src/transport/init.rs b/commons/zenoh-protocol/src/transport/init.rs index 7e56bfd770..0a4e97f95e 100644 --- a/commons/zenoh-protocol/src/transport/init.rs +++ b/commons/zenoh-protocol/src/transport/init.rs @@ -126,13 +126,13 @@ pub struct InitSyn { // Extensions pub mod ext { use crate::{ - common::{ZExtUnit, ZExtZBuf}, - zextunit, zextzbuf, + common::{ZExtUnit, ZExtZ64, ZExtZBuf}, + zextunit, zextz64, zextzbuf, }; /// # QoS extension /// Used to negotiate the use of QoS - pub type QoS = zextunit!(0x1, false); + pub type QoS = zextz64!(0x1, false); /// # Shm extension /// Used as challenge for probing shared memory capabilities @@ -161,7 +161,7 @@ impl InitSyn { pub fn rand() -> Self { use rand::Rng; - use crate::common::{ZExtUnit, ZExtZBuf}; + use crate::common::{ZExtUnit, ZExtZ64, ZExtZBuf}; let mut rng = rand::thread_rng(); @@ -170,7 +170,7 @@ impl InitSyn { let zid = ZenohIdProto::default(); let resolution = Resolution::rand(); let batch_size: BatchSize = rng.gen(); - let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); + let ext_qos = rng.gen_bool(0.5).then_some(ZExtZ64::rand()); #[cfg(feature = "shared-memory")] let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); @@ -217,7 +217,7 @@ impl InitAck { pub fn rand() -> Self { use rand::Rng; - use crate::common::{ZExtUnit, ZExtZBuf}; + use crate::common::{ZExtUnit, ZExtZ64, ZExtZBuf}; let mut rng = rand::thread_rng(); @@ -231,7 +231,7 @@ impl InitAck { }; let batch_size: BatchSize = rng.gen(); let cookie = ZSlice::rand(64); - let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand()); + let ext_qos = rng.gen_bool(0.5).then_some(ZExtZ64::rand()); #[cfg(feature = "shared-memory")] let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand()); diff --git a/io/zenoh-transport/src/unicast/establishment/ext/qos.rs b/io/zenoh-transport/src/unicast/establishment/ext/qos.rs index f749073805..da93b23d9f 100644 --- a/io/zenoh-transport/src/unicast/establishment/ext/qos.rs +++ b/io/zenoh-transport/src/unicast/establishment/ext/qos.rs @@ -12,6 +12,7 @@ // ZettaScale Zenoh Team, // use core::marker::PhantomData; +use std::ops::Range; use async_trait::async_trait; use zenoh_buffers::{ @@ -19,10 +20,18 @@ use zenoh_buffers::{ writer::{DidntWrite, Writer}, }; use zenoh_codec::{RCodec, WCodec, Zenoh080}; -use zenoh_protocol::transport::{init, open}; -use zenoh_result::Error as ZError; +use zenoh_core::{bail, zerror}; +use zenoh_link::LinkUnicast; +use zenoh_protocol::{ + core::Priority, + transport::{init, open}, +}; +use zenoh_result::{Error as ZError, ZResult}; -use crate::unicast::establishment::{AcceptFsm, OpenFsm}; +use crate::unicast::{ + establishment::{AcceptFsm, OpenFsm}, + link::TransportLinkUnicastDirection, +}; // Extension Fsm pub(crate) struct QoSFsm<'a> { @@ -38,18 +47,58 @@ impl<'a> QoSFsm<'a> { /*************************************/ /* OPEN */ /*************************************/ -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub(crate) struct StateOpen { - is_qos: bool, +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) enum StateOpen { + Disabled, + Enabled { priorities: Option> }, } impl StateOpen { - pub(crate) const fn new(is_qos: bool) -> Self { - Self { is_qos } - } - - pub(crate) const fn is_qos(&self) -> bool { - self.is_qos + pub(crate) fn new( + is_qos: bool, + link: &LinkUnicast, + direction: TransportLinkUnicastDirection, + ) -> ZResult { + if is_qos { + Ok(Self::Disabled) + } else { + const PRIORITY_METADATA_KEY: &str = "priorities"; + + let link_locator = match direction { + TransportLinkUnicastDirection::Inbound => link.get_src(), + TransportLinkUnicastDirection::Outbound => link.get_dst(), + }; + + let link_locator_metadata = link_locator.metadata(); + + let Some(priorities_raw) = link_locator_metadata.get(PRIORITY_METADATA_KEY) else { + return Ok(Self::Enabled { priorities: None }); + }; + let mut chunks = priorities_raw.split(".."); + let start = chunks + .next() + .ok_or(zerror!("Invalid priority range syntax"))? + .parse::()?; + if start < Priority::MIN as u8 { + bail!("Invalid priority range start: {start}") + } + + let end = chunks + .next() + .ok_or(zerror!("Invalid priority range syntax"))? + .parse::()?; + if end > Priority::MAX as u8 { + bail!("Invalid priority range end: {end}") + } + + Ok(Self::Enabled { + priorities: Some(start..end), + }) + } + } + + pub(crate) fn is_qos(&self) -> bool { + self != &Self::Disabled } } @@ -63,8 +112,7 @@ impl<'a> OpenFsm for &'a QoSFsm<'a> { self, state: Self::SendInitSynIn, ) -> Result { - let output = state.is_qos.then_some(init::ext::QoS::new()); - Ok(output) + todo!() } type RecvInitAckIn = (&'a mut StateOpen, Option); @@ -74,8 +122,7 @@ impl<'a> OpenFsm for &'a QoSFsm<'a> { input: Self::RecvInitAckIn, ) -> Result { let (state, other_ext) = input; - state.is_qos &= other_ext.is_some(); - Ok(()) + todo!() } type SendOpenSynIn = &'a StateOpen; @@ -160,8 +207,7 @@ impl<'a> AcceptFsm for &'a QoSFsm<'a> { input: Self::RecvInitSynIn, ) -> Result { let (state, other_ext) = input; - state.is_qos &= other_ext.is_some(); - Ok(()) + todo!() } type SendInitAckIn = &'a StateAccept; @@ -170,8 +216,7 @@ impl<'a> AcceptFsm for &'a QoSFsm<'a> { self, state: Self::SendInitAckIn, ) -> Result { - let output = state.is_qos.then_some(init::ext::QoS::new()); - Ok(output) + todo!() } type RecvOpenSynIn = (&'a mut StateAccept, Option); diff --git a/io/zenoh-transport/src/unicast/establishment/open.rs b/io/zenoh-transport/src/unicast/establishment/open.rs index d084e8116e..fe6a1ee203 100644 --- a/io/zenoh-transport/src/unicast/establishment/open.rs +++ b/io/zenoh-transport/src/unicast/establishment/open.rs @@ -580,7 +580,11 @@ pub(crate) async fn open_link( .min(batch_size::UNICAST) .min(link.config.batch.mtu), resolution: manager.config.resolution, - ext_qos: ext::qos::StateOpen::new(manager.config.unicast.is_qos), + ext_qos: ext::qos::StateOpen::new( + manager.config.unicast.is_qos, + &link.link, + direction, + )?, #[cfg(feature = "transport_multilink")] ext_mlink: manager .state