Skip to content

Commit

Permalink
Add priority into in QoS
Browse files Browse the repository at this point in the history
  • Loading branch information
fuzzypixelz committed Aug 30, 2024
1 parent c01ad63 commit 389308e
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 28 deletions.
14 changes: 7 additions & 7 deletions commons/zenoh-protocol/src/transport/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,13 @@ pub struct InitSyn {
// Extensions
pub mod ext {
use crate::{
common::{ZExtUnit, ZExtZBuf},
zextunit, zextzbuf,
common::{ZExtUnit, ZExtZ64, ZExtZBuf},
zextunit, zextz64, zextzbuf,
};

/// # QoS extension
/// Used to negotiate the use of QoS
pub type QoS = zextunit!(0x1, false);
pub type QoS = zextz64!(0x1, false);

/// # Shm extension
/// Used as challenge for probing shared memory capabilities
Expand Down Expand Up @@ -161,7 +161,7 @@ impl InitSyn {
pub fn rand() -> Self {
use rand::Rng;

use crate::common::{ZExtUnit, ZExtZBuf};
use crate::common::{ZExtUnit, ZExtZ64, ZExtZBuf};

let mut rng = rand::thread_rng();

Expand All @@ -170,7 +170,7 @@ impl InitSyn {
let zid = ZenohIdProto::default();
let resolution = Resolution::rand();
let batch_size: BatchSize = rng.gen();
let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
let ext_qos = rng.gen_bool(0.5).then_some(ZExtZ64::rand());
#[cfg(feature = "shared-memory")]
let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
Expand Down Expand Up @@ -217,7 +217,7 @@ impl InitAck {
pub fn rand() -> Self {
use rand::Rng;

use crate::common::{ZExtUnit, ZExtZBuf};
use crate::common::{ZExtUnit, ZExtZ64, ZExtZBuf};

let mut rng = rand::thread_rng();

Expand All @@ -231,7 +231,7 @@ impl InitAck {
};
let batch_size: BatchSize = rng.gen();
let cookie = ZSlice::rand(64);
let ext_qos = rng.gen_bool(0.5).then_some(ZExtUnit::rand());
let ext_qos = rng.gen_bool(0.5).then_some(ZExtZ64::rand());
#[cfg(feature = "shared-memory")]
let ext_shm = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
let ext_auth = rng.gen_bool(0.5).then_some(ZExtZBuf::rand());
Expand Down
85 changes: 65 additions & 20 deletions io/zenoh-transport/src/unicast/establishment/ext/qos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,26 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use core::marker::PhantomData;
use std::ops::Range;

use async_trait::async_trait;
use zenoh_buffers::{
reader::{DidntRead, Reader},
writer::{DidntWrite, Writer},
};
use zenoh_codec::{RCodec, WCodec, Zenoh080};
use zenoh_protocol::transport::{init, open};
use zenoh_result::Error as ZError;
use zenoh_core::{bail, zerror};
use zenoh_link::LinkUnicast;
use zenoh_protocol::{
core::Priority,
transport::{init, open},
};
use zenoh_result::{Error as ZError, ZResult};

use crate::unicast::establishment::{AcceptFsm, OpenFsm};
use crate::unicast::{
establishment::{AcceptFsm, OpenFsm},
link::TransportLinkUnicastDirection,
};

// Extension Fsm
pub(crate) struct QoSFsm<'a> {
Expand All @@ -38,18 +47,58 @@ impl<'a> QoSFsm<'a> {
/*************************************/
/* OPEN */
/*************************************/
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) struct StateOpen {
is_qos: bool,
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) enum StateOpen {
Disabled,
Enabled { priorities: Option<Range<u8>> },
}

impl StateOpen {
pub(crate) const fn new(is_qos: bool) -> Self {
Self { is_qos }
}

pub(crate) const fn is_qos(&self) -> bool {
self.is_qos
pub(crate) fn new(
is_qos: bool,
link: &LinkUnicast,
direction: TransportLinkUnicastDirection,
) -> ZResult<Self> {
if is_qos {
Ok(Self::Disabled)
} else {
const PRIORITY_METADATA_KEY: &str = "priorities";

let link_locator = match direction {
TransportLinkUnicastDirection::Inbound => link.get_src(),
TransportLinkUnicastDirection::Outbound => link.get_dst(),
};

let link_locator_metadata = link_locator.metadata();

let Some(priorities_raw) = link_locator_metadata.get(PRIORITY_METADATA_KEY) else {
return Ok(Self::Enabled { priorities: None });
};
let mut chunks = priorities_raw.split("..");
let start = chunks
.next()
.ok_or(zerror!("Invalid priority range syntax"))?
.parse::<u8>()?;
if start < Priority::MIN as u8 {
bail!("Invalid priority range start: {start}")
}

let end = chunks
.next()
.ok_or(zerror!("Invalid priority range syntax"))?
.parse::<u8>()?;
if end > Priority::MAX as u8 {
bail!("Invalid priority range end: {end}")
}

Ok(Self::Enabled {
priorities: Some(start..end),
})
}
}

pub(crate) fn is_qos(&self) -> bool {
self != &Self::Disabled
}
}

Expand All @@ -63,8 +112,7 @@ impl<'a> OpenFsm for &'a QoSFsm<'a> {
self,
state: Self::SendInitSynIn,
) -> Result<Self::SendInitSynOut, Self::Error> {
let output = state.is_qos.then_some(init::ext::QoS::new());
Ok(output)
todo!()
}

type RecvInitAckIn = (&'a mut StateOpen, Option<init::ext::QoS>);
Expand All @@ -74,8 +122,7 @@ impl<'a> OpenFsm for &'a QoSFsm<'a> {
input: Self::RecvInitAckIn,
) -> Result<Self::RecvInitAckOut, Self::Error> {
let (state, other_ext) = input;
state.is_qos &= other_ext.is_some();
Ok(())
todo!()
}

type SendOpenSynIn = &'a StateOpen;
Expand Down Expand Up @@ -160,8 +207,7 @@ impl<'a> AcceptFsm for &'a QoSFsm<'a> {
input: Self::RecvInitSynIn,
) -> Result<Self::RecvInitSynOut, Self::Error> {
let (state, other_ext) = input;
state.is_qos &= other_ext.is_some();
Ok(())
todo!()
}

type SendInitAckIn = &'a StateAccept;
Expand All @@ -170,8 +216,7 @@ impl<'a> AcceptFsm for &'a QoSFsm<'a> {
self,
state: Self::SendInitAckIn,
) -> Result<Self::SendInitAckOut, Self::Error> {
let output = state.is_qos.then_some(init::ext::QoS::new());
Ok(output)
todo!()
}

type RecvOpenSynIn = (&'a mut StateAccept, Option<open::ext::QoS>);
Expand Down
6 changes: 5 additions & 1 deletion io/zenoh-transport/src/unicast/establishment/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,11 @@ pub(crate) async fn open_link(
.min(batch_size::UNICAST)
.min(link.config.batch.mtu),
resolution: manager.config.resolution,
ext_qos: ext::qos::StateOpen::new(manager.config.unicast.is_qos),
ext_qos: ext::qos::StateOpen::new(
manager.config.unicast.is_qos,
&link.link,
direction,
)?,
#[cfg(feature = "transport_multilink")]
ext_mlink: manager
.state
Expand Down

0 comments on commit 389308e

Please sign in to comment.