From be2360e84bd928e14a5ad18b23f09c01616c2dd7 Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Wed, 4 Sep 2024 09:17:49 +0000 Subject: [PATCH] Add `PriorityRange` negotiation tests --- .../src/unicast/establishment/ext/qos.rs | 204 +++++++++++++++--- io/zenoh-transport/src/unicast/link.rs | 2 +- 2 files changed, 174 insertions(+), 32 deletions(-) diff --git a/io/zenoh-transport/src/unicast/establishment/ext/qos.rs b/io/zenoh-transport/src/unicast/establishment/ext/qos.rs index fd3fa269b1..b094ad3999 100644 --- a/io/zenoh-transport/src/unicast/establishment/ext/qos.rs +++ b/io/zenoh-transport/src/unicast/establishment/ext/qos.rs @@ -107,7 +107,7 @@ impl QoS { /// 2. QoS is enabled but no priority range is available /// 3. QoS is enabled and priority information is range, in which case the next 16 least /// significant bits are used to encode the priority range. - fn to_u64(&self) -> u64 { + fn to_u64(self) -> u64 { match self { QoS::Disabled => 0b00_u64, QoS::Enabled { priorities: None } => 0b01_u64, @@ -117,6 +117,22 @@ impl QoS { } } + fn to_ext(self) -> Option { + if self.is_enabled() { + Some(init::ext::QoS::new(self.to_u64())) + } else { + None + } + } + + fn try_from_ext(ext: Option) -> ZResult { + if let Some(ext) = ext { + QoS::try_from_u64(ext.value) + } else { + Ok(QoS::Disabled) + } + } + pub(crate) fn is_enabled(&self) -> bool { matches!(self, QoS::Enabled { .. }) } @@ -179,11 +195,7 @@ impl<'a> OpenFsm for &'a QoSFsm<'a> { self, state: Self::SendInitSynIn, ) -> Result { - if state.is_enabled() { - Ok(Some(init::ext::QoS::new(state.to_u64()))) - } else { - Ok(None) - } + Ok(state.to_ext()) } type RecvInitAckIn = (&'a mut QoS, Option); @@ -194,11 +206,7 @@ impl<'a> OpenFsm for &'a QoSFsm<'a> { ) -> Result { let (state_self, other_ext) = input; - let state_other = if let Some(other_ext) = other_ext { - QoS::try_from_u64(other_ext.value)? - } else { - QoS::Disabled - }; + let state_other = QoS::try_from_ext(other_ext)?; *state_self = match (*state_self, state_other) { (QoS::Disabled, _) | (_, QoS::Disabled) => QoS::Disabled, @@ -206,17 +214,17 @@ impl<'a> OpenFsm for &'a QoSFsm<'a> { QoS::Enabled { priorities: None } } ( - self_qos @ QoS::Enabled { + qos @ QoS::Enabled { priorities: Some(_), }, QoS::Enabled { priorities: None }, - ) => self_qos, - ( + ) + | ( QoS::Enabled { priorities: None }, - other_qos @ QoS::Enabled { + qos @ QoS::Enabled { priorities: Some(_), }, - ) => other_qos, + ) => qos, ( self_qos @ QoS::Enabled { priorities: Some(priorities_self), @@ -270,11 +278,7 @@ impl<'a> AcceptFsm for &'a QoSFsm<'a> { ) -> Result { let (state_self, other_ext) = input; - let state_other = if let Some(other_ext) = other_ext { - QoS::try_from_u64(other_ext.value)? - } else { - QoS::Disabled - }; + let state_other = QoS::try_from_ext(other_ext)?; *state_self = match (*state_self, state_other) { (QoS::Disabled, _) | (_, QoS::Disabled) => QoS::Disabled, @@ -282,17 +286,17 @@ impl<'a> AcceptFsm for &'a QoSFsm<'a> { QoS::Enabled { priorities: None } } ( - self_qos @ QoS::Enabled { + qos @ QoS::Enabled { priorities: Some(_), }, QoS::Enabled { priorities: None }, - ) => self_qos, - ( + ) + | ( QoS::Enabled { priorities: None }, - other_qos @ QoS::Enabled { + qos @ QoS::Enabled { priorities: Some(_), }, - ) => other_qos, + ) => qos, ( QoS::Enabled { priorities: Some(priorities_self), @@ -321,11 +325,7 @@ impl<'a> AcceptFsm for &'a QoSFsm<'a> { self, state: Self::SendInitAckIn, ) -> Result { - if state.is_enabled() { - Ok(Some(init::ext::QoS::new(state.to_u64()))) - } else { - Ok(None) - } + Ok(state.to_ext()) } type RecvOpenSynIn = (&'a mut QoS, Option); @@ -346,3 +346,145 @@ impl<'a> AcceptFsm for &'a QoSFsm<'a> { Ok(None) } } + +#[cfg(test)] +mod tests { + use zenoh_protocol::core::PriorityRange; + use zenoh_result::ZResult; + + use crate::unicast::establishment::{AcceptFsm, OpenFsm}; + + use super::{QoS, QoSFsm}; + + async fn test_priority_range_negotiation( + qos_open: &mut QoS, + qos_accept: &mut QoS, + ) -> ZResult<()> { + let fsm = QoSFsm::new(); + + let ext = fsm.send_init_syn(&*qos_open).await?; + fsm.recv_init_syn((qos_accept, ext)).await?; + + let ext = fsm.send_init_ack(&*qos_accept).await?; + fsm.recv_init_ack((qos_open, ext)).await?; + + Ok(()) + } + + #[tokio::test] + async fn test_priority_range_negotiation_scenario_1() { + let qos_open = &mut QoS::Enabled { priorities: None }; + let qos_accept = &mut QoS::Enabled { priorities: None }; + + match test_priority_range_negotiation(qos_open, qos_accept).await { + Err(err) => panic!("expected `Ok(())`, got: {err}"), + Ok(()) => { + assert_eq!(*qos_open, *qos_accept); + assert_eq!(*qos_open, QoS::Enabled { priorities: None }); + } + }; + } + + #[tokio::test] + async fn test_priority_range_negotiation_scenario_2() { + let qos_open = &mut QoS::Enabled { priorities: None }; + let qos_accept = &mut QoS::Enabled { + priorities: Some(PriorityRange::new(1, 3).unwrap()), + }; + + match test_priority_range_negotiation(qos_open, qos_accept).await { + Err(err) => panic!("expected `Ok(())`, got: {err}"), + Ok(()) => { + assert_eq!(*qos_open, *qos_accept); + assert_eq!( + *qos_open, + QoS::Enabled { + priorities: Some(PriorityRange::new(1, 3).unwrap()) + } + ); + } + }; + } + + #[tokio::test] + async fn test_priority_range_negotiation_scenario_3() { + let qos_open = &mut QoS::Enabled { + priorities: Some(PriorityRange::new(1, 3).unwrap()), + }; + let qos_accept = &mut QoS::Enabled { priorities: None }; + + match test_priority_range_negotiation(qos_open, qos_accept).await { + Err(err) => panic!("expected `Ok(())`, got: {err}"), + Ok(()) => { + assert_eq!(*qos_open, *qos_accept); + assert_eq!( + *qos_open, + QoS::Enabled { + priorities: Some(PriorityRange::new(1, 3).unwrap()) + } + ); + } + }; + } + + #[tokio::test] + async fn test_priority_range_negotiation_scenario_4() { + let qos_open = &mut QoS::Enabled { + priorities: Some(PriorityRange::new(1, 3).unwrap()), + }; + let qos_accept = &mut QoS::Enabled { + priorities: Some(PriorityRange::new(1, 3).unwrap()), + }; + + match test_priority_range_negotiation(qos_open, qos_accept).await { + Err(err) => panic!("expected `Ok(())`, got: {err}"), + Ok(()) => { + assert_eq!(*qos_open, *qos_accept); + assert_eq!( + *qos_open, + QoS::Enabled { + priorities: Some(PriorityRange::new(1, 3).unwrap()) + } + ); + } + }; + } + + #[tokio::test] + async fn test_priority_range_negotiation_scenario_5() { + let qos_open = &mut QoS::Enabled { + priorities: Some(PriorityRange::new(1, 3).unwrap()), + }; + let qos_accept = &mut QoS::Enabled { + priorities: Some(PriorityRange::new(0, 4).unwrap()), + }; + + match test_priority_range_negotiation(qos_open, qos_accept).await { + Err(err) => panic!("expected `Ok(())`, got: {err}"), + Ok(()) => { + assert_eq!(*qos_open, *qos_accept); + assert_eq!( + *qos_open, + QoS::Enabled { + priorities: Some(PriorityRange::new(1, 3).unwrap()) + } + ); + } + }; + } + + #[tokio::test] + async fn test_priority_range_negotiation_scenario_6() { + let qos_open = &mut QoS::Enabled { + priorities: Some(PriorityRange::new(1, 3).unwrap()), + }; + let qos_accept = &mut QoS::Enabled { + priorities: Some(PriorityRange::new(2, 3).unwrap()), + }; + + match test_priority_range_negotiation(qos_open, qos_accept).await { + Ok(()) => panic!("expected `Err(_)`, got `Ok(())`"), + Err(_) => {} + }; + } +} diff --git a/io/zenoh-transport/src/unicast/link.rs b/io/zenoh-transport/src/unicast/link.rs index 6e8425f056..bfd69ff7dc 100644 --- a/io/zenoh-transport/src/unicast/link.rs +++ b/io/zenoh-transport/src/unicast/link.rs @@ -82,7 +82,7 @@ impl TransportLinkUnicast { pub(crate) fn rx(&self) -> TransportLinkUnicastRx { TransportLinkUnicastRx { link: self.link.clone(), - config: self.config.clone(), + config: self.config, } }