From 551927698543d8ac4cc8606bb1e4d3bfa3ed6c58 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 9 Aug 2024 14:15:08 +0200 Subject: [PATCH 01/20] Add reliability to NetworkMessage --- commons/zenoh-codec/src/network/mod.rs | 4 ++- commons/zenoh-protocol/src/network/mod.rs | 7 ++-- commons/zenoh-protocol/src/transport/frame.rs | 3 +- zenoh/src/net/primitives/mux.rs | 33 +++++++++++++++++-- 4 files changed, 39 insertions(+), 8 deletions(-) diff --git a/commons/zenoh-codec/src/network/mod.rs b/commons/zenoh-codec/src/network/mod.rs index c68a3470aa..65c75f1452 100644 --- a/commons/zenoh-codec/src/network/mod.rs +++ b/commons/zenoh-codec/src/network/mod.rs @@ -76,7 +76,9 @@ where let header: u8 = self.codec.read(&mut *reader)?; let codec = Zenoh080Header::new(header); - codec.read(&mut *reader) + let mut msg: NetworkMessage = codec.read(&mut *reader)?; + msg.reliability = self.reliability; + Ok(msg) } } diff --git a/commons/zenoh-protocol/src/network/mod.rs b/commons/zenoh-protocol/src/network/mod.rs index 407df6dd52..336f952f3d 100644 --- a/commons/zenoh-protocol/src/network/mod.rs +++ b/commons/zenoh-protocol/src/network/mod.rs @@ -30,7 +30,7 @@ pub use push::Push; pub use request::{AtomicRequestId, Request, RequestId}; pub use response::{Response, ResponseFinal}; -use crate::core::{CongestionControl, Priority}; +use crate::core::{CongestionControl, Priority, Reliability}; pub mod id { // WARNING: it's crucial that these IDs do NOT collide with the IDs @@ -83,6 +83,7 @@ pub enum NetworkBody { #[derive(Debug, Clone, PartialEq, Eq)] pub struct NetworkMessage { pub body: NetworkBody, + pub reliability: Reliability, #[cfg(feature = "stats")] pub size: Option, } @@ -109,8 +110,7 @@ impl NetworkMessage { #[inline] pub fn is_reliable(&self) -> bool { - // TODO - true + self.reliability == Reliability::Reliable } #[inline] @@ -179,6 +179,7 @@ impl From for NetworkMessage { fn from(body: NetworkBody) -> Self { Self { body, + reliability: Reliability::DEFAULT, #[cfg(feature = "stats")] size: None, } diff --git a/commons/zenoh-protocol/src/transport/frame.rs b/commons/zenoh-protocol/src/transport/frame.rs index b3ef1d819f..10e55cc99e 100644 --- a/commons/zenoh-protocol/src/transport/frame.rs +++ b/commons/zenoh-protocol/src/transport/frame.rs @@ -95,7 +95,8 @@ impl Frame { let ext_qos = ext::QoSType::rand(); let mut payload = vec![]; for _ in 0..rng.gen_range(1..4) { - let m = NetworkMessage::rand(); + let mut m = NetworkMessage::rand(); + m.reliability = reliability; payload.push(m); } diff --git a/zenoh/src/net/primitives/mux.rs b/zenoh/src/net/primitives/mux.rs index bc718ba324..b017e3858a 100644 --- a/zenoh/src/net/primitives/mux.rs +++ b/zenoh/src/net/primitives/mux.rs @@ -13,9 +13,12 @@ // use std::sync::OnceLock; -use zenoh_protocol::network::{ - interest::Interest, Declare, NetworkBody, NetworkMessage, Push, Request, Response, - ResponseFinal, +use zenoh_protocol::{ + core::Reliability, + network::{ + interest::Interest, Declare, NetworkBody, NetworkMessage, Push, Request, Response, + ResponseFinal, + }, }; use zenoh_transport::{multicast::TransportMulticast, unicast::TransportUnicast}; @@ -46,6 +49,7 @@ impl Primitives for Mux { fn send_interest(&self, msg: Interest) { let msg = NetworkMessage { body: NetworkBody::Interest(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -74,6 +78,7 @@ impl Primitives for Mux { fn send_declare(&self, msg: Declare) { let msg = NetworkMessage { body: NetworkBody::Declare(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -98,6 +103,7 @@ impl Primitives for Mux { fn send_push(&self, msg: Push) { let msg = NetworkMessage { body: NetworkBody::Push(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -122,6 +128,7 @@ impl Primitives for Mux { fn send_request(&self, msg: Request) { let msg = NetworkMessage { body: NetworkBody::Request(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -146,6 +153,7 @@ impl Primitives for Mux { fn send_response(&self, msg: Response) { let msg = NetworkMessage { body: NetworkBody::Response(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -170,6 +178,7 @@ impl Primitives for Mux { fn send_response_final(&self, msg: ResponseFinal) { let msg = NetworkMessage { body: NetworkBody::ResponseFinal(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -201,6 +210,7 @@ impl EPrimitives for Mux { let ctx = RoutingContext { msg: NetworkMessage { body: NetworkBody::Interest(ctx.msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }, @@ -226,6 +236,7 @@ impl EPrimitives for Mux { let ctx = RoutingContext { msg: NetworkMessage { body: NetworkBody::Declare(ctx.msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }, @@ -250,6 +261,7 @@ impl EPrimitives for Mux { fn send_push(&self, msg: Push) { let msg = NetworkMessage { body: NetworkBody::Push(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -274,6 +286,7 @@ impl EPrimitives for Mux { fn send_request(&self, msg: Request) { let msg = NetworkMessage { body: NetworkBody::Request(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -298,6 +311,7 @@ impl EPrimitives for Mux { fn send_response(&self, msg: Response) { let msg = NetworkMessage { body: NetworkBody::Response(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -322,6 +336,7 @@ impl EPrimitives for Mux { fn send_response_final(&self, msg: ResponseFinal) { let msg = NetworkMessage { body: NetworkBody::ResponseFinal(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -368,6 +383,7 @@ impl Primitives for McastMux { fn send_interest(&self, msg: Interest) { let msg = NetworkMessage { body: NetworkBody::Interest(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -392,6 +408,7 @@ impl Primitives for McastMux { fn send_declare(&self, msg: Declare) { let msg = NetworkMessage { body: NetworkBody::Declare(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -416,6 +433,7 @@ impl Primitives for McastMux { fn send_push(&self, msg: Push) { let msg = NetworkMessage { body: NetworkBody::Push(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -440,6 +458,7 @@ impl Primitives for McastMux { fn send_request(&self, msg: Request) { let msg = NetworkMessage { body: NetworkBody::Request(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -464,6 +483,7 @@ impl Primitives for McastMux { fn send_response(&self, msg: Response) { let msg = NetworkMessage { body: NetworkBody::Response(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -488,6 +508,7 @@ impl Primitives for McastMux { fn send_response_final(&self, msg: ResponseFinal) { let msg = NetworkMessage { body: NetworkBody::ResponseFinal(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -519,6 +540,7 @@ impl EPrimitives for McastMux { let ctx = RoutingContext { msg: NetworkMessage { body: NetworkBody::Interest(ctx.msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }, @@ -544,6 +566,7 @@ impl EPrimitives for McastMux { let ctx = RoutingContext { msg: NetworkMessage { body: NetworkBody::Declare(ctx.msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }, @@ -568,6 +591,7 @@ impl EPrimitives for McastMux { fn send_push(&self, msg: Push) { let msg = NetworkMessage { body: NetworkBody::Push(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -592,6 +616,7 @@ impl EPrimitives for McastMux { fn send_request(&self, msg: Request) { let msg = NetworkMessage { body: NetworkBody::Request(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -616,6 +641,7 @@ impl EPrimitives for McastMux { fn send_response(&self, msg: Response) { let msg = NetworkMessage { body: NetworkBody::Response(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; @@ -640,6 +666,7 @@ impl EPrimitives for McastMux { fn send_response_final(&self, msg: ResponseFinal) { let msg = NetworkMessage { body: NetworkBody::ResponseFinal(msg), + reliability: Reliability::Reliable, #[cfg(feature = "stats")] size: None, }; From ef76d115cdbc0a5b5f79d70d89327eb891c18f55 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 9 Aug 2024 14:50:20 +0200 Subject: [PATCH 02/20] Updates --- commons/zenoh-codec/src/transport/batch.rs | 13 ++++++------- io/zenoh-transport/src/common/pipeline.rs | 4 ++-- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/commons/zenoh-codec/src/transport/batch.rs b/commons/zenoh-codec/src/transport/batch.rs index bfdc21f618..d4fd603864 100644 --- a/commons/zenoh-codec/src/transport/batch.rs +++ b/commons/zenoh-codec/src/transport/batch.rs @@ -150,13 +150,12 @@ where fn write(self, writer: &mut W, x: (&NetworkMessage, &FrameHeader)) -> Self::Output { let (m, f) = x; - // @TODO: m.is_reliable() always return true for the time being - // if let (Reliability::Reliable, false) | (Reliability::BestEffort, true) = - // (f.reliability, m.is_reliable()) - // { - // // We are not serializing on the right frame. - // return Err(BatchError::NewFrame); - // } + if let (Reliability::Reliable, false) | (Reliability::BestEffort, true) = + (f.reliability, m.is_reliable()) + { + // We are not serializing on the right frame. + return Err(BatchError::NewFrame); + } // Mark the write operation let mark = writer.mark(); diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 68a4b87d24..9cf8d85843 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -17,7 +17,7 @@ use zenoh_codec::{transport::batch::BatchError, WCodec, Zenoh080}; use zenoh_config::QueueSizeConf; use zenoh_core::zlock; use zenoh_protocol::{ - core::{Priority, Reliability}, + core::Priority, network::NetworkMessage, transport::{ fragment::FragmentHeader, @@ -210,7 +210,7 @@ impl StageIn { // The Frame let frame = FrameHeader { - reliability: Reliability::Reliable, // TODO + reliability: msg.reliability, sn, ext_qos: frame::ext::QoSType::new(priority), }; From ff8699bddbec1f059ecfb1602b6419d798c18cdb Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 9 Aug 2024 15:07:39 +0200 Subject: [PATCH 03/20] Change primitives --- zenoh/src/api/publisher.rs | 71 +++++---- zenoh/src/api/session.rs | 6 +- zenoh/src/net/primitives/demux.rs | 2 +- zenoh/src/net/primitives/mod.rs | 13 +- zenoh/src/net/primitives/mux.rs | 16 +- zenoh/src/net/routing/dispatcher/face.rs | 14 +- zenoh/src/net/routing/dispatcher/pubsub.rs | 97 +++++++----- zenoh/src/net/runtime/adminspace.rs | 8 +- zenoh/src/net/tests/tables.rs | 171 +++++++++++---------- 9 files changed, 211 insertions(+), 187 deletions(-) diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index f4b969b18f..d8e3ecca61 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -23,7 +23,7 @@ use std::{ use futures::Sink; use zenoh_core::{zread, Resolvable, Resolve, Wait}; use zenoh_protocol::{ - core::CongestionControl, + core::{CongestionControl, Reliability}, network::{push::ext, Push}, zenoh::{Del, PushBody, Put}, }; @@ -582,40 +582,43 @@ impl Publisher<'_> { timestamp }; if self.destination != Locality::SessionLocal { - primitives.send_push(Push { - wire_expr: self.key_expr.to_wire(&self.session).to_owned(), - ext_qos: ext::QoSType::new( - self.priority.into(), - self.congestion_control, - self.is_express, - ), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - payload: match kind { - SampleKind::Put => PushBody::Put(Put { - timestamp, - encoding: encoding.clone().into(), - #[cfg(feature = "unstable")] - ext_sinfo: source_info.into(), - #[cfg(not(feature = "unstable"))] - ext_sinfo: None, - #[cfg(feature = "shared-memory")] - ext_shm: None, - ext_attachment: attachment.clone().map(|a| a.into()), - ext_unknown: vec![], - payload: payload.clone().into(), - }), - SampleKind::Delete => PushBody::Del(Del { - timestamp, - #[cfg(feature = "unstable")] - ext_sinfo: source_info.into(), - #[cfg(not(feature = "unstable"))] - ext_sinfo: None, - ext_attachment: attachment.clone().map(|a| a.into()), - ext_unknown: vec![], - }), + primitives.send_push( + Push { + wire_expr: self.key_expr.to_wire(&self.session).to_owned(), + ext_qos: ext::QoSType::new( + self.priority.into(), + self.congestion_control, + self.is_express, + ), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + payload: match kind { + SampleKind::Put => PushBody::Put(Put { + timestamp, + encoding: encoding.clone().into(), + #[cfg(feature = "unstable")] + ext_sinfo: source_info.into(), + #[cfg(not(feature = "unstable"))] + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_attachment: attachment.clone().map(|a| a.into()), + ext_unknown: vec![], + payload: payload.clone().into(), + }), + SampleKind::Delete => PushBody::Del(Del { + timestamp, + #[cfg(feature = "unstable")] + ext_sinfo: source_info.into(), + #[cfg(not(feature = "unstable"))] + ext_sinfo: None, + ext_attachment: attachment.clone().map(|a| a.into()), + ext_unknown: vec![], + }), + }, }, - }); + Reliability::Reliable, // TODO + ); } if self.destination != Locality::Remote { let data_info = DataInfo { diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index ed1c75d3f2..c1ba753f68 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -2333,7 +2333,7 @@ impl Primitives for Session { } } - fn send_push(&self, msg: Push) { + fn send_push(&self, msg: Push, _reliability: Reliability) { trace!("recv Push {:?}", msg); match msg.payload { PushBody::Put(m) => { @@ -2791,8 +2791,8 @@ impl crate::net::primitives::EPrimitives for Session { } #[inline] - fn send_push(&self, msg: Push) { - (self as &dyn Primitives).send_push(msg) + fn send_push(&self, msg: Push, reliability: Reliability) { + (self as &dyn Primitives).send_push(msg, reliability) } #[inline] diff --git a/zenoh/src/net/primitives/demux.rs b/zenoh/src/net/primitives/demux.rs index 59111e5441..e4774aab4a 100644 --- a/zenoh/src/net/primitives/demux.rs +++ b/zenoh/src/net/primitives/demux.rs @@ -66,7 +66,7 @@ impl TransportPeerEventHandler for DeMux { } match msg.body { - NetworkBody::Push(m) => self.face.send_push(m), + NetworkBody::Push(m) => self.face.send_push(m, msg.reliability), NetworkBody::Declare(m) => self.face.send_declare(m), NetworkBody::Interest(m) => self.face.send_interest(m), NetworkBody::Request(m) => self.face.send_request(m), diff --git a/zenoh/src/net/primitives/mod.rs b/zenoh/src/net/primitives/mod.rs index 837571f7f6..21526466f5 100644 --- a/zenoh/src/net/primitives/mod.rs +++ b/zenoh/src/net/primitives/mod.rs @@ -18,8 +18,9 @@ use std::any::Any; pub use demux::*; pub use mux::*; -use zenoh_protocol::network::{ - interest::Interest, Declare, Push, Request, Response, ResponseFinal, +use zenoh_protocol::{ + core::Reliability, + network::{interest::Interest, Declare, Push, Request, Response, ResponseFinal}, }; use super::routing::RoutingContext; @@ -29,7 +30,7 @@ pub trait Primitives: Send + Sync { fn send_declare(&self, msg: Declare); - fn send_push(&self, msg: Push); + fn send_push(&self, msg: Push, reliability: Reliability); fn send_request(&self, msg: Request); @@ -47,7 +48,7 @@ pub(crate) trait EPrimitives: Send + Sync { fn send_declare(&self, ctx: RoutingContext); - fn send_push(&self, msg: Push); + fn send_push(&self, msg: Push, reliability: Reliability); fn send_request(&self, msg: Request); @@ -64,7 +65,7 @@ impl Primitives for DummyPrimitives { fn send_declare(&self, _msg: Declare) {} - fn send_push(&self, _msg: Push) {} + fn send_push(&self, _msg: Push, _reliability: Reliability) {} fn send_request(&self, _msg: Request) {} @@ -80,7 +81,7 @@ impl EPrimitives for DummyPrimitives { fn send_declare(&self, _ctx: RoutingContext) {} - fn send_push(&self, _msg: Push) {} + fn send_push(&self, _msg: Push, _reliability: Reliability) {} fn send_request(&self, _msg: Request) {} diff --git a/zenoh/src/net/primitives/mux.rs b/zenoh/src/net/primitives/mux.rs index b017e3858a..47067f231e 100644 --- a/zenoh/src/net/primitives/mux.rs +++ b/zenoh/src/net/primitives/mux.rs @@ -100,10 +100,10 @@ impl Primitives for Mux { } } - fn send_push(&self, msg: Push) { + fn send_push(&self, msg: Push, reliability: Reliability) { let msg = NetworkMessage { body: NetworkBody::Push(msg), - reliability: Reliability::Reliable, + reliability, #[cfg(feature = "stats")] size: None, }; @@ -258,10 +258,10 @@ impl EPrimitives for Mux { } } - fn send_push(&self, msg: Push) { + fn send_push(&self, msg: Push, reliability: Reliability) { let msg = NetworkMessage { body: NetworkBody::Push(msg), - reliability: Reliability::Reliable, + reliability, #[cfg(feature = "stats")] size: None, }; @@ -430,10 +430,10 @@ impl Primitives for McastMux { } } - fn send_push(&self, msg: Push) { + fn send_push(&self, msg: Push, reliability: Reliability) { let msg = NetworkMessage { body: NetworkBody::Push(msg), - reliability: Reliability::Reliable, + reliability, #[cfg(feature = "stats")] size: None, }; @@ -588,10 +588,10 @@ impl EPrimitives for McastMux { } } - fn send_push(&self, msg: Push) { + fn send_push(&self, msg: Push, reliability: Reliability) { let msg = NetworkMessage { body: NetworkBody::Push(msg), - reliability: Reliability::Reliable, + reliability, #[cfg(feature = "stats")] size: None, }; diff --git a/zenoh/src/net/routing/dispatcher/face.rs b/zenoh/src/net/routing/dispatcher/face.rs index bbc910b124..0bc14450a7 100644 --- a/zenoh/src/net/routing/dispatcher/face.rs +++ b/zenoh/src/net/routing/dispatcher/face.rs @@ -20,7 +20,7 @@ use std::{ use tokio_util::sync::CancellationToken; use zenoh_protocol::{ - core::{ExprId, WhatAmI, ZenohIdProto}, + core::{ExprId, Reliability, WhatAmI, ZenohIdProto}, network::{ interest::{InterestId, InterestMode, InterestOptions}, Mapping, Push, Request, RequestId, Response, ResponseFinal, @@ -379,16 +379,8 @@ impl Primitives for Face { } #[inline] - fn send_push(&self, msg: Push) { - full_reentrant_route_data( - &self.tables, - &self.state, - &msg.wire_expr, - msg.ext_qos, - msg.ext_tstamp, - msg.payload, - msg.ext_nodeid.node_id, - ); + fn send_push(&self, msg: Push, reliability: Reliability) { + route_data(&self.tables, &self.state, msg, reliability); } fn send_request(&self, msg: Request) { diff --git a/zenoh/src/net/routing/dispatcher/pubsub.rs b/zenoh/src/net/routing/dispatcher/pubsub.rs index 84c8433a48..7334f4f267 100644 --- a/zenoh/src/net/routing/dispatcher/pubsub.rs +++ b/zenoh/src/net/routing/dispatcher/pubsub.rs @@ -15,7 +15,7 @@ use std::{collections::HashMap, sync::Arc}; use zenoh_core::zread; use zenoh_protocol::{ - core::{key_expr::keyexpr, WhatAmI, WireExpr}, + core::{key_expr::keyexpr, Reliability, WhatAmI, WireExpr}, network::{ declare::{ext, subscriber::ext::SubscriberInfo, SubscriberId}, Push, @@ -385,42 +385,42 @@ macro_rules! inc_stats { }; } -pub fn full_reentrant_route_data( +pub fn route_data( tables_ref: &Arc, face: &FaceState, - expr: &WireExpr, - ext_qos: ext::QoSType, - ext_tstamp: Option, - mut payload: PushBody, - routing_context: NodeId, + mut msg: Push, + reliability: Reliability, ) { let tables = zread!(tables_ref.tables); - match tables.get_mapping(face, &expr.scope, expr.mapping).cloned() { + match tables + .get_mapping(face, &msg.wire_expr.scope, msg.wire_expr.mapping) + .cloned() + { Some(prefix) => { tracing::trace!( "{} Route data for res {}{}", face, prefix.expr(), - expr.suffix.as_ref() + msg.wire_expr.suffix.as_ref() ); - let mut expr = RoutingExpr::new(&prefix, expr.suffix.as_ref()); + let mut expr = RoutingExpr::new(&prefix, msg.wire_expr.suffix.as_ref()); #[cfg(feature = "stats")] let admin = expr.full_expr().starts_with("@/"); #[cfg(feature = "stats")] if !admin { - inc_stats!(face, rx, user, payload) + inc_stats!(face, rx, user, msg.payload) } else { - inc_stats!(face, rx, admin, payload) + inc_stats!(face, rx, admin, msg.payload) } if tables.hat_code.ingress_filter(&tables, face, &mut expr) { let res = Resource::get_resource(&prefix, expr.suffix); - let route = get_data_route(&tables, face, &res, &mut expr, routing_context); + let route = get_data_route(&tables, face, &res, &mut expr, msg.ext_nodeid.node_id); if !route.is_empty() { - treat_timestamp!(&tables.hlc, payload, tables.drop_future_timestamp); + treat_timestamp!(&tables.hlc, msg.payload, tables.drop_future_timestamp); if route.len() == 1 { let (outface, key_expr, context) = route.values().next().unwrap(); @@ -431,18 +431,21 @@ pub fn full_reentrant_route_data( drop(tables); #[cfg(feature = "stats")] if !admin { - inc_stats!(face, tx, user, payload) + inc_stats!(face, tx, user, msg.payload) } else { - inc_stats!(face, tx, admin, payload) + inc_stats!(face, tx, admin, msg.payload) } - outface.primitives.send_push(Push { - wire_expr: key_expr.into(), - ext_qos, - ext_tstamp, - ext_nodeid: ext::NodeIdType { node_id: *context }, - payload, - }) + outface.primitives.send_push( + Push { + wire_expr: key_expr.into(), + ext_qos: msg.ext_qos, + ext_tstamp: msg.ext_tstamp, + ext_nodeid: ext::NodeIdType { node_id: *context }, + payload: msg.payload, + }, + reliability, + ) } } else if tables.whatami == WhatAmI::Router { let route = route @@ -459,18 +462,21 @@ pub fn full_reentrant_route_data( for (outface, key_expr, context) in route { #[cfg(feature = "stats")] if !admin { - inc_stats!(face, tx, user, payload) + inc_stats!(face, tx, user, msg.payload) } else { - inc_stats!(face, tx, admin, payload) + inc_stats!(face, tx, admin, msg.payload) } - outface.primitives.send_push(Push { - wire_expr: key_expr, - ext_qos, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType { node_id: context }, - payload: payload.clone(), - }) + outface.primitives.send_push( + Push { + wire_expr: key_expr, + ext_qos: msg.ext_qos, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType { node_id: context }, + payload: msg.payload.clone(), + }, + reliability, + ) } } else { drop(tables); @@ -483,18 +489,21 @@ pub fn full_reentrant_route_data( { #[cfg(feature = "stats")] if !admin { - inc_stats!(face, tx, user, payload) + inc_stats!(face, tx, user, msg.payload) } else { - inc_stats!(face, tx, admin, payload) + inc_stats!(face, tx, admin, msg.payload) } - outface.primitives.send_push(Push { - wire_expr: key_expr.into(), - ext_qos, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType { node_id: *context }, - payload: payload.clone(), - }) + outface.primitives.send_push( + Push { + wire_expr: key_expr.into(), + ext_qos: msg.ext_qos, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType { node_id: *context }, + payload: msg.payload.clone(), + }, + reliability, + ) } } } @@ -502,7 +511,11 @@ pub fn full_reentrant_route_data( } } None => { - tracing::error!("{} Route data with unknown scope {}!", face, expr.scope); + tracing::error!( + "{} Route data with unknown scope {}!", + face, + msg.wire_expr.scope + ); } } } diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index ce87d68ef0..d3e2a3c1ad 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -26,7 +26,7 @@ use zenoh_plugin_trait::{PluginControl, PluginStatus}; #[cfg(feature = "plugins")] use zenoh_protocol::core::key_expr::keyexpr; use zenoh_protocol::{ - core::{key_expr::OwnedKeyExpr, ExprId, WireExpr, EMPTY_EXPR_ID}, + core::{key_expr::OwnedKeyExpr, ExprId, Reliability, WireExpr, EMPTY_EXPR_ID}, network::{ declare::{ queryable::ext::QueryableInfoType, subscriber::ext::SubscriberInfo, QueryableId, @@ -375,7 +375,7 @@ impl Primitives for AdminSpace { } } - fn send_push(&self, msg: Push) { + fn send_push(&self, msg: Push, _reliability: Reliability) { trace!("recv Push {:?}", msg); { let conf = self.context.runtime.state.config.lock(); @@ -516,8 +516,8 @@ impl crate::net::primitives::EPrimitives for AdminSpace { } #[inline] - fn send_push(&self, msg: Push) { - (self as &dyn Primitives).send_push(msg) + fn send_push(&self, msg: Push, reliability: Reliability) { + (self as &dyn Primitives).send_push(msg, reliability) } #[inline] diff --git a/zenoh/src/net/tests/tables.rs b/zenoh/src/net/tests/tables.rs index 5fd8a49261..bab638af83 100644 --- a/zenoh/src/net/tests/tables.rs +++ b/zenoh/src/net/tests/tables.rs @@ -26,7 +26,7 @@ use zenoh_protocol::{ EMPTY_EXPR_ID, }, network::{ - declare::subscriber::ext::SubscriberInfo, ext, Declare, DeclareBody, DeclareKeyExpr, + declare::subscriber::ext::SubscriberInfo, ext, Declare, DeclareBody, DeclareKeyExpr, Push, }, zenoh::{PushBody, Put}, }; @@ -534,7 +534,7 @@ impl Primitives for ClientPrimitives { } } - fn send_push(&self, msg: zenoh_protocol::network::Push) { + fn send_push(&self, msg: zenoh_protocol::network::Push, _reliability: Reliability) { *zlock!(self.data) = Some(msg.wire_expr.to_owned()); } @@ -563,7 +563,7 @@ impl EPrimitives for ClientPrimitives { } } - fn send_push(&self, msg: zenoh_protocol::network::Push) { + fn send_push(&self, msg: zenoh_protocol::network::Push, _reliability: Reliability) { *zlock!(self.data) = Some(msg.wire_expr.to_owned()); } @@ -736,23 +736,26 @@ fn client_test() { primitives1.clear_data(); primitives2.clear_data(); - full_reentrant_route_data( + route_data( &tables, &face0.upgrade().unwrap(), - &"test/client/z1_wr1".into(), - ext::QoSType::DEFAULT, - None, - PushBody::Put(Put { - timestamp: None, - encoding: Encoding::empty(), - ext_sinfo: None, - #[cfg(feature = "shared-memory")] - ext_shm: None, - ext_unknown: vec![], - payload: ZBuf::empty(), - ext_attachment: None, - }), - 0, + Push { + wire_expr: "test/client/z1_wr1".into(), + ext_qos: ext::QoSType::DEFAULT, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType { node_id: 0 }, + payload: PushBody::Put(Put { + timestamp: None, + encoding: Encoding::empty(), + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_unknown: vec![], + payload: ZBuf::empty(), + ext_attachment: None, + }), + }, + Reliability::Reliable, ); // functional check @@ -770,23 +773,26 @@ fn client_test() { primitives0.clear_data(); primitives1.clear_data(); primitives2.clear_data(); - full_reentrant_route_data( + route_data( &router.tables, &face0.upgrade().unwrap(), - &WireExpr::from(11).with_suffix("/z1_wr2"), - ext::QoSType::DEFAULT, - None, - PushBody::Put(Put { - timestamp: None, - encoding: Encoding::empty(), - ext_sinfo: None, - #[cfg(feature = "shared-memory")] - ext_shm: None, - ext_unknown: vec![], - payload: ZBuf::empty(), - ext_attachment: None, - }), - 0, + Push { + wire_expr: WireExpr::from(11).with_suffix("/z1_wr2"), + ext_qos: ext::QoSType::DEFAULT, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType { node_id: 0 }, + payload: PushBody::Put(Put { + timestamp: None, + encoding: Encoding::empty(), + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_unknown: vec![], + payload: ZBuf::empty(), + ext_attachment: None, + }), + }, + Reliability::Reliable, ); // functional check @@ -804,23 +810,26 @@ fn client_test() { primitives0.clear_data(); primitives1.clear_data(); primitives2.clear_data(); - full_reentrant_route_data( + route_data( &router.tables, &face1.upgrade().unwrap(), - &"test/client/**".into(), - ext::QoSType::DEFAULT, - None, - PushBody::Put(Put { - timestamp: None, - encoding: Encoding::empty(), - ext_sinfo: None, - #[cfg(feature = "shared-memory")] - ext_shm: None, - ext_unknown: vec![], - payload: ZBuf::empty(), - ext_attachment: None, - }), - 0, + Push { + wire_expr: "test/client/**".into(), + ext_qos: ext::QoSType::DEFAULT, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType { node_id: 0 }, + payload: PushBody::Put(Put { + timestamp: None, + encoding: Encoding::empty(), + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_unknown: vec![], + payload: ZBuf::empty(), + ext_attachment: None, + }), + }, + Reliability::Reliable, ); // functional check @@ -838,23 +847,26 @@ fn client_test() { primitives0.clear_data(); primitives1.clear_data(); primitives2.clear_data(); - full_reentrant_route_data( + route_data( &router.tables, &face0.upgrade().unwrap(), - &12.into(), - ext::QoSType::DEFAULT, - None, - PushBody::Put(Put { - timestamp: None, - encoding: Encoding::empty(), - ext_sinfo: None, - #[cfg(feature = "shared-memory")] - ext_shm: None, - ext_unknown: vec![], - payload: ZBuf::empty(), - ext_attachment: None, - }), - 0, + Push { + wire_expr: 12.into(), + ext_qos: ext::QoSType::DEFAULT, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType { node_id: 0 }, + payload: PushBody::Put(Put { + timestamp: None, + encoding: Encoding::empty(), + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_unknown: vec![], + payload: ZBuf::empty(), + ext_attachment: None, + }), + }, + Reliability::Reliable, ); // functional check @@ -872,23 +884,26 @@ fn client_test() { primitives0.clear_data(); primitives1.clear_data(); primitives2.clear_data(); - full_reentrant_route_data( + route_data( &router.tables, &face1.upgrade().unwrap(), - &22.into(), - ext::QoSType::DEFAULT, - None, - PushBody::Put(Put { - timestamp: None, - encoding: Encoding::empty(), - ext_sinfo: None, - #[cfg(feature = "shared-memory")] - ext_shm: None, - ext_unknown: vec![], - payload: ZBuf::empty(), - ext_attachment: None, - }), - 0, + Push { + wire_expr: 22.into(), + ext_qos: ext::QoSType::DEFAULT, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType { node_id: 0 }, + payload: PushBody::Put(Put { + timestamp: None, + encoding: Encoding::empty(), + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_unknown: vec![], + payload: ZBuf::empty(), + ext_attachment: None, + }), + }, + Reliability::Reliable, ); // functional check From ce6cdec678ebbeccac196546cdad0a123c85822a Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 9 Aug 2024 18:33:17 +0200 Subject: [PATCH 04/20] Api publisher reliability --- zenoh/src/api/admin.rs | 10 +++++++ zenoh/src/api/builders/publisher.rs | 29 +++++++++++++++++++ zenoh/src/api/builders/sample.rs | 17 +++++++++++ zenoh/src/api/publisher.rs | 10 ++++++- zenoh/src/api/sample.rs | 29 ++++++++++++++++++- zenoh/src/api/session.rs | 44 +++++++++++++++++++++++++---- 6 files changed, 132 insertions(+), 7 deletions(-) diff --git a/zenoh/src/api/admin.rs b/zenoh/src/api/admin.rs index e794c87db5..8c20a275c1 100644 --- a/zenoh/src/api/admin.rs +++ b/zenoh/src/api/admin.rs @@ -19,6 +19,8 @@ use std::{ use zenoh_core::{Result as ZResult, Wait}; use zenoh_keyexpr::keyexpr; +#[cfg(feature = "unstable")] +use zenoh_protocol::core::Reliability; use zenoh_protocol::{core::WireExpr, network::NetworkMessage}; use zenoh_transport::{ TransportEventHandler, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, @@ -171,6 +173,8 @@ impl TransportMulticastEventHandler for Handler { Some(info), serde_json::to_vec(&peer).unwrap().into(), SubscriberKind::Subscriber, + #[cfg(feature = "unstable")] + Reliability::Reliable, None, ); Ok(Arc::new(PeerHandler { @@ -220,6 +224,8 @@ impl TransportPeerEventHandler for PeerHandler { Some(info), serde_json::to_vec(&link).unwrap().into(), SubscriberKind::Subscriber, + #[cfg(feature = "unstable")] + Reliability::Reliable, None, ); } @@ -240,6 +246,8 @@ impl TransportPeerEventHandler for PeerHandler { Some(info), vec![0u8; 0].into(), SubscriberKind::Subscriber, + #[cfg(feature = "unstable")] + Reliability::Reliable, None, ); } @@ -257,6 +265,8 @@ impl TransportPeerEventHandler for PeerHandler { Some(info), vec![0u8; 0].into(), SubscriberKind::Subscriber, + #[cfg(feature = "unstable")] + Reliability::Reliable, None, ); } diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index 666b4378e0..0404838a04 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -14,6 +14,8 @@ use std::future::{IntoFuture, Ready}; use zenoh_core::{Resolvable, Result as ZResult, Wait}; +#[cfg(feature = "unstable")] +use zenoh_protocol::core::Reliability; use zenoh_protocol::{core::CongestionControl, network::Mapping}; #[cfg(feature = "unstable")] @@ -111,6 +113,15 @@ impl PublicationBuilder, T> { self.publisher = self.publisher.allowed_destination(destination); self } + /// Change the `reliability` to apply when routing the data. + #[zenoh_macros::unstable] + #[inline] + pub fn reliability(self, reliability: Reliability) -> Self { + Self { + publisher: self.publisher.reliability(reliability), + ..self + } + } } impl EncodingBuilderTrait for PublisherBuilder<'_, '_> { @@ -239,6 +250,8 @@ pub struct PublisherBuilder<'a, 'b: 'a> { pub(crate) congestion_control: CongestionControl, pub(crate) priority: Priority, pub(crate) is_express: bool, + #[cfg(feature = "unstable")] + pub(crate) reliability: Reliability, pub(crate) destination: Locality, } @@ -254,6 +267,8 @@ impl<'a, 'b> Clone for PublisherBuilder<'a, 'b> { congestion_control: self.congestion_control, priority: self.priority, is_express: self.is_express, + #[cfg(feature = "unstable")] + reliability: self.reliability, destination: self.destination, } } @@ -294,6 +309,16 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { self } + /// Change the `reliability`` to apply when routing the data. + #[zenoh_macros::unstable] + #[inline] + pub fn reliability(self, reliability: Reliability) -> Self { + Self { + reliability, + ..self + } + } + // internal function for performing the publication fn create_one_shot_publisher(self) -> ZResult> { Ok(Publisher { @@ -306,6 +331,8 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { is_express: self.is_express, destination: self.destination, #[cfg(feature = "unstable")] + reliability: self.reliability, + #[cfg(feature = "unstable")] matching_listeners: Default::default(), undeclare_on_drop: true, }) @@ -361,6 +388,8 @@ impl<'a, 'b> Wait for PublisherBuilder<'a, 'b> { is_express: self.is_express, destination: self.destination, #[cfg(feature = "unstable")] + reliability: self.reliability, + #[cfg(feature = "unstable")] matching_listeners: Default::default(), undeclare_on_drop: true, }) diff --git a/zenoh/src/api/builders/sample.rs b/zenoh/src/api/builders/sample.rs index 53cf099448..4c1fa81406 100644 --- a/zenoh/src/api/builders/sample.rs +++ b/zenoh/src/api/builders/sample.rs @@ -16,6 +16,8 @@ use std::marker::PhantomData; use uhlc::Timestamp; use zenoh_core::zresult; use zenoh_protocol::core::CongestionControl; +#[cfg(feature = "unstable")] +use zenoh_protocol::core::Reliability; use crate::api::{ bytes::{OptionZBytes, ZBytes}, @@ -87,6 +89,8 @@ impl SampleBuilder { timestamp: None, qos: QoS::default(), #[cfg(feature = "unstable")] + reliability: Reliability::DEFAULT, + #[cfg(feature = "unstable")] source_info: SourceInfo::empty(), attachment: None, }, @@ -117,6 +121,8 @@ impl SampleBuilder { timestamp: None, qos: QoS::default(), #[cfg(feature = "unstable")] + reliability: Reliability::DEFAULT, + #[cfg(feature = "unstable")] source_info: SourceInfo::empty(), attachment: None, }, @@ -147,6 +153,17 @@ impl SampleBuilder { _t: PhantomData::, } } + + #[zenoh_macros::unstable] + pub fn reliability(self, reliability: Reliability) -> Self { + Self { + sample: Sample { + reliability, + ..self.sample + }, + _t: PhantomData::, + } + } } impl TimestampBuilderTrait for SampleBuilder { diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index d8e3ecca61..4cab752f79 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -145,6 +145,8 @@ pub struct Publisher<'a> { pub(crate) is_express: bool, pub(crate) destination: Locality, #[cfg(feature = "unstable")] + pub(crate) reliability: Reliability, + #[cfg(feature = "unstable")] pub(crate) matching_listeners: Arc>>, pub(crate) undeclare_on_drop: bool, } @@ -561,6 +563,7 @@ impl<'a> Sink for Publisher<'a> { } impl Publisher<'_> { + #[allow(clippy::too_many_arguments)] // TODO fixme pub(crate) fn resolve_put( &self, payload: ZBytes, @@ -617,7 +620,10 @@ impl Publisher<'_> { }), }, }, - Reliability::Reliable, // TODO + #[cfg(feature = "unstable")] + self.reliability, + #[cfg(not(feature = "unstable"))] + Reliability::DEFAULT, ); } if self.destination != Locality::Remote { @@ -640,6 +646,8 @@ impl Publisher<'_> { Some(data_info), payload.into(), SubscriberKind::Subscriber, + #[cfg(feature = "unstable")] + self.reliability, attachment, ); } diff --git a/zenoh/src/api/sample.rs b/zenoh/src/api/sample.rs index 220785c668..253e98d4b5 100644 --- a/zenoh/src/api/sample.rs +++ b/zenoh/src/api/sample.rs @@ -18,6 +18,8 @@ use std::{convert::TryFrom, fmt}; #[cfg(feature = "unstable")] use serde::Serialize; use zenoh_config::wrappers::EntityGlobalId; +#[cfg(feature = "unstable")] +use zenoh_protocol::core::Reliability; use zenoh_protocol::{ core::{CongestionControl, Timestamp}, network::declare::ext::QoSType, @@ -63,6 +65,7 @@ pub(crate) trait DataInfoIntoSample { self, key_expr: IntoKeyExpr, payload: IntoZBytes, + #[cfg(feature = "unstable")] reliability: Reliability, attachment: Option, ) -> Sample where @@ -80,6 +83,7 @@ impl DataInfoIntoSample for DataInfo { self, key_expr: IntoKeyExpr, payload: IntoZBytes, + #[cfg(feature = "unstable")] reliability: Reliability, attachment: Option, ) -> Sample where @@ -94,6 +98,8 @@ impl DataInfoIntoSample for DataInfo { timestamp: self.timestamp, qos: self.qos, #[cfg(feature = "unstable")] + reliability, + #[cfg(feature = "unstable")] source_info: SourceInfo { source_id: self.source_id, source_sn: self.source_sn, @@ -109,6 +115,7 @@ impl DataInfoIntoSample for Option { self, key_expr: IntoKeyExpr, payload: IntoZBytes, + #[cfg(feature = "unstable")] reliability: Reliability, attachment: Option, ) -> Sample where @@ -116,7 +123,13 @@ impl DataInfoIntoSample for Option { IntoZBytes: Into, { if let Some(data_info) = self { - data_info.into_sample(key_expr, payload, attachment) + data_info.into_sample( + key_expr, + payload, + #[cfg(feature = "unstable")] + reliability, + attachment, + ) } else { Sample { key_expr: key_expr.into(), @@ -126,6 +139,8 @@ impl DataInfoIntoSample for Option { timestamp: None, qos: QoS::default(), #[cfg(feature = "unstable")] + reliability, + #[cfg(feature = "unstable")] source_info: SourceInfo::empty(), attachment, } @@ -252,6 +267,8 @@ pub struct SampleFields { pub priority: Priority, pub congestion_control: CongestionControl, #[cfg(feature = "unstable")] + pub reliability: Reliability, + #[cfg(feature = "unstable")] pub source_info: SourceInfo, pub attachment: Option, } @@ -268,6 +285,8 @@ impl From for SampleFields { priority: sample.qos.priority(), congestion_control: sample.qos.congestion_control(), #[cfg(feature = "unstable")] + reliability: sample.reliability, + #[cfg(feature = "unstable")] source_info: sample.source_info, attachment: sample.attachment, } @@ -285,6 +304,8 @@ pub struct Sample { pub(crate) timestamp: Option, pub(crate) qos: QoS, #[cfg(feature = "unstable")] + pub(crate) reliability: Reliability, + #[cfg(feature = "unstable")] pub(crate) source_info: SourceInfo, pub(crate) attachment: Option, } @@ -336,6 +357,12 @@ impl Sample { self.qos.priority() } + /// Gets the reliability of this Sample + #[zenoh_macros::unstable] + pub fn reliability(&self) -> Reliability { + self.reliability + } + /// Gets the express flag value. If `true`, the message is not batched during transmission, in order to reduce latency. pub fn express(&self) -> bool { self.qos.express() diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index c1ba753f68..b0c7543be8 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -413,6 +413,8 @@ impl<'s, 'a> SessionDeclarations<'s, 'a> for SessionRef<'a> { congestion_control: CongestionControl::DEFAULT, priority: Priority::DEFAULT, is_express: false, + #[cfg(feature = "unstable")] + reliability: Reliability::DEFAULT, destination: Locality::default(), } } @@ -1653,6 +1655,7 @@ impl Session { } } + #[allow(clippy::too_many_arguments)] // TODO fixme pub(crate) fn execute_subscriber_callbacks( &self, local: bool, @@ -1660,6 +1663,7 @@ impl Session { info: Option, payload: ZBuf, kind: SubscriberKind, + #[cfg(feature = "unstable")] reliability: Reliability, attachment: Option, ) { let mut callbacks = SingleOrVec::default(); @@ -1708,13 +1712,23 @@ impl Session { drop(state); let zenoh_collections::single_or_vec::IntoIter { drain, last } = callbacks.into_iter(); for (cb, key_expr) in drain { - let sample = info - .clone() - .into_sample(key_expr, payload.clone(), attachment.clone()); + let sample = info.clone().into_sample( + key_expr, + payload.clone(), + #[cfg(feature = "unstable")] + reliability, + attachment.clone(), + ); cb(sample); } if let Some((cb, key_expr)) = last { - let sample = info.into_sample(key_expr, payload, attachment.clone()); + let sample = info.into_sample( + key_expr, + payload, + #[cfg(feature = "unstable")] + reliability, + attachment.clone(), + ); cb(sample); } } @@ -2103,6 +2117,8 @@ impl<'s> SessionDeclarations<'s, 'static> for Arc { congestion_control: CongestionControl::DEFAULT, priority: Priority::DEFAULT, is_express: false, + #[cfg(feature = "unstable")] + reliability: Reliability::DEFAULT, destination: Locality::default(), } } @@ -2234,6 +2250,8 @@ impl Primitives for Session { timestamp: None, qos: QoS::default(), #[cfg(feature = "unstable")] + reliability: Reliability::Reliable, + #[cfg(feature = "unstable")] source_info: SourceInfo::empty(), #[cfg(feature = "unstable")] attachment: None, @@ -2256,6 +2274,8 @@ impl Primitives for Session { ZBuf::default(), SubscriberKind::LivelinessSubscriber, #[cfg(feature = "unstable")] + Reliability::Reliable, + #[cfg(feature = "unstable")] None, ); } @@ -2286,6 +2306,8 @@ impl Primitives for Session { ZBuf::default(), SubscriberKind::LivelinessSubscriber, #[cfg(feature = "unstable")] + Reliability::Reliable, + #[cfg(feature = "unstable")] None, ); } else if m.ext_wire_expr.wire_expr != WireExpr::empty() { @@ -2308,6 +2330,8 @@ impl Primitives for Session { ZBuf::default(), SubscriberKind::LivelinessSubscriber, #[cfg(feature = "unstable")] + Reliability::Reliable, + #[cfg(feature = "unstable")] None, ); } @@ -2351,6 +2375,8 @@ impl Primitives for Session { Some(info), m.payload, SubscriberKind::Subscriber, + #[cfg(feature = "unstable")] + _reliability, m.ext_attachment.map(Into::into), ) } @@ -2369,6 +2395,8 @@ impl Primitives for Session { Some(info), ZBuf::empty(), SubscriberKind::Subscriber, + #[cfg(feature = "unstable")] + _reliability, m.ext_attachment.map(Into::into), ) } @@ -2486,7 +2514,13 @@ impl Primitives for Session { attachment: _attachment.map(Into::into), }, }; - let sample = info.into_sample(key_expr.into_owned(), payload, attachment); + let sample = info.into_sample( + key_expr.into_owned(), + payload, + #[cfg(feature = "unstable")] + Reliability::Reliable, + attachment, + ); let new_reply = Reply { result: Ok(sample), #[cfg(feature = "unstable")] From 3ac6c3f3780ac239a5a5562e42aa6ccb4fe815c5 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 9 Aug 2024 18:46:10 +0200 Subject: [PATCH 05/20] Fix serialization_batch test --- io/zenoh-transport/src/common/batch.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/io/zenoh-transport/src/common/batch.rs b/io/zenoh-transport/src/common/batch.rs index 5537ec46fb..715d487b8c 100644 --- a/io/zenoh-transport/src/common/batch.rs +++ b/io/zenoh-transport/src/common/batch.rs @@ -570,7 +570,7 @@ mod tests { let mut batch = WBatch::new(config); let tmsg: TransportMessage = KeepAlive.into(); - let nmsg: NetworkMessage = Push { + let mut nmsg: NetworkMessage = Push { wire_expr: WireExpr::empty(), ext_qos: ext::QoSType::new(Priority::DEFAULT, CongestionControl::Block, false), ext_tstamp: None, @@ -601,6 +601,7 @@ mod tests { sn: 0, ext_qos: frame::ext::QoSType::DEFAULT, }; + nmsg.reliability = Reliability::Reliable; // Serialize with a frame batch.encode((&nmsg, &frame)).unwrap(); @@ -608,6 +609,7 @@ mod tests { nmsgs_in.push(nmsg.clone()); frame.reliability = Reliability::BestEffort; + nmsg.reliability = Reliability::BestEffort; batch.encode((&nmsg, &frame)).unwrap(); assert_ne!(batch.len(), 0); nmsgs_in.push(nmsg.clone()); From 168a80448991f1234f03267c0c150444e8fb772e Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Thu, 29 Aug 2024 18:24:15 +0200 Subject: [PATCH 06/20] Change defrag errors to log instead of closing transport --- .../src/common/defragmentation.rs | 6 +++- io/zenoh-transport/src/multicast/rx.rs | 16 +++++----- .../src/unicast/universal/rx.rs | 30 +++++++++---------- 3 files changed, 29 insertions(+), 23 deletions(-) diff --git a/io/zenoh-transport/src/common/defragmentation.rs b/io/zenoh-transport/src/common/defragmentation.rs index 476fad632c..481f4b00bd 100644 --- a/io/zenoh-transport/src/common/defragmentation.rs +++ b/io/zenoh-transport/src/common/defragmentation.rs @@ -66,7 +66,11 @@ impl DefragBuffer { pub(crate) fn push(&mut self, sn: TransportSn, zslice: ZSlice) -> ZResult<()> { if sn != self.sn.get() { self.clear(); - bail!("Expected SN {}, received {}", self.sn.get(), sn) + bail!( + "Defragmentation SN error: expected SN {}, received {}", + self.sn.get(), + sn + ) } let new_len = self.len + zslice.len(); diff --git a/io/zenoh-transport/src/multicast/rx.rs b/io/zenoh-transport/src/multicast/rx.rs index 93dc3c727a..0f0288ee21 100644 --- a/io/zenoh-transport/src/multicast/rx.rs +++ b/io/zenoh-transport/src/multicast/rx.rs @@ -207,18 +207,20 @@ impl TransportMulticastInner { if guard.defrag.is_empty() { let _ = guard.defrag.sync(sn); } - guard.defrag.push(sn, payload)?; - if !more { + if let Err(e) = guard.defrag.push(sn, payload) { + tracing::trace!("{}", e); + } else if !more { // When shared-memory feature is disabled, msg does not need to be mutable - let msg = guard.defrag.defragment().ok_or_else(|| { - zerror!( + if let Some(msg) = guard.defrag.defragment() { + return self.trigger_callback(msg, peer); + } else { + tracing::trace!( "Transport: {}. Peer: {}. Priority: {:?}. Defragmentation error.", self.manager.config.zid, peer.zid, priority - ) - })?; - return self.trigger_callback(msg, peer); + ); + } } Ok(()) diff --git a/io/zenoh-transport/src/unicast/universal/rx.rs b/io/zenoh-transport/src/unicast/universal/rx.rs index afd8e114d7..7c207b160c 100644 --- a/io/zenoh-transport/src/unicast/universal/rx.rs +++ b/io/zenoh-transport/src/unicast/universal/rx.rs @@ -145,23 +145,23 @@ impl TransportUnicastUniversal { if guard.defrag.is_empty() { let _ = guard.defrag.sync(sn); } - guard.defrag.push(sn, payload)?; - if !more { + if let Err(e) = guard.defrag.push(sn, payload) { + tracing::trace!("{}", e); + } else if !more { // When shared-memory feature is disabled, msg does not need to be mutable - let msg = guard - .defrag - .defragment() - .ok_or_else(|| zerror!("Transport: {}. Defragmentation error.", self.config.zid))?; - - let callback = zread!(self.callback).clone(); - if let Some(callback) = callback.as_ref() { - return self.trigger_callback(callback.as_ref(), msg); + if let Some(msg) = guard.defrag.defragment() { + let callback = zread!(self.callback).clone(); + if let Some(callback) = callback.as_ref() { + return self.trigger_callback(callback.as_ref(), msg); + } else { + tracing::debug!( + "Transport: {}. No callback available, dropping messages: {:?}", + self.config.zid, + msg + ); + } } else { - tracing::debug!( - "Transport: {}. No callback available, dropping messages: {:?}", - self.config.zid, - msg - ); + tracing::trace!("Transport: {}. Defragmentation error.", self.config.zid); } } From 5114c33e205c2c9574246a39c24d669d9ad8bc04 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Fri, 30 Aug 2024 15:34:27 +0200 Subject: [PATCH 07/20] Fix messages with old SN not being dropped --- io/zenoh-transport/src/multicast/rx.rs | 55 ++++++------- .../src/unicast/universal/rx.rs | 79 +++++++++---------- 2 files changed, 62 insertions(+), 72 deletions(-) diff --git a/io/zenoh-transport/src/multicast/rx.rs b/io/zenoh-transport/src/multicast/rx.rs index 0f0288ee21..ca607be493 100644 --- a/io/zenoh-transport/src/multicast/rx.rs +++ b/io/zenoh-transport/src/multicast/rx.rs @@ -166,10 +166,10 @@ impl TransportMulticastInner { Reliability::BestEffort => zlock!(c.best_effort), }; - self.verify_sn(sn, &mut guard)?; - - for msg in payload.drain(..) { - self.trigger_callback(msg, peer)?; + if self.verify_sn(sn, &mut guard)? { + for msg in payload.drain(..) { + self.trigger_callback(msg, peer)?; + } } Ok(()) } @@ -202,24 +202,24 @@ impl TransportMulticastInner { Reliability::BestEffort => zlock!(c.best_effort), }; - self.verify_sn(sn, &mut guard)?; - - if guard.defrag.is_empty() { - let _ = guard.defrag.sync(sn); - } - if let Err(e) = guard.defrag.push(sn, payload) { - tracing::trace!("{}", e); - } else if !more { - // When shared-memory feature is disabled, msg does not need to be mutable - if let Some(msg) = guard.defrag.defragment() { - return self.trigger_callback(msg, peer); - } else { - tracing::trace!( - "Transport: {}. Peer: {}. Priority: {:?}. Defragmentation error.", - self.manager.config.zid, - peer.zid, - priority - ); + if self.verify_sn(sn, &mut guard)? { + if guard.defrag.is_empty() { + let _ = guard.defrag.sync(sn); + } + if let Err(e) = guard.defrag.push(sn, payload) { + tracing::trace!("{}", e); + } else if !more { + // When shared-memory feature is disabled, msg does not need to be mutable + if let Some(msg) = guard.defrag.defragment() { + return self.trigger_callback(msg, peer); + } else { + tracing::trace!( + "Transport: {}. Peer: {}. Priority: {:?}. Defragmentation error.", + self.manager.config.zid, + peer.zid, + priority + ); + } } } @@ -230,7 +230,7 @@ impl TransportMulticastInner { &self, sn: TransportSn, guard: &mut MutexGuard<'_, TransportChannelRx>, - ) -> ZResult<()> { + ) -> ZResult { let precedes = guard.sn.precedes(sn)?; if !precedes { tracing::debug!( @@ -239,19 +239,14 @@ impl TransportMulticastInner { sn, guard.sn.next() ); - // Drop the fragments if needed - if !guard.defrag.is_empty() { - guard.defrag.clear(); - } - // Keep reading - return Ok(()); + return Ok(false); } // Set will always return OK because we have already checked // with precedes() that the sn has the right resolution let _ = guard.sn.set(sn); - Ok(()) + Ok(true) } pub(super) fn read_messages( diff --git a/io/zenoh-transport/src/unicast/universal/rx.rs b/io/zenoh-transport/src/unicast/universal/rx.rs index 7c207b160c..f826c054d4 100644 --- a/io/zenoh-transport/src/unicast/universal/rx.rs +++ b/io/zenoh-transport/src/unicast/universal/rx.rs @@ -97,19 +97,19 @@ impl TransportUnicastUniversal { Reliability::BestEffort => zlock!(c.best_effort), }; - self.verify_sn(sn, &mut guard)?; - - let callback = zread!(self.callback).clone(); - if let Some(callback) = callback.as_ref() { - for msg in payload.drain(..) { - self.trigger_callback(callback.as_ref(), msg)?; + if self.verify_sn(sn, &mut guard)? { + let callback = zread!(self.callback).clone(); + if let Some(callback) = callback.as_ref() { + for msg in payload.drain(..) { + self.trigger_callback(callback.as_ref(), msg)?; + } + } else { + tracing::debug!( + "Transport: {}. No callback available, dropping messages: {:?}", + self.config.zid, + payload + ); } - } else { - tracing::debug!( - "Transport: {}. No callback available, dropping messages: {:?}", - self.config.zid, - payload - ); } Ok(()) } @@ -140,28 +140,28 @@ impl TransportUnicastUniversal { Reliability::BestEffort => zlock!(c.best_effort), }; - self.verify_sn(sn, &mut guard)?; - - if guard.defrag.is_empty() { - let _ = guard.defrag.sync(sn); - } - if let Err(e) = guard.defrag.push(sn, payload) { - tracing::trace!("{}", e); - } else if !more { - // When shared-memory feature is disabled, msg does not need to be mutable - if let Some(msg) = guard.defrag.defragment() { - let callback = zread!(self.callback).clone(); - if let Some(callback) = callback.as_ref() { - return self.trigger_callback(callback.as_ref(), msg); + if self.verify_sn(sn, &mut guard)? { + if guard.defrag.is_empty() { + let _ = guard.defrag.sync(sn); + } + if let Err(e) = guard.defrag.push(sn, payload) { + tracing::trace!("{}", e); + } else if !more { + // When shared-memory feature is disabled, msg does not need to be mutable + if let Some(msg) = guard.defrag.defragment() { + let callback = zread!(self.callback).clone(); + if let Some(callback) = callback.as_ref() { + return self.trigger_callback(callback.as_ref(), msg); + } else { + tracing::debug!( + "Transport: {}. No callback available, dropping messages: {:?}", + self.config.zid, + msg + ); + } } else { - tracing::debug!( - "Transport: {}. No callback available, dropping messages: {:?}", - self.config.zid, - msg - ); + tracing::trace!("Transport: {}. Defragmentation error.", self.config.zid); } - } else { - tracing::trace!("Transport: {}. Defragmentation error.", self.config.zid); } } @@ -172,24 +172,19 @@ impl TransportUnicastUniversal { &self, sn: TransportSn, guard: &mut MutexGuard<'_, TransportChannelRx>, - ) -> ZResult<()> { + ) -> ZResult { let precedes = guard.sn.roll(sn)?; if !precedes { - tracing::debug!( + tracing::trace!( "Transport: {}. Frame with invalid SN dropped: {}. Expected: {}.", self.config.zid, sn, - guard.sn.get() + guard.sn.next() ); - // Drop the fragments if needed - if !guard.defrag.is_empty() { - guard.defrag.clear(); - } - // Keep reading - return Ok(()); + return Ok(false); } - Ok(()) + Ok(true) } pub(super) fn read_messages(&self, mut batch: RBatch, link: &Link) -> ZResult<()> { From 7206fdc0eec8294d53cdbdc0122be2a3da1065e4 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Fri, 30 Aug 2024 16:31:38 +0200 Subject: [PATCH 08/20] Remove failure check in defrag tests --- .../tests/unicast_defragmentation.rs | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/io/zenoh-transport/tests/unicast_defragmentation.rs b/io/zenoh-transport/tests/unicast_defragmentation.rs index fc54180c96..03a9ed6ee8 100644 --- a/io/zenoh-transport/tests/unicast_defragmentation.rs +++ b/io/zenoh-transport/tests/unicast_defragmentation.rs @@ -93,20 +93,6 @@ async fn run(endpoint: &EndPoint, channel: Channel, msg_size: usize) { ); client_transport.schedule(message.clone()).unwrap(); - // Wait that the client transport has been closed - ztimeout!(async { - while client_transport.get_zid().is_ok() { - tokio::time::sleep(SLEEP).await; - } - }); - - // Wait on the router manager that the transport has been closed - ztimeout!(async { - while !router_manager.get_transports_unicast().await.is_empty() { - tokio::time::sleep(SLEEP).await; - } - }); - // Stop the locators on the manager println!("Del locator: {endpoint}"); ztimeout!(router_manager.del_listener(endpoint)).unwrap(); From c434f69f289843941284327b0644e51be0cd879d Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Fri, 30 Aug 2024 16:32:54 +0200 Subject: [PATCH 09/20] Wait for message to be sent in defrag test --- io/zenoh-transport/tests/unicast_defragmentation.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/io/zenoh-transport/tests/unicast_defragmentation.rs b/io/zenoh-transport/tests/unicast_defragmentation.rs index 03a9ed6ee8..5df569eff4 100644 --- a/io/zenoh-transport/tests/unicast_defragmentation.rs +++ b/io/zenoh-transport/tests/unicast_defragmentation.rs @@ -93,6 +93,9 @@ async fn run(endpoint: &EndPoint, channel: Channel, msg_size: usize) { ); client_transport.schedule(message.clone()).unwrap(); + // wait a little bit for the message to be sent + tokio::time::sleep(SLEEP).await; + // Stop the locators on the manager println!("Del locator: {endpoint}"); ztimeout!(router_manager.del_listener(endpoint)).unwrap(); From bafbe617fc48d49f4bf6ecff9de48acdd4c14be9 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Mon, 2 Sep 2024 17:31:33 +0200 Subject: [PATCH 10/20] Remove if-statement nesting --- io/zenoh-transport/src/multicast/rx.rs | 52 ++++++++------ .../src/unicast/universal/rx.rs | 72 ++++++++++--------- 2 files changed, 70 insertions(+), 54 deletions(-) diff --git a/io/zenoh-transport/src/multicast/rx.rs b/io/zenoh-transport/src/multicast/rx.rs index ca607be493..8562d5b3eb 100644 --- a/io/zenoh-transport/src/multicast/rx.rs +++ b/io/zenoh-transport/src/multicast/rx.rs @@ -166,11 +166,14 @@ impl TransportMulticastInner { Reliability::BestEffort => zlock!(c.best_effort), }; - if self.verify_sn(sn, &mut guard)? { - for msg in payload.drain(..) { - self.trigger_callback(msg, peer)?; - } + if !self.verify_sn(sn, &mut guard)? { + // Drop invalid message and continue + return Ok(()); + } + for msg in payload.drain(..) { + self.trigger_callback(msg, peer)?; } + Ok(()) } @@ -202,24 +205,29 @@ impl TransportMulticastInner { Reliability::BestEffort => zlock!(c.best_effort), }; - if self.verify_sn(sn, &mut guard)? { - if guard.defrag.is_empty() { - let _ = guard.defrag.sync(sn); - } - if let Err(e) = guard.defrag.push(sn, payload) { - tracing::trace!("{}", e); - } else if !more { - // When shared-memory feature is disabled, msg does not need to be mutable - if let Some(msg) = guard.defrag.defragment() { - return self.trigger_callback(msg, peer); - } else { - tracing::trace!( - "Transport: {}. Peer: {}. Priority: {:?}. Defragmentation error.", - self.manager.config.zid, - peer.zid, - priority - ); - } + if !self.verify_sn(sn, &mut guard)? { + // Drop invalid message and continue + return Ok(()); + } + if guard.defrag.is_empty() { + let _ = guard.defrag.sync(sn); + } + if let Err(e) = guard.defrag.push(sn, payload) { + // Defrag errors don't close transport + tracing::trace!("{}", e); + return Ok(()); + } + if !more { + // When shared-memory feature is disabled, msg does not need to be mutable + if let Some(msg) = guard.defrag.defragment() { + return self.trigger_callback(msg, peer); + } else { + tracing::trace!( + "Transport: {}. Peer: {}. Priority: {:?}. Defragmentation error.", + self.manager.config.zid, + peer.zid, + priority + ); } } diff --git a/io/zenoh-transport/src/unicast/universal/rx.rs b/io/zenoh-transport/src/unicast/universal/rx.rs index f826c054d4..e69a305876 100644 --- a/io/zenoh-transport/src/unicast/universal/rx.rs +++ b/io/zenoh-transport/src/unicast/universal/rx.rs @@ -97,20 +97,23 @@ impl TransportUnicastUniversal { Reliability::BestEffort => zlock!(c.best_effort), }; - if self.verify_sn(sn, &mut guard)? { - let callback = zread!(self.callback).clone(); - if let Some(callback) = callback.as_ref() { - for msg in payload.drain(..) { - self.trigger_callback(callback.as_ref(), msg)?; - } - } else { - tracing::debug!( - "Transport: {}. No callback available, dropping messages: {:?}", - self.config.zid, - payload - ); + if !self.verify_sn(sn, &mut guard)? { + // Drop invalid message and continue + return Ok(()); + } + let callback = zread!(self.callback).clone(); + if let Some(callback) = callback.as_ref() { + for msg in payload.drain(..) { + self.trigger_callback(callback.as_ref(), msg)?; } + } else { + tracing::debug!( + "Transport: {}. No callback available, dropping messages: {:?}", + self.config.zid, + payload + ); } + Ok(()) } @@ -140,28 +143,33 @@ impl TransportUnicastUniversal { Reliability::BestEffort => zlock!(c.best_effort), }; - if self.verify_sn(sn, &mut guard)? { - if guard.defrag.is_empty() { - let _ = guard.defrag.sync(sn); - } - if let Err(e) = guard.defrag.push(sn, payload) { - tracing::trace!("{}", e); - } else if !more { - // When shared-memory feature is disabled, msg does not need to be mutable - if let Some(msg) = guard.defrag.defragment() { - let callback = zread!(self.callback).clone(); - if let Some(callback) = callback.as_ref() { - return self.trigger_callback(callback.as_ref(), msg); - } else { - tracing::debug!( - "Transport: {}. No callback available, dropping messages: {:?}", - self.config.zid, - msg - ); - } + if !self.verify_sn(sn, &mut guard)? { + // Drop invalid message and continue + return Ok(()); + } + if guard.defrag.is_empty() { + let _ = guard.defrag.sync(sn); + } + if let Err(e) = guard.defrag.push(sn, payload) { + // Defrag errors don't close transport + tracing::trace!("{}", e); + return Ok(()); + } + if !more { + // When shared-memory feature is disabled, msg does not need to be mutable + if let Some(msg) = guard.defrag.defragment() { + let callback = zread!(self.callback).clone(); + if let Some(callback) = callback.as_ref() { + return self.trigger_callback(callback.as_ref(), msg); } else { - tracing::trace!("Transport: {}. Defragmentation error.", self.config.zid); + tracing::debug!( + "Transport: {}. No callback available, dropping messages: {:?}", + self.config.zid, + msg + ); } + } else { + tracing::trace!("Transport: {}. Defragmentation error.", self.config.zid); } } From 96cefa548ffbf65d8c92198b7c355dfaabe44f89 Mon Sep 17 00:00:00 2001 From: Oussama Teffahi Date: Mon, 2 Sep 2024 17:31:51 +0200 Subject: [PATCH 11/20] Remove deprecated defragmentation test --- .../tests/unicast_defragmentation.rs | 217 ------------------ 1 file changed, 217 deletions(-) delete mode 100644 io/zenoh-transport/tests/unicast_defragmentation.rs diff --git a/io/zenoh-transport/tests/unicast_defragmentation.rs b/io/zenoh-transport/tests/unicast_defragmentation.rs deleted file mode 100644 index 5df569eff4..0000000000 --- a/io/zenoh-transport/tests/unicast_defragmentation.rs +++ /dev/null @@ -1,217 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -use std::{convert::TryFrom, sync::Arc, time::Duration}; - -use zenoh_core::ztimeout; -use zenoh_protocol::{ - core::{ - Channel, CongestionControl, Encoding, EndPoint, Priority, Reliability, WhatAmI, - ZenohIdProto, - }, - network::{ - push::{ - ext::{NodeIdType, QoSType}, - Push, - }, - NetworkMessage, - }, - zenoh::Put, -}; -use zenoh_transport::{DummyTransportEventHandler, TransportManager}; - -const TIMEOUT: Duration = Duration::from_secs(60); -const SLEEP: Duration = Duration::from_secs(1); - -const MSG_SIZE: usize = 131_072; -const MSG_DEFRAG_BUF: usize = 128_000; - -async fn run(endpoint: &EndPoint, channel: Channel, msg_size: usize) { - // Define client and router IDs - let client_id = ZenohIdProto::try_from([1]).unwrap(); - let router_id = ZenohIdProto::try_from([2]).unwrap(); - - // Create the router transport manager - let router_manager = TransportManager::builder() - .zid(router_id) - .whatami(WhatAmI::Router) - .defrag_buff_size(MSG_DEFRAG_BUF) - .build(Arc::new(DummyTransportEventHandler)) - .unwrap(); - - // Create the client transport manager - let client_manager = TransportManager::builder() - .whatami(WhatAmI::Client) - .zid(client_id) - .defrag_buff_size(MSG_DEFRAG_BUF) - .build(Arc::new(DummyTransportEventHandler)) - .unwrap(); - - // Create the listener on the router - println!("Add locator: {endpoint}"); - let _ = ztimeout!(router_manager.add_listener(endpoint.clone())).unwrap(); - - // Create an empty transport with the client - // Open transport -> This should be accepted - println!("Opening transport with {endpoint}"); - let _ = ztimeout!(client_manager.open_transport_unicast(endpoint.clone())).unwrap(); - - let client_transport = ztimeout!(client_manager.get_transport_unicast(&router_id)).unwrap(); - - // Create the message to send - let message: NetworkMessage = Push { - wire_expr: "test".into(), - ext_qos: QoSType::new(channel.priority, CongestionControl::Block, false), - ext_tstamp: None, - ext_nodeid: NodeIdType::DEFAULT, - payload: Put { - payload: vec![0u8; msg_size].into(), - timestamp: None, - encoding: Encoding::empty(), - ext_sinfo: None, - #[cfg(feature = "shared-memory")] - ext_shm: None, - ext_attachment: None, - ext_unknown: vec![], - } - .into(), - } - .into(); - - println!( - "Sending message of {msg_size} bytes while defragmentation buffer size is {MSG_DEFRAG_BUF} bytes" - ); - client_transport.schedule(message.clone()).unwrap(); - - // wait a little bit for the message to be sent - tokio::time::sleep(SLEEP).await; - - // Stop the locators on the manager - println!("Del locator: {endpoint}"); - ztimeout!(router_manager.del_listener(endpoint)).unwrap(); - - // Wait a little bit - ztimeout!(async { - while !router_manager.get_listeners().await.is_empty() { - tokio::time::sleep(SLEEP).await; - } - }); - - tokio::time::sleep(SLEEP).await; - - ztimeout!(router_manager.close()); - ztimeout!(client_manager.close()); - - // Wait a little bit - tokio::time::sleep(SLEEP).await; -} - -#[cfg(feature = "transport_tcp")] -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -async fn transport_unicast_defragmentation_tcp_only() { - zenoh_util::try_init_log_from_env(); - - // Define the locators - let endpoint: EndPoint = format!("tcp/127.0.0.1:{}", 11000).parse().unwrap(); - // Define the reliability and congestion control - let channel = [ - Channel { - priority: Priority::DEFAULT, - reliability: Reliability::Reliable, - }, - Channel { - priority: Priority::DEFAULT, - reliability: Reliability::BestEffort, - }, - Channel { - priority: Priority::RealTime, - reliability: Reliability::Reliable, - }, - Channel { - priority: Priority::RealTime, - reliability: Reliability::BestEffort, - }, - ]; - // Run - for ch in channel.iter() { - run(&endpoint, *ch, MSG_SIZE).await; - } -} - -#[cfg(feature = "transport_ws")] -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -#[ignore] -async fn transport_unicast_defragmentation_ws_only() { - zenoh_util::try_init_log_from_env(); - - // Define the locators - let endpoint: EndPoint = format!("ws/127.0.0.1:{}", 11010).parse().unwrap(); - // Define the reliability and congestion control - let channel = [ - Channel { - priority: Priority::DEFAULT, - reliability: Reliability::Reliable, - }, - Channel { - priority: Priority::DEFAULT, - reliability: Reliability::BestEffort, - }, - Channel { - priority: Priority::RealTime, - reliability: Reliability::Reliable, - }, - Channel { - priority: Priority::RealTime, - reliability: Reliability::BestEffort, - }, - ]; - // Run - for ch in channel.iter() { - run(&endpoint, *ch, MSG_SIZE).await; - } -} - -#[cfg(feature = "transport_unixpipe")] -#[tokio::test(flavor = "multi_thread", worker_threads = 4)] -#[ignore] -async fn transport_unicast_defragmentation_unixpipe_only() { - zenoh_util::try_init_log_from_env(); - - // Define the locators - let endpoint: EndPoint = "unixpipe/transport_unicast_defragmentation_unixpipe_only" - .parse() - .unwrap(); - // Define the reliability and congestion control - let channel = [ - Channel { - priority: Priority::DEFAULT, - reliability: Reliability::Reliable, - }, - Channel { - priority: Priority::DEFAULT, - reliability: Reliability::BestEffort, - }, - Channel { - priority: Priority::RealTime, - reliability: Reliability::Reliable, - }, - Channel { - priority: Priority::RealTime, - reliability: Reliability::BestEffort, - }, - ]; - // Run - for ch in channel.iter() { - run(&endpoint, *ch, MSG_SIZE).await; - } -} From a7d03b7f575c027914a8fe3df396b3bd831b6fa8 Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Tue, 3 Sep 2024 11:10:15 +0000 Subject: [PATCH 12/20] Parse `backend_search_dirs` as a list of paths --- plugins/zenoh-backend-traits/src/config.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/plugins/zenoh-backend-traits/src/config.rs b/plugins/zenoh-backend-traits/src/config.rs index e440e3014e..b791ba5c1a 100644 --- a/plugins/zenoh-backend-traits/src/config.rs +++ b/plugins/zenoh-backend-traits/src/config.rs @@ -164,6 +164,8 @@ impl + AsRef, V: AsObject> TryFrom<(S, &V)> for PluginConfi }) }) .unwrap_or(Ok(true))?; + // TODO(fuzzypixelz): refactor this function's interface to get access to the configuration + // source, this we can support spec syntax in the lib search dir. let backend_search_dirs = match value.get("backend_search_dirs") { Some(serde_json::Value::String(path)) => LibSearchDirs::from_paths(&[path.clone()]), Some(serde_json::Value::Array(paths)) => { @@ -174,7 +176,7 @@ impl + AsRef, V: AsObject> TryFrom<(S, &V)> for PluginConfi }; specs.push(path.clone()); } - LibSearchDirs::from_specs(&specs)? + LibSearchDirs::from_paths(&specs) } None => LibSearchDirs::default(), _ => bail!("`backend_search_dirs` field of {}'s configuration must be a string or array of strings", name.as_ref()) From b21ce07194f03459de5245502b6b50e924f636bf Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Tue, 3 Sep 2024 15:31:14 +0200 Subject: [PATCH 13/20] Change Reliability default to Reliable --- commons/zenoh-protocol/src/core/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/commons/zenoh-protocol/src/core/mod.rs b/commons/zenoh-protocol/src/core/mod.rs index ebf1bb7f85..629357e339 100644 --- a/commons/zenoh-protocol/src/core/mod.rs +++ b/commons/zenoh-protocol/src/core/mod.rs @@ -346,12 +346,12 @@ impl TryFrom for Priority { #[repr(u8)] pub enum Reliability { #[default] - BestEffort, Reliable, + BestEffort, } impl Reliability { - pub const DEFAULT: Self = Self::BestEffort; + pub const DEFAULT: Self = Self::Reliable; #[cfg(feature = "test")] pub fn rand() -> Self { From e1cd9cbb39f1e7eb8a708a64cd3e8b9c099fdffc Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Tue, 3 Sep 2024 16:01:55 +0200 Subject: [PATCH 14/20] Fix merge --- zenoh/src/api/session.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index be727c80f4..26704d378a 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -39,7 +39,8 @@ use zenoh_protocol::network::{ use zenoh_protocol::{ core::{ key_expr::{keyexpr, OwnedKeyExpr}, - AtomicExprId, CongestionControl, EntityId, ExprId, Parameters, WireExpr, EMPTY_EXPR_ID, + AtomicExprId, CongestionControl, EntityId, ExprId, Parameters, Reliability, WireExpr, + EMPTY_EXPR_ID, }, network::{ self, @@ -101,8 +102,6 @@ use crate::net::{ routing::dispatcher::face::Face, runtime::{Runtime, RuntimeBuilder}, }; -#[cfg(feature = "unstable")] -use crate::pubsub::Reliability; zconfigurable! { pub(crate) static ref API_DATA_RECEPTION_CHANNEL_SIZE: usize = 256; From 0a911f3a6015f613e846a5ee242e9a61b8675c96 Mon Sep 17 00:00:00 2001 From: oteffahi <70609372+oteffahi@users.noreply.github.com> Date: Tue, 3 Sep 2024 19:15:31 +0200 Subject: [PATCH 15/20] Fix storage error when keyexpr equals the configured `strip_prefix` (#1351) --- plugins/zenoh-plugin-storage-manager/src/lib.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/plugins/zenoh-plugin-storage-manager/src/lib.rs b/plugins/zenoh-plugin-storage-manager/src/lib.rs index ac778f3633..77e53cc80d 100644 --- a/plugins/zenoh-plugin-storage-manager/src/lib.rs +++ b/plugins/zenoh-plugin-storage-manager/src/lib.rs @@ -443,14 +443,16 @@ pub fn strip_prefix( ); } - match key_expr.strip_prefix(prefix).as_slice() { - [stripped_key_expr] => { - if stripped_key_expr.is_empty() { - return Ok(None); - } + // `key_expr.strip_prefix` returns empty vec if `key_expr == prefix`, + // but also returns empty vec if `prefix` is not a prefix to `key_expr`. + // First case needs to be handled before calling `key_expr.strip_prefix` + if key_expr.as_str().eq(prefix.as_str()) { + return Ok(None); + } - OwnedKeyExpr::from_str(stripped_key_expr).map(Some) - } + match key_expr.strip_prefix(prefix).as_slice() { + // NOTE: `stripped_key_expr.is_empty()` should be impossible as "" is not a valid key expression + [stripped_key_expr] => OwnedKeyExpr::from_str(stripped_key_expr).map(Some), _ => bail!("Failed to strip prefix < {} > from: {}", prefix, key_expr), } } From 118c441663f4d09257dcab945d3c1e45158f64ee Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 4 Sep 2024 11:24:43 +0200 Subject: [PATCH 16/20] Improve batch tests Co-authored-by: Luca Cominardi --- io/zenoh-transport/src/common/batch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/zenoh-transport/src/common/batch.rs b/io/zenoh-transport/src/common/batch.rs index 715d487b8c..cc18ad36c4 100644 --- a/io/zenoh-transport/src/common/batch.rs +++ b/io/zenoh-transport/src/common/batch.rs @@ -601,7 +601,7 @@ mod tests { sn: 0, ext_qos: frame::ext::QoSType::DEFAULT, }; - nmsg.reliability = Reliability::Reliable; + nmsg.reliability = frame.reliability; // Serialize with a frame batch.encode((&nmsg, &frame)).unwrap(); From b6bf33443966f46f543cad3a7465a9e5dd38ee35 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 4 Sep 2024 11:24:53 +0200 Subject: [PATCH 17/20] Improve batch tests Co-authored-by: Luca Cominardi --- io/zenoh-transport/src/common/batch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/zenoh-transport/src/common/batch.rs b/io/zenoh-transport/src/common/batch.rs index cc18ad36c4..578adf22d1 100644 --- a/io/zenoh-transport/src/common/batch.rs +++ b/io/zenoh-transport/src/common/batch.rs @@ -609,7 +609,7 @@ mod tests { nmsgs_in.push(nmsg.clone()); frame.reliability = Reliability::BestEffort; - nmsg.reliability = Reliability::BestEffort; + nmsg.reliability = frame.reliability; batch.encode((&nmsg, &frame)).unwrap(); assert_ne!(batch.len(), 0); nmsgs_in.push(nmsg.clone()); From 38ef6106460be3ffa80bf7eabf21a806a40116e4 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 4 Sep 2024 11:25:22 +0200 Subject: [PATCH 18/20] Improve publisher reliability doc. Co-authored-by: Luca Cominardi --- zenoh/src/api/builders/publisher.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index 0404838a04..4333f68846 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -310,6 +310,8 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { } /// Change the `reliability`` to apply when routing the data. + /// NOTE: Currently `reliability` does not trigger any data retransmission on the wire. + /// It is rather used as a marker on the wire and it may be used to select the best link available (e.g. TCP for reliable data and UDP for best effort data). #[zenoh_macros::unstable] #[inline] pub fn reliability(self, reliability: Reliability) -> Self { From 2128f14d4c1d2ef3a71ea10f6d72305a99b7f9ff Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 4 Sep 2024 11:25:35 +0200 Subject: [PATCH 19/20] Improve publisher reliability doc. Co-authored-by: Luca Cominardi --- zenoh/src/api/builders/publisher.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index 4333f68846..457041589a 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -114,6 +114,8 @@ impl PublicationBuilder, T> { self } /// Change the `reliability` to apply when routing the data. + /// NOTE: Currently `reliability` does not trigger any data retransmission on the wire. + /// It is rather used as a marker on the wire and it may be used to select the best link available (e.g. TCP for reliable data and UDP for best effort data). #[zenoh_macros::unstable] #[inline] pub fn reliability(self, reliability: Reliability) -> Self { From 43ec70ebf99258538f590bcdd70d152429f66e7f Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 4 Sep 2024 11:27:52 +0200 Subject: [PATCH 20/20] Code format --- zenoh/src/api/builders/publisher.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index 457041589a..8a6961ac55 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -114,7 +114,7 @@ impl PublicationBuilder, T> { self } /// Change the `reliability` to apply when routing the data. - /// NOTE: Currently `reliability` does not trigger any data retransmission on the wire. + /// NOTE: Currently `reliability` does not trigger any data retransmission on the wire. /// It is rather used as a marker on the wire and it may be used to select the best link available (e.g. TCP for reliable data and UDP for best effort data). #[zenoh_macros::unstable] #[inline] @@ -312,7 +312,7 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { } /// Change the `reliability`` to apply when routing the data. - /// NOTE: Currently `reliability` does not trigger any data retransmission on the wire. + /// NOTE: Currently `reliability` does not trigger any data retransmission on the wire. /// It is rather used as a marker on the wire and it may be used to select the best link available (e.g. TCP for reliable data and UDP for best effort data). #[zenoh_macros::unstable] #[inline]