From f809ac258008f9529a09d6eb642a4e505bfc25b1 Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Fri, 6 Sep 2024 07:23:41 +0000 Subject: [PATCH] Split `State` into `StateOpen` and `StateAccept` --- commons/zenoh-protocol/src/core/mod.rs | 2 +- io/zenoh-link-commons/src/lib.rs | 1 + io/zenoh-link-commons/src/unicast.rs | 1 + .../src/unicast/establishment/accept.rs | 6 +- .../src/unicast/establishment/cookie.rs | 6 +- .../src/unicast/establishment/ext/qos.rs | 298 +++++++++++------- .../src/unicast/establishment/open.rs | 6 +- 7 files changed, 199 insertions(+), 121 deletions(-) diff --git a/commons/zenoh-protocol/src/core/mod.rs b/commons/zenoh-protocol/src/core/mod.rs index efb294d415..57844d5a4b 100644 --- a/commons/zenoh-protocol/src/core/mod.rs +++ b/commons/zenoh-protocol/src/core/mod.rs @@ -311,7 +311,7 @@ pub enum Priority { } #[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize)] -/// A `u8` range bounded inclusively below and above. +/// A [`Priority`] range bounded inclusively below and above. pub struct PriorityRange(RangeInclusive); impl Deref for PriorityRange { diff --git a/io/zenoh-link-commons/src/lib.rs b/io/zenoh-link-commons/src/lib.rs index 505a0c986d..12b76385b8 100644 --- a/io/zenoh-link-commons/src/lib.rs +++ b/io/zenoh-link-commons/src/lib.rs @@ -45,6 +45,7 @@ use zenoh_result::ZResult; pub const BIND_INTERFACE: &str = "iface"; +// TODO(fuzzypixelz): Patch the Locators to contain negotiated priority and reliability #[derive(Clone, Debug, Serialize, Hash, PartialEq, Eq)] pub struct Link { pub src: Locator, diff --git a/io/zenoh-link-commons/src/unicast.rs b/io/zenoh-link-commons/src/unicast.rs index c29f4c7c14..15d7ce8b17 100644 --- a/io/zenoh-link-commons/src/unicast.rs +++ b/io/zenoh-link-commons/src/unicast.rs @@ -38,6 +38,7 @@ pub trait LinkManagerUnicastTrait: Send + Sync { } pub type NewLinkChannelSender = flume::Sender; +// TODO(fuzzypixelz): Delete this /// Notification of a new inbound connection. /// /// Link implementations should preserve the metadata sections of [`NewLinkUnicast::endpoint`]. diff --git a/io/zenoh-transport/src/unicast/establishment/accept.rs b/io/zenoh-transport/src/unicast/establishment/accept.rs index b9de27c076..efa7acb5ab 100644 --- a/io/zenoh-transport/src/unicast/establishment/accept.rs +++ b/io/zenoh-transport/src/unicast/establishment/accept.rs @@ -55,7 +55,7 @@ pub(super) type AcceptError = (zenoh_result::Error, Option); struct StateTransport { batch_size: BatchSize, resolution: Resolution, - ext_qos: ext::qos::QoS, + ext_qos: ext::qos::StateAccept, #[cfg(feature = "transport_multilink")] ext_mlink: ext::multilink::StateAccept, #[cfg(feature = "shared-memory")] @@ -696,7 +696,7 @@ pub(crate) async fn accept_link( transport: StateTransport { batch_size: manager.config.batch_size.min(batch_size::UNICAST).min(mtu), resolution: manager.config.resolution, - ext_qos: ext::qos::QoS::new(manager.config.unicast.is_qos, &endpoint)?, + ext_qos: ext::qos::StateAccept::new(manager.config.unicast.is_qos, &endpoint)?, #[cfg(feature = "transport_multilink")] ext_mlink: manager .state @@ -764,7 +764,7 @@ pub(crate) async fn accept_link( whatami: osyn_out.other_whatami, sn_resolution: state.transport.resolution.get(Field::FrameSN), tx_initial_sn: oack_out.open_ack.initial_sn, - is_qos: state.transport.ext_qos.is_enabled(), + is_qos: state.transport.ext_qos.is_qos(), #[cfg(feature = "transport_multilink")] multilink: state.transport.ext_mlink.multilink(), #[cfg(feature = "shared-memory")] diff --git a/io/zenoh-transport/src/unicast/establishment/cookie.rs b/io/zenoh-transport/src/unicast/establishment/cookie.rs index 7fb84f258e..4220f8e08b 100644 --- a/io/zenoh-transport/src/unicast/establishment/cookie.rs +++ b/io/zenoh-transport/src/unicast/establishment/cookie.rs @@ -34,7 +34,7 @@ pub(crate) struct Cookie { pub(crate) batch_size: BatchSize, pub(crate) nonce: u64, // Extensions - pub(crate) ext_qos: ext::qos::QoS, + pub(crate) ext_qos: ext::qos::StateAccept, #[cfg(feature = "transport_multilink")] pub(crate) ext_mlink: ext::multilink::StateAccept, #[cfg(feature = "shared-memory")] @@ -90,7 +90,7 @@ where let batch_size: BatchSize = self.read(&mut *reader)?; let nonce: u64 = self.read(&mut *reader)?; // Extensions - let ext_qos: ext::qos::QoS = self.read(&mut *reader)?; + let ext_qos: ext::qos::StateAccept = self.read(&mut *reader)?; #[cfg(feature = "transport_multilink")] let ext_mlink: ext::multilink::StateAccept = self.read(&mut *reader)?; #[cfg(feature = "shared-memory")] @@ -178,7 +178,7 @@ impl Cookie { resolution: Resolution::rand(), batch_size: rng.gen(), nonce: rng.gen(), - ext_qos: ext::qos::QoS::rand(), + ext_qos: ext::qos::StateAccept::rand(), #[cfg(feature = "transport_multilink")] ext_mlink: ext::multilink::StateAccept::rand(), #[cfg(feature = "shared-memory")] diff --git a/io/zenoh-transport/src/unicast/establishment/ext/qos.rs b/io/zenoh-transport/src/unicast/establishment/ext/qos.rs index 101835c1b7..36ab15b077 100644 --- a/io/zenoh-transport/src/unicast/establishment/ext/qos.rs +++ b/io/zenoh-transport/src/unicast/establishment/ext/qos.rs @@ -15,7 +15,6 @@ use core::marker::PhantomData; use std::str::FromStr; use async_trait::async_trait; -use serde::Serialize; use zenoh_buffers::{ reader::{DidntRead, Reader}, writer::{DidntWrite, Writer}, @@ -42,19 +41,20 @@ impl<'a> QoSFsm<'a> { } } -#[derive(Clone, Debug, PartialEq, Eq, Serialize)] -pub(crate) enum QoS { - Disabled, - Enabled { +// TODO(fuzzypixelz): Fallback to ZExtUnit QoS matching QoS::Disabled or QoS::Enabled { reliability: None, priorities: None } +#[derive(Clone, Debug, PartialEq, Eq)] +enum State { + NoQoS, + QoS { reliability: Option, priorities: Option, }, } -impl QoS { - pub(crate) fn new(is_qos: bool, endpoint: &EndPoint) -> ZResult { +impl State { + fn new(is_qos: bool, endpoint: &EndPoint) -> ZResult { if !is_qos { - Ok(QoS::Disabled) + Ok(State::NoQoS) } else { const RELIABILITY_METADATA_KEY: &str = "reliability"; const PRIORITY_METADATA_KEY: &str = "priorities"; @@ -71,7 +71,7 @@ impl QoS { .map(PriorityRange::from_str) .transpose()?; - Ok(QoS::Enabled { + Ok(State::QoS { priorities, reliability, }) @@ -80,8 +80,8 @@ impl QoS { fn try_from_u64(value: u64) -> ZResult { match value { - 0b000_u64 => Ok(QoS::Disabled), - 0b001_u64 => Ok(QoS::Enabled { + 0b000_u64 => Ok(State::NoQoS), + 0b001_u64 => Ok(State::QoS { priorities: None, reliability: None, }), @@ -105,7 +105,7 @@ impl QoS { None }; - Ok(QoS::Enabled { + Ok(State::QoS { priorities, reliability, }) @@ -116,6 +116,10 @@ impl QoS { /// Encodes [`QoS`] as a [`u64`]. /// + /// This function is used for encoding both of [`StateAccept`] in + /// [`crate::unicast::establishment::cookie::Cookie::ext_qos`] and + /// [`zenoh_protocol::transport::init::ext::QoS`]. + /// /// The three least significant bits are used to discrimnate five states: /// /// 1. QoS is disabled @@ -125,8 +129,8 @@ impl QoS { /// 5. QoS is enabled and both priority range and reliability are available fn to_u64(&self) -> u64 { match self { - QoS::Disabled => 0b000_u64, - QoS::Enabled { + State::NoQoS => 0b000_u64, + State::QoS { priorities, reliability, } => { @@ -153,7 +157,7 @@ impl QoS { } fn to_ext(&self) -> Option { - if self.is_enabled() { + if self.is_qos() { Some(init::ext::QoS::new(self.to_u64())) } else { None @@ -162,36 +166,36 @@ impl QoS { fn try_from_ext(ext: Option) -> ZResult { if let Some(ext) = ext { - QoS::try_from_u64(ext.value) + State::try_from_u64(ext.value) } else { - Ok(QoS::Disabled) + Ok(State::NoQoS) } } - pub(crate) fn is_enabled(&self) -> bool { - matches!(self, QoS::Enabled { .. }) + fn is_qos(&self) -> bool { + matches!(self, State::QoS { .. }) } - pub(crate) fn priorities(&self) -> Option { + fn priorities(&self) -> Option { match self { - QoS::Disabled - | QoS::Enabled { + State::NoQoS + | State::QoS { priorities: None, .. } => None, - QoS::Enabled { + State::QoS { priorities: Some(priorities), .. } => Some(priorities.clone()), } } - pub(crate) fn reliability(&self) -> Option { + fn reliability(&self) -> Option { match self { - QoS::Disabled - | QoS::Enabled { + State::NoQoS + | State::QoS { reliability: None, .. } => None, - QoS::Enabled { + State::QoS { reliability: Some(reliability), .. } => Some(*reliability), @@ -199,18 +203,18 @@ impl QoS { } #[cfg(test)] - pub(crate) fn rand() -> Self { + fn rand() -> Self { use rand::Rng; let mut rng = rand::thread_rng(); if rng.gen_bool(0.5) { - QoS::Disabled + State::NoQoS } else { let priorities = rng.gen_bool(0.5).then(PriorityRange::rand); let reliability = rng .gen_bool(0.5) .then(|| Reliability::from(rng.gen_bool(0.5))); - QoS::Enabled { + State::QoS { priorities, reliability, } @@ -218,26 +222,30 @@ impl QoS { } } -// Codec -impl WCodec<&QoS, &mut W> for Zenoh080 -where - W: Writer, -{ - type Output = Result<(), DidntWrite>; +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct StateOpen(State); - fn write(self, writer: &mut W, x: &QoS) -> Self::Output { - self.write(writer, &x.to_u64()) +impl From for StateOpen { + fn from(value: State) -> Self { + StateOpen(value) } } -impl RCodec for Zenoh080 -where - R: Reader, -{ - type Error = DidntRead; +impl StateOpen { + pub(crate) fn new(is_qos: bool, endpoint: &EndPoint) -> ZResult { + State::new(is_qos, endpoint).map(StateOpen) + } - fn read(self, reader: &mut R) -> Result { - QoS::try_from_u64(self.read(reader)?).map_err(|_| DidntRead) + pub(crate) fn is_qos(&self) -> bool { + self.0.is_qos() + } + + pub(crate) fn priorities(&self) -> Option { + self.0.priorities() + } + + pub(crate) fn reliability(&self) -> Option { + self.0.reliability() } } @@ -245,16 +253,16 @@ where impl<'a> OpenFsm for &'a QoSFsm<'a> { type Error = ZError; - type SendInitSynIn = &'a QoS; + type SendInitSynIn = &'a StateOpen; type SendInitSynOut = Option; async fn send_init_syn( self, state: Self::SendInitSynIn, ) -> Result { - Ok(state.to_ext()) + Ok(state.0.to_ext()) } - type RecvInitAckIn = (&'a mut QoS, Option); + type RecvInitAckIn = (&'a mut StateOpen, Option); type RecvInitAckOut = (); async fn recv_init_ack( self, @@ -262,20 +270,20 @@ impl<'a> OpenFsm for &'a QoSFsm<'a> { ) -> Result { let (state_self, other_ext) = input; - let state_other = QoS::try_from_ext(other_ext)?; + let state_other = State::try_from_ext(other_ext)?; let ( - QoS::Enabled { + State::QoS { reliability: self_reliability, priorities: self_priorities, }, - QoS::Enabled { + State::QoS { reliability: other_reliability, priorities: other_priorities, }, - ) = (state_self.clone(), state_other) + ) = (state_self.0.clone(), state_other) else { - *state_self = QoS::Disabled; + *state_self = State::NoQoS.into(); return Ok(()); }; @@ -307,15 +315,16 @@ impl<'a> OpenFsm for &'a QoSFsm<'a> { } }; - *state_self = QoS::Enabled { + *state_self = State::QoS { reliability, priorities, - }; + } + .into(); Ok(()) } - type SendOpenSynIn = &'a QoS; + type SendOpenSynIn = &'a StateOpen; type SendOpenSynOut = Option; async fn send_open_syn( self, @@ -324,7 +333,7 @@ impl<'a> OpenFsm for &'a QoSFsm<'a> { Ok(None) } - type RecvOpenAckIn = (&'a mut QoS, Option); + type RecvOpenAckIn = (&'a mut StateOpen, Option); type RecvOpenAckOut = (); async fn recv_open_ack( self, @@ -334,11 +343,68 @@ impl<'a> OpenFsm for &'a QoSFsm<'a> { } } +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct StateAccept(State); + +impl From for StateAccept { + fn from(value: State) -> Self { + StateAccept(value) + } +} + +impl StateAccept { + pub(crate) fn new(is_qos: bool, endpoint: &EndPoint) -> ZResult { + State::new(is_qos, endpoint).map(StateAccept::from) + } + + pub(crate) fn is_qos(&self) -> bool { + self.0.is_qos() + } + + pub(crate) fn priorities(&self) -> Option { + self.0.priorities() + } + + pub(crate) fn reliability(&self) -> Option { + self.0.reliability() + } + + #[cfg(test)] + pub(crate) fn rand() -> Self { + State::rand().into() + } +} + +// Codec +impl WCodec<&StateAccept, &mut W> for Zenoh080 +where + W: Writer, +{ + type Output = Result<(), DidntWrite>; + + fn write(self, writer: &mut W, x: &StateAccept) -> Self::Output { + self.write(writer, &x.0.to_u64()) + } +} + +impl RCodec for Zenoh080 +where + R: Reader, +{ + type Error = DidntRead; + + fn read(self, reader: &mut R) -> Result { + Ok(State::try_from_u64(self.read(reader)?) + .map_err(|_| DidntRead)? + .into()) + } +} + #[async_trait] impl<'a> AcceptFsm for &'a QoSFsm<'a> { type Error = ZError; - type RecvInitSynIn = (&'a mut QoS, Option); + type RecvInitSynIn = (&'a mut StateAccept, Option); type RecvInitSynOut = (); async fn recv_init_syn( self, @@ -346,20 +412,20 @@ impl<'a> AcceptFsm for &'a QoSFsm<'a> { ) -> Result { let (state_self, other_ext) = input; - let state_other = QoS::try_from_ext(other_ext)?; + let state_other = State::try_from_ext(other_ext)?; let ( - QoS::Enabled { + State::QoS { reliability: self_reliability, priorities: self_priorities, }, - QoS::Enabled { + State::QoS { reliability: other_reliability, priorities: other_priorities, }, - ) = (state_self.clone(), state_other) + ) = (state_self.0.clone(), state_other) else { - *state_self = QoS::Disabled; + *state_self = State::NoQoS.into(); return Ok(()); }; @@ -391,24 +457,25 @@ impl<'a> AcceptFsm for &'a QoSFsm<'a> { } }; - *state_self = QoS::Enabled { + *state_self = State::QoS { reliability, priorities, - }; + } + .into(); Ok(()) } - type SendInitAckIn = &'a QoS; + type SendInitAckIn = &'a StateAccept; type SendInitAckOut = Option; async fn send_init_ack( self, state: Self::SendInitAckIn, ) -> Result { - Ok(state.to_ext()) + Ok(state.0.to_ext()) } - type RecvOpenSynIn = (&'a mut QoS, Option); + type RecvOpenSynIn = (&'a mut StateAccept, Option); type RecvOpenSynOut = (); async fn recv_open_syn( self, @@ -417,7 +484,7 @@ impl<'a> AcceptFsm for &'a QoSFsm<'a> { Ok(()) } - type SendOpenAckIn = &'a QoS; + type SendOpenAckIn = &'a StateAccept; type SendOpenAckOut = Option; async fn send_open_ack( self, @@ -432,7 +499,7 @@ mod tests { use zenoh_protocol::core::{PriorityRange, Reliability}; use zenoh_result::ZResult; - use super::{QoS, QoSFsm}; + use super::{QoSFsm, State, StateAccept, StateOpen}; use crate::unicast::establishment::{AcceptFsm, OpenFsm}; macro_rules! priority_range { @@ -441,30 +508,39 @@ mod tests { }; } - async fn test_negotiation(qos_open: &mut QoS, qos_accept: &mut QoS) -> ZResult<()> { + async fn test_negotiation( + state_open: &mut StateOpen, + state_accept: &mut StateAccept, + ) -> ZResult<()> { let fsm = QoSFsm::new(); - let ext = fsm.send_init_syn(&*qos_open).await?; - fsm.recv_init_syn((qos_accept, ext)).await?; + let ext = fsm.send_init_syn(&*state_open).await?; + fsm.recv_init_syn((state_accept, ext)).await?; - let ext = fsm.send_init_ack(&*qos_accept).await?; - fsm.recv_init_ack((qos_open, ext)).await?; + let ext = fsm.send_init_ack(&*state_accept).await?; + fsm.recv_init_ack((state_open, ext)).await?; Ok(()) } - async fn test_negotiation_ok(mut open_qos: QoS, mut accept_qos: QoS, expected_qos: QoS) { - match test_negotiation(&mut open_qos, &mut accept_qos).await { + async fn test_negotiation_ok(state_open: State, state_accept: State, state_expected: State) { + let mut state_open = state_open.into(); + let mut state_accept = state_accept.into(); + + match test_negotiation(&mut state_open, &mut state_accept).await { Err(err) => panic!("expected `Ok(())`, got: {err}"), Ok(()) => { - assert_eq!(open_qos, accept_qos); - assert_eq!(open_qos, expected_qos); + assert_eq!(state_open.0, state_accept.0); + assert_eq!(state_open.0, state_expected); } }; } - async fn test_negotiation_err(mut open_qos: QoS, mut accept_qos: QoS) { - if let Ok(()) = test_negotiation(&mut open_qos, &mut accept_qos).await { + async fn test_negotiation_err(state_open: State, state_accept: State) { + let mut state_open = state_open.into(); + let mut state_accept = state_accept.into(); + + if let Ok(()) = test_negotiation(&mut state_open, &mut state_accept).await { panic!("expected `Err(_)`, got `Ok(())`") } } @@ -472,15 +548,15 @@ mod tests { #[tokio::test] async fn test_priority_range_negotiation_scenario_1() { test_negotiation_ok( - QoS::Enabled { + State::QoS { priorities: None, reliability: None, }, - QoS::Enabled { + State::QoS { priorities: None, reliability: None, }, - QoS::Enabled { + State::QoS { priorities: None, reliability: None, }, @@ -491,15 +567,15 @@ mod tests { #[tokio::test] async fn test_priority_range_negotiation_scenario_2() { test_negotiation_ok( - QoS::Enabled { + State::QoS { priorities: None, reliability: None, }, - QoS::Enabled { + State::QoS { priorities: Some(priority_range!(1, 3)), reliability: None, }, - QoS::Enabled { + State::QoS { priorities: Some(priority_range!(1, 3)), reliability: None, }, @@ -510,15 +586,15 @@ mod tests { #[tokio::test] async fn test_priority_range_negotiation_scenario_3() { test_negotiation_ok( - QoS::Enabled { + State::QoS { priorities: Some(priority_range!(1, 3)), reliability: None, }, - QoS::Enabled { + State::QoS { priorities: None, reliability: None, }, - QoS::Enabled { + State::QoS { priorities: Some(priority_range!(1, 3)), reliability: None, }, @@ -529,15 +605,15 @@ mod tests { #[tokio::test] async fn test_priority_range_negotiation_scenario_4() { test_negotiation_ok( - QoS::Enabled { + State::QoS { priorities: Some(priority_range!(1, 3)), reliability: None, }, - QoS::Enabled { + State::QoS { priorities: Some(priority_range!(1, 3)), reliability: None, }, - QoS::Enabled { + State::QoS { priorities: Some(priority_range!(1, 3)), reliability: None, }, @@ -548,15 +624,15 @@ mod tests { #[tokio::test] async fn test_priority_range_negotiation_scenario_5() { test_negotiation_ok( - QoS::Enabled { + State::QoS { priorities: Some(priority_range!(1, 3)), reliability: None, }, - QoS::Enabled { + State::QoS { priorities: Some(priority_range!(1, 3)), reliability: None, }, - QoS::Enabled { + State::QoS { priorities: Some(priority_range!(1, 3)), reliability: None, }, @@ -567,11 +643,11 @@ mod tests { #[tokio::test] async fn test_priority_range_negotiation_scenario_6() { test_negotiation_err( - QoS::Enabled { + State::QoS { priorities: Some(priority_range!(1, 3)), reliability: None, }, - QoS::Enabled { + State::QoS { priorities: Some(priority_range!(1, 3)), reliability: None, }, @@ -582,15 +658,15 @@ mod tests { #[tokio::test] async fn test_reliability_negotiation_scenario_2() { test_negotiation_ok( - QoS::Enabled { + State::QoS { reliability: None, priorities: None, }, - QoS::Enabled { + State::QoS { reliability: Some(Reliability::BestEffort), priorities: None, }, - QoS::Enabled { + State::QoS { reliability: Some(Reliability::BestEffort), priorities: None, }, @@ -601,11 +677,11 @@ mod tests { #[tokio::test] async fn test_reliability_negotiation_scenario_3() { test_negotiation_err( - QoS::Enabled { + State::QoS { reliability: Some(Reliability::Reliable), priorities: None, }, - QoS::Enabled { + State::QoS { reliability: Some(Reliability::BestEffort), priorities: None, }, @@ -616,15 +692,15 @@ mod tests { #[tokio::test] async fn test_reliability_negotiation_scenario_4() { test_negotiation_ok( - QoS::Enabled { + State::QoS { reliability: Some(Reliability::Reliable), priorities: None, }, - QoS::Enabled { + State::QoS { reliability: Some(Reliability::Reliable), priorities: None, }, - QoS::Enabled { + State::QoS { reliability: Some(Reliability::Reliable), priorities: None, }, @@ -635,11 +711,11 @@ mod tests { #[tokio::test] async fn test_reliability_negotiation_scenario_5() { test_negotiation_err( - QoS::Enabled { + State::QoS { reliability: Some(Reliability::BestEffort), priorities: None, }, - QoS::Enabled { + State::QoS { reliability: Some(Reliability::Reliable), priorities: None, }, @@ -650,15 +726,15 @@ mod tests { #[tokio::test] async fn test_priority_range_and_reliability_negotiation_scenario_1() { test_negotiation_ok( - QoS::Enabled { + State::QoS { reliability: Some(Reliability::BestEffort), priorities: Some(priority_range!(1, 3)), }, - QoS::Enabled { + State::QoS { reliability: Some(Reliability::BestEffort), priorities: Some(priority_range!(1, 3)), }, - QoS::Enabled { + State::QoS { reliability: Some(Reliability::BestEffort), priorities: Some(priority_range!(1, 3)), }, diff --git a/io/zenoh-transport/src/unicast/establishment/open.rs b/io/zenoh-transport/src/unicast/establishment/open.rs index b231210c0a..da180b2b72 100644 --- a/io/zenoh-transport/src/unicast/establishment/open.rs +++ b/io/zenoh-transport/src/unicast/establishment/open.rs @@ -52,7 +52,7 @@ type OpenError = (zenoh_result::Error, Option); struct StateTransport { batch_size: BatchSize, resolution: Resolution, - ext_qos: ext::qos::QoS, + ext_qos: ext::qos::StateOpen, #[cfg(feature = "transport_multilink")] ext_mlink: ext::multilink::StateOpen, #[cfg(feature = "shared-memory")] @@ -581,7 +581,7 @@ pub(crate) async fn open_link( .min(batch_size::UNICAST) .min(link.config.batch.mtu), resolution: manager.config.resolution, - ext_qos: ext::qos::QoS::new(manager.config.unicast.is_qos, &endpoint)?, + ext_qos: ext::qos::StateOpen::new(manager.config.unicast.is_qos, &endpoint)?, #[cfg(feature = "transport_multilink")] ext_mlink: manager .state @@ -649,7 +649,7 @@ pub(crate) async fn open_link( whatami: iack_out.other_whatami, sn_resolution: state.transport.resolution.get(Field::FrameSN), tx_initial_sn: osyn_out.mine_initial_sn, - is_qos: state.transport.ext_qos.is_enabled(), + is_qos: state.transport.ext_qos.is_qos(), #[cfg(feature = "transport_multilink")] multilink: state.transport.ext_mlink.multilink(), #[cfg(feature = "shared-memory")]