Skip to content

Commit

Permalink
Add PriorityRange negotiation tests
Browse files Browse the repository at this point in the history
  • Loading branch information
fuzzypixelz committed Sep 4, 2024
1 parent 9e24335 commit be2360e
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 32 deletions.
204 changes: 173 additions & 31 deletions io/zenoh-transport/src/unicast/establishment/ext/qos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl QoS {
/// 2. QoS is enabled but no priority range is available
/// 3. QoS is enabled and priority information is range, in which case the next 16 least
/// significant bits are used to encode the priority range.
fn to_u64(&self) -> u64 {
fn to_u64(self) -> u64 {
match self {
QoS::Disabled => 0b00_u64,
QoS::Enabled { priorities: None } => 0b01_u64,
Expand All @@ -117,6 +117,22 @@ impl QoS {
}
}

fn to_ext(self) -> Option<init::ext::QoS> {
if self.is_enabled() {
Some(init::ext::QoS::new(self.to_u64()))
} else {
None
}
}

fn try_from_ext(ext: Option<init::ext::QoS>) -> ZResult<Self> {
if let Some(ext) = ext {
QoS::try_from_u64(ext.value)
} else {
Ok(QoS::Disabled)
}
}

pub(crate) fn is_enabled(&self) -> bool {
matches!(self, QoS::Enabled { .. })
}
Expand Down Expand Up @@ -179,11 +195,7 @@ impl<'a> OpenFsm for &'a QoSFsm<'a> {
self,
state: Self::SendInitSynIn,
) -> Result<Self::SendInitSynOut, Self::Error> {
if state.is_enabled() {
Ok(Some(init::ext::QoS::new(state.to_u64())))
} else {
Ok(None)
}
Ok(state.to_ext())
}

type RecvInitAckIn = (&'a mut QoS, Option<init::ext::QoS>);
Expand All @@ -194,29 +206,25 @@ impl<'a> OpenFsm for &'a QoSFsm<'a> {
) -> Result<Self::RecvInitAckOut, Self::Error> {
let (state_self, other_ext) = input;

let state_other = if let Some(other_ext) = other_ext {
QoS::try_from_u64(other_ext.value)?
} else {
QoS::Disabled
};
let state_other = QoS::try_from_ext(other_ext)?;

*state_self = match (*state_self, state_other) {
(QoS::Disabled, _) | (_, QoS::Disabled) => QoS::Disabled,
(QoS::Enabled { priorities: None }, QoS::Enabled { priorities: None }) => {
QoS::Enabled { priorities: None }
}
(
self_qos @ QoS::Enabled {
qos @ QoS::Enabled {
priorities: Some(_),
},
QoS::Enabled { priorities: None },
) => self_qos,
(
)
| (
QoS::Enabled { priorities: None },
other_qos @ QoS::Enabled {
qos @ QoS::Enabled {
priorities: Some(_),
},
) => other_qos,
) => qos,
(
self_qos @ QoS::Enabled {
priorities: Some(priorities_self),
Expand Down Expand Up @@ -270,29 +278,25 @@ impl<'a> AcceptFsm for &'a QoSFsm<'a> {
) -> Result<Self::RecvInitSynOut, Self::Error> {
let (state_self, other_ext) = input;

let state_other = if let Some(other_ext) = other_ext {
QoS::try_from_u64(other_ext.value)?
} else {
QoS::Disabled
};
let state_other = QoS::try_from_ext(other_ext)?;

*state_self = match (*state_self, state_other) {
(QoS::Disabled, _) | (_, QoS::Disabled) => QoS::Disabled,
(QoS::Enabled { priorities: None }, QoS::Enabled { priorities: None }) => {
QoS::Enabled { priorities: None }
}
(
self_qos @ QoS::Enabled {
qos @ QoS::Enabled {
priorities: Some(_),
},
QoS::Enabled { priorities: None },
) => self_qos,
(
)
| (
QoS::Enabled { priorities: None },
other_qos @ QoS::Enabled {
qos @ QoS::Enabled {
priorities: Some(_),
},
) => other_qos,
) => qos,
(
QoS::Enabled {
priorities: Some(priorities_self),
Expand Down Expand Up @@ -321,11 +325,7 @@ impl<'a> AcceptFsm for &'a QoSFsm<'a> {
self,
state: Self::SendInitAckIn,
) -> Result<Self::SendInitAckOut, Self::Error> {
if state.is_enabled() {
Ok(Some(init::ext::QoS::new(state.to_u64())))
} else {
Ok(None)
}
Ok(state.to_ext())
}

type RecvOpenSynIn = (&'a mut QoS, Option<open::ext::QoS>);
Expand All @@ -346,3 +346,145 @@ impl<'a> AcceptFsm for &'a QoSFsm<'a> {
Ok(None)
}
}

#[cfg(test)]
mod tests {
use zenoh_protocol::core::PriorityRange;
use zenoh_result::ZResult;

use crate::unicast::establishment::{AcceptFsm, OpenFsm};

use super::{QoS, QoSFsm};

async fn test_priority_range_negotiation(
qos_open: &mut QoS,
qos_accept: &mut QoS,
) -> ZResult<()> {
let fsm = QoSFsm::new();

let ext = fsm.send_init_syn(&*qos_open).await?;
fsm.recv_init_syn((qos_accept, ext)).await?;

let ext = fsm.send_init_ack(&*qos_accept).await?;
fsm.recv_init_ack((qos_open, ext)).await?;

Ok(())
}

#[tokio::test]
async fn test_priority_range_negotiation_scenario_1() {
let qos_open = &mut QoS::Enabled { priorities: None };
let qos_accept = &mut QoS::Enabled { priorities: None };

match test_priority_range_negotiation(qos_open, qos_accept).await {
Err(err) => panic!("expected `Ok(())`, got: {err}"),
Ok(()) => {
assert_eq!(*qos_open, *qos_accept);
assert_eq!(*qos_open, QoS::Enabled { priorities: None });
}
};
}

#[tokio::test]
async fn test_priority_range_negotiation_scenario_2() {
let qos_open = &mut QoS::Enabled { priorities: None };
let qos_accept = &mut QoS::Enabled {
priorities: Some(PriorityRange::new(1, 3).unwrap()),
};

match test_priority_range_negotiation(qos_open, qos_accept).await {
Err(err) => panic!("expected `Ok(())`, got: {err}"),
Ok(()) => {
assert_eq!(*qos_open, *qos_accept);
assert_eq!(
*qos_open,
QoS::Enabled {
priorities: Some(PriorityRange::new(1, 3).unwrap())
}
);
}
};
}

#[tokio::test]
async fn test_priority_range_negotiation_scenario_3() {
let qos_open = &mut QoS::Enabled {
priorities: Some(PriorityRange::new(1, 3).unwrap()),
};
let qos_accept = &mut QoS::Enabled { priorities: None };

match test_priority_range_negotiation(qos_open, qos_accept).await {
Err(err) => panic!("expected `Ok(())`, got: {err}"),
Ok(()) => {
assert_eq!(*qos_open, *qos_accept);
assert_eq!(
*qos_open,
QoS::Enabled {
priorities: Some(PriorityRange::new(1, 3).unwrap())
}
);
}
};
}

#[tokio::test]
async fn test_priority_range_negotiation_scenario_4() {
let qos_open = &mut QoS::Enabled {
priorities: Some(PriorityRange::new(1, 3).unwrap()),
};
let qos_accept = &mut QoS::Enabled {
priorities: Some(PriorityRange::new(1, 3).unwrap()),
};

match test_priority_range_negotiation(qos_open, qos_accept).await {
Err(err) => panic!("expected `Ok(())`, got: {err}"),
Ok(()) => {
assert_eq!(*qos_open, *qos_accept);
assert_eq!(
*qos_open,
QoS::Enabled {
priorities: Some(PriorityRange::new(1, 3).unwrap())
}
);
}
};
}

#[tokio::test]
async fn test_priority_range_negotiation_scenario_5() {
let qos_open = &mut QoS::Enabled {
priorities: Some(PriorityRange::new(1, 3).unwrap()),
};
let qos_accept = &mut QoS::Enabled {
priorities: Some(PriorityRange::new(0, 4).unwrap()),
};

match test_priority_range_negotiation(qos_open, qos_accept).await {
Err(err) => panic!("expected `Ok(())`, got: {err}"),
Ok(()) => {
assert_eq!(*qos_open, *qos_accept);
assert_eq!(
*qos_open,
QoS::Enabled {
priorities: Some(PriorityRange::new(1, 3).unwrap())
}
);
}
};
}

#[tokio::test]
async fn test_priority_range_negotiation_scenario_6() {
let qos_open = &mut QoS::Enabled {
priorities: Some(PriorityRange::new(1, 3).unwrap()),
};
let qos_accept = &mut QoS::Enabled {
priorities: Some(PriorityRange::new(2, 3).unwrap()),
};

match test_priority_range_negotiation(qos_open, qos_accept).await {
Ok(()) => panic!("expected `Err(_)`, got `Ok(())`"),
Err(_) => {}
};
}
}
2 changes: 1 addition & 1 deletion io/zenoh-transport/src/unicast/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl TransportLinkUnicast {
pub(crate) fn rx(&self) -> TransportLinkUnicastRx {
TransportLinkUnicastRx {
link: self.link.clone(),
config: self.config.clone(),
config: self.config,
}
}

Expand Down

0 comments on commit be2360e

Please sign in to comment.