From 26c55a987342354f8f16642e1592ad1fe9839e9d Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Fri, 9 Feb 2024 18:13:00 +0100 Subject: [PATCH 01/13] -expose qos data in Sample --- commons/zenoh-protocol/src/core/mod.rs | 8 ++++ commons/zenoh-protocol/src/network/mod.rs | 11 ++++++ zenoh/src/publication.rs | 9 ++++- zenoh/src/queryable.rs | 2 + zenoh/src/sample.rs | 9 ++++- zenoh/src/session.rs | 3 ++ zenoh/tests/qos.rs | 48 +++++++++++++++++++++++ 7 files changed, 88 insertions(+), 2 deletions(-) create mode 100644 zenoh/tests/qos.rs diff --git a/commons/zenoh-protocol/src/core/mod.rs b/commons/zenoh-protocol/src/core/mod.rs index 2547034c44..b7f73ba85f 100644 --- a/commons/zenoh-protocol/src/core/mod.rs +++ b/commons/zenoh-protocol/src/core/mod.rs @@ -431,3 +431,11 @@ pub enum QueryTarget { #[cfg(feature = "complete_n")] Complete(u64), } + +/// Structure containing quality of service data +#[derive(Debug, Default, Copy, Clone, Eq, PartialEq)] +pub struct QoS { + pub priority: Priority, + pub congestion_control: CongestionControl, + pub express: bool, +} diff --git a/commons/zenoh-protocol/src/network/mod.rs b/commons/zenoh-protocol/src/network/mod.rs index 1be58db5cc..9f7a1d6d58 100644 --- a/commons/zenoh-protocol/src/network/mod.rs +++ b/commons/zenoh-protocol/src/network/mod.rs @@ -331,6 +331,17 @@ pub mod ext { } } + type QoSPublic = crate::core::QoS; + impl From> for QoSPublic { + fn from(ext: QoSType<{ ID }>) -> Self { + QoSPublic { + priority: ext.get_priority(), + congestion_control: ext.get_congestion_control(), + express: ext.is_express(), + } + } + } + /// ```text /// 7 6 5 4 3 2 1 0 /// +-+-+-+-+-+-+-+-+ diff --git a/zenoh/src/publication.rs b/zenoh/src/publication.rs index 843190ad45..1a849ec5d0 100644 --- a/zenoh/src/publication.rs +++ b/zenoh/src/publication.rs @@ -857,7 +857,14 @@ fn resolve_put( kind, encoding: Some(value.encoding), timestamp, - ..Default::default() + source_id: None, + source_sn: None, + qos: ext::QoSType::new ( + publisher.priority.into(), + publisher.congestion_control, + false, + ) + .into(), }; publisher.session.handle_data( diff --git a/zenoh/src/queryable.rs b/zenoh/src/queryable.rs index 9ee73d1641..5114ef6570 100644 --- a/zenoh/src/queryable.rs +++ b/zenoh/src/queryable.rs @@ -193,6 +193,7 @@ impl SyncResolve for ReplyBuilder<'_> { value: Value { payload, encoding }, kind, timestamp, + qos, #[cfg(feature = "unstable")] source_info, #[cfg(feature = "unstable")] @@ -203,6 +204,7 @@ impl SyncResolve for ReplyBuilder<'_> { kind, encoding: Some(encoding), timestamp, + qos, source_id: None, source_sn: None, }; diff --git a/zenoh/src/sample.rs b/zenoh/src/sample.rs index 5d707e5936..1a579fd9b9 100644 --- a/zenoh/src/sample.rs +++ b/zenoh/src/sample.rs @@ -21,7 +21,7 @@ use crate::time::{new_reception_timestamp, Timestamp}; #[zenoh_macros::unstable] use serde::Serialize; use std::convert::{TryFrom, TryInto}; -use zenoh_protocol::core::Encoding; +use zenoh_protocol::core::{Encoding, QoS}; pub type SourceSn = u64; @@ -50,6 +50,7 @@ pub(crate) struct DataInfo { pub timestamp: Option, pub source_id: Option, pub source_sn: Option, + pub qos: QoS, } /// Informations on the source of a zenoh [`Sample`]. @@ -326,6 +327,8 @@ pub struct Sample { pub kind: SampleKind, /// The [`Timestamp`] of this Sample. pub timestamp: Option, + /// Quality of service settings the sample was send with + pub qos: QoS, #[cfg(feature = "unstable")] ///
@@ -361,6 +364,7 @@ impl Sample { value: value.into(), kind: SampleKind::default(), timestamp: None, + qos: QoS::default(), #[cfg(feature = "unstable")] source_info: SourceInfo::empty(), #[cfg(feature = "unstable")] @@ -383,6 +387,7 @@ impl Sample { value: value.into(), kind: SampleKind::default(), timestamp: None, + qos: QoS::default(), #[cfg(feature = "unstable")] source_info: SourceInfo::empty(), #[cfg(feature = "unstable")] @@ -407,6 +412,7 @@ impl Sample { value, kind: data_info.kind, timestamp: data_info.timestamp, + qos: data_info.qos, #[cfg(feature = "unstable")] source_info: data_info.into(), #[cfg(feature = "unstable")] @@ -418,6 +424,7 @@ impl Sample { value, kind: SampleKind::default(), timestamp: None, + qos: QoS::default(), #[cfg(feature = "unstable")] source_info: SourceInfo::empty(), #[cfg(feature = "unstable")] diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index d52c446d3d..bc9ceb4001 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -2192,6 +2192,7 @@ impl Primitives for Session { kind: SampleKind::Put, encoding: Some(m.encoding), timestamp: m.timestamp, + qos: msg.ext_qos.into(), source_id: m.ext_sinfo.as_ref().map(|i| i.zid), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), }; @@ -2209,6 +2210,7 @@ impl Primitives for Session { kind: SampleKind::Delete, encoding: None, timestamp: m.timestamp, + qos: msg.ext_qos.into(), source_id: m.ext_sinfo.as_ref().map(|i| i.zid), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), }; @@ -2345,6 +2347,7 @@ impl Primitives for Session { kind: SampleKind::Put, encoding: Some(m.encoding), timestamp: m.timestamp, + qos: msg.ext_qos.into(), source_id: m.ext_sinfo.as_ref().map(|i| i.zid), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), }; diff --git a/zenoh/tests/qos.rs b/zenoh/tests/qos.rs new file mode 100644 index 0000000000..3503b6bff8 --- /dev/null +++ b/zenoh/tests/qos.rs @@ -0,0 +1,48 @@ +use std::time::Duration; + +use async_std::task; +use zenoh::{publication::Priority, subscriber, SessionDeclarations}; +use zenoh_core::{zasync_executor_init, AsyncResolve}; +use zenoh_protocol::core::CongestionControl; + + +const SLEEP: Duration = Duration::from_secs(1); + +#[test] +fn pubsub() { + task::block_on(async { + zasync_executor_init!(); + let session1 = zenoh::open(zenoh_config::peer()).res_async().await.unwrap(); + let session2 = zenoh::open(zenoh_config::peer()).res_async().await.unwrap(); + + let publisher = session1 + .declare_publisher("test/qos") + .res() + .await + .unwrap() + .priority(Priority::DataHigh) + .congestion_control(CongestionControl::Drop); + + task::sleep(SLEEP).await; + + let sub = session2 + .declare_subscriber("test/qos") + .res() + .await + .unwrap(); + task::sleep(SLEEP).await; + + publisher.put("qos").res_async().await.unwrap(); + let qos = sub.recv_async().await.unwrap().qos; + + assert_eq!(qos.priority, Priority::DataHigh.into()); + assert_eq!(qos.congestion_control, CongestionControl::Drop); + + let publisher = publisher.priority(Priority::DataLow).congestion_control(CongestionControl::Block); + publisher.put("qos").res_async().await.unwrap(); + let qos = sub.recv_async().await.unwrap().qos; + + assert_eq!(qos.priority, Priority::DataLow.into()); + assert_eq!(qos.congestion_control, CongestionControl::Block); + }); +} \ No newline at end of file From 239b4ac7b5bcc6153b5a0bf7406f74be2e3f449c Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Fri, 9 Feb 2024 18:17:20 +0100 Subject: [PATCH 02/13] format --- zenoh/src/publication.rs | 6 +++--- zenoh/tests/qos.rs | 15 ++++++--------- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/zenoh/src/publication.rs b/zenoh/src/publication.rs index 1a849ec5d0..915c7c7cd2 100644 --- a/zenoh/src/publication.rs +++ b/zenoh/src/publication.rs @@ -859,12 +859,12 @@ fn resolve_put( timestamp, source_id: None, source_sn: None, - qos: ext::QoSType::new ( + qos: ext::QoSType::new( publisher.priority.into(), publisher.congestion_control, false, - ) - .into(), + ) + .into(), }; publisher.session.handle_data( diff --git a/zenoh/tests/qos.rs b/zenoh/tests/qos.rs index 3503b6bff8..25e1341330 100644 --- a/zenoh/tests/qos.rs +++ b/zenoh/tests/qos.rs @@ -1,11 +1,10 @@ use std::time::Duration; use async_std::task; -use zenoh::{publication::Priority, subscriber, SessionDeclarations}; +use zenoh::{publication::Priority, SessionDeclarations}; use zenoh_core::{zasync_executor_init, AsyncResolve}; use zenoh_protocol::core::CongestionControl; - const SLEEP: Duration = Duration::from_secs(1); #[test] @@ -25,11 +24,7 @@ fn pubsub() { task::sleep(SLEEP).await; - let sub = session2 - .declare_subscriber("test/qos") - .res() - .await - .unwrap(); + let sub = session2.declare_subscriber("test/qos").res().await.unwrap(); task::sleep(SLEEP).await; publisher.put("qos").res_async().await.unwrap(); @@ -38,11 +33,13 @@ fn pubsub() { assert_eq!(qos.priority, Priority::DataHigh.into()); assert_eq!(qos.congestion_control, CongestionControl::Drop); - let publisher = publisher.priority(Priority::DataLow).congestion_control(CongestionControl::Block); + let publisher = publisher + .priority(Priority::DataLow) + .congestion_control(CongestionControl::Block); publisher.put("qos").res_async().await.unwrap(); let qos = sub.recv_async().await.unwrap().qos; assert_eq!(qos.priority, Priority::DataLow.into()); assert_eq!(qos.congestion_control, CongestionControl::Block); }); -} \ No newline at end of file +} From c09a6a4f110a3d13a80d97450ef076d9a55bf477 Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Fri, 9 Feb 2024 18:21:24 +0100 Subject: [PATCH 03/13] typo fix --- zenoh/src/sample.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zenoh/src/sample.rs b/zenoh/src/sample.rs index 1a579fd9b9..999a292098 100644 --- a/zenoh/src/sample.rs +++ b/zenoh/src/sample.rs @@ -327,7 +327,7 @@ pub struct Sample { pub kind: SampleKind, /// The [`Timestamp`] of this Sample. pub timestamp: Option, - /// Quality of service settings the sample was send with + /// Quality of service settings this sample was sent with. pub qos: QoS, #[cfg(feature = "unstable")] From 9c26273d201f270dc66c607b9cdd9d1e6cdc1e79 Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Tue, 13 Feb 2024 12:07:20 +0100 Subject: [PATCH 04/13] cleanup tests/qos --- zenoh/tests/qos.rs | 61 +++++++++++++++++++++++++++++++--------------- 1 file changed, 41 insertions(+), 20 deletions(-) diff --git a/zenoh/tests/qos.rs b/zenoh/tests/qos.rs index 25e1341330..cee5866ec1 100644 --- a/zenoh/tests/qos.rs +++ b/zenoh/tests/qos.rs @@ -1,43 +1,64 @@ -use std::time::Duration; - +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use async_std::prelude::FutureExt; use async_std::task; +use std::time::Duration; +use zenoh::prelude::r#async::*; use zenoh::{publication::Priority, SessionDeclarations}; -use zenoh_core::{zasync_executor_init, AsyncResolve}; -use zenoh_protocol::core::CongestionControl; +use zenoh_core::zasync_executor_init; +const TIMEOUT: Duration = Duration::from_secs(60); const SLEEP: Duration = Duration::from_secs(1); +macro_rules! ztimeout { + ($f:expr) => { + $f.timeout(TIMEOUT).await.unwrap() + }; +} + #[test] fn pubsub() { task::block_on(async { zasync_executor_init!(); - let session1 = zenoh::open(zenoh_config::peer()).res_async().await.unwrap(); - let session2 = zenoh::open(zenoh_config::peer()).res_async().await.unwrap(); + let session1 = ztimeout!(zenoh::open(zenoh_config::peer()).res_async()).unwrap(); + let session2 = ztimeout!(zenoh::open(zenoh_config::peer()).res_async()).unwrap(); - let publisher = session1 + let publisher1 = ztimeout!(session1 .declare_publisher("test/qos") - .res() - .await - .unwrap() .priority(Priority::DataHigh) - .congestion_control(CongestionControl::Drop); + .congestion_control(CongestionControl::Drop) + .res()) + .unwrap(); - task::sleep(SLEEP).await; + let publisher2 = ztimeout!(session1 + .declare_publisher("test/qos") + .priority(Priority::DataLow) + .congestion_control(CongestionControl::Block) + .res()) + .unwrap(); - let sub = session2.declare_subscriber("test/qos").res().await.unwrap(); + let subscriber = ztimeout!(session2.declare_subscriber("test/qos").res()).unwrap(); task::sleep(SLEEP).await; - publisher.put("qos").res_async().await.unwrap(); - let qos = sub.recv_async().await.unwrap().qos; + ztimeout!(publisher1.put("qos").res_async()).unwrap(); + let qos = ztimeout!(subscriber.recv_async()).unwrap().qos; assert_eq!(qos.priority, Priority::DataHigh.into()); assert_eq!(qos.congestion_control, CongestionControl::Drop); - let publisher = publisher - .priority(Priority::DataLow) - .congestion_control(CongestionControl::Block); - publisher.put("qos").res_async().await.unwrap(); - let qos = sub.recv_async().await.unwrap().qos; + ztimeout!(publisher2.put("qos").res_async()).unwrap(); + let qos = ztimeout!(subscriber.recv_async()).unwrap().qos; assert_eq!(qos.priority, Priority::DataLow.into()); assert_eq!(qos.congestion_control, CongestionControl::Block); From df05068bf5fc3f3d0142a8f342ddcd6a051f9fbe Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Tue, 13 Feb 2024 12:38:26 +0100 Subject: [PATCH 05/13] moved public QoS type to Sample.rs --- commons/zenoh-protocol/src/core/mod.rs | 8 -------- commons/zenoh-protocol/src/network/mod.rs | 11 ----------- zenoh/src/publication.rs | 6 ++++++ zenoh/src/sample.rs | 22 +++++++++++++++++++++- zenoh/tests/qos.rs | 4 ++-- 5 files changed, 29 insertions(+), 22 deletions(-) diff --git a/commons/zenoh-protocol/src/core/mod.rs b/commons/zenoh-protocol/src/core/mod.rs index b7f73ba85f..2547034c44 100644 --- a/commons/zenoh-protocol/src/core/mod.rs +++ b/commons/zenoh-protocol/src/core/mod.rs @@ -431,11 +431,3 @@ pub enum QueryTarget { #[cfg(feature = "complete_n")] Complete(u64), } - -/// Structure containing quality of service data -#[derive(Debug, Default, Copy, Clone, Eq, PartialEq)] -pub struct QoS { - pub priority: Priority, - pub congestion_control: CongestionControl, - pub express: bool, -} diff --git a/commons/zenoh-protocol/src/network/mod.rs b/commons/zenoh-protocol/src/network/mod.rs index 9f7a1d6d58..1be58db5cc 100644 --- a/commons/zenoh-protocol/src/network/mod.rs +++ b/commons/zenoh-protocol/src/network/mod.rs @@ -331,17 +331,6 @@ pub mod ext { } } - type QoSPublic = crate::core::QoS; - impl From> for QoSPublic { - fn from(ext: QoSType<{ ID }>) -> Self { - QoSPublic { - priority: ext.get_priority(), - congestion_control: ext.get_congestion_control(), - express: ext.is_express(), - } - } - } - /// ```text /// 7 6 5 4 3 2 1 0 /// +-+-+-+-+-+-+-+-+ diff --git a/zenoh/src/publication.rs b/zenoh/src/publication.rs index 915c7c7cd2..63b96ce9fe 100644 --- a/zenoh/src/publication.rs +++ b/zenoh/src/publication.rs @@ -902,6 +902,12 @@ impl Priority { pub const NUM: usize = 1 + Self::MIN as usize - Self::MAX as usize; } +impl From for Priority { + fn from(priority: zenoh_protocol::core::Priority) -> Self { + Self::try_from(priority as u8).unwrap() + } +} + impl TryFrom for Priority { type Error = zenoh_result::Error; diff --git a/zenoh/src/sample.rs b/zenoh/src/sample.rs index 999a292098..9c5a1ea5ae 100644 --- a/zenoh/src/sample.rs +++ b/zenoh/src/sample.rs @@ -18,10 +18,12 @@ use crate::prelude::ZenohId; use crate::prelude::{KeyExpr, SampleKind, Value}; use crate::query::Reply; use crate::time::{new_reception_timestamp, Timestamp}; +use crate::Priority; #[zenoh_macros::unstable] use serde::Serialize; use std::convert::{TryFrom, TryInto}; -use zenoh_protocol::core::{Encoding, QoS}; +use zenoh_protocol::core::{CongestionControl, Encoding}; +use zenoh_protocol::network::push::ext::QoSType; pub type SourceSn = u64; @@ -516,3 +518,21 @@ impl TryFrom for Sample { value.sample } } + +/// Structure containing quality of service data +#[derive(Debug, Default, Copy, Clone, Eq, PartialEq)] +pub struct QoS { + pub priority: Priority, + pub congestion_control: CongestionControl, + pub express: bool, +} + +impl From for QoS { + fn from(ext: QoSType) -> Self { + QoS { + priority: ext.get_priority().into(), + congestion_control: ext.get_congestion_control(), + express: ext.is_express(), + } + } +} diff --git a/zenoh/tests/qos.rs b/zenoh/tests/qos.rs index cee5866ec1..6e9add8d06 100644 --- a/zenoh/tests/qos.rs +++ b/zenoh/tests/qos.rs @@ -54,13 +54,13 @@ fn pubsub() { ztimeout!(publisher1.put("qos").res_async()).unwrap(); let qos = ztimeout!(subscriber.recv_async()).unwrap().qos; - assert_eq!(qos.priority, Priority::DataHigh.into()); + assert_eq!(qos.priority, Priority::DataHigh); assert_eq!(qos.congestion_control, CongestionControl::Drop); ztimeout!(publisher2.put("qos").res_async()).unwrap(); let qos = ztimeout!(subscriber.recv_async()).unwrap().qos; - assert_eq!(qos.priority, Priority::DataLow.into()); + assert_eq!(qos.priority, Priority::DataLow); assert_eq!(qos.congestion_control, CongestionControl::Block); }); } From b90fc73994c55a0bd2085bfcff33e699959a61fb Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Tue, 13 Feb 2024 15:01:30 +0100 Subject: [PATCH 06/13] account for the possibility of failure when converting from ext::QoSType to Sample::QoS --- zenoh/src/publication.rs | 31 +++++++++++++++++++++---------- zenoh/src/sample.rs | 24 +++++++++++++++++++----- zenoh/src/session.rs | 7 ++++--- 3 files changed, 44 insertions(+), 18 deletions(-) diff --git a/zenoh/src/publication.rs b/zenoh/src/publication.rs index 63b96ce9fe..248ee2cb8c 100644 --- a/zenoh/src/publication.rs +++ b/zenoh/src/publication.rs @@ -22,6 +22,7 @@ use crate::prelude::*; #[zenoh_macros::unstable] use crate::sample::Attachment; use crate::sample::DataInfo; +use crate::sample::QoS; use crate::Encoding; use crate::SessionRef; use crate::Undeclarable; @@ -859,12 +860,11 @@ fn resolve_put( timestamp, source_id: None, source_sn: None, - qos: ext::QoSType::new( + qos: QoS::from_or_default(ext::QoSType::new( publisher.priority.into(), publisher.congestion_control, false, - ) - .into(), + )), }; publisher.session.handle_data( @@ -902,12 +902,6 @@ impl Priority { pub const NUM: usize = 1 + Self::MIN as usize - Self::MAX as usize; } -impl From for Priority { - fn from(priority: zenoh_protocol::core::Priority) -> Self { - Self::try_from(priority as u8).unwrap() - } -} - impl TryFrom for Priority { type Error = zenoh_result::Error; @@ -938,7 +932,8 @@ impl TryFrom for Priority { } } -impl From for zenoh_protocol::core::Priority { +type ProtocolPriority = zenoh_protocol::core::Priority; +impl From for ProtocolPriority { fn from(prio: Priority) -> Self { // The Priority in the prelude differs from the Priority in the core protocol only from // the missing Control priority. The Control priority is reserved for zenoh internal use @@ -952,6 +947,22 @@ impl From for zenoh_protocol::core::Priority { } } +impl TryFrom for Priority { + type Error = zenoh_result::Error; + fn try_from(priority: ProtocolPriority) -> Result { + match priority { + ProtocolPriority::Control => bail!("'Control' is not a valid priority value."), + ProtocolPriority::RealTime => Ok(Priority::RealTime), + ProtocolPriority::InteractiveHigh => Ok(Priority::InteractiveHigh), + ProtocolPriority::InteractiveLow => Ok(Priority::InteractiveLow), + ProtocolPriority::DataHigh => Ok(Priority::DataHigh), + ProtocolPriority::Data => Ok(Priority::Data), + ProtocolPriority::DataLow => Ok(Priority::DataLow), + ProtocolPriority::Background => Ok(Priority::Background), + } + } +} + /// A struct that indicates if there exist Subscribers matching the Publisher's key expression. /// /// # Examples diff --git a/zenoh/src/sample.rs b/zenoh/src/sample.rs index 9c5a1ea5ae..9dc6db94eb 100644 --- a/zenoh/src/sample.rs +++ b/zenoh/src/sample.rs @@ -527,12 +527,26 @@ pub struct QoS { pub express: bool, } -impl From for QoS { - fn from(ext: QoSType) -> Self { - QoS { - priority: ext.get_priority().into(), +impl QoS { + /// Helper function to fallback to default QoS value and log a error in case of conversion failure + pub(crate) fn from_or_default(ext: QoSType) -> QoS { + match QoS::try_from(ext) { + Ok(qos) => return qos, + Err(e) => { + log::error!("Failed to convert: {}", e.to_string()); + QoS::default() + } + } + } +} + +impl TryFrom for QoS { + type Error = zenoh_result::Error; + fn try_from(ext: QoSType) -> Result { + Ok(QoS { + priority: ext.get_priority().try_into()?, congestion_control: ext.get_congestion_control(), express: ext.is_express(), - } + }) } } diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index bc9ceb4001..4208c8f96c 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -31,6 +31,7 @@ use crate::queryable::*; #[cfg(feature = "unstable")] use crate::sample::Attachment; use crate::sample::DataInfo; +use crate::sample::QoS; use crate::selector::TIME_RANGE_KEY; use crate::subscriber::*; use crate::Id; @@ -2192,7 +2193,7 @@ impl Primitives for Session { kind: SampleKind::Put, encoding: Some(m.encoding), timestamp: m.timestamp, - qos: msg.ext_qos.into(), + qos: QoS::from_or_default(msg.ext_qos), source_id: m.ext_sinfo.as_ref().map(|i| i.zid), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), }; @@ -2210,7 +2211,7 @@ impl Primitives for Session { kind: SampleKind::Delete, encoding: None, timestamp: m.timestamp, - qos: msg.ext_qos.into(), + qos: QoS::from_or_default(msg.ext_qos), source_id: m.ext_sinfo.as_ref().map(|i| i.zid), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), }; @@ -2347,7 +2348,7 @@ impl Primitives for Session { kind: SampleKind::Put, encoding: Some(m.encoding), timestamp: m.timestamp, - qos: msg.ext_qos.into(), + qos: QoS::from_or_default(msg.ext_qos), source_id: m.ext_sinfo.as_ref().map(|i| i.zid), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), }; From 3028518e727e13389fdf592e1f2a5b535f53f677 Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Tue, 13 Feb 2024 19:16:34 +0100 Subject: [PATCH 07/13] QoS.from_or_default to replace only priority value by default one in case of failure --- zenoh/src/sample.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/zenoh/src/sample.rs b/zenoh/src/sample.rs index 9dc6db94eb..03fefb3411 100644 --- a/zenoh/src/sample.rs +++ b/zenoh/src/sample.rs @@ -528,14 +528,19 @@ pub struct QoS { } impl QoS { - /// Helper function to fallback to default QoS value and log a error in case of conversion failure + /// Helper function to fallback to QoS with default priortiy value, in case we fail to extract it pub(crate) fn from_or_default(ext: QoSType) -> QoS { - match QoS::try_from(ext) { - Ok(qos) => return qos, + let priority = match Priority::try_from(ext.get_priority()) { + Ok(p) => p, Err(e) => { - log::error!("Failed to convert: {}", e.to_string()); - QoS::default() + log::error!("Failed to convert priority: {}", e.to_string()); + Priority::default() } + }; + QoS { + priority, + congestion_control: ext.get_congestion_control(), + express: ext.is_express(), } } } From 03831f43d35b8f613c49dcedda8bb6fdfb809efe Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Tue, 13 Feb 2024 19:21:10 +0100 Subject: [PATCH 08/13] typo fix --- zenoh/src/sample.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zenoh/src/sample.rs b/zenoh/src/sample.rs index 03fefb3411..5c24dac963 100644 --- a/zenoh/src/sample.rs +++ b/zenoh/src/sample.rs @@ -528,7 +528,7 @@ pub struct QoS { } impl QoS { - /// Helper function to fallback to QoS with default priortiy value, in case we fail to extract it + /// Helper function to fallback to QoS with default priority value, in case we fail to extract it pub(crate) fn from_or_default(ext: QoSType) -> QoS { let priority = match Priority::try_from(ext.get_priority()) { Ok(p) => p, From 2f21cfbf7a608915abedea8974953abfe92a47a9 Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Wed, 14 Feb 2024 11:23:20 +0100 Subject: [PATCH 09/13] reduce priority of logging to trace, in case we fail to conver protocol priority to user one --- zenoh/src/sample.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/zenoh/src/sample.rs b/zenoh/src/sample.rs index 5c24dac963..c658e15a28 100644 --- a/zenoh/src/sample.rs +++ b/zenoh/src/sample.rs @@ -533,7 +533,10 @@ impl QoS { let priority = match Priority::try_from(ext.get_priority()) { Ok(p) => p, Err(e) => { - log::error!("Failed to convert priority: {}", e.to_string()); + log::trace!( + "Failed to convert priority: {}; replacing with default value", + e.to_string() + ); Priority::default() } }; From 85d8769197aa9463931a6977877fab738910e1cc Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Mon, 19 Feb 2024 12:44:10 +0100 Subject: [PATCH 10/13] reduce size of QoS struct --- zenoh/src/publication.rs | 2 +- zenoh/src/sample.rs | 38 +++++++++++++++++++------------------- zenoh/src/session.rs | 6 +++--- zenoh/tests/qos.rs | 8 ++++---- 4 files changed, 27 insertions(+), 27 deletions(-) diff --git a/zenoh/src/publication.rs b/zenoh/src/publication.rs index 248ee2cb8c..e0cd1d33c6 100644 --- a/zenoh/src/publication.rs +++ b/zenoh/src/publication.rs @@ -860,7 +860,7 @@ fn resolve_put( timestamp, source_id: None, source_sn: None, - qos: QoS::from_or_default(ext::QoSType::new( + qos: QoS::from(ext::QoSType::new( publisher.priority.into(), publisher.congestion_control, false, diff --git a/zenoh/src/sample.rs b/zenoh/src/sample.rs index c658e15a28..f55e5175bf 100644 --- a/zenoh/src/sample.rs +++ b/zenoh/src/sample.rs @@ -522,15 +522,12 @@ impl TryFrom for Sample { /// Structure containing quality of service data #[derive(Debug, Default, Copy, Clone, Eq, PartialEq)] pub struct QoS { - pub priority: Priority, - pub congestion_control: CongestionControl, - pub express: bool, + inner: QoSType, } impl QoS { - /// Helper function to fallback to QoS with default priority value, in case we fail to extract it - pub(crate) fn from_or_default(ext: QoSType) -> QoS { - let priority = match Priority::try_from(ext.get_priority()) { + pub fn priority(&self) -> Priority { + let priority = match Priority::try_from(self.inner.get_priority()) { Ok(p) => p, Err(e) => { log::trace!( @@ -540,21 +537,24 @@ impl QoS { Priority::default() } }; - QoS { - priority, - congestion_control: ext.get_congestion_control(), - express: ext.is_express(), - } + priority + } + + pub fn congestion_control(&self) -> CongestionControl { + self.inner.get_congestion_control() + } + + pub fn express(&self) -> bool { + self.inner.is_express() + } + + pub(crate) fn inner(&self) -> QoSType { + self.inner } } -impl TryFrom for QoS { - type Error = zenoh_result::Error; - fn try_from(ext: QoSType) -> Result { - Ok(QoS { - priority: ext.get_priority().try_into()?, - congestion_control: ext.get_congestion_control(), - express: ext.is_express(), - }) +impl From for QoS { + fn from(qos: QoSType) -> Self { + QoS { inner: qos } } } diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 4208c8f96c..43ef74d58f 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -2193,7 +2193,7 @@ impl Primitives for Session { kind: SampleKind::Put, encoding: Some(m.encoding), timestamp: m.timestamp, - qos: QoS::from_or_default(msg.ext_qos), + qos: QoS::from(msg.ext_qos), source_id: m.ext_sinfo.as_ref().map(|i| i.zid), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), }; @@ -2211,7 +2211,7 @@ impl Primitives for Session { kind: SampleKind::Delete, encoding: None, timestamp: m.timestamp, - qos: QoS::from_or_default(msg.ext_qos), + qos: QoS::from(msg.ext_qos), source_id: m.ext_sinfo.as_ref().map(|i| i.zid), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), }; @@ -2348,7 +2348,7 @@ impl Primitives for Session { kind: SampleKind::Put, encoding: Some(m.encoding), timestamp: m.timestamp, - qos: QoS::from_or_default(msg.ext_qos), + qos: QoS::from(msg.ext_qos), source_id: m.ext_sinfo.as_ref().map(|i| i.zid), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), }; diff --git a/zenoh/tests/qos.rs b/zenoh/tests/qos.rs index 6e9add8d06..475d8d7a1b 100644 --- a/zenoh/tests/qos.rs +++ b/zenoh/tests/qos.rs @@ -54,13 +54,13 @@ fn pubsub() { ztimeout!(publisher1.put("qos").res_async()).unwrap(); let qos = ztimeout!(subscriber.recv_async()).unwrap().qos; - assert_eq!(qos.priority, Priority::DataHigh); - assert_eq!(qos.congestion_control, CongestionControl::Drop); + assert_eq!(qos.priority(), Priority::DataHigh); + assert_eq!(qos.congestion_control(), CongestionControl::Drop); ztimeout!(publisher2.put("qos").res_async()).unwrap(); let qos = ztimeout!(subscriber.recv_async()).unwrap().qos; - assert_eq!(qos.priority, Priority::DataLow); - assert_eq!(qos.congestion_control, CongestionControl::Block); + assert_eq!(qos.priority(), Priority::DataLow); + assert_eq!(qos.congestion_control(), CongestionControl::Block); }); } From eb7987709d381ac57bb39545fff40f82d301ebf3 Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Mon, 19 Feb 2024 16:09:34 +0100 Subject: [PATCH 11/13] - add docstrings - remove inner type accessor --- zenoh/src/sample.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/zenoh/src/sample.rs b/zenoh/src/sample.rs index f55e5175bf..db55bce522 100644 --- a/zenoh/src/sample.rs +++ b/zenoh/src/sample.rs @@ -526,6 +526,7 @@ pub struct QoS { } impl QoS { + /// Get priority of the message. pub fn priority(&self) -> Priority { let priority = match Priority::try_from(self.inner.get_priority()) { Ok(p) => p, @@ -540,17 +541,15 @@ impl QoS { priority } + /// Get congestion control of the message. pub fn congestion_control(&self) -> CongestionControl { self.inner.get_congestion_control() } + /// Get express flag value. If true, the message is not batched during transmission, in order to reduce latency. pub fn express(&self) -> bool { self.inner.is_express() } - - pub(crate) fn inner(&self) -> QoSType { - self.inner - } } impl From for QoS { From 061008f21a3ae442d37f4db2ca9b2dd6cc95c0f8 Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Tue, 20 Feb 2024 18:08:38 +0100 Subject: [PATCH 12/13] add qos setters --- commons/zenoh-protocol/src/network/mod.rs | 7 +++++++ zenoh/src/sample.rs | 24 ++++++++++++++++++++--- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/commons/zenoh-protocol/src/network/mod.rs b/commons/zenoh-protocol/src/network/mod.rs index 1be58db5cc..b2ae5deabe 100644 --- a/commons/zenoh-protocol/src/network/mod.rs +++ b/commons/zenoh-protocol/src/network/mod.rs @@ -263,6 +263,13 @@ pub mod ext { } } + pub fn set_is_express(&mut self, is_express: bool) { + match is_express { + true => self.inner = imsg::set_flag(self.inner, Self::E_FLAG), + false => self.inner = imsg::unset_flag(self.inner, Self::E_FLAG), + } + } + pub const fn is_express(&self) -> bool { imsg::has_flag(self.inner, Self::E_FLAG) } diff --git a/zenoh/src/sample.rs b/zenoh/src/sample.rs index db55bce522..6ae5f2405d 100644 --- a/zenoh/src/sample.rs +++ b/zenoh/src/sample.rs @@ -526,7 +526,7 @@ pub struct QoS { } impl QoS { - /// Get priority of the message. + /// Gets priority of the message. pub fn priority(&self) -> Priority { let priority = match Priority::try_from(self.inner.get_priority()) { Ok(p) => p, @@ -541,15 +541,33 @@ impl QoS { priority } - /// Get congestion control of the message. + /// Gets congestion control of the message. pub fn congestion_control(&self) -> CongestionControl { self.inner.get_congestion_control() } - /// Get express flag value. If true, the message is not batched during transmission, in order to reduce latency. + /// Gets express flag value. If true, the message is not batched during transmission, in order to reduce latency. pub fn express(&self) -> bool { self.inner.is_express() } + + /// Sets priority value. + pub fn with_priority(mut self, priority: Priority) -> Self { + self.inner.set_priority(priority.into()); + self + } + + /// Sets congestion control value. + pub fn with_congestion_control(mut self, congestion_control: CongestionControl) -> Self { + self.inner.set_congestion_control(congestion_control); + self + } + + /// Sets express flag vlaue. + pub fn with_express(mut self, is_express: bool) -> Self { + self.inner.set_is_express(is_express); + self + } } impl From for QoS { From 90d1b7629c04c493b95a66b2295916cbdabd2ea8 Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Wed, 21 Feb 2024 17:00:01 +0100 Subject: [PATCH 13/13] fix clippy warnings --- zenoh/src/sample.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/zenoh/src/sample.rs b/zenoh/src/sample.rs index 6ae5f2405d..ff867cecee 100644 --- a/zenoh/src/sample.rs +++ b/zenoh/src/sample.rs @@ -528,7 +528,7 @@ pub struct QoS { impl QoS { /// Gets priority of the message. pub fn priority(&self) -> Priority { - let priority = match Priority::try_from(self.inner.get_priority()) { + match Priority::try_from(self.inner.get_priority()) { Ok(p) => p, Err(e) => { log::trace!( @@ -537,8 +537,7 @@ impl QoS { ); Priority::default() } - }; - priority + } } /// Gets congestion control of the message.