Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose Quality of Service settings (Priority, Congestion Control, Express) the Sample was published with #730

Merged
merged 14 commits into from
Feb 23, 2024
Merged
8 changes: 0 additions & 8 deletions commons/zenoh-protocol/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,11 +431,3 @@ pub enum QueryTarget {
#[cfg(feature = "complete_n")]
Complete(u64),
}

/// Structure containing quality of service data
#[derive(Debug, Default, Copy, Clone, Eq, PartialEq)]
pub struct QoS {
pub priority: Priority,
pub congestion_control: CongestionControl,
pub express: bool,
}
11 changes: 0 additions & 11 deletions commons/zenoh-protocol/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,17 +331,6 @@ pub mod ext {
}
}

type QoSPublic = crate::core::QoS;
impl<const ID: u8> From<QoSType<{ ID }>> for QoSPublic {
fn from(ext: QoSType<{ ID }>) -> Self {
QoSPublic {
priority: ext.get_priority(),
congestion_control: ext.get_congestion_control(),
express: ext.is_express(),
}
}
}

/// ```text
/// 7 6 5 4 3 2 1 0
/// +-+-+-+-+-+-+-+-+
Expand Down
6 changes: 6 additions & 0 deletions zenoh/src/publication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,12 @@ impl Priority {
pub const NUM: usize = 1 + Self::MIN as usize - Self::MAX as usize;
}

impl From<zenoh_protocol::core::Priority> for Priority {
Mallets marked this conversation as resolved.
Show resolved Hide resolved
fn from(priority: zenoh_protocol::core::Priority) -> Self {
Self::try_from(priority as u8).unwrap()
}
}

impl TryFrom<u8> for Priority {
type Error = zenoh_result::Error;

Expand Down
22 changes: 21 additions & 1 deletion zenoh/src/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ use crate::prelude::ZenohId;
use crate::prelude::{KeyExpr, SampleKind, Value};
use crate::query::Reply;
use crate::time::{new_reception_timestamp, Timestamp};
use crate::Priority;
#[zenoh_macros::unstable]
use serde::Serialize;
use std::convert::{TryFrom, TryInto};
use zenoh_protocol::core::{Encoding, QoS};
use zenoh_protocol::core::{CongestionControl, Encoding};
use zenoh_protocol::network::push::ext::QoSType;

pub type SourceSn = u64;

Expand Down Expand Up @@ -516,3 +518,21 @@ impl TryFrom<Reply> for Sample {
value.sample
}
}

/// Structure containing quality of service data
#[derive(Debug, Default, Copy, Clone, Eq, PartialEq)]
pub struct QoS {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using accessors, this could be a bitfield and occupy a single byte instead of 3

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the benefit of reducing 3 bytes to one on the system with word size >= 4 bytes ? As a separate struct it will still tend to occupy 4 bytes at least, no ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's all about breakpoints and, to be fair, I think this layout just barely fits in Sample's current padding, but that also means the next field is guaranteed to make Sample fatter, which we've shown in previous layout improvements to be rather performances sensitive for us.

API-wise, it's tempting to have only public fields, but that turns any future change to the struct's fields an API break. Builder patterns are just as convenient and give us much more flexibility in the future, both for layout optimization and for field additions.

pub priority: Priority,
pub congestion_control: CongestionControl,
pub express: bool,
}

impl From<QoSType> for QoS {
fn from(ext: QoSType) -> Self {
QoS {
priority: ext.get_priority().into(),
congestion_control: ext.get_congestion_control(),
express: ext.is_express(),
}
}
}
65 changes: 43 additions & 22 deletions zenoh/tests/qos.rs
Mallets marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,45 +1,66 @@
use std::time::Duration;

//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use async_std::prelude::FutureExt;
use async_std::task;
use std::time::Duration;
use zenoh::prelude::r#async::*;
use zenoh::{publication::Priority, SessionDeclarations};
use zenoh_core::{zasync_executor_init, AsyncResolve};
use zenoh_protocol::core::CongestionControl;
use zenoh_core::zasync_executor_init;

const TIMEOUT: Duration = Duration::from_secs(60);
const SLEEP: Duration = Duration::from_secs(1);

macro_rules! ztimeout {
($f:expr) => {
$f.timeout(TIMEOUT).await.unwrap()
};
}

#[test]
fn pubsub() {
task::block_on(async {
zasync_executor_init!();
let session1 = zenoh::open(zenoh_config::peer()).res_async().await.unwrap();
let session2 = zenoh::open(zenoh_config::peer()).res_async().await.unwrap();
let session1 = ztimeout!(zenoh::open(zenoh_config::peer()).res_async()).unwrap();
let session2 = ztimeout!(zenoh::open(zenoh_config::peer()).res_async()).unwrap();

let publisher = session1
let publisher1 = ztimeout!(session1
.declare_publisher("test/qos")
.res()
.await
.unwrap()
.priority(Priority::DataHigh)
Mallets marked this conversation as resolved.
Show resolved Hide resolved
.congestion_control(CongestionControl::Drop);
.congestion_control(CongestionControl::Drop)
.res())
.unwrap();

task::sleep(SLEEP).await;
let publisher2 = ztimeout!(session1
.declare_publisher("test/qos")
.priority(Priority::DataLow)
.congestion_control(CongestionControl::Block)
.res())
.unwrap();

let sub = session2.declare_subscriber("test/qos").res().await.unwrap();
let subscriber = ztimeout!(session2.declare_subscriber("test/qos").res()).unwrap();
task::sleep(SLEEP).await;

publisher.put("qos").res_async().await.unwrap();
let qos = sub.recv_async().await.unwrap().qos;
ztimeout!(publisher1.put("qos").res_async()).unwrap();
let qos = ztimeout!(subscriber.recv_async()).unwrap().qos;

assert_eq!(qos.priority, Priority::DataHigh.into());
assert_eq!(qos.priority, Priority::DataHigh);
assert_eq!(qos.congestion_control, CongestionControl::Drop);

let publisher = publisher
.priority(Priority::DataLow)
.congestion_control(CongestionControl::Block);
publisher.put("qos").res_async().await.unwrap();
let qos = sub.recv_async().await.unwrap().qos;
ztimeout!(publisher2.put("qos").res_async()).unwrap();
let qos = ztimeout!(subscriber.recv_async()).unwrap().qos;

assert_eq!(qos.priority, Priority::DataLow.into());
assert_eq!(qos.priority, Priority::DataLow);
assert_eq!(qos.congestion_control, CongestionControl::Block);
});
}