Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose Quality of Service settings (Priority, Congestion Control, Express) the Sample was published with #730

Merged
merged 14 commits into from
Feb 23, 2024
Merged
31 changes: 21 additions & 10 deletions zenoh/src/publication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::prelude::*;
#[zenoh_macros::unstable]
use crate::sample::Attachment;
use crate::sample::DataInfo;
use crate::sample::QoS;
use crate::Encoding;
use crate::SessionRef;
use crate::Undeclarable;
Expand Down Expand Up @@ -859,12 +860,11 @@ fn resolve_put(
timestamp,
source_id: None,
source_sn: None,
qos: ext::QoSType::new(
qos: QoS::from_or_default(ext::QoSType::new(
publisher.priority.into(),
publisher.congestion_control,
false,
)
.into(),
)),
};

publisher.session.handle_data(
Expand Down Expand Up @@ -902,12 +902,6 @@ impl Priority {
pub const NUM: usize = 1 + Self::MIN as usize - Self::MAX as usize;
}

impl From<zenoh_protocol::core::Priority> for Priority {
fn from(priority: zenoh_protocol::core::Priority) -> Self {
Self::try_from(priority as u8).unwrap()
}
}

impl TryFrom<u8> for Priority {
type Error = zenoh_result::Error;

Expand Down Expand Up @@ -938,7 +932,8 @@ impl TryFrom<u8> for Priority {
}
}

impl From<Priority> for zenoh_protocol::core::Priority {
type ProtocolPriority = zenoh_protocol::core::Priority;
impl From<Priority> for ProtocolPriority {
fn from(prio: Priority) -> Self {
// The Priority in the prelude differs from the Priority in the core protocol only from
// the missing Control priority. The Control priority is reserved for zenoh internal use
Expand All @@ -952,6 +947,22 @@ impl From<Priority> for zenoh_protocol::core::Priority {
}
}

impl TryFrom<ProtocolPriority> for Priority {
type Error = zenoh_result::Error;
fn try_from(priority: ProtocolPriority) -> Result<Self, Self::Error> {
match priority {
ProtocolPriority::Control => bail!("'Control' is not a valid priority value."),
ProtocolPriority::RealTime => Ok(Priority::RealTime),
ProtocolPriority::InteractiveHigh => Ok(Priority::InteractiveHigh),
ProtocolPriority::InteractiveLow => Ok(Priority::InteractiveLow),
ProtocolPriority::DataHigh => Ok(Priority::DataHigh),
ProtocolPriority::Data => Ok(Priority::Data),
ProtocolPriority::DataLow => Ok(Priority::DataLow),
ProtocolPriority::Background => Ok(Priority::Background),
}
}
}

/// A struct that indicates if there exist Subscribers matching the Publisher's key expression.
///
/// # Examples
Expand Down
24 changes: 19 additions & 5 deletions zenoh/src/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,12 +527,26 @@ pub struct QoS {
pub express: bool,
}

impl From<QoSType> for QoS {
fn from(ext: QoSType) -> Self {
QoS {
priority: ext.get_priority().into(),
impl QoS {
/// Helper function to fallback to default QoS value and log a error in case of conversion failure
pub(crate) fn from_or_default(ext: QoSType) -> QoS {
Mallets marked this conversation as resolved.
Show resolved Hide resolved
match QoS::try_from(ext) {
Ok(qos) => return qos,
Err(e) => {
log::error!("Failed to convert: {}", e.to_string());
QoS::default()
}
}
}
}

impl TryFrom<QoSType> for QoS {
type Error = zenoh_result::Error;
fn try_from(ext: QoSType) -> Result<Self, Self::Error> {
Ok(QoS {
priority: ext.get_priority().try_into()?,
congestion_control: ext.get_congestion_control(),
express: ext.is_express(),
}
})
}
}
7 changes: 4 additions & 3 deletions zenoh/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::queryable::*;
#[cfg(feature = "unstable")]
use crate::sample::Attachment;
use crate::sample::DataInfo;
use crate::sample::QoS;
use crate::selector::TIME_RANGE_KEY;
use crate::subscriber::*;
use crate::Id;
Expand Down Expand Up @@ -2192,7 +2193,7 @@ impl Primitives for Session {
kind: SampleKind::Put,
encoding: Some(m.encoding),
timestamp: m.timestamp,
qos: msg.ext_qos.into(),
qos: QoS::from_or_default(msg.ext_qos),
source_id: m.ext_sinfo.as_ref().map(|i| i.zid),
source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64),
};
Expand All @@ -2210,7 +2211,7 @@ impl Primitives for Session {
kind: SampleKind::Delete,
encoding: None,
timestamp: m.timestamp,
qos: msg.ext_qos.into(),
qos: QoS::from_or_default(msg.ext_qos),
source_id: m.ext_sinfo.as_ref().map(|i| i.zid),
source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64),
};
Expand Down Expand Up @@ -2347,7 +2348,7 @@ impl Primitives for Session {
kind: SampleKind::Put,
encoding: Some(m.encoding),
timestamp: m.timestamp,
qos: msg.ext_qos.into(),
qos: QoS::from_or_default(msg.ext_qos),
source_id: m.ext_sinfo.as_ref().map(|i| i.zid),
source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64),
};
Expand Down