Skip to content

Commit

Permalink
Merge pull request #730 from DenisBiryukov91/feature/priority-in-sample
Browse files Browse the repository at this point in the history
Expose Quality of Service settings (Priority, Congestion Control, Express) the Sample was published with
  • Loading branch information
milyin authored Feb 23, 2024
2 parents 7ebdb3c + 90d1b76 commit 0440148
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 3 deletions.
7 changes: 7 additions & 0 deletions commons/zenoh-protocol/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,13 @@ pub mod ext {
}
}

pub fn set_is_express(&mut self, is_express: bool) {
match is_express {
true => self.inner = imsg::set_flag(self.inner, Self::E_FLAG),
false => self.inner = imsg::unset_flag(self.inner, Self::E_FLAG),
}
}

pub const fn is_express(&self) -> bool {
imsg::has_flag(self.inner, Self::E_FLAG)
}
Expand Down
28 changes: 26 additions & 2 deletions zenoh/src/publication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::prelude::*;
#[zenoh_macros::unstable]
use crate::sample::Attachment;
use crate::sample::DataInfo;
use crate::sample::QoS;
use crate::Encoding;
use crate::SessionRef;
use crate::Undeclarable;
Expand Down Expand Up @@ -857,7 +858,13 @@ fn resolve_put(
kind,
encoding: Some(value.encoding),
timestamp,
..Default::default()
source_id: None,
source_sn: None,
qos: QoS::from(ext::QoSType::new(
publisher.priority.into(),
publisher.congestion_control,
false,
)),
};

publisher.session.handle_data(
Expand Down Expand Up @@ -925,7 +932,8 @@ impl TryFrom<u8> for Priority {
}
}

impl From<Priority> for zenoh_protocol::core::Priority {
type ProtocolPriority = zenoh_protocol::core::Priority;
impl From<Priority> for ProtocolPriority {
fn from(prio: Priority) -> Self {
// The Priority in the prelude differs from the Priority in the core protocol only from
// the missing Control priority. The Control priority is reserved for zenoh internal use
Expand All @@ -939,6 +947,22 @@ impl From<Priority> for zenoh_protocol::core::Priority {
}
}

impl TryFrom<ProtocolPriority> for Priority {
type Error = zenoh_result::Error;
fn try_from(priority: ProtocolPriority) -> Result<Self, Self::Error> {
match priority {
ProtocolPriority::Control => bail!("'Control' is not a valid priority value."),
ProtocolPriority::RealTime => Ok(Priority::RealTime),
ProtocolPriority::InteractiveHigh => Ok(Priority::InteractiveHigh),
ProtocolPriority::InteractiveLow => Ok(Priority::InteractiveLow),
ProtocolPriority::DataHigh => Ok(Priority::DataHigh),
ProtocolPriority::Data => Ok(Priority::Data),
ProtocolPriority::DataLow => Ok(Priority::DataLow),
ProtocolPriority::Background => Ok(Priority::Background),
}
}
}

/// A struct that indicates if there exist Subscribers matching the Publisher's key expression.
///
/// # Examples
Expand Down
2 changes: 2 additions & 0 deletions zenoh/src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ impl SyncResolve for ReplyBuilder<'_> {
value: Value { payload, encoding },
kind,
timestamp,
qos,
#[cfg(feature = "unstable")]
source_info,
#[cfg(feature = "unstable")]
Expand All @@ -203,6 +204,7 @@ impl SyncResolve for ReplyBuilder<'_> {
kind,
encoding: Some(encoding),
timestamp,
qos,
source_id: None,
source_sn: None,
};
Expand Down
67 changes: 66 additions & 1 deletion zenoh/src/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ use crate::prelude::ZenohId;
use crate::prelude::{KeyExpr, SampleKind, Value};
use crate::query::Reply;
use crate::time::{new_reception_timestamp, Timestamp};
use crate::Priority;
#[zenoh_macros::unstable]
use serde::Serialize;
use std::convert::{TryFrom, TryInto};
use zenoh_protocol::core::Encoding;
use zenoh_protocol::core::{CongestionControl, Encoding};
use zenoh_protocol::network::push::ext::QoSType;

pub type SourceSn = u64;

Expand Down Expand Up @@ -50,6 +52,7 @@ pub(crate) struct DataInfo {
pub timestamp: Option<Timestamp>,
pub source_id: Option<ZenohId>,
pub source_sn: Option<SourceSn>,
pub qos: QoS,
}

/// Informations on the source of a zenoh [`Sample`].
Expand Down Expand Up @@ -326,6 +329,8 @@ pub struct Sample {
pub kind: SampleKind,
/// The [`Timestamp`] of this Sample.
pub timestamp: Option<Timestamp>,
/// Quality of service settings this sample was sent with.
pub qos: QoS,

#[cfg(feature = "unstable")]
/// <div class="stab unstable">
Expand Down Expand Up @@ -361,6 +366,7 @@ impl Sample {
value: value.into(),
kind: SampleKind::default(),
timestamp: None,
qos: QoS::default(),
#[cfg(feature = "unstable")]
source_info: SourceInfo::empty(),
#[cfg(feature = "unstable")]
Expand All @@ -383,6 +389,7 @@ impl Sample {
value: value.into(),
kind: SampleKind::default(),
timestamp: None,
qos: QoS::default(),
#[cfg(feature = "unstable")]
source_info: SourceInfo::empty(),
#[cfg(feature = "unstable")]
Expand All @@ -407,6 +414,7 @@ impl Sample {
value,
kind: data_info.kind,
timestamp: data_info.timestamp,
qos: data_info.qos,
#[cfg(feature = "unstable")]
source_info: data_info.into(),
#[cfg(feature = "unstable")]
Expand All @@ -418,6 +426,7 @@ impl Sample {
value,
kind: SampleKind::default(),
timestamp: None,
qos: QoS::default(),
#[cfg(feature = "unstable")]
source_info: SourceInfo::empty(),
#[cfg(feature = "unstable")]
Expand Down Expand Up @@ -509,3 +518,59 @@ impl TryFrom<Reply> for Sample {
value.sample
}
}

/// Structure containing quality of service data
#[derive(Debug, Default, Copy, Clone, Eq, PartialEq)]
pub struct QoS {
inner: QoSType,
}

impl QoS {
/// Gets priority of the message.
pub fn priority(&self) -> Priority {
match Priority::try_from(self.inner.get_priority()) {
Ok(p) => p,
Err(e) => {
log::trace!(
"Failed to convert priority: {}; replacing with default value",
e.to_string()
);
Priority::default()
}
}
}

/// Gets congestion control of the message.
pub fn congestion_control(&self) -> CongestionControl {
self.inner.get_congestion_control()
}

/// Gets express flag value. If true, the message is not batched during transmission, in order to reduce latency.
pub fn express(&self) -> bool {
self.inner.is_express()
}

/// Sets priority value.
pub fn with_priority(mut self, priority: Priority) -> Self {
self.inner.set_priority(priority.into());
self
}

/// Sets congestion control value.
pub fn with_congestion_control(mut self, congestion_control: CongestionControl) -> Self {
self.inner.set_congestion_control(congestion_control);
self
}

/// Sets express flag vlaue.
pub fn with_express(mut self, is_express: bool) -> Self {
self.inner.set_is_express(is_express);
self
}
}

impl From<QoSType> for QoS {
fn from(qos: QoSType) -> Self {
QoS { inner: qos }
}
}
4 changes: 4 additions & 0 deletions zenoh/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::queryable::*;
#[cfg(feature = "unstable")]
use crate::sample::Attachment;
use crate::sample::DataInfo;
use crate::sample::QoS;
use crate::selector::TIME_RANGE_KEY;
use crate::subscriber::*;
use crate::Id;
Expand Down Expand Up @@ -2192,6 +2193,7 @@ impl Primitives for Session {
kind: SampleKind::Put,
encoding: Some(m.encoding),
timestamp: m.timestamp,
qos: QoS::from(msg.ext_qos),
source_id: m.ext_sinfo.as_ref().map(|i| i.zid),
source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64),
};
Expand All @@ -2209,6 +2211,7 @@ impl Primitives for Session {
kind: SampleKind::Delete,
encoding: None,
timestamp: m.timestamp,
qos: QoS::from(msg.ext_qos),
source_id: m.ext_sinfo.as_ref().map(|i| i.zid),
source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64),
};
Expand Down Expand Up @@ -2345,6 +2348,7 @@ impl Primitives for Session {
kind: SampleKind::Put,
encoding: Some(m.encoding),
timestamp: m.timestamp,
qos: QoS::from(msg.ext_qos),
source_id: m.ext_sinfo.as_ref().map(|i| i.zid),
source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64),
};
Expand Down
66 changes: 66 additions & 0 deletions zenoh/tests/qos.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use async_std::prelude::FutureExt;
use async_std::task;
use std::time::Duration;
use zenoh::prelude::r#async::*;
use zenoh::{publication::Priority, SessionDeclarations};
use zenoh_core::zasync_executor_init;

const TIMEOUT: Duration = Duration::from_secs(60);
const SLEEP: Duration = Duration::from_secs(1);

macro_rules! ztimeout {
($f:expr) => {
$f.timeout(TIMEOUT).await.unwrap()
};
}

#[test]
fn pubsub() {
task::block_on(async {
zasync_executor_init!();
let session1 = ztimeout!(zenoh::open(zenoh_config::peer()).res_async()).unwrap();
let session2 = ztimeout!(zenoh::open(zenoh_config::peer()).res_async()).unwrap();

let publisher1 = ztimeout!(session1
.declare_publisher("test/qos")
.priority(Priority::DataHigh)
.congestion_control(CongestionControl::Drop)
.res())
.unwrap();

let publisher2 = ztimeout!(session1
.declare_publisher("test/qos")
.priority(Priority::DataLow)
.congestion_control(CongestionControl::Block)
.res())
.unwrap();

let subscriber = ztimeout!(session2.declare_subscriber("test/qos").res()).unwrap();
task::sleep(SLEEP).await;

ztimeout!(publisher1.put("qos").res_async()).unwrap();
let qos = ztimeout!(subscriber.recv_async()).unwrap().qos;

assert_eq!(qos.priority(), Priority::DataHigh);
assert_eq!(qos.congestion_control(), CongestionControl::Drop);

ztimeout!(publisher2.put("qos").res_async()).unwrap();
let qos = ztimeout!(subscriber.recv_async()).unwrap().qos;

assert_eq!(qos.priority(), Priority::DataLow);
assert_eq!(qos.congestion_control(), CongestionControl::Block);
});
}

0 comments on commit 0440148

Please sign in to comment.