Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose Quality of Service settings (Priority, Congestion Control, Express) the Sample was published with #730

Merged
merged 14 commits into from
Feb 23, 2024
Merged
7 changes: 7 additions & 0 deletions commons/zenoh-protocol/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,13 @@ pub mod ext {
}
}

pub fn set_is_express(&mut self, is_express: bool) {
match is_express {
true => self.inner = imsg::set_flag(self.inner, Self::E_FLAG),
false => self.inner = imsg::unset_flag(self.inner, Self::E_FLAG),
}
}

pub const fn is_express(&self) -> bool {
imsg::has_flag(self.inner, Self::E_FLAG)
}
Expand Down
28 changes: 26 additions & 2 deletions zenoh/src/publication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::prelude::*;
#[zenoh_macros::unstable]
use crate::sample::Attachment;
use crate::sample::DataInfo;
use crate::sample::QoS;
use crate::Encoding;
use crate::SessionRef;
use crate::Undeclarable;
Expand Down Expand Up @@ -857,7 +858,13 @@ fn resolve_put(
kind,
encoding: Some(value.encoding),
timestamp,
..Default::default()
source_id: None,
source_sn: None,
qos: QoS::from(ext::QoSType::new(
publisher.priority.into(),
publisher.congestion_control,
false,
)),
};

publisher.session.handle_data(
Expand Down Expand Up @@ -925,7 +932,8 @@ impl TryFrom<u8> for Priority {
}
}

impl From<Priority> for zenoh_protocol::core::Priority {
type ProtocolPriority = zenoh_protocol::core::Priority;
impl From<Priority> for ProtocolPriority {
fn from(prio: Priority) -> Self {
// The Priority in the prelude differs from the Priority in the core protocol only from
// the missing Control priority. The Control priority is reserved for zenoh internal use
Expand All @@ -939,6 +947,22 @@ impl From<Priority> for zenoh_protocol::core::Priority {
}
}

impl TryFrom<ProtocolPriority> for Priority {
type Error = zenoh_result::Error;
fn try_from(priority: ProtocolPriority) -> Result<Self, Self::Error> {
match priority {
ProtocolPriority::Control => bail!("'Control' is not a valid priority value."),
ProtocolPriority::RealTime => Ok(Priority::RealTime),
ProtocolPriority::InteractiveHigh => Ok(Priority::InteractiveHigh),
ProtocolPriority::InteractiveLow => Ok(Priority::InteractiveLow),
ProtocolPriority::DataHigh => Ok(Priority::DataHigh),
ProtocolPriority::Data => Ok(Priority::Data),
ProtocolPriority::DataLow => Ok(Priority::DataLow),
ProtocolPriority::Background => Ok(Priority::Background),
}
}
}

/// A struct that indicates if there exist Subscribers matching the Publisher's key expression.
///
/// # Examples
Expand Down
2 changes: 2 additions & 0 deletions zenoh/src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ impl SyncResolve for ReplyBuilder<'_> {
value: Value { payload, encoding },
kind,
timestamp,
qos,
#[cfg(feature = "unstable")]
source_info,
#[cfg(feature = "unstable")]
Expand All @@ -203,6 +204,7 @@ impl SyncResolve for ReplyBuilder<'_> {
kind,
encoding: Some(encoding),
timestamp,
qos,
source_id: None,
source_sn: None,
};
Expand Down
67 changes: 66 additions & 1 deletion zenoh/src/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ use crate::prelude::ZenohId;
use crate::prelude::{KeyExpr, SampleKind, Value};
use crate::query::Reply;
use crate::time::{new_reception_timestamp, Timestamp};
use crate::Priority;
#[zenoh_macros::unstable]
use serde::Serialize;
use std::convert::{TryFrom, TryInto};
use zenoh_protocol::core::Encoding;
use zenoh_protocol::core::{CongestionControl, Encoding};
use zenoh_protocol::network::push::ext::QoSType;

pub type SourceSn = u64;

Expand Down Expand Up @@ -50,6 +52,7 @@ pub(crate) struct DataInfo {
pub timestamp: Option<Timestamp>,
pub source_id: Option<ZenohId>,
pub source_sn: Option<SourceSn>,
pub qos: QoS,
}

/// Informations on the source of a zenoh [`Sample`].
Expand Down Expand Up @@ -326,6 +329,8 @@ pub struct Sample {
pub kind: SampleKind,
/// The [`Timestamp`] of this Sample.
pub timestamp: Option<Timestamp>,
/// Quality of service settings this sample was sent with.
pub qos: QoS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a huge fan of how Sample keeps on getting more public fields. Maybe we can use the API break to make it a bit more private?


#[cfg(feature = "unstable")]
/// <div class="stab unstable">
Expand Down Expand Up @@ -361,6 +366,7 @@ impl Sample {
value: value.into(),
kind: SampleKind::default(),
timestamp: None,
qos: QoS::default(),
#[cfg(feature = "unstable")]
source_info: SourceInfo::empty(),
#[cfg(feature = "unstable")]
Expand All @@ -383,6 +389,7 @@ impl Sample {
value: value.into(),
kind: SampleKind::default(),
timestamp: None,
qos: QoS::default(),
#[cfg(feature = "unstable")]
source_info: SourceInfo::empty(),
#[cfg(feature = "unstable")]
Expand All @@ -407,6 +414,7 @@ impl Sample {
value,
kind: data_info.kind,
timestamp: data_info.timestamp,
qos: data_info.qos,
#[cfg(feature = "unstable")]
source_info: data_info.into(),
#[cfg(feature = "unstable")]
Expand All @@ -418,6 +426,7 @@ impl Sample {
value,
kind: SampleKind::default(),
timestamp: None,
qos: QoS::default(),
#[cfg(feature = "unstable")]
source_info: SourceInfo::empty(),
#[cfg(feature = "unstable")]
Expand Down Expand Up @@ -509,3 +518,59 @@ impl TryFrom<Reply> for Sample {
value.sample
}
}

/// Structure containing quality of service data
#[derive(Debug, Default, Copy, Clone, Eq, PartialEq)]
pub struct QoS {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using accessors, this could be a bitfield and occupy a single byte instead of 3

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the benefit of reducing 3 bytes to one on the system with word size >= 4 bytes ? As a separate struct it will still tend to occupy 4 bytes at least, no ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's all about breakpoints and, to be fair, I think this layout just barely fits in Sample's current padding, but that also means the next field is guaranteed to make Sample fatter, which we've shown in previous layout improvements to be rather performances sensitive for us.

API-wise, it's tempting to have only public fields, but that turns any future change to the struct's fields an API break. Builder patterns are just as convenient and give us much more flexibility in the future, both for layout optimization and for field additions.

inner: QoSType,
}

impl QoS {
/// Gets priority of the message.
pub fn priority(&self) -> Priority {
match Priority::try_from(self.inner.get_priority()) {
Ok(p) => p,
Err(e) => {
log::trace!(
"Failed to convert priority: {}; replacing with default value",
e.to_string()
);
Priority::default()
}
}
}

/// Gets congestion control of the message.
pub fn congestion_control(&self) -> CongestionControl {
self.inner.get_congestion_control()
}

/// Gets express flag value. If true, the message is not batched during transmission, in order to reduce latency.
pub fn express(&self) -> bool {
self.inner.is_express()
}

/// Sets priority value.
pub fn with_priority(mut self, priority: Priority) -> Self {
self.inner.set_priority(priority.into());
self
}

/// Sets congestion control value.
pub fn with_congestion_control(mut self, congestion_control: CongestionControl) -> Self {
self.inner.set_congestion_control(congestion_control);
self
}

/// Sets express flag vlaue.
pub fn with_express(mut self, is_express: bool) -> Self {
self.inner.set_is_express(is_express);
self
}
}

impl From<QoSType> for QoS {
fn from(qos: QoSType) -> Self {
QoS { inner: qos }
}
}
4 changes: 4 additions & 0 deletions zenoh/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::queryable::*;
#[cfg(feature = "unstable")]
use crate::sample::Attachment;
use crate::sample::DataInfo;
use crate::sample::QoS;
use crate::selector::TIME_RANGE_KEY;
use crate::subscriber::*;
use crate::Id;
Expand Down Expand Up @@ -2192,6 +2193,7 @@ impl Primitives for Session {
kind: SampleKind::Put,
encoding: Some(m.encoding),
timestamp: m.timestamp,
qos: QoS::from(msg.ext_qos),
source_id: m.ext_sinfo.as_ref().map(|i| i.zid),
source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64),
};
Expand All @@ -2209,6 +2211,7 @@ impl Primitives for Session {
kind: SampleKind::Delete,
encoding: None,
timestamp: m.timestamp,
qos: QoS::from(msg.ext_qos),
source_id: m.ext_sinfo.as_ref().map(|i| i.zid),
source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64),
};
Expand Down Expand Up @@ -2345,6 +2348,7 @@ impl Primitives for Session {
kind: SampleKind::Put,
encoding: Some(m.encoding),
timestamp: m.timestamp,
qos: QoS::from(msg.ext_qos),
source_id: m.ext_sinfo.as_ref().map(|i| i.zid),
source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64),
};
Expand Down
66 changes: 66 additions & 0 deletions zenoh/tests/qos.rs
Mallets marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use async_std::prelude::FutureExt;
use async_std::task;
use std::time::Duration;
use zenoh::prelude::r#async::*;
use zenoh::{publication::Priority, SessionDeclarations};
use zenoh_core::zasync_executor_init;

const TIMEOUT: Duration = Duration::from_secs(60);
const SLEEP: Duration = Duration::from_secs(1);

macro_rules! ztimeout {
($f:expr) => {
$f.timeout(TIMEOUT).await.unwrap()
};
}

#[test]
fn pubsub() {
task::block_on(async {
zasync_executor_init!();
let session1 = ztimeout!(zenoh::open(zenoh_config::peer()).res_async()).unwrap();
let session2 = ztimeout!(zenoh::open(zenoh_config::peer()).res_async()).unwrap();

let publisher1 = ztimeout!(session1
.declare_publisher("test/qos")
.priority(Priority::DataHigh)
Mallets marked this conversation as resolved.
Show resolved Hide resolved
.congestion_control(CongestionControl::Drop)
.res())
.unwrap();

let publisher2 = ztimeout!(session1
.declare_publisher("test/qos")
.priority(Priority::DataLow)
.congestion_control(CongestionControl::Block)
.res())
.unwrap();

let subscriber = ztimeout!(session2.declare_subscriber("test/qos").res()).unwrap();
task::sleep(SLEEP).await;

ztimeout!(publisher1.put("qos").res_async()).unwrap();
let qos = ztimeout!(subscriber.recv_async()).unwrap().qos;

assert_eq!(qos.priority(), Priority::DataHigh);
assert_eq!(qos.congestion_control(), CongestionControl::Drop);

ztimeout!(publisher2.put("qos").res_async()).unwrap();
let qos = ztimeout!(subscriber.recv_async()).unwrap().qos;

assert_eq!(qos.priority(), Priority::DataLow);
assert_eq!(qos.congestion_control(), CongestionControl::Block);
});
}
Loading