diff --git a/commons/zenoh-protocol/src/network/mod.rs b/commons/zenoh-protocol/src/network/mod.rs index 1be58db5cc..b2ae5deabe 100644 --- a/commons/zenoh-protocol/src/network/mod.rs +++ b/commons/zenoh-protocol/src/network/mod.rs @@ -263,6 +263,13 @@ pub mod ext { } } + pub fn set_is_express(&mut self, is_express: bool) { + match is_express { + true => self.inner = imsg::set_flag(self.inner, Self::E_FLAG), + false => self.inner = imsg::unset_flag(self.inner, Self::E_FLAG), + } + } + pub const fn is_express(&self) -> bool { imsg::has_flag(self.inner, Self::E_FLAG) } diff --git a/zenoh/src/publication.rs b/zenoh/src/publication.rs index 843190ad45..e0cd1d33c6 100644 --- a/zenoh/src/publication.rs +++ b/zenoh/src/publication.rs @@ -22,6 +22,7 @@ use crate::prelude::*; #[zenoh_macros::unstable] use crate::sample::Attachment; use crate::sample::DataInfo; +use crate::sample::QoS; use crate::Encoding; use crate::SessionRef; use crate::Undeclarable; @@ -857,7 +858,13 @@ fn resolve_put( kind, encoding: Some(value.encoding), timestamp, - ..Default::default() + source_id: None, + source_sn: None, + qos: QoS::from(ext::QoSType::new( + publisher.priority.into(), + publisher.congestion_control, + false, + )), }; publisher.session.handle_data( @@ -925,7 +932,8 @@ impl TryFrom for Priority { } } -impl From for zenoh_protocol::core::Priority { +type ProtocolPriority = zenoh_protocol::core::Priority; +impl From for ProtocolPriority { fn from(prio: Priority) -> Self { // The Priority in the prelude differs from the Priority in the core protocol only from // the missing Control priority. The Control priority is reserved for zenoh internal use @@ -939,6 +947,22 @@ impl From for zenoh_protocol::core::Priority { } } +impl TryFrom for Priority { + type Error = zenoh_result::Error; + fn try_from(priority: ProtocolPriority) -> Result { + match priority { + ProtocolPriority::Control => bail!("'Control' is not a valid priority value."), + ProtocolPriority::RealTime => Ok(Priority::RealTime), + ProtocolPriority::InteractiveHigh => Ok(Priority::InteractiveHigh), + ProtocolPriority::InteractiveLow => Ok(Priority::InteractiveLow), + ProtocolPriority::DataHigh => Ok(Priority::DataHigh), + ProtocolPriority::Data => Ok(Priority::Data), + ProtocolPriority::DataLow => Ok(Priority::DataLow), + ProtocolPriority::Background => Ok(Priority::Background), + } + } +} + /// A struct that indicates if there exist Subscribers matching the Publisher's key expression. /// /// # Examples diff --git a/zenoh/src/queryable.rs b/zenoh/src/queryable.rs index 9ee73d1641..5114ef6570 100644 --- a/zenoh/src/queryable.rs +++ b/zenoh/src/queryable.rs @@ -193,6 +193,7 @@ impl SyncResolve for ReplyBuilder<'_> { value: Value { payload, encoding }, kind, timestamp, + qos, #[cfg(feature = "unstable")] source_info, #[cfg(feature = "unstable")] @@ -203,6 +204,7 @@ impl SyncResolve for ReplyBuilder<'_> { kind, encoding: Some(encoding), timestamp, + qos, source_id: None, source_sn: None, }; diff --git a/zenoh/src/sample.rs b/zenoh/src/sample.rs index 5d707e5936..ff867cecee 100644 --- a/zenoh/src/sample.rs +++ b/zenoh/src/sample.rs @@ -18,10 +18,12 @@ use crate::prelude::ZenohId; use crate::prelude::{KeyExpr, SampleKind, Value}; use crate::query::Reply; use crate::time::{new_reception_timestamp, Timestamp}; +use crate::Priority; #[zenoh_macros::unstable] use serde::Serialize; use std::convert::{TryFrom, TryInto}; -use zenoh_protocol::core::Encoding; +use zenoh_protocol::core::{CongestionControl, Encoding}; +use zenoh_protocol::network::push::ext::QoSType; pub type SourceSn = u64; @@ -50,6 +52,7 @@ pub(crate) struct DataInfo { pub timestamp: Option, pub source_id: Option, pub source_sn: Option, + pub qos: QoS, } /// Informations on the source of a zenoh [`Sample`]. @@ -326,6 +329,8 @@ pub struct Sample { pub kind: SampleKind, /// The [`Timestamp`] of this Sample. pub timestamp: Option, + /// Quality of service settings this sample was sent with. + pub qos: QoS, #[cfg(feature = "unstable")] ///
@@ -361,6 +366,7 @@ impl Sample { value: value.into(), kind: SampleKind::default(), timestamp: None, + qos: QoS::default(), #[cfg(feature = "unstable")] source_info: SourceInfo::empty(), #[cfg(feature = "unstable")] @@ -383,6 +389,7 @@ impl Sample { value: value.into(), kind: SampleKind::default(), timestamp: None, + qos: QoS::default(), #[cfg(feature = "unstable")] source_info: SourceInfo::empty(), #[cfg(feature = "unstable")] @@ -407,6 +414,7 @@ impl Sample { value, kind: data_info.kind, timestamp: data_info.timestamp, + qos: data_info.qos, #[cfg(feature = "unstable")] source_info: data_info.into(), #[cfg(feature = "unstable")] @@ -418,6 +426,7 @@ impl Sample { value, kind: SampleKind::default(), timestamp: None, + qos: QoS::default(), #[cfg(feature = "unstable")] source_info: SourceInfo::empty(), #[cfg(feature = "unstable")] @@ -509,3 +518,59 @@ impl TryFrom for Sample { value.sample } } + +/// Structure containing quality of service data +#[derive(Debug, Default, Copy, Clone, Eq, PartialEq)] +pub struct QoS { + inner: QoSType, +} + +impl QoS { + /// Gets priority of the message. + pub fn priority(&self) -> Priority { + match Priority::try_from(self.inner.get_priority()) { + Ok(p) => p, + Err(e) => { + log::trace!( + "Failed to convert priority: {}; replacing with default value", + e.to_string() + ); + Priority::default() + } + } + } + + /// Gets congestion control of the message. + pub fn congestion_control(&self) -> CongestionControl { + self.inner.get_congestion_control() + } + + /// Gets express flag value. If true, the message is not batched during transmission, in order to reduce latency. + pub fn express(&self) -> bool { + self.inner.is_express() + } + + /// Sets priority value. + pub fn with_priority(mut self, priority: Priority) -> Self { + self.inner.set_priority(priority.into()); + self + } + + /// Sets congestion control value. + pub fn with_congestion_control(mut self, congestion_control: CongestionControl) -> Self { + self.inner.set_congestion_control(congestion_control); + self + } + + /// Sets express flag vlaue. + pub fn with_express(mut self, is_express: bool) -> Self { + self.inner.set_is_express(is_express); + self + } +} + +impl From for QoS { + fn from(qos: QoSType) -> Self { + QoS { inner: qos } + } +} diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index d52c446d3d..43ef74d58f 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -31,6 +31,7 @@ use crate::queryable::*; #[cfg(feature = "unstable")] use crate::sample::Attachment; use crate::sample::DataInfo; +use crate::sample::QoS; use crate::selector::TIME_RANGE_KEY; use crate::subscriber::*; use crate::Id; @@ -2192,6 +2193,7 @@ impl Primitives for Session { kind: SampleKind::Put, encoding: Some(m.encoding), timestamp: m.timestamp, + qos: QoS::from(msg.ext_qos), source_id: m.ext_sinfo.as_ref().map(|i| i.zid), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), }; @@ -2209,6 +2211,7 @@ impl Primitives for Session { kind: SampleKind::Delete, encoding: None, timestamp: m.timestamp, + qos: QoS::from(msg.ext_qos), source_id: m.ext_sinfo.as_ref().map(|i| i.zid), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), }; @@ -2345,6 +2348,7 @@ impl Primitives for Session { kind: SampleKind::Put, encoding: Some(m.encoding), timestamp: m.timestamp, + qos: QoS::from(msg.ext_qos), source_id: m.ext_sinfo.as_ref().map(|i| i.zid), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), }; diff --git a/zenoh/tests/qos.rs b/zenoh/tests/qos.rs new file mode 100644 index 0000000000..475d8d7a1b --- /dev/null +++ b/zenoh/tests/qos.rs @@ -0,0 +1,66 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use async_std::prelude::FutureExt; +use async_std::task; +use std::time::Duration; +use zenoh::prelude::r#async::*; +use zenoh::{publication::Priority, SessionDeclarations}; +use zenoh_core::zasync_executor_init; + +const TIMEOUT: Duration = Duration::from_secs(60); +const SLEEP: Duration = Duration::from_secs(1); + +macro_rules! ztimeout { + ($f:expr) => { + $f.timeout(TIMEOUT).await.unwrap() + }; +} + +#[test] +fn pubsub() { + task::block_on(async { + zasync_executor_init!(); + let session1 = ztimeout!(zenoh::open(zenoh_config::peer()).res_async()).unwrap(); + let session2 = ztimeout!(zenoh::open(zenoh_config::peer()).res_async()).unwrap(); + + let publisher1 = ztimeout!(session1 + .declare_publisher("test/qos") + .priority(Priority::DataHigh) + .congestion_control(CongestionControl::Drop) + .res()) + .unwrap(); + + let publisher2 = ztimeout!(session1 + .declare_publisher("test/qos") + .priority(Priority::DataLow) + .congestion_control(CongestionControl::Block) + .res()) + .unwrap(); + + let subscriber = ztimeout!(session2.declare_subscriber("test/qos").res()).unwrap(); + task::sleep(SLEEP).await; + + ztimeout!(publisher1.put("qos").res_async()).unwrap(); + let qos = ztimeout!(subscriber.recv_async()).unwrap().qos; + + assert_eq!(qos.priority(), Priority::DataHigh); + assert_eq!(qos.congestion_control(), CongestionControl::Drop); + + ztimeout!(publisher2.put("qos").res_async()).unwrap(); + let qos = ztimeout!(subscriber.recv_async()).unwrap().qos; + + assert_eq!(qos.priority(), Priority::DataLow); + assert_eq!(qos.congestion_control(), CongestionControl::Block); + }); +}