From 551927698543d8ac4cc8606bb1e4d3bfa3ed6c58 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 9 Aug 2024 14:15:08 +0200 Subject: [PATCH 01/12] Add reliability to NetworkMessage --- commons/zenoh-codec/src/network/mod.rs | 4 ++- commons/zenoh-protocol/src/network/mod.rs | 7 ++-- commons/zenoh-protocol/src/transport/frame.rs | 3 +- zenoh/src/net/primitives/mux.rs | 33 +++++++++++++++++-- 4 files changed, 39 insertions(+), 8 deletions(-) diff --git a/commons/zenoh-codec/src/network/mod.rs b/commons/zenoh-codec/src/network/mod.rs index c68a3470aa..65c75f1452 100644 --- a/commons/zenoh-codec/src/network/mod.rs +++ b/commons/zenoh-codec/src/network/mod.rs @@ -76,7 +76,9 @@ where let header: u8 = self.codec.read(&mut *reader)?; let codec = Zenoh080Header::new(header); - codec.read(&mut *reader) + let mut msg: NetworkMessage = codec.read(&mut *reader)?; + msg.reliability = self.reliability; + Ok(msg) } } diff --git a/commons/zenoh-protocol/src/network/mod.rs b/commons/zenoh-protocol/src/network/mod.rs index 407df6dd52..336f952f3d 100644 --- a/commons/zenoh-protocol/src/network/mod.rs +++ b/commons/zenoh-protocol/src/network/mod.rs @@ -30,7 +30,7 @@ pub use push::Push; pub use request::{AtomicRequestId, Request, RequestId}; pub use response::{Response, ResponseFinal}; -use crate::core::{CongestionControl, Priority}; +use crate::core::{CongestionControl, Priority, Reliability}; pub mod id { // WARNING: it's crucial that these IDs do NOT collide with the IDs @@ -83,6 +83,7 @@ pub enum NetworkBody { #[derive(Debug, Clone, PartialEq, Eq)] pub struct NetworkMessage { pub body: NetworkBody, + pub reliability: Reliability, #[cfg(feature = "stats")] pub size: Option, } @@ -109,8 +110,7 @@ impl NetworkMessage { #[inline] pub fn is_reliable(&self) -> bool { - // TODO - true + self.reliability == Reliability::Reliable } #[inline] @@ -179,6 +179,7 @@ impl From for NetworkMessage { fn from(body: NetworkBody) -> Self { Self { body, + reliability: Reliability::DEFAULT, #[cfg(feature = "stats")] size: None, } diff --git a/commons/zenoh-protocol/src/transport/frame.rs b/commons/zenoh-protocol/src/transport/frame.rs index b3ef1d819f..10e55cc99e 100644 --- a/commons/zenoh-protocol/src/transport/frame.rs +++ b/commons/zenoh-protocol/src/transport/frame.rs @@ -95,7 +95,8 @@ impl Frame { let ext_qos = ext::QoSType::rand(); let mut payload = vec![]; for _ in 0..rng.gen_range(1..4) { - let m = NetworkMessage::rand(); + let mut m = NetworkMessage::rand(); + m.reliability = reliability; payload.push(m); } diff --git a/zenoh/src/net/primitives/mux.rs b/zenoh/src/net/primitives/mux.rs index bc718ba324..b017e3858a 100644 --- a/zenoh/src/net/primitives/mux.rs +++ b/zenoh/src/net/primitives/mux.rs @@ -13,9 +13,12 @@ // use std::sync::OnceLock; -use zenoh_protocol::network::{ - interest::Interest, Declare, NetworkBody, NetworkMessage, Push, Request, Response, - ResponseFinal, +use zenoh_protocol::{ + core::Reliability, + network::{ + interest::Interest, Declare, NetworkBody, NetworkMessage, Push, Request, Response, + ResponseFinal, + }, }; use zenoh_transport::{multicast::TransportMulticast, unicast::TransportUnicast}; @@ -46,6 +49,7 @@ impl Primitives for Mux { fn send_interest(&self, msg: Interest) { let msg = NetworkMessage { body: NetworkBody::Interest(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -74,6 +78,7 @@ impl Primitives for Mux { fn send_declare(&self, msg: Declare) { let msg = NetworkMessage { body: NetworkBody::Declare(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -98,6 +103,7 @@ impl Primitives for Mux { fn send_push(&self, msg: Push) { let msg = NetworkMessage { body: NetworkBody::Push(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -122,6 +128,7 @@ impl Primitives for Mux { fn send_request(&self, msg: Request) { let msg = NetworkMessage { body: NetworkBody::Request(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -146,6 +153,7 @@ impl Primitives for Mux { fn send_response(&self, msg: Response) { let msg = NetworkMessage { body: NetworkBody::Response(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -170,6 +178,7 @@ impl Primitives for Mux { fn send_response_final(&self, msg: ResponseFinal) { let msg = NetworkMessage { body: NetworkBody::ResponseFinal(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -201,6 +210,7 @@ impl EPrimitives for Mux { let ctx = RoutingContext { msg: NetworkMessage { body: NetworkBody::Interest(ctx.msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }, @@ -226,6 +236,7 @@ impl EPrimitives for Mux { let ctx = RoutingContext { msg: NetworkMessage { body: NetworkBody::Declare(ctx.msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }, @@ -250,6 +261,7 @@ impl EPrimitives for Mux { fn send_push(&self, msg: Push) { let msg = NetworkMessage { body: NetworkBody::Push(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -274,6 +286,7 @@ impl EPrimitives for Mux { fn send_request(&self, msg: Request) { let msg = NetworkMessage { body: NetworkBody::Request(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -298,6 +311,7 @@ impl EPrimitives for Mux { fn send_response(&self, msg: Response) { let msg = NetworkMessage { body: NetworkBody::Response(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -322,6 +336,7 @@ impl EPrimitives for Mux { fn send_response_final(&self, msg: ResponseFinal) { let msg = NetworkMessage { body: NetworkBody::ResponseFinal(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -368,6 +383,7 @@ impl Primitives for McastMux { fn send_interest(&self, msg: Interest) { let msg = NetworkMessage { body: NetworkBody::Interest(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -392,6 +408,7 @@ impl Primitives for McastMux { fn send_declare(&self, msg: Declare) { let msg = NetworkMessage { body: NetworkBody::Declare(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -416,6 +433,7 @@ impl Primitives for McastMux { fn send_push(&self, msg: Push) { let msg = NetworkMessage { body: NetworkBody::Push(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -440,6 +458,7 @@ impl Primitives for McastMux { fn send_request(&self, msg: Request) { let msg = NetworkMessage { body: NetworkBody::Request(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -464,6 +483,7 @@ impl Primitives for McastMux { fn send_response(&self, msg: Response) { let msg = NetworkMessage { body: NetworkBody::Response(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -488,6 +508,7 @@ impl Primitives for McastMux { fn send_response_final(&self, msg: ResponseFinal) { let msg = NetworkMessage { body: NetworkBody::ResponseFinal(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -519,6 +540,7 @@ impl EPrimitives for McastMux { let ctx = RoutingContext { msg: NetworkMessage { body: NetworkBody::Interest(ctx.msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }, @@ -544,6 +566,7 @@ impl EPrimitives for McastMux { let ctx = RoutingContext { msg: NetworkMessage { body: NetworkBody::Declare(ctx.msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }, @@ -568,6 +591,7 @@ impl EPrimitives for McastMux { fn send_push(&self, msg: Push) { let msg = NetworkMessage { body: NetworkBody::Push(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -592,6 +616,7 @@ impl EPrimitives for McastMux { fn send_request(&self, msg: Request) { let msg = NetworkMessage { body: NetworkBody::Request(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -616,6 +641,7 @@ impl EPrimitives for McastMux { fn send_response(&self, msg: Response) { let msg = NetworkMessage { body: NetworkBody::Response(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -640,6 +666,7 @@ impl EPrimitives for McastMux { fn send_response_final(&self, msg: ResponseFinal) { let msg = NetworkMessage { body: NetworkBody::ResponseFinal(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; From ef76d115cdbc0a5b5f79d70d89327eb891c18f55 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 9 Aug 2024 14:50:20 +0200 Subject: [PATCH 02/12] Updates --- commons/zenoh-codec/src/transport/batch.rs | 13 ++++++------- io/zenoh-transport/src/common/pipeline.rs | 4 ++-- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/commons/zenoh-codec/src/transport/batch.rs b/commons/zenoh-codec/src/transport/batch.rs index bfdc21f618..d4fd603864 100644 --- a/commons/zenoh-codec/src/transport/batch.rs +++ b/commons/zenoh-codec/src/transport/batch.rs @@ -150,13 +150,12 @@ where fn write(self, writer: &mut W, x: (&NetworkMessage, &FrameHeader)) -> Self::Output { let (m, f) = x; - // @TODO: m.is_reliable() always return true for the time being - // if let (Reliability::Reliable, false) | (Reliability::BestEffort, true) = - // (f.reliability, m.is_reliable()) - // { - // // We are not serializing on the right frame. - // return Err(BatchError::NewFrame); - // } + if let (Reliability::Reliable, false) | (Reliability::BestEffort, true) = + (f.reliability, m.is_reliable()) + { + // We are not serializing on the right frame. + return Err(BatchError::NewFrame); + } // Mark the write operation let mark = writer.mark(); diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 68a4b87d24..9cf8d85843 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -17,7 +17,7 @@ use zenoh_codec::{transport::batch::BatchError, WCodec, Zenoh080}; use zenoh_config::QueueSizeConf; use zenoh_core::zlock; use zenoh_protocol::{ - core::{Priority, Reliability}, + core::Priority, network::NetworkMessage, transport::{ fragment::FragmentHeader, @@ -210,7 +210,7 @@ impl StageIn { // The Frame let frame = FrameHeader { - reliability: Reliability::Reliable, // TODO + reliability: msg.reliability, sn, ext_qos: frame::ext::QoSType::new(priority), }; From ff8699bddbec1f059ecfb1602b6419d798c18cdb Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 9 Aug 2024 15:07:39 +0200 Subject: [PATCH 03/12] Change primitives --- zenoh/src/api/publisher.rs | 71 +++++---- zenoh/src/api/session.rs | 6 +- zenoh/src/net/primitives/demux.rs | 2 +- zenoh/src/net/primitives/mod.rs | 13 +- zenoh/src/net/primitives/mux.rs | 16 +- zenoh/src/net/routing/dispatcher/face.rs | 14 +- zenoh/src/net/routing/dispatcher/pubsub.rs | 97 +++++++----- zenoh/src/net/runtime/adminspace.rs | 8 +- zenoh/src/net/tests/tables.rs | 171 +++++++++++---------- 9 files changed, 211 insertions(+), 187 deletions(-) diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index f4b969b18f..d8e3ecca61 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -23,7 +23,7 @@ use std::{ use futures::Sink; use zenoh_core::{zread, Resolvable, Resolve, Wait}; use zenoh_protocol::{ - core::CongestionControl, + core::{CongestionControl, Reliability}, network::{push::ext, Push}, zenoh::{Del, PushBody, Put}, }; @@ -582,40 +582,43 @@ impl Publisher<'_> { timestamp }; if self.destination != Locality::SessionLocal { - primitives.send_push(Push { - wire_expr: self.key_expr.to_wire(&self.session).to_owned(), - ext_qos: ext::QoSType::new( - self.priority.into(), - self.congestion_control, - self.is_express, - ), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - payload: match kind { - SampleKind::Put => PushBody::Put(Put { - timestamp, - encoding: encoding.clone().into(), - #[cfg(feature = "unstable")] - ext_sinfo: source_info.into(), - #[cfg(not(feature = "unstable"))] - ext_sinfo: None, - #[cfg(feature = "shared-memory")] - ext_shm: None, - ext_attachment: attachment.clone().map(|a| a.into()), - ext_unknown: vec![], - payload: payload.clone().into(), - }), - SampleKind::Delete => PushBody::Del(Del { - timestamp, - #[cfg(feature = "unstable")] - ext_sinfo: source_info.into(), - #[cfg(not(feature = "unstable"))] - ext_sinfo: None, - ext_attachment: attachment.clone().map(|a| a.into()), - ext_unknown: vec![], - }), + primitives.send_push( + Push { + wire_expr: self.key_expr.to_wire(&self.session).to_owned(), + ext_qos: ext::QoSType::new( + self.priority.into(), + self.congestion_control, + self.is_express, + ), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + payload: match kind { + SampleKind::Put => PushBody::Put(Put { + timestamp, + encoding: encoding.clone().into(), + #[cfg(feature = "unstable")] + ext_sinfo: source_info.into(), + #[cfg(not(feature = "unstable"))] + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_attachment: attachment.clone().map(|a| a.into()), + ext_unknown: vec![], + payload: payload.clone().into(), + }), + SampleKind::Delete => PushBody::Del(Del { + timestamp, + #[cfg(feature = "unstable")] + ext_sinfo: source_info.into(), + #[cfg(not(feature = "unstable"))] + ext_sinfo: None, + ext_attachment: attachment.clone().map(|a| a.into()), + ext_unknown: vec![], + }), + }, }, - }); + Reliability::Reliable, // TODO + ); } if self.destination != Locality::Remote { let data_info = DataInfo { diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index ed1c75d3f2..c1ba753f68 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -2333,7 +2333,7 @@ impl Primitives for Session { } } - fn send_push(&self, msg: Push) { + fn send_push(&self, msg: Push, _reliability: Reliability) { trace!("recv Push {:?}", msg); match msg.payload { PushBody::Put(m) => { @@ -2791,8 +2791,8 @@ impl crate::net::primitives::EPrimitives for Session { } #[inline] - fn send_push(&self, msg: Push) { - (self as &dyn Primitives).send_push(msg) + fn send_push(&self, msg: Push, reliability: Reliability) { + (self as &dyn Primitives).send_push(msg, reliability) } #[inline] diff --git a/zenoh/src/net/primitives/demux.rs b/zenoh/src/net/primitives/demux.rs index 59111e5441..e4774aab4a 100644 --- a/zenoh/src/net/primitives/demux.rs +++ b/zenoh/src/net/primitives/demux.rs @@ -66,7 +66,7 @@ impl TransportPeerEventHandler for DeMux { } match msg.body { - NetworkBody::Push(m) => self.face.send_push(m), + NetworkBody::Push(m) => self.face.send_push(m, msg.reliability), NetworkBody::Declare(m) => self.face.send_declare(m), NetworkBody::Interest(m) => self.face.send_interest(m), NetworkBody::Request(m) => self.face.send_request(m), diff --git a/zenoh/src/net/primitives/mod.rs b/zenoh/src/net/primitives/mod.rs index 837571f7f6..21526466f5 100644 --- a/zenoh/src/net/primitives/mod.rs +++ b/zenoh/src/net/primitives/mod.rs @@ -18,8 +18,9 @@ use std::any::Any; pub use demux::*; pub use mux::*; -use zenoh_protocol::network::{ - interest::Interest, Declare, Push, Request, Response, ResponseFinal, +use zenoh_protocol::{ + core::Reliability, + network::{interest::Interest, Declare, Push, Request, Response, ResponseFinal}, }; use super::routing::RoutingContext; @@ -29,7 +30,7 @@ pub trait Primitives: Send + Sync { fn send_declare(&self, msg: Declare); - fn send_push(&self, msg: Push); + fn send_push(&self, msg: Push, reliability: Reliability); fn send_request(&self, msg: Request); @@ -47,7 +48,7 @@ pub(crate) trait EPrimitives: Send + Sync { fn send_declare(&self, ctx: RoutingContext); - fn send_push(&self, msg: Push); + fn send_push(&self, msg: Push, reliability: Reliability); fn send_request(&self, msg: Request); @@ -64,7 +65,7 @@ impl Primitives for DummyPrimitives { fn send_declare(&self, _msg: Declare) {} - fn send_push(&self, _msg: Push) {} + fn send_push(&self, _msg: Push, _reliability: Reliability) {} fn send_request(&self, _msg: Request) {} @@ -80,7 +81,7 @@ impl EPrimitives for DummyPrimitives { fn send_declare(&self, _ctx: RoutingContext) {} - fn send_push(&self, _msg: Push) {} + fn send_push(&self, _msg: Push, _reliability: Reliability) {} fn send_request(&self, _msg: Request) {} diff --git a/zenoh/src/net/primitives/mux.rs b/zenoh/src/net/primitives/mux.rs index b017e3858a..47067f231e 100644 --- a/zenoh/src/net/primitives/mux.rs +++ b/zenoh/src/net/primitives/mux.rs @@ -100,10 +100,10 @@ impl Primitives for Mux { } } - fn send_push(&self, msg: Push) { + fn send_push(&self, msg: Push, reliability: Reliability) { let msg = NetworkMessage { body: NetworkBody::Push(msg), - reliability: Reliability::Reliable, + reliability, #[cfg(feature = "stats")] size: None, }; @@ -258,10 +258,10 @@ impl EPrimitives for Mux { } } - fn send_push(&self, msg: Push) { + fn send_push(&self, msg: Push, reliability: Reliability) { let msg = NetworkMessage { body: NetworkBody::Push(msg), - reliability: Reliability::Reliable, + reliability, #[cfg(feature = "stats")] size: None, }; @@ -430,10 +430,10 @@ impl Primitives for McastMux { } } - fn send_push(&self, msg: Push) { + fn send_push(&self, msg: Push, reliability: Reliability) { let msg = NetworkMessage { body: NetworkBody::Push(msg), - reliability: Reliability::Reliable, + reliability, #[cfg(feature = "stats")] size: None, }; @@ -588,10 +588,10 @@ impl EPrimitives for McastMux { } } - fn send_push(&self, msg: Push) { + fn send_push(&self, msg: Push, reliability: Reliability) { let msg = NetworkMessage { body: NetworkBody::Push(msg), - reliability: Reliability::Reliable, + reliability, #[cfg(feature = "stats")] size: None, }; diff --git a/zenoh/src/net/routing/dispatcher/face.rs b/zenoh/src/net/routing/dispatcher/face.rs index bbc910b124..0bc14450a7 100644 --- a/zenoh/src/net/routing/dispatcher/face.rs +++ b/zenoh/src/net/routing/dispatcher/face.rs @@ -20,7 +20,7 @@ use std::{ use tokio_util::sync::CancellationToken; use zenoh_protocol::{ - core::{ExprId, WhatAmI, ZenohIdProto}, + core::{ExprId, Reliability, WhatAmI, ZenohIdProto}, network::{ interest::{InterestId, InterestMode, InterestOptions}, Mapping, Push, Request, RequestId, Response, ResponseFinal, @@ -379,16 +379,8 @@ impl Primitives for Face { } #[inline] - fn send_push(&self, msg: Push) { - full_reentrant_route_data( - &self.tables, - &self.state, - &msg.wire_expr, - msg.ext_qos, - msg.ext_tstamp, - msg.payload, - msg.ext_nodeid.node_id, - ); + fn send_push(&self, msg: Push, reliability: Reliability) { + route_data(&self.tables, &self.state, msg, reliability); } fn send_request(&self, msg: Request) { diff --git a/zenoh/src/net/routing/dispatcher/pubsub.rs b/zenoh/src/net/routing/dispatcher/pubsub.rs index 84c8433a48..7334f4f267 100644 --- a/zenoh/src/net/routing/dispatcher/pubsub.rs +++ b/zenoh/src/net/routing/dispatcher/pubsub.rs @@ -15,7 +15,7 @@ use std::{collections::HashMap, sync::Arc}; use zenoh_core::zread; use zenoh_protocol::{ - core::{key_expr::keyexpr, WhatAmI, WireExpr}, + core::{key_expr::keyexpr, Reliability, WhatAmI, WireExpr}, network::{ declare::{ext, subscriber::ext::SubscriberInfo, SubscriberId}, Push, @@ -385,42 +385,42 @@ macro_rules! inc_stats { }; } -pub fn full_reentrant_route_data( +pub fn route_data( tables_ref: &Arc, face: &FaceState, - expr: &WireExpr, - ext_qos: ext::QoSType, - ext_tstamp: Option, - mut payload: PushBody, - routing_context: NodeId, + mut msg: Push, + reliability: Reliability, ) { let tables = zread!(tables_ref.tables); - match tables.get_mapping(face, &expr.scope, expr.mapping).cloned() { + match tables + .get_mapping(face, &msg.wire_expr.scope, msg.wire_expr.mapping) + .cloned() + { Some(prefix) => { tracing::trace!( "{} Route data for res {}{}", face, prefix.expr(), - expr.suffix.as_ref() + msg.wire_expr.suffix.as_ref() ); - let mut expr = RoutingExpr::new(&prefix, expr.suffix.as_ref()); + let mut expr = RoutingExpr::new(&prefix, msg.wire_expr.suffix.as_ref()); #[cfg(feature = "stats")] let admin = expr.full_expr().starts_with("@/"); #[cfg(feature = "stats")] if !admin { - inc_stats!(face, rx, user, payload) + inc_stats!(face, rx, user, msg.payload) } else { - inc_stats!(face, rx, admin, payload) + inc_stats!(face, rx, admin, msg.payload) } if tables.hat_code.ingress_filter(&tables, face, &mut expr) { let res = Resource::get_resource(&prefix, expr.suffix); - let route = get_data_route(&tables, face, &res, &mut expr, routing_context); + let route = get_data_route(&tables, face, &res, &mut expr, msg.ext_nodeid.node_id); if !route.is_empty() { - treat_timestamp!(&tables.hlc, payload, tables.drop_future_timestamp); + treat_timestamp!(&tables.hlc, msg.payload, tables.drop_future_timestamp); if route.len() == 1 { let (outface, key_expr, context) = route.values().next().unwrap(); @@ -431,18 +431,21 @@ pub fn full_reentrant_route_data( drop(tables); #[cfg(feature = "stats")] if !admin { - inc_stats!(face, tx, user, payload) + inc_stats!(face, tx, user, msg.payload) } else { - inc_stats!(face, tx, admin, payload) + inc_stats!(face, tx, admin, msg.payload) } - outface.primitives.send_push(Push { - wire_expr: key_expr.into(), - ext_qos, - ext_tstamp, - ext_nodeid: ext::NodeIdType { node_id: *context }, - payload, - }) + outface.primitives.send_push( + Push { + wire_expr: key_expr.into(), + ext_qos: msg.ext_qos, + ext_tstamp: msg.ext_tstamp, + ext_nodeid: ext::NodeIdType { node_id: *context }, + payload: msg.payload, + }, + reliability, + ) } } else if tables.whatami == WhatAmI::Router { let route = route @@ -459,18 +462,21 @@ pub fn full_reentrant_route_data( for (outface, key_expr, context) in route { #[cfg(feature = "stats")] if !admin { - inc_stats!(face, tx, user, payload) + inc_stats!(face, tx, user, msg.payload) } else { - inc_stats!(face, tx, admin, payload) + inc_stats!(face, tx, admin, msg.payload) } - outface.primitives.send_push(Push { - wire_expr: key_expr, - ext_qos, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType { node_id: context }, - payload: payload.clone(), - }) + outface.primitives.send_push( + Push { + wire_expr: key_expr, + ext_qos: msg.ext_qos, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType { node_id: context }, + payload: msg.payload.clone(), + }, + reliability, + ) } } else { drop(tables); @@ -483,18 +489,21 @@ pub fn full_reentrant_route_data( { #[cfg(feature = "stats")] if !admin { - inc_stats!(face, tx, user, payload) + inc_stats!(face, tx, user, msg.payload) } else { - inc_stats!(face, tx, admin, payload) + inc_stats!(face, tx, admin, msg.payload) } - outface.primitives.send_push(Push { - wire_expr: key_expr.into(), - ext_qos, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType { node_id: *context }, - payload: payload.clone(), - }) + outface.primitives.send_push( + Push { + wire_expr: key_expr.into(), + ext_qos: msg.ext_qos, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType { node_id: *context }, + payload: msg.payload.clone(), + }, + reliability, + ) } } } @@ -502,7 +511,11 @@ pub fn full_reentrant_route_data( } } None => { - tracing::error!("{} Route data with unknown scope {}!", face, expr.scope); + tracing::error!( + "{} Route data with unknown scope {}!", + face, + msg.wire_expr.scope + ); } } } diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index ce87d68ef0..d3e2a3c1ad 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -26,7 +26,7 @@ use zenoh_plugin_trait::{PluginControl, PluginStatus}; #[cfg(feature = "plugins")] use zenoh_protocol::core::key_expr::keyexpr; use zenoh_protocol::{ - core::{key_expr::OwnedKeyExpr, ExprId, WireExpr, EMPTY_EXPR_ID}, + core::{key_expr::OwnedKeyExpr, ExprId, Reliability, WireExpr, EMPTY_EXPR_ID}, network::{ declare::{ queryable::ext::QueryableInfoType, subscriber::ext::SubscriberInfo, QueryableId, @@ -375,7 +375,7 @@ impl Primitives for AdminSpace { } } - fn send_push(&self, msg: Push) { + fn send_push(&self, msg: Push, _reliability: Reliability) { trace!("recv Push {:?}", msg); { let conf = self.context.runtime.state.config.lock(); @@ -516,8 +516,8 @@ impl crate::net::primitives::EPrimitives for AdminSpace { } #[inline] - fn send_push(&self, msg: Push) { - (self as &dyn Primitives).send_push(msg) + fn send_push(&self, msg: Push, reliability: Reliability) { + (self as &dyn Primitives).send_push(msg, reliability) } #[inline] diff --git a/zenoh/src/net/tests/tables.rs b/zenoh/src/net/tests/tables.rs index 5fd8a49261..bab638af83 100644 --- a/zenoh/src/net/tests/tables.rs +++ b/zenoh/src/net/tests/tables.rs @@ -26,7 +26,7 @@ use zenoh_protocol::{ EMPTY_EXPR_ID, }, network::{ - declare::subscriber::ext::SubscriberInfo, ext, Declare, DeclareBody, DeclareKeyExpr, + declare::subscriber::ext::SubscriberInfo, ext, Declare, DeclareBody, DeclareKeyExpr, Push, }, zenoh::{PushBody, Put}, }; @@ -534,7 +534,7 @@ impl Primitives for ClientPrimitives { } } - fn send_push(&self, msg: zenoh_protocol::network::Push) { + fn send_push(&self, msg: zenoh_protocol::network::Push, _reliability: Reliability) { *zlock!(self.data) = Some(msg.wire_expr.to_owned()); } @@ -563,7 +563,7 @@ impl EPrimitives for ClientPrimitives { } } - fn send_push(&self, msg: zenoh_protocol::network::Push) { + fn send_push(&self, msg: zenoh_protocol::network::Push, _reliability: Reliability) { *zlock!(self.data) = Some(msg.wire_expr.to_owned()); } @@ -736,23 +736,26 @@ fn client_test() { primitives1.clear_data(); primitives2.clear_data(); - full_reentrant_route_data( + route_data( &tables, &face0.upgrade().unwrap(), - &"test/client/z1_wr1".into(), - ext::QoSType::DEFAULT, - None, - PushBody::Put(Put { - timestamp: None, - encoding: Encoding::empty(), - ext_sinfo: None, - #[cfg(feature = "shared-memory")] - ext_shm: None, - ext_unknown: vec![], - payload: ZBuf::empty(), - ext_attachment: None, - }), - 0, + Push { + wire_expr: "test/client/z1_wr1".into(), + ext_qos: ext::QoSType::DEFAULT, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType { node_id: 0 }, + payload: PushBody::Put(Put { + timestamp: None, + encoding: Encoding::empty(), + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_unknown: vec![], + payload: ZBuf::empty(), + ext_attachment: None, + }), + }, + Reliability::Reliable, ); // functional check @@ -770,23 +773,26 @@ fn client_test() { primitives0.clear_data(); primitives1.clear_data(); primitives2.clear_data(); - full_reentrant_route_data( + route_data( &router.tables, &face0.upgrade().unwrap(), - &WireExpr::from(11).with_suffix("/z1_wr2"), - ext::QoSType::DEFAULT, - None, - PushBody::Put(Put { - timestamp: None, - encoding: Encoding::empty(), - ext_sinfo: None, - #[cfg(feature = "shared-memory")] - ext_shm: None, - ext_unknown: vec![], - payload: ZBuf::empty(), - ext_attachment: None, - }), - 0, + Push { + wire_expr: WireExpr::from(11).with_suffix("/z1_wr2"), + ext_qos: ext::QoSType::DEFAULT, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType { node_id: 0 }, + payload: PushBody::Put(Put { + timestamp: None, + encoding: Encoding::empty(), + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_unknown: vec![], + payload: ZBuf::empty(), + ext_attachment: None, + }), + }, + Reliability::Reliable, ); // functional check @@ -804,23 +810,26 @@ fn client_test() { primitives0.clear_data(); primitives1.clear_data(); primitives2.clear_data(); - full_reentrant_route_data( + route_data( &router.tables, &face1.upgrade().unwrap(), - &"test/client/**".into(), - ext::QoSType::DEFAULT, - None, - PushBody::Put(Put { - timestamp: None, - encoding: Encoding::empty(), - ext_sinfo: None, - #[cfg(feature = "shared-memory")] - ext_shm: None, - ext_unknown: vec![], - payload: ZBuf::empty(), - ext_attachment: None, - }), - 0, + Push { + wire_expr: "test/client/**".into(), + ext_qos: ext::QoSType::DEFAULT, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType { node_id: 0 }, + payload: PushBody::Put(Put { + timestamp: None, + encoding: Encoding::empty(), + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_unknown: vec![], + payload: ZBuf::empty(), + ext_attachment: None, + }), + }, + Reliability::Reliable, ); // functional check @@ -838,23 +847,26 @@ fn client_test() { primitives0.clear_data(); primitives1.clear_data(); primitives2.clear_data(); - full_reentrant_route_data( + route_data( &router.tables, &face0.upgrade().unwrap(), - &12.into(), - ext::QoSType::DEFAULT, - None, - PushBody::Put(Put { - timestamp: None, - encoding: Encoding::empty(), - ext_sinfo: None, - #[cfg(feature = "shared-memory")] - ext_shm: None, - ext_unknown: vec![], - payload: ZBuf::empty(), - ext_attachment: None, - }), - 0, + Push { + wire_expr: 12.into(), + ext_qos: ext::QoSType::DEFAULT, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType { node_id: 0 }, + payload: PushBody::Put(Put { + timestamp: None, + encoding: Encoding::empty(), + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_unknown: vec![], + payload: ZBuf::empty(), + ext_attachment: None, + }), + }, + Reliability::Reliable, ); // functional check @@ -872,23 +884,26 @@ fn client_test() { primitives0.clear_data(); primitives1.clear_data(); primitives2.clear_data(); - full_reentrant_route_data( + route_data( &router.tables, &face1.upgrade().unwrap(), - &22.into(), - ext::QoSType::DEFAULT, - None, - PushBody::Put(Put { - timestamp: None, - encoding: Encoding::empty(), - ext_sinfo: None, - #[cfg(feature = "shared-memory")] - ext_shm: None, - ext_unknown: vec![], - payload: ZBuf::empty(), - ext_attachment: None, - }), - 0, + Push { + wire_expr: 22.into(), + ext_qos: ext::QoSType::DEFAULT, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType { node_id: 0 }, + payload: PushBody::Put(Put { + timestamp: None, + encoding: Encoding::empty(), + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_unknown: vec![], + payload: ZBuf::empty(), + ext_attachment: None, + }), + }, + Reliability::Reliable, ); // functional check From ce6cdec678ebbeccac196546cdad0a123c85822a Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 9 Aug 2024 18:33:17 +0200 Subject: [PATCH 04/12] Api publisher reliability --- zenoh/src/api/admin.rs | 10 +++++++ zenoh/src/api/builders/publisher.rs | 29 +++++++++++++++++++ zenoh/src/api/builders/sample.rs | 17 +++++++++++ zenoh/src/api/publisher.rs | 10 ++++++- zenoh/src/api/sample.rs | 29 ++++++++++++++++++- zenoh/src/api/session.rs | 44 +++++++++++++++++++++++++---- 6 files changed, 132 insertions(+), 7 deletions(-) diff --git a/zenoh/src/api/admin.rs b/zenoh/src/api/admin.rs index e794c87db5..8c20a275c1 100644 --- a/zenoh/src/api/admin.rs +++ b/zenoh/src/api/admin.rs @@ -19,6 +19,8 @@ use std::{ use zenoh_core::{Result as ZResult, Wait}; use zenoh_keyexpr::keyexpr; +#[cfg(feature = "unstable")] +use zenoh_protocol::core::Reliability; use zenoh_protocol::{core::WireExpr, network::NetworkMessage}; use zenoh_transport::{ TransportEventHandler, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, @@ -171,6 +173,8 @@ impl TransportMulticastEventHandler for Handler { Some(info), serde_json::to_vec(&peer).unwrap().into(), SubscriberKind::Subscriber, + #[cfg(feature = "unstable")] + Reliability::Reliable, None, ); Ok(Arc::new(PeerHandler { @@ -220,6 +224,8 @@ impl TransportPeerEventHandler for PeerHandler { Some(info), serde_json::to_vec(&link).unwrap().into(), SubscriberKind::Subscriber, + #[cfg(feature = "unstable")] + Reliability::Reliable, None, ); } @@ -240,6 +246,8 @@ impl TransportPeerEventHandler for PeerHandler { Some(info), vec![0u8; 0].into(), SubscriberKind::Subscriber, + #[cfg(feature = "unstable")] + Reliability::Reliable, None, ); } @@ -257,6 +265,8 @@ impl TransportPeerEventHandler for PeerHandler { Some(info), vec![0u8; 0].into(), SubscriberKind::Subscriber, + #[cfg(feature = "unstable")] + Reliability::Reliable, None, ); } diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index 666b4378e0..0404838a04 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -14,6 +14,8 @@ use std::future::{IntoFuture, Ready}; use zenoh_core::{Resolvable, Result as ZResult, Wait}; +#[cfg(feature = "unstable")] +use zenoh_protocol::core::Reliability; use zenoh_protocol::{core::CongestionControl, network::Mapping}; #[cfg(feature = "unstable")] @@ -111,6 +113,15 @@ impl PublicationBuilder, T> { self.publisher = self.publisher.allowed_destination(destination); self } + /// Change the `reliability` to apply when routing the data. + #[zenoh_macros::unstable] + #[inline] + pub fn reliability(self, reliability: Reliability) -> Self { + Self { + publisher: self.publisher.reliability(reliability), + ..self + } + } } impl EncodingBuilderTrait for PublisherBuilder<'_, '_> { @@ -239,6 +250,8 @@ pub struct PublisherBuilder<'a, 'b: 'a> { pub(crate) congestion_control: CongestionControl, pub(crate) priority: Priority, pub(crate) is_express: bool, + #[cfg(feature = "unstable")] + pub(crate) reliability: Reliability, pub(crate) destination: Locality, } @@ -254,6 +267,8 @@ impl<'a, 'b> Clone for PublisherBuilder<'a, 'b> { congestion_control: self.congestion_control, priority: self.priority, is_express: self.is_express, + #[cfg(feature = "unstable")] + reliability: self.reliability, destination: self.destination, } } @@ -294,6 +309,16 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { self } + /// Change the `reliability`` to apply when routing the data. + #[zenoh_macros::unstable] + #[inline] + pub fn reliability(self, reliability: Reliability) -> Self { + Self { + reliability, + ..self + } + } + // internal function for performing the publication fn create_one_shot_publisher(self) -> ZResult> { Ok(Publisher { @@ -306,6 +331,8 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { is_express: self.is_express, destination: self.destination, #[cfg(feature = "unstable")] + reliability: self.reliability, + #[cfg(feature = "unstable")] matching_listeners: Default::default(), undeclare_on_drop: true, }) @@ -361,6 +388,8 @@ impl<'a, 'b> Wait for PublisherBuilder<'a, 'b> { is_express: self.is_express, destination: self.destination, #[cfg(feature = "unstable")] + reliability: self.reliability, + #[cfg(feature = "unstable")] matching_listeners: Default::default(), undeclare_on_drop: true, }) diff --git a/zenoh/src/api/builders/sample.rs b/zenoh/src/api/builders/sample.rs index 53cf099448..4c1fa81406 100644 --- a/zenoh/src/api/builders/sample.rs +++ b/zenoh/src/api/builders/sample.rs @@ -16,6 +16,8 @@ use std::marker::PhantomData; use uhlc::Timestamp; use zenoh_core::zresult; use zenoh_protocol::core::CongestionControl; +#[cfg(feature = "unstable")] +use zenoh_protocol::core::Reliability; use crate::api::{ bytes::{OptionZBytes, ZBytes}, @@ -87,6 +89,8 @@ impl SampleBuilder { timestamp: None, qos: QoS::default(), #[cfg(feature = "unstable")] + reliability: Reliability::DEFAULT, + #[cfg(feature = "unstable")] source_info: SourceInfo::empty(), attachment: None, }, @@ -117,6 +121,8 @@ impl SampleBuilder { timestamp: None, qos: QoS::default(), #[cfg(feature = "unstable")] + reliability: Reliability::DEFAULT, + #[cfg(feature = "unstable")] source_info: SourceInfo::empty(), attachment: None, }, @@ -147,6 +153,17 @@ impl SampleBuilder { _t: PhantomData::, } } + + #[zenoh_macros::unstable] + pub fn reliability(self, reliability: Reliability) -> Self { + Self { + sample: Sample { + reliability, + ..self.sample + }, + _t: PhantomData::, + } + } } impl TimestampBuilderTrait for SampleBuilder { diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index d8e3ecca61..4cab752f79 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -145,6 +145,8 @@ pub struct Publisher<'a> { pub(crate) is_express: bool, pub(crate) destination: Locality, #[cfg(feature = "unstable")] + pub(crate) reliability: Reliability, + #[cfg(feature = "unstable")] pub(crate) matching_listeners: Arc>>, pub(crate) undeclare_on_drop: bool, } @@ -561,6 +563,7 @@ impl<'a> Sink for Publisher<'a> { } impl Publisher<'_> { + #[allow(clippy::too_many_arguments)] // TODO fixme pub(crate) fn resolve_put( &self, payload: ZBytes, @@ -617,7 +620,10 @@ impl Publisher<'_> { }), }, }, - Reliability::Reliable, // TODO + #[cfg(feature = "unstable")] + self.reliability, + #[cfg(not(feature = "unstable"))] + Reliability::DEFAULT, ); } if self.destination != Locality::Remote { @@ -640,6 +646,8 @@ impl Publisher<'_> { Some(data_info), payload.into(), SubscriberKind::Subscriber, + #[cfg(feature = "unstable")] + self.reliability, attachment, ); } diff --git a/zenoh/src/api/sample.rs b/zenoh/src/api/sample.rs index 220785c668..253e98d4b5 100644 --- a/zenoh/src/api/sample.rs +++ b/zenoh/src/api/sample.rs @@ -18,6 +18,8 @@ use std::{convert::TryFrom, fmt}; #[cfg(feature = "unstable")] use serde::Serialize; use zenoh_config::wrappers::EntityGlobalId; +#[cfg(feature = "unstable")] +use zenoh_protocol::core::Reliability; use zenoh_protocol::{ core::{CongestionControl, Timestamp}, network::declare::ext::QoSType, @@ -63,6 +65,7 @@ pub(crate) trait DataInfoIntoSample { self, key_expr: IntoKeyExpr, payload: IntoZBytes, + #[cfg(feature = "unstable")] reliability: Reliability, attachment: Option, ) -> Sample where @@ -80,6 +83,7 @@ impl DataInfoIntoSample for DataInfo { self, key_expr: IntoKeyExpr, payload: IntoZBytes, + #[cfg(feature = "unstable")] reliability: Reliability, attachment: Option, ) -> Sample where @@ -94,6 +98,8 @@ impl DataInfoIntoSample for DataInfo { timestamp: self.timestamp, qos: self.qos, #[cfg(feature = "unstable")] + reliability, + #[cfg(feature = "unstable")] source_info: SourceInfo { source_id: self.source_id, source_sn: self.source_sn, @@ -109,6 +115,7 @@ impl DataInfoIntoSample for Option { self, key_expr: IntoKeyExpr, payload: IntoZBytes, + #[cfg(feature = "unstable")] reliability: Reliability, attachment: Option, ) -> Sample where @@ -116,7 +123,13 @@ impl DataInfoIntoSample for Option { IntoZBytes: Into, { if let Some(data_info) = self { - data_info.into_sample(key_expr, payload, attachment) + data_info.into_sample( + key_expr, + payload, + #[cfg(feature = "unstable")] + reliability, + attachment, + ) } else { Sample { key_expr: key_expr.into(), @@ -126,6 +139,8 @@ impl DataInfoIntoSample for Option { timestamp: None, qos: QoS::default(), #[cfg(feature = "unstable")] + reliability, + #[cfg(feature = "unstable")] source_info: SourceInfo::empty(), attachment, } @@ -252,6 +267,8 @@ pub struct SampleFields { pub priority: Priority, pub congestion_control: CongestionControl, #[cfg(feature = "unstable")] + pub reliability: Reliability, + #[cfg(feature = "unstable")] pub source_info: SourceInfo, pub attachment: Option, } @@ -268,6 +285,8 @@ impl From for SampleFields { priority: sample.qos.priority(), congestion_control: sample.qos.congestion_control(), #[cfg(feature = "unstable")] + reliability: sample.reliability, + #[cfg(feature = "unstable")] source_info: sample.source_info, attachment: sample.attachment, } @@ -285,6 +304,8 @@ pub struct Sample { pub(crate) timestamp: Option, pub(crate) qos: QoS, #[cfg(feature = "unstable")] + pub(crate) reliability: Reliability, + #[cfg(feature = "unstable")] pub(crate) source_info: SourceInfo, pub(crate) attachment: Option, } @@ -336,6 +357,12 @@ impl Sample { self.qos.priority() } + /// Gets the reliability of this Sample + #[zenoh_macros::unstable] + pub fn reliability(&self) -> Reliability { + self.reliability + } + /// Gets the express flag value. If `true`, the message is not batched during transmission, in order to reduce latency. pub fn express(&self) -> bool { self.qos.express() diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index c1ba753f68..b0c7543be8 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -413,6 +413,8 @@ impl<'s, 'a> SessionDeclarations<'s, 'a> for SessionRef<'a> { congestion_control: CongestionControl::DEFAULT, priority: Priority::DEFAULT, is_express: false, + #[cfg(feature = "unstable")] + reliability: Reliability::DEFAULT, destination: Locality::default(), } } @@ -1653,6 +1655,7 @@ impl Session { } } + #[allow(clippy::too_many_arguments)] // TODO fixme pub(crate) fn execute_subscriber_callbacks( &self, local: bool, @@ -1660,6 +1663,7 @@ impl Session { info: Option, payload: ZBuf, kind: SubscriberKind, + #[cfg(feature = "unstable")] reliability: Reliability, attachment: Option, ) { let mut callbacks = SingleOrVec::default(); @@ -1708,13 +1712,23 @@ impl Session { drop(state); let zenoh_collections::single_or_vec::IntoIter { drain, last } = callbacks.into_iter(); for (cb, key_expr) in drain { - let sample = info - .clone() - .into_sample(key_expr, payload.clone(), attachment.clone()); + let sample = info.clone().into_sample( + key_expr, + payload.clone(), + #[cfg(feature = "unstable")] + reliability, + attachment.clone(), + ); cb(sample); } if let Some((cb, key_expr)) = last { - let sample = info.into_sample(key_expr, payload, attachment.clone()); + let sample = info.into_sample( + key_expr, + payload, + #[cfg(feature = "unstable")] + reliability, + attachment.clone(), + ); cb(sample); } } @@ -2103,6 +2117,8 @@ impl<'s> SessionDeclarations<'s, 'static> for Arc { congestion_control: CongestionControl::DEFAULT, priority: Priority::DEFAULT, is_express: false, + #[cfg(feature = "unstable")] + reliability: Reliability::DEFAULT, destination: Locality::default(), } } @@ -2234,6 +2250,8 @@ impl Primitives for Session { timestamp: None, qos: QoS::default(), #[cfg(feature = "unstable")] + reliability: Reliability::Reliable, + #[cfg(feature = "unstable")] source_info: SourceInfo::empty(), #[cfg(feature = "unstable")] attachment: None, @@ -2256,6 +2274,8 @@ impl Primitives for Session { ZBuf::default(), SubscriberKind::LivelinessSubscriber, #[cfg(feature = "unstable")] + Reliability::Reliable, + #[cfg(feature = "unstable")] None, ); } @@ -2286,6 +2306,8 @@ impl Primitives for Session { ZBuf::default(), SubscriberKind::LivelinessSubscriber, #[cfg(feature = "unstable")] + Reliability::Reliable, + #[cfg(feature = "unstable")] None, ); } else if m.ext_wire_expr.wire_expr != WireExpr::empty() { @@ -2308,6 +2330,8 @@ impl Primitives for Session { ZBuf::default(), SubscriberKind::LivelinessSubscriber, #[cfg(feature = "unstable")] + Reliability::Reliable, + #[cfg(feature = "unstable")] None, ); } @@ -2351,6 +2375,8 @@ impl Primitives for Session { Some(info), m.payload, SubscriberKind::Subscriber, + #[cfg(feature = "unstable")] + _reliability, m.ext_attachment.map(Into::into), ) } @@ -2369,6 +2395,8 @@ impl Primitives for Session { Some(info), ZBuf::empty(), SubscriberKind::Subscriber, + #[cfg(feature = "unstable")] + _reliability, m.ext_attachment.map(Into::into), ) } @@ -2486,7 +2514,13 @@ impl Primitives for Session { attachment: _attachment.map(Into::into), }, }; - let sample = info.into_sample(key_expr.into_owned(), payload, attachment); + let sample = info.into_sample( + key_expr.into_owned(), + payload, + #[cfg(feature = "unstable")] + Reliability::Reliable, + attachment, + ); let new_reply = Reply { result: Ok(sample), #[cfg(feature = "unstable")] From 3ac6c3f3780ac239a5a5562e42aa6ccb4fe815c5 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 9 Aug 2024 18:46:10 +0200 Subject: [PATCH 05/12] Fix serialization_batch test --- io/zenoh-transport/src/common/batch.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/io/zenoh-transport/src/common/batch.rs b/io/zenoh-transport/src/common/batch.rs index 5537ec46fb..715d487b8c 100644 --- a/io/zenoh-transport/src/common/batch.rs +++ b/io/zenoh-transport/src/common/batch.rs @@ -570,7 +570,7 @@ mod tests { let mut batch = WBatch::new(config); let tmsg: TransportMessage = KeepAlive.into(); - let nmsg: NetworkMessage = Push { + let mut nmsg: NetworkMessage = Push { wire_expr: WireExpr::empty(), ext_qos: ext::QoSType::new(Priority::DEFAULT, CongestionControl::Block, false), ext_tstamp: None, @@ -601,6 +601,7 @@ mod tests { sn: 0, ext_qos: frame::ext::QoSType::DEFAULT, }; + nmsg.reliability = Reliability::Reliable; // Serialize with a frame batch.encode((&nmsg, &frame)).unwrap(); @@ -608,6 +609,7 @@ mod tests { nmsgs_in.push(nmsg.clone()); frame.reliability = Reliability::BestEffort; + nmsg.reliability = Reliability::BestEffort; batch.encode((&nmsg, &frame)).unwrap(); assert_ne!(batch.len(), 0); nmsgs_in.push(nmsg.clone()); From b21ce07194f03459de5245502b6b50e924f636bf Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Tue, 3 Sep 2024 15:31:14 +0200 Subject: [PATCH 06/12] Change Reliability default to Reliable --- commons/zenoh-protocol/src/core/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/commons/zenoh-protocol/src/core/mod.rs b/commons/zenoh-protocol/src/core/mod.rs index ebf1bb7f85..629357e339 100644 --- a/commons/zenoh-protocol/src/core/mod.rs +++ b/commons/zenoh-protocol/src/core/mod.rs @@ -346,12 +346,12 @@ impl TryFrom for Priority { #[repr(u8)] pub enum Reliability { #[default] - BestEffort, Reliable, + BestEffort, } impl Reliability { - pub const DEFAULT: Self = Self::BestEffort; + pub const DEFAULT: Self = Self::Reliable; #[cfg(feature = "test")] pub fn rand() -> Self { From e1cd9cbb39f1e7eb8a708a64cd3e8b9c099fdffc Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Tue, 3 Sep 2024 16:01:55 +0200 Subject: [PATCH 07/12] Fix merge --- zenoh/src/api/session.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index be727c80f4..26704d378a 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -39,7 +39,8 @@ use zenoh_protocol::network::{ use zenoh_protocol::{ core::{ key_expr::{keyexpr, OwnedKeyExpr}, - AtomicExprId, CongestionControl, EntityId, ExprId, Parameters, WireExpr, EMPTY_EXPR_ID, + AtomicExprId, CongestionControl, EntityId, ExprId, Parameters, Reliability, WireExpr, + EMPTY_EXPR_ID, }, network::{ self, @@ -101,8 +102,6 @@ use crate::net::{ routing::dispatcher::face::Face, runtime::{Runtime, RuntimeBuilder}, }; -#[cfg(feature = "unstable")] -use crate::pubsub::Reliability; zconfigurable! { pub(crate) static ref API_DATA_RECEPTION_CHANNEL_SIZE: usize = 256; From 118c441663f4d09257dcab945d3c1e45158f64ee Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 4 Sep 2024 11:24:43 +0200 Subject: [PATCH 08/12] Improve batch tests Co-authored-by: Luca Cominardi --- io/zenoh-transport/src/common/batch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/zenoh-transport/src/common/batch.rs b/io/zenoh-transport/src/common/batch.rs index 715d487b8c..cc18ad36c4 100644 --- a/io/zenoh-transport/src/common/batch.rs +++ b/io/zenoh-transport/src/common/batch.rs @@ -601,7 +601,7 @@ mod tests { sn: 0, ext_qos: frame::ext::QoSType::DEFAULT, }; - nmsg.reliability = Reliability::Reliable; + nmsg.reliability = frame.reliability; // Serialize with a frame batch.encode((&nmsg, &frame)).unwrap(); From b6bf33443966f46f543cad3a7465a9e5dd38ee35 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 4 Sep 2024 11:24:53 +0200 Subject: [PATCH 09/12] Improve batch tests Co-authored-by: Luca Cominardi --- io/zenoh-transport/src/common/batch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/zenoh-transport/src/common/batch.rs b/io/zenoh-transport/src/common/batch.rs index cc18ad36c4..578adf22d1 100644 --- a/io/zenoh-transport/src/common/batch.rs +++ b/io/zenoh-transport/src/common/batch.rs @@ -609,7 +609,7 @@ mod tests { nmsgs_in.push(nmsg.clone()); frame.reliability = Reliability::BestEffort; - nmsg.reliability = Reliability::BestEffort; + nmsg.reliability = frame.reliability; batch.encode((&nmsg, &frame)).unwrap(); assert_ne!(batch.len(), 0); nmsgs_in.push(nmsg.clone()); From 38ef6106460be3ffa80bf7eabf21a806a40116e4 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 4 Sep 2024 11:25:22 +0200 Subject: [PATCH 10/12] Improve publisher reliability doc. Co-authored-by: Luca Cominardi --- zenoh/src/api/builders/publisher.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index 0404838a04..4333f68846 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -310,6 +310,8 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { } /// Change the `reliability`` to apply when routing the data. + /// NOTE: Currently `reliability` does not trigger any data retransmission on the wire. + /// It is rather used as a marker on the wire and it may be used to select the best link available (e.g. TCP for reliable data and UDP for best effort data). #[zenoh_macros::unstable] #[inline] pub fn reliability(self, reliability: Reliability) -> Self { From 2128f14d4c1d2ef3a71ea10f6d72305a99b7f9ff Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 4 Sep 2024 11:25:35 +0200 Subject: [PATCH 11/12] Improve publisher reliability doc. Co-authored-by: Luca Cominardi --- zenoh/src/api/builders/publisher.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index 4333f68846..457041589a 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -114,6 +114,8 @@ impl PublicationBuilder, T> { self } /// Change the `reliability` to apply when routing the data. + /// NOTE: Currently `reliability` does not trigger any data retransmission on the wire. + /// It is rather used as a marker on the wire and it may be used to select the best link available (e.g. TCP for reliable data and UDP for best effort data). #[zenoh_macros::unstable] #[inline] pub fn reliability(self, reliability: Reliability) -> Self { From 43ec70ebf99258538f590bcdd70d152429f66e7f Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 4 Sep 2024 11:27:52 +0200 Subject: [PATCH 12/12] Code format --- zenoh/src/api/builders/publisher.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index 457041589a..8a6961ac55 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -114,7 +114,7 @@ impl PublicationBuilder, T> { self } /// Change the `reliability` to apply when routing the data. - /// NOTE: Currently `reliability` does not trigger any data retransmission on the wire. + /// NOTE: Currently `reliability` does not trigger any data retransmission on the wire. /// It is rather used as a marker on the wire and it may be used to select the best link available (e.g. TCP for reliable data and UDP for best effort data). #[zenoh_macros::unstable] #[inline] @@ -312,7 +312,7 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { } /// Change the `reliability`` to apply when routing the data. - /// NOTE: Currently `reliability` does not trigger any data retransmission on the wire. + /// NOTE: Currently `reliability` does not trigger any data retransmission on the wire. /// It is rather used as a marker on the wire and it may be used to select the best link available (e.g. TCP for reliable data and UDP for best effort data). #[zenoh_macros::unstable] #[inline]