Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose Quality of Service settings (Priority, Congestion Control, Express) the Sample was published with #730

Merged
merged 14 commits into from
Feb 23, 2024
Merged
8 changes: 8 additions & 0 deletions commons/zenoh-protocol/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,3 +431,11 @@ pub enum QueryTarget {
#[cfg(feature = "complete_n")]
Complete(u64),
}

/// Structure containing quality of service data
Mallets marked this conversation as resolved.
Show resolved Hide resolved
#[derive(Debug, Default, Copy, Clone, Eq, PartialEq)]
pub struct QoS {
pub priority: Priority,
pub congestion_control: CongestionControl,
pub express: bool,
}
11 changes: 11 additions & 0 deletions commons/zenoh-protocol/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,17 @@ pub mod ext {
}
}

type QoSPublic = crate::core::QoS;
Mallets marked this conversation as resolved.
Show resolved Hide resolved
impl<const ID: u8> From<QoSType<{ ID }>> for QoSPublic {
fn from(ext: QoSType<{ ID }>) -> Self {
QoSPublic {
priority: ext.get_priority(),
congestion_control: ext.get_congestion_control(),
express: ext.is_express(),
}
}
}

/// ```text
/// 7 6 5 4 3 2 1 0
/// +-+-+-+-+-+-+-+-+
Expand Down
9 changes: 8 additions & 1 deletion zenoh/src/publication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,14 @@ fn resolve_put(
kind,
encoding: Some(value.encoding),
timestamp,
..Default::default()
source_id: None,
source_sn: None,
qos: ext::QoSType::new(
publisher.priority.into(),
publisher.congestion_control,
false,
)
.into(),
};

publisher.session.handle_data(
Expand Down
2 changes: 2 additions & 0 deletions zenoh/src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ impl SyncResolve for ReplyBuilder<'_> {
value: Value { payload, encoding },
kind,
timestamp,
qos,
#[cfg(feature = "unstable")]
source_info,
#[cfg(feature = "unstable")]
Expand All @@ -203,6 +204,7 @@ impl SyncResolve for ReplyBuilder<'_> {
kind,
encoding: Some(encoding),
timestamp,
qos,
source_id: None,
source_sn: None,
};
Expand Down
9 changes: 8 additions & 1 deletion zenoh/src/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::time::{new_reception_timestamp, Timestamp};
#[zenoh_macros::unstable]
use serde::Serialize;
use std::convert::{TryFrom, TryInto};
use zenoh_protocol::core::Encoding;
use zenoh_protocol::core::{Encoding, QoS};

pub type SourceSn = u64;

Expand Down Expand Up @@ -50,6 +50,7 @@ pub(crate) struct DataInfo {
pub timestamp: Option<Timestamp>,
pub source_id: Option<ZenohId>,
pub source_sn: Option<SourceSn>,
pub qos: QoS,
}

/// Informations on the source of a zenoh [`Sample`].
Expand Down Expand Up @@ -326,6 +327,8 @@ pub struct Sample {
pub kind: SampleKind,
/// The [`Timestamp`] of this Sample.
pub timestamp: Option<Timestamp>,
/// Quality of service settings this sample was sent with.
pub qos: QoS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a huge fan of how Sample keeps on getting more public fields. Maybe we can use the API break to make it a bit more private?


#[cfg(feature = "unstable")]
/// <div class="stab unstable">
Expand Down Expand Up @@ -361,6 +364,7 @@ impl Sample {
value: value.into(),
kind: SampleKind::default(),
timestamp: None,
qos: QoS::default(),
#[cfg(feature = "unstable")]
source_info: SourceInfo::empty(),
#[cfg(feature = "unstable")]
Expand All @@ -383,6 +387,7 @@ impl Sample {
value: value.into(),
kind: SampleKind::default(),
timestamp: None,
qos: QoS::default(),
#[cfg(feature = "unstable")]
source_info: SourceInfo::empty(),
#[cfg(feature = "unstable")]
Expand All @@ -407,6 +412,7 @@ impl Sample {
value,
kind: data_info.kind,
timestamp: data_info.timestamp,
qos: data_info.qos,
#[cfg(feature = "unstable")]
source_info: data_info.into(),
#[cfg(feature = "unstable")]
Expand All @@ -418,6 +424,7 @@ impl Sample {
value,
kind: SampleKind::default(),
timestamp: None,
qos: QoS::default(),
#[cfg(feature = "unstable")]
source_info: SourceInfo::empty(),
#[cfg(feature = "unstable")]
Expand Down
3 changes: 3 additions & 0 deletions zenoh/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2192,6 +2192,7 @@ impl Primitives for Session {
kind: SampleKind::Put,
encoding: Some(m.encoding),
timestamp: m.timestamp,
qos: msg.ext_qos.into(),
source_id: m.ext_sinfo.as_ref().map(|i| i.zid),
source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64),
};
Expand All @@ -2209,6 +2210,7 @@ impl Primitives for Session {
kind: SampleKind::Delete,
encoding: None,
timestamp: m.timestamp,
qos: msg.ext_qos.into(),
source_id: m.ext_sinfo.as_ref().map(|i| i.zid),
source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64),
};
Expand Down Expand Up @@ -2345,6 +2347,7 @@ impl Primitives for Session {
kind: SampleKind::Put,
encoding: Some(m.encoding),
timestamp: m.timestamp,
qos: msg.ext_qos.into(),
source_id: m.ext_sinfo.as_ref().map(|i| i.zid),
source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64),
};
Expand Down
45 changes: 45 additions & 0 deletions zenoh/tests/qos.rs
Mallets marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use std::time::Duration;

use async_std::task;
use zenoh::{publication::Priority, SessionDeclarations};
use zenoh_core::{zasync_executor_init, AsyncResolve};
use zenoh_protocol::core::CongestionControl;
Mallets marked this conversation as resolved.
Show resolved Hide resolved

const SLEEP: Duration = Duration::from_secs(1);

#[test]
fn pubsub() {
task::block_on(async {
zasync_executor_init!();
let session1 = zenoh::open(zenoh_config::peer()).res_async().await.unwrap();
let session2 = zenoh::open(zenoh_config::peer()).res_async().await.unwrap();

let publisher = session1
.declare_publisher("test/qos")
.res()
.await
.unwrap()
.priority(Priority::DataHigh)
Mallets marked this conversation as resolved.
Show resolved Hide resolved
.congestion_control(CongestionControl::Drop);

task::sleep(SLEEP).await;

let sub = session2.declare_subscriber("test/qos").res().await.unwrap();
task::sleep(SLEEP).await;

publisher.put("qos").res_async().await.unwrap();
let qos = sub.recv_async().await.unwrap().qos;
Mallets marked this conversation as resolved.
Show resolved Hide resolved

assert_eq!(qos.priority, Priority::DataHigh.into());
Mallets marked this conversation as resolved.
Show resolved Hide resolved
assert_eq!(qos.congestion_control, CongestionControl::Drop);

let publisher = publisher
.priority(Priority::DataLow)
.congestion_control(CongestionControl::Block);
publisher.put("qos").res_async().await.unwrap();
let qos = sub.recv_async().await.unwrap().qos;

assert_eq!(qos.priority, Priority::DataLow.into());
Mallets marked this conversation as resolved.
Show resolved Hide resolved
assert_eq!(qos.congestion_control, CongestionControl::Block);
});
}