Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publisher reliability #1305

Merged
merged 13 commits into from
Sep 4, 2024
4 changes: 3 additions & 1 deletion commons/zenoh-codec/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ where
let header: u8 = self.codec.read(&mut *reader)?;

let codec = Zenoh080Header::new(header);
codec.read(&mut *reader)
let mut msg: NetworkMessage = codec.read(&mut *reader)?;
msg.reliability = self.reliability;
Ok(msg)
}
}

Expand Down
13 changes: 6 additions & 7 deletions commons/zenoh-codec/src/transport/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,12 @@ where
fn write(self, writer: &mut W, x: (&NetworkMessage, &FrameHeader)) -> Self::Output {
let (m, f) = x;

// @TODO: m.is_reliable() always return true for the time being
// if let (Reliability::Reliable, false) | (Reliability::BestEffort, true) =
// (f.reliability, m.is_reliable())
// {
// // We are not serializing on the right frame.
// return Err(BatchError::NewFrame);
// }
if let (Reliability::Reliable, false) | (Reliability::BestEffort, true) =
(f.reliability, m.is_reliable())
{
// We are not serializing on the right frame.
return Err(BatchError::NewFrame);
}

// Mark the write operation
let mark = writer.mark();
Expand Down
4 changes: 2 additions & 2 deletions commons/zenoh-protocol/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,12 +346,12 @@ impl TryFrom<u8> for Priority {
#[repr(u8)]
pub enum Reliability {
#[default]
BestEffort,
Reliable,
BestEffort,
}

impl Reliability {
pub const DEFAULT: Self = Self::BestEffort;
pub const DEFAULT: Self = Self::Reliable;

#[cfg(feature = "test")]
pub fn rand() -> Self {
Expand Down
7 changes: 4 additions & 3 deletions commons/zenoh-protocol/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub use push::Push;
pub use request::{AtomicRequestId, Request, RequestId};
pub use response::{Response, ResponseFinal};

use crate::core::{CongestionControl, Priority};
use crate::core::{CongestionControl, Priority, Reliability};

pub mod id {
// WARNING: it's crucial that these IDs do NOT collide with the IDs
Expand Down Expand Up @@ -83,6 +83,7 @@ pub enum NetworkBody {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NetworkMessage {
pub body: NetworkBody,
pub reliability: Reliability,
#[cfg(feature = "stats")]
pub size: Option<core::num::NonZeroUsize>,
}
Expand All @@ -109,8 +110,7 @@ impl NetworkMessage {

#[inline]
pub fn is_reliable(&self) -> bool {
// TODO
true
self.reliability == Reliability::Reliable
}

#[inline]
Expand Down Expand Up @@ -179,6 +179,7 @@ impl From<NetworkBody> for NetworkMessage {
fn from(body: NetworkBody) -> Self {
Self {
body,
reliability: Reliability::DEFAULT,
#[cfg(feature = "stats")]
size: None,
}
Expand Down
3 changes: 2 additions & 1 deletion commons/zenoh-protocol/src/transport/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ impl Frame {
let ext_qos = ext::QoSType::rand();
let mut payload = vec![];
for _ in 0..rng.gen_range(1..4) {
let m = NetworkMessage::rand();
let mut m = NetworkMessage::rand();
m.reliability = reliability;
payload.push(m);
}

Expand Down
4 changes: 3 additions & 1 deletion io/zenoh-transport/src/common/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ mod tests {
let mut batch = WBatch::new(config);

let tmsg: TransportMessage = KeepAlive.into();
let nmsg: NetworkMessage = Push {
let mut nmsg: NetworkMessage = Push {
wire_expr: WireExpr::empty(),
ext_qos: ext::QoSType::new(Priority::DEFAULT, CongestionControl::Block, false),
ext_tstamp: None,
Expand Down Expand Up @@ -601,13 +601,15 @@ mod tests {
sn: 0,
ext_qos: frame::ext::QoSType::DEFAULT,
};
nmsg.reliability = frame.reliability;

// Serialize with a frame
batch.encode((&nmsg, &frame)).unwrap();
assert_ne!(batch.len(), 0);
nmsgs_in.push(nmsg.clone());

frame.reliability = Reliability::BestEffort;
nmsg.reliability = frame.reliability;
batch.encode((&nmsg, &frame)).unwrap();
assert_ne!(batch.len(), 0);
nmsgs_in.push(nmsg.clone());
Expand Down
4 changes: 2 additions & 2 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use zenoh_codec::{transport::batch::BatchError, WCodec, Zenoh080};
use zenoh_config::QueueSizeConf;
use zenoh_core::zlock;
use zenoh_protocol::{
core::{Priority, Reliability},
core::Priority,
network::NetworkMessage,
transport::{
fragment::FragmentHeader,
Expand Down Expand Up @@ -220,7 +220,7 @@ impl StageIn {

// The Frame
let frame = FrameHeader {
reliability: Reliability::Reliable, // TODO
reliability: msg.reliability,
sn,
ext_qos: frame::ext::QoSType::new(priority),
};
Expand Down
10 changes: 10 additions & 0 deletions zenoh/src/api/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use std::{

use zenoh_core::{Result as ZResult, Wait};
use zenoh_keyexpr::keyexpr;
#[cfg(feature = "unstable")]
use zenoh_protocol::core::Reliability;
use zenoh_protocol::{core::WireExpr, network::NetworkMessage};
use zenoh_transport::{
TransportEventHandler, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler,
Expand Down Expand Up @@ -171,6 +173,8 @@ impl TransportMulticastEventHandler for Handler {
Some(info),
serde_json::to_vec(&peer).unwrap().into(),
SubscriberKind::Subscriber,
#[cfg(feature = "unstable")]
Reliability::Reliable,
None,
);
Ok(Arc::new(PeerHandler {
Expand Down Expand Up @@ -220,6 +224,8 @@ impl TransportPeerEventHandler for PeerHandler {
Some(info),
serde_json::to_vec(&link).unwrap().into(),
SubscriberKind::Subscriber,
#[cfg(feature = "unstable")]
Reliability::Reliable,
None,
);
}
Expand All @@ -240,6 +246,8 @@ impl TransportPeerEventHandler for PeerHandler {
Some(info),
vec![0u8; 0].into(),
SubscriberKind::Subscriber,
#[cfg(feature = "unstable")]
Reliability::Reliable,
None,
);
}
Expand All @@ -257,6 +265,8 @@ impl TransportPeerEventHandler for PeerHandler {
Some(info),
vec![0u8; 0].into(),
SubscriberKind::Subscriber,
#[cfg(feature = "unstable")]
Reliability::Reliable,
None,
);
}
Expand Down
33 changes: 33 additions & 0 deletions zenoh/src/api/builders/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
use std::future::{IntoFuture, Ready};

use zenoh_core::{Resolvable, Result as ZResult, Wait};
#[cfg(feature = "unstable")]
use zenoh_protocol::core::Reliability;
use zenoh_protocol::{core::CongestionControl, network::Mapping};

#[cfg(feature = "unstable")]
Expand Down Expand Up @@ -111,6 +113,17 @@ impl<T> PublicationBuilder<PublisherBuilder<'_, '_>, T> {
self.publisher = self.publisher.allowed_destination(destination);
self
}
/// Change the `reliability` to apply when routing the data.
OlivierHecart marked this conversation as resolved.
Show resolved Hide resolved
/// NOTE: Currently `reliability` does not trigger any data retransmission on the wire.
/// It is rather used as a marker on the wire and it may be used to select the best link available (e.g. TCP for reliable data and UDP for best effort data).
#[zenoh_macros::unstable]
#[inline]
pub fn reliability(self, reliability: Reliability) -> Self {
Self {
publisher: self.publisher.reliability(reliability),
..self
}
}
}

impl EncodingBuilderTrait for PublisherBuilder<'_, '_> {
Expand Down Expand Up @@ -239,6 +252,8 @@ pub struct PublisherBuilder<'a, 'b: 'a> {
pub(crate) congestion_control: CongestionControl,
pub(crate) priority: Priority,
pub(crate) is_express: bool,
#[cfg(feature = "unstable")]
pub(crate) reliability: Reliability,
pub(crate) destination: Locality,
}

Expand All @@ -254,6 +269,8 @@ impl<'a, 'b> Clone for PublisherBuilder<'a, 'b> {
congestion_control: self.congestion_control,
priority: self.priority,
is_express: self.is_express,
#[cfg(feature = "unstable")]
reliability: self.reliability,
destination: self.destination,
}
}
Expand Down Expand Up @@ -294,6 +311,18 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> {
self
}

/// Change the `reliability`` to apply when routing the data.
OlivierHecart marked this conversation as resolved.
Show resolved Hide resolved
/// NOTE: Currently `reliability` does not trigger any data retransmission on the wire.
/// It is rather used as a marker on the wire and it may be used to select the best link available (e.g. TCP for reliable data and UDP for best effort data).
#[zenoh_macros::unstable]
#[inline]
pub fn reliability(self, reliability: Reliability) -> Self {
Self {
reliability,
..self
}
}

// internal function for performing the publication
fn create_one_shot_publisher(self) -> ZResult<Publisher<'a>> {
Ok(Publisher {
Expand All @@ -306,6 +335,8 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> {
is_express: self.is_express,
destination: self.destination,
#[cfg(feature = "unstable")]
reliability: self.reliability,
#[cfg(feature = "unstable")]
matching_listeners: Default::default(),
undeclare_on_drop: true,
})
Expand Down Expand Up @@ -361,6 +392,8 @@ impl<'a, 'b> Wait for PublisherBuilder<'a, 'b> {
is_express: self.is_express,
destination: self.destination,
#[cfg(feature = "unstable")]
reliability: self.reliability,
#[cfg(feature = "unstable")]
matching_listeners: Default::default(),
undeclare_on_drop: true,
})
Expand Down
17 changes: 17 additions & 0 deletions zenoh/src/api/builders/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::marker::PhantomData;
use uhlc::Timestamp;
use zenoh_core::zresult;
use zenoh_protocol::core::CongestionControl;
#[cfg(feature = "unstable")]
use zenoh_protocol::core::Reliability;

use crate::api::{
bytes::{OptionZBytes, ZBytes},
Expand Down Expand Up @@ -87,6 +89,8 @@ impl SampleBuilder<SampleBuilderPut> {
timestamp: None,
qos: QoS::default(),
#[cfg(feature = "unstable")]
reliability: Reliability::DEFAULT,
#[cfg(feature = "unstable")]
source_info: SourceInfo::empty(),
attachment: None,
},
Expand Down Expand Up @@ -117,6 +121,8 @@ impl SampleBuilder<SampleBuilderDelete> {
timestamp: None,
qos: QoS::default(),
#[cfg(feature = "unstable")]
reliability: Reliability::DEFAULT,
#[cfg(feature = "unstable")]
source_info: SourceInfo::empty(),
attachment: None,
},
Expand Down Expand Up @@ -147,6 +153,17 @@ impl<T> SampleBuilder<T> {
_t: PhantomData::<T>,
}
}

#[zenoh_macros::unstable]
pub fn reliability(self, reliability: Reliability) -> Self {
Self {
sample: Sample {
reliability,
..self.sample
},
_t: PhantomData::<T>,
}
}
}

impl<T> TimestampBuilderTrait for SampleBuilder<T> {
Expand Down
Loading