From e7f885ef527be6477b6943da34d29ef1cf651d33 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Tue, 3 Sep 2024 16:46:57 +0200 Subject: [PATCH 1/4] Remove SubscriberInfo extension. Remove reliability on declare_subscriber. --- commons/zenoh-codec/src/network/declare.rs | 40 +--------- commons/zenoh-protocol/src/network/declare.rs | 78 +------------------ zenoh-ext/src/querying_subscriber.rs | 55 +++++++------ zenoh-ext/src/subscriber_ext.rs | 6 +- zenoh/src/api/session.rs | 14 +--- zenoh/src/api/subscriber.rs | 71 +++++++---------- zenoh/src/net/routing/dispatcher/face.rs | 2 +- zenoh/src/net/routing/dispatcher/pubsub.rs | 5 +- zenoh/src/net/routing/dispatcher/resource.rs | 7 +- zenoh/src/net/routing/hat/client/pubsub.rs | 15 ++-- .../net/routing/hat/linkstate_peer/pubsub.rs | 25 ++---- zenoh/src/net/routing/hat/mod.rs | 6 +- zenoh/src/net/routing/hat/p2p_peer/pubsub.rs | 27 ++----- zenoh/src/net/routing/hat/router/pubsub.rs | 28 ++----- zenoh/src/net/runtime/adminspace.rs | 5 +- zenoh/src/net/tests/tables.rs | 30 +++---- 16 files changed, 115 insertions(+), 299 deletions(-) diff --git a/commons/zenoh-codec/src/network/declare.rs b/commons/zenoh-codec/src/network/declare.rs index 7c3b797d5d..3eab8564ff 100644 --- a/commons/zenoh-codec/src/network/declare.rs +++ b/commons/zenoh-codec/src/network/declare.rs @@ -19,7 +19,7 @@ use zenoh_buffers::{ ZBuf, }; use zenoh_protocol::{ - common::{iext, imsg, ZExtZ64}, + common::{iext, imsg}, core::{ExprId, ExprLen, WireExpr}, network::{ declare::{self, common, keyexpr, queryable, subscriber, token, Declare, DeclareBody}, @@ -384,9 +384,6 @@ where } } -// SubscriberInfo -crate::impl_zextz64!(subscriber::ext::SubscriberInfo, subscriber::ext::Info::ID); - // DeclareSubscriber impl WCodec<&subscriber::DeclareSubscriber, &mut W> for Zenoh080 where @@ -395,18 +392,10 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &subscriber::DeclareSubscriber) -> Self::Output { - let subscriber::DeclareSubscriber { - id, - wire_expr, - ext_info, - } = x; + let subscriber::DeclareSubscriber { id, wire_expr } = x; // Header let mut header = declare::id::D_SUBSCRIBER; - let mut n_exts = (ext_info != &subscriber::ext::SubscriberInfo::DEFAULT) as u8; - if n_exts != 0 { - header |= subscriber::flag::Z; - } if wire_expr.mapping != Mapping::DEFAULT { header |= subscriber::flag::M; } @@ -420,10 +409,6 @@ where self.write(&mut *writer, wire_expr)?; // Extensions - if ext_info != &subscriber::ext::SubscriberInfo::DEFAULT { - n_exts -= 1; - self.write(&mut *writer, (*ext_info, n_exts != 0))?; - } Ok(()) } @@ -465,30 +450,13 @@ where }; // Extensions - let mut ext_info = subscriber::ext::SubscriberInfo::DEFAULT; - let mut has_ext = imsg::has_flag(self.header, subscriber::flag::Z); while has_ext { let ext: u8 = self.codec.read(&mut *reader)?; - let eodec = Zenoh080Header::new(ext); - match iext::eid(ext) { - subscriber::ext::Info::ID => { - let (i, ext): (subscriber::ext::SubscriberInfo, bool) = - eodec.read(&mut *reader)?; - ext_info = i; - has_ext = ext; - } - _ => { - has_ext = extension::skip(reader, "DeclareSubscriber", ext)?; - } - } + has_ext = extension::skip(reader, "DeclareSubscriber", ext)?; } - Ok(subscriber::DeclareSubscriber { - id, - wire_expr, - ext_info, - }) + Ok(subscriber::DeclareSubscriber { id, wire_expr }) } } diff --git a/commons/zenoh-protocol/src/network/declare.rs b/commons/zenoh-protocol/src/network/declare.rs index 8f31e0ff2a..6efd69d31f 100644 --- a/commons/zenoh-protocol/src/network/declare.rs +++ b/commons/zenoh-protocol/src/network/declare.rs @@ -20,8 +20,8 @@ pub use subscriber::*; pub use token::*; use crate::{ - common::{imsg, ZExtZ64, ZExtZBuf}, - core::{ExprId, Reliability, WireExpr}, + common::{ZExtZ64, ZExtZBuf}, + core::{ExprId, WireExpr}, network::Mapping, zextz64, zextzbuf, }; @@ -341,73 +341,6 @@ pub mod subscriber { pub struct DeclareSubscriber { pub id: SubscriberId, pub wire_expr: WireExpr<'static>, - pub ext_info: ext::SubscriberInfo, - } - - pub mod ext { - use super::*; - - pub type Info = zextz64!(0x01, false); - - /// # The subscription mode. - /// - /// ```text - /// 7 6 5 4 3 2 1 0 - /// +-+-+-+-+-+-+-+-+ - /// |Z|0_1| ID | - /// +-+-+-+---------+ - /// % reserved |R% - /// +---------------+ - /// - /// - if R==1 then the subscription is reliable, else it is best effort - /// - rsv: Reserved - /// ``` - #[derive(Debug, Clone, Copy, PartialEq, Eq)] - pub struct SubscriberInfo { - pub reliability: Reliability, - } - - impl SubscriberInfo { - pub const R: u64 = 1; - - pub const DEFAULT: Self = Self { - reliability: Reliability::DEFAULT, - }; - - #[cfg(feature = "test")] - pub fn rand() -> Self { - let reliability = Reliability::rand(); - - Self { reliability } - } - } - - impl Default for SubscriberInfo { - fn default() -> Self { - Self::DEFAULT - } - } - - impl From for SubscriberInfo { - fn from(ext: Info) -> Self { - let reliability = if imsg::has_option(ext.value, SubscriberInfo::R) { - Reliability::Reliable - } else { - Reliability::BestEffort - }; - Self { reliability } - } - } - - impl From for Info { - fn from(ext: SubscriberInfo) -> Self { - let mut v: u64 = 0; - if ext.reliability == Reliability::Reliable { - v |= SubscriberInfo::R; - } - Info::new(v) - } - } } impl DeclareSubscriber { @@ -418,13 +351,8 @@ pub mod subscriber { let id: SubscriberId = rng.gen(); let wire_expr = WireExpr::rand(); - let ext_info = ext::SubscriberInfo::rand(); - Self { - id, - wire_expr, - ext_info, - } + Self { id, wire_expr } } } diff --git a/zenoh-ext/src/querying_subscriber.rs b/zenoh-ext/src/querying_subscriber.rs index 224abfde87..d904eea194 100644 --- a/zenoh-ext/src/querying_subscriber.rs +++ b/zenoh-ext/src/querying_subscriber.rs @@ -41,7 +41,6 @@ pub struct QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler> { pub(crate) session: SessionRef<'a>, pub(crate) key_expr: ZResult>, pub(crate) key_space: KeySpace, - pub(crate) reliability: Reliability, pub(crate) origin: Locality, pub(crate) query_selector: Option>>, pub(crate) query_target: QueryTarget, @@ -65,7 +64,6 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle session, key_expr, key_space, - reliability, origin, query_selector, query_target, @@ -78,7 +76,6 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle session, key_expr, key_space, - reliability, origin, query_selector, query_target, @@ -118,7 +115,6 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle session, key_expr, key_space, - reliability, origin, query_selector, query_target, @@ -131,7 +127,6 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle session, key_expr, key_space, - reliability, origin, query_selector, query_target, @@ -145,23 +140,32 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle impl<'a, 'b, Handler> QueryingSubscriberBuilder<'a, 'b, crate::UserSpace, Handler> { /// Change the subscription reliability. - #[inline] + #[deprecated( + since = "1.0.0", + note = "please use `reliability` on `declare_publisher` or `put`" + )] + #[allow(unused_mut, unused_variables)] pub fn reliability(mut self, reliability: Reliability) -> Self { - self.reliability = reliability; self } /// Change the subscription reliability to Reliable. - #[inline] + #[deprecated( + since = "1.0.0", + note = "please use `reliability` on `declare_publisher` or `put`" + )] + #[allow(unused_mut)] pub fn reliable(mut self) -> Self { - self.reliability = Reliability::Reliable; self } /// Change the subscription reliability to BestEffort. - #[inline] + #[deprecated( + since = "1.0.0", + note = "please use `reliability` on `declare_publisher` or `put`" + )] + #[allow(unused_mut)] pub fn best_effort(mut self) -> Self { - self.reliability = Reliability::BestEffort; self } @@ -249,7 +253,6 @@ where session: self.session, key_expr: Ok(key_expr.clone()), key_space: self.key_space, - reliability: self.reliability, origin: self.origin, fetch: |cb| match key_space { crate::KeySpace::User => match query_selector { @@ -365,7 +368,6 @@ pub struct FetchingSubscriberBuilder< pub(crate) session: SessionRef<'a>, pub(crate) key_expr: ZResult>, pub(crate) key_space: KeySpace, - pub(crate) reliability: Reliability, pub(crate) origin: Locality, pub(crate) fetch: Fetch, pub(crate) handler: Handler, @@ -390,7 +392,6 @@ where session: self.session, key_expr: self.key_expr.map(|s| s.into_owned()), key_space: self.key_space, - reliability: self.reliability, origin: self.origin, fetch: self.fetch, handler: self.handler, @@ -422,7 +423,6 @@ where session, key_expr, key_space, - reliability, origin, fetch, handler: _, @@ -432,7 +432,6 @@ where session, key_expr, key_space, - reliability, origin, fetch, handler: callback, @@ -476,7 +475,6 @@ where session, key_expr, key_space, - reliability, origin, fetch, handler: _, @@ -486,7 +484,6 @@ where session, key_expr, key_space, - reliability, origin, fetch, handler, @@ -506,23 +503,32 @@ where TryIntoSample: ExtractSample, { /// Change the subscription reliability. - #[inline] + #[deprecated( + since = "1.0.0", + note = "please use `reliability` on `declare_publisher` or `put`" + )] + #[allow(unused_mut, unused_variables)] pub fn reliability(mut self, reliability: Reliability) -> Self { - self.reliability = reliability; self } /// Change the subscription reliability to Reliable. - #[inline] + #[deprecated( + since = "1.0.0", + note = "please use `reliability` on `declare_publisher` or `put`" + )] + #[allow(unused_mut)] pub fn reliable(mut self) -> Self { - self.reliability = Reliability::Reliable; self } /// Change the subscription reliability to BestEffort. - #[inline] + #[deprecated( + since = "1.0.0", + note = "please use `reliability` on `declare_publisher` or `put`" + )] + #[allow(unused_mut)] pub fn best_effort(mut self) -> Self { - self.reliability = Reliability::BestEffort; self } @@ -698,7 +704,6 @@ impl<'a, Handler> FetchingSubscriber<'a, Handler> { .session .declare_subscriber(&key_expr) .callback(sub_callback) - .reliability(conf.reliability) .allowed_origin(conf.origin) .wait()?, crate::KeySpace::Liveliness => conf diff --git a/zenoh-ext/src/subscriber_ext.rs b/zenoh-ext/src/subscriber_ext.rs index a7356f86dc..cfd810185a 100644 --- a/zenoh-ext/src/subscriber_ext.rs +++ b/zenoh-ext/src/subscriber_ext.rs @@ -17,7 +17,7 @@ use flume::r#async::RecvStream; use futures::stream::{Forward, Map}; use zenoh::{ liveliness::LivelinessSubscriberBuilder, - pubsub::{Reliability, Subscriber, SubscriberBuilder}, + pubsub::{Subscriber, SubscriberBuilder}, query::{QueryConsolidation, QueryTarget, ReplyKeyExpr}, sample::{Locality, Sample}, Result as ZResult, @@ -171,7 +171,6 @@ impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> for SubscriberBuilde session: self.session, key_expr: self.key_expr, key_space: crate::UserSpace, - reliability: self.reliability, origin: self.origin, fetch, handler: self.handler, @@ -213,7 +212,6 @@ impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> for SubscriberBuilde session: self.session, key_expr: self.key_expr, key_space: crate::UserSpace, - reliability: self.reliability, origin: self.origin, query_selector: None, // By default query all matching publication caches and storages @@ -283,7 +281,6 @@ impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> session: self.session, key_expr: self.key_expr, key_space: crate::LivelinessSpace, - reliability: Reliability::DEFAULT, origin: Locality::default(), fetch, handler: self.handler, @@ -327,7 +324,6 @@ impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> session: self.session, key_expr: self.key_expr, key_space: crate::LivelinessSpace, - reliability: Reliability::DEFAULT, origin: Locality::default(), query_selector: None, query_target: QueryTarget::DEFAULT, diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 451c1340ad..52063f816d 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -44,9 +44,9 @@ use zenoh_protocol::{ network::{ self, declare::{ - self, common::ext::WireExprType, queryable::ext::QueryableInfoType, - subscriber::ext::SubscriberInfo, Declare, DeclareBody, DeclareKeyExpr, - DeclareQueryable, DeclareSubscriber, UndeclareQueryable, UndeclareSubscriber, + self, common::ext::WireExprType, queryable::ext::QueryableInfoType, Declare, + DeclareBody, DeclareKeyExpr, DeclareQueryable, DeclareSubscriber, UndeclareQueryable, + UndeclareSubscriber, }, interest::{InterestMode, InterestOptions}, request::{self, ext::TargetType}, @@ -101,8 +101,6 @@ use crate::net::{ routing::dispatcher::face::Face, runtime::{Runtime, RuntimeBuilder}, }; -#[cfg(feature = "unstable")] -use crate::pubsub::Reliability; zconfigurable! { pub(crate) static ref API_DATA_RECEPTION_CHANNEL_SIZE: usize = 256; @@ -378,8 +376,6 @@ impl<'s, 'a> SessionDeclarations<'s, 'a> for SessionRef<'a> { SubscriberBuilder { session: self.clone(), key_expr: TryIntoKeyExpr::try_into(key_expr).map_err(Into::into), - #[cfg(feature = "unstable")] - reliability: Reliability::DEFAULT, origin: Locality::default(), handler: DefaultHandler::default(), } @@ -1127,7 +1123,6 @@ impl Session { key_expr: &KeyExpr, origin: Locality, callback: Callback<'static, Sample>, - info: &SubscriberInfo, ) -> ZResult> { let mut state = zwrite!(self.state); tracing::trace!("declare_subscriber({:?})", key_expr); @@ -1240,7 +1235,6 @@ impl Session { body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id, wire_expr: key_expr.to_wire(self).to_owned(), - ext_info: *info, }), }); @@ -2037,8 +2031,6 @@ impl<'s> SessionDeclarations<'s, 'static> for Arc { SubscriberBuilder { session: SessionRef::Shared(self.clone()), key_expr: key_expr.try_into().map_err(Into::into), - #[cfg(feature = "unstable")] - reliability: Reliability::DEFAULT, origin: Locality::default(), handler: DefaultHandler::default(), } diff --git a/zenoh/src/api/subscriber.rs b/zenoh/src/api/subscriber.rs index 0e82a20331..fa2ca30b4f 100644 --- a/zenoh/src/api/subscriber.rs +++ b/zenoh/src/api/subscriber.rs @@ -11,30 +11,26 @@ // Contributors: // ZettaScale Zenoh Team, // - +use super::{ + handlers::{locked, Callback, DefaultHandler, IntoHandler}, + key_expr::KeyExpr, + sample::{Locality, Sample}, + session::{SessionRef, UndeclarableSealed}, + Id, +}; +#[cfg(feature = "unstable")] +use crate::pubsub::Reliability; use std::{ fmt, future::{IntoFuture, Ready}, ops::{Deref, DerefMut}, sync::Arc, }; - use zenoh_core::{Resolvable, Wait}; -use zenoh_protocol::network::declare::subscriber::ext::SubscriberInfo; use zenoh_result::ZResult; #[cfg(feature = "unstable")] use {zenoh_config::wrappers::EntityGlobalId, zenoh_protocol::core::EntityGlobalIdProto}; -use super::{ - handlers::{locked, Callback, DefaultHandler, IntoHandler}, - key_expr::KeyExpr, - sample::{Locality, Sample}, - session::{SessionRef, UndeclarableSealed}, - Id, -}; -#[cfg(feature = "unstable")] -use crate::pubsub::Reliability; - pub(crate) struct SubscriberState { pub(crate) id: Id, pub(crate) remote_id: Id, @@ -200,9 +196,6 @@ pub struct SubscriberBuilder<'a, 'b, Handler> { #[cfg(not(feature = "unstable"))] pub(crate) key_expr: ZResult>, - #[cfg(feature = "unstable")] - pub reliability: Reliability, - #[cfg(feature = "unstable")] pub origin: Locality, #[cfg(not(feature = "unstable"))] @@ -239,16 +232,12 @@ impl<'a, 'b> SubscriberBuilder<'a, 'b, DefaultHandler> { let SubscriberBuilder { session, key_expr, - #[cfg(feature = "unstable")] - reliability, origin, handler: _, } = self; SubscriberBuilder { session, key_expr, - #[cfg(feature = "unstable")] - reliability, origin, handler: callback, } @@ -312,16 +301,12 @@ impl<'a, 'b> SubscriberBuilder<'a, 'b, DefaultHandler> { let SubscriberBuilder { session, key_expr, - #[cfg(feature = "unstable")] - reliability, origin, handler: _, } = self; SubscriberBuilder { session, key_expr, - #[cfg(feature = "unstable")] - reliability, origin, handler, } @@ -330,26 +315,32 @@ impl<'a, 'b> SubscriberBuilder<'a, 'b, DefaultHandler> { impl<'a, 'b, Handler> SubscriberBuilder<'a, 'b, Handler> { /// Change the subscription reliability. - #[inline] - #[zenoh_macros::unstable] + #[deprecated( + since = "1.0.0", + note = "please use `reliability` on `declare_publisher` or `put`" + )] + #[allow(unused_mut, unused_variables)] pub fn reliability(mut self, reliability: Reliability) -> Self { - self.reliability = reliability; self } - /// Change the subscription reliability to `Reliable`. - #[inline] - #[zenoh_macros::unstable] + /// Change the subscription reliability to `Reliable`. + #[deprecated( + since = "1.0.0", + note = "please use `reliability` on `declare_publisher` or `put`" + )] + #[allow(unused_mut)] pub fn reliable(mut self) -> Self { - self.reliability = Reliability::Reliable; self } /// Change the subscription reliability to `BestEffort`. - #[inline] - #[zenoh_macros::unstable] + #[deprecated( + since = "1.0.0", + note = "please use `reliability` on `declare_publisher` or `put`" + )] + #[allow(unused_mut)] pub fn best_effort(mut self) -> Self { - self.reliability = Reliability::BestEffort; self } @@ -382,17 +373,7 @@ where let session = self.session; let (callback, receiver) = self.handler.into_handler(); session - .declare_subscriber_inner( - &key_expr, - self.origin, - callback, - #[cfg(feature = "unstable")] - &SubscriberInfo { - reliability: self.reliability, - }, - #[cfg(not(feature = "unstable"))] - &SubscriberInfo::default(), - ) + .declare_subscriber_inner(&key_expr, self.origin, callback) .map(|sub_state| Subscriber { subscriber: SubscriberInner { session, diff --git a/zenoh/src/net/routing/dispatcher/face.rs b/zenoh/src/net/routing/dispatcher/face.rs index bbc910b124..a29cbc1d76 100644 --- a/zenoh/src/net/routing/dispatcher/face.rs +++ b/zenoh/src/net/routing/dispatcher/face.rs @@ -257,7 +257,7 @@ impl Primitives for Face { &mut self.state.clone(), m.id, &m.wire_expr, - &m.ext_info, + &SubscriberInfo, msg.ext_nodeid.node_id, &mut |p, m| declares.push((p.clone(), m)), ); diff --git a/zenoh/src/net/routing/dispatcher/pubsub.rs b/zenoh/src/net/routing/dispatcher/pubsub.rs index 84c8433a48..2185ddb4c0 100644 --- a/zenoh/src/net/routing/dispatcher/pubsub.rs +++ b/zenoh/src/net/routing/dispatcher/pubsub.rs @@ -17,7 +17,7 @@ use zenoh_core::zread; use zenoh_protocol::{ core::{key_expr::keyexpr, WhatAmI, WireExpr}, network::{ - declare::{ext, subscriber::ext::SubscriberInfo, SubscriberId}, + declare::{ext, SubscriberId}, Push, }, zenoh::PushBody, @@ -33,6 +33,9 @@ use super::{ use crate::key_expr::KeyExpr; use crate::net::routing::hat::{HatTrait, SendDeclare}; +#[derive(Copy, Clone)] +pub(crate) struct SubscriberInfo; + #[allow(clippy::too_many_arguments)] pub(crate) fn declare_subscription( hat_code: &(dyn HatTrait + Send + Sync), diff --git a/zenoh/src/net/routing/dispatcher/resource.rs b/zenoh/src/net/routing/dispatcher/resource.rs index 01ff9b2817..fd1ce5da88 100644 --- a/zenoh/src/net/routing/dispatcher/resource.rs +++ b/zenoh/src/net/routing/dispatcher/resource.rs @@ -23,10 +23,7 @@ use zenoh_config::WhatAmI; use zenoh_protocol::{ core::{key_expr::keyexpr, ExprId, WireExpr}, network::{ - declare::{ - ext, queryable::ext::QueryableInfoType, subscriber::ext::SubscriberInfo, Declare, - DeclareBody, DeclareKeyExpr, - }, + declare::{ext, queryable::ext::QueryableInfoType, Declare, DeclareBody, DeclareKeyExpr}, interest::InterestId, Mapping, RequestId, }, @@ -35,6 +32,7 @@ use zenoh_sync::get_mut_unchecked; use super::{ face::FaceState, + pubsub::SubscriberInfo, tables::{Tables, TablesLock}, }; use crate::net::routing::{dispatcher::face::Face, RoutingContext}; @@ -43,6 +41,7 @@ pub(crate) type NodeId = u16; pub(crate) type Direction = (Arc, WireExpr<'static>, NodeId); pub(crate) type Route = HashMap; + pub(crate) type QueryRoute = HashMap; pub(crate) struct QueryTargetQabl { pub(crate) direction: Direction, diff --git a/zenoh/src/net/routing/hat/client/pubsub.rs b/zenoh/src/net/routing/hat/client/pubsub.rs index 5a19f3549c..cd6338d7b5 100644 --- a/zenoh/src/net/routing/hat/client/pubsub.rs +++ b/zenoh/src/net/routing/hat/client/pubsub.rs @@ -18,10 +18,10 @@ use std::{ }; use zenoh_protocol::{ - core::{key_expr::OwnedKeyExpr, Reliability, WhatAmI}, + core::{key_expr::OwnedKeyExpr, WhatAmI}, network::declare::{ - common::ext::WireExprType, ext, subscriber::ext::SubscriberInfo, Declare, DeclareBody, - DeclareSubscriber, SubscriberId, UndeclareSubscriber, + common::ext::WireExprType, ext, Declare, DeclareBody, DeclareSubscriber, SubscriberId, + UndeclareSubscriber, }, }; use zenoh_sync::get_mut_unchecked; @@ -32,6 +32,7 @@ use crate::{ net::routing::{ dispatcher::{ face::FaceState, + pubsub::SubscriberInfo, resource::{NodeId, Resource, SessionContext}, tables::{Route, RoutingExpr, Tables}, }, @@ -46,7 +47,7 @@ fn propagate_simple_subscription_to( _tables: &mut Tables, dst_face: &mut Arc, res: &Arc, - sub_info: &SubscriberInfo, + _sub_info: &SubscriberInfo, src_face: &mut Arc, send_declare: &mut SendDeclare, ) { @@ -68,7 +69,6 @@ fn propagate_simple_subscription_to( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id, wire_expr: key_expr, - ext_info: *sub_info, }), }, res.expr(), @@ -155,7 +155,6 @@ fn declare_simple_subscription( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id: 0, // @TODO use proper SubscriberId wire_expr: res.expr().into(), - ext_info: *sub_info, }), }, res.expr(), @@ -262,9 +261,7 @@ pub(super) fn pubsub_new_face( face: &mut Arc, send_declare: &mut SendDeclare, ) { - let sub_info = SubscriberInfo { - reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers - }; + let sub_info = SubscriberInfo; for src_face in tables .faces .values() diff --git a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs index f1412ec807..53cacba95c 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs @@ -19,11 +19,11 @@ use std::{ use petgraph::graph::NodeIndex; use zenoh_protocol::{ - core::{key_expr::OwnedKeyExpr, Reliability, WhatAmI, ZenohIdProto}, + core::{key_expr::OwnedKeyExpr, WhatAmI, ZenohIdProto}, network::{ declare::{ - common::ext::WireExprType, ext, subscriber::ext::SubscriberInfo, Declare, DeclareBody, - DeclareSubscriber, SubscriberId, UndeclareSubscriber, + common::ext::WireExprType, ext, Declare, DeclareBody, DeclareSubscriber, SubscriberId, + UndeclareSubscriber, }, interest::{InterestId, InterestMode, InterestOptions}, }, @@ -39,7 +39,7 @@ use crate::key_expr::KeyExpr; use crate::net::routing::{ dispatcher::{ face::FaceState, - pubsub::*, + pubsub::{update_data_routes_from, update_matches_data_routes, SubscriberInfo}, resource::{NodeId, Resource, SessionContext}, tables::{Route, RoutingExpr, Tables}, }, @@ -55,7 +55,7 @@ fn send_sourced_subscription_to_net_children( children: &[NodeIndex], res: &Arc, src_face: Option<&Arc>, - sub_info: &SubscriberInfo, + _sub_info: &SubscriberInfo, routing_context: NodeId, ) { for child in children { @@ -80,7 +80,6 @@ fn send_sourced_subscription_to_net_children( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id: 0, // Sourced subscriptions do not use ids wire_expr: key_expr, - ext_info: *sub_info, }), }, res.expr(), @@ -98,7 +97,7 @@ fn propagate_simple_subscription_to( _tables: &mut Tables, dst_face: &mut Arc, res: &Arc, - sub_info: &SubscriberInfo, + _sub_info: &SubscriberInfo, src_face: &mut Arc, send_declare: &mut SendDeclare, ) { @@ -121,7 +120,6 @@ fn propagate_simple_subscription_to( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id, wire_expr: key_expr, - ext_info: *sub_info, }), }, res.expr(), @@ -159,7 +157,6 @@ fn propagate_simple_subscription_to( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id, wire_expr: key_expr, - ext_info: *sub_info, }), }, res.expr(), @@ -654,9 +651,7 @@ pub(super) fn pubsub_tree_change(tables: &mut Tables, new_children: &[Vec, res: &Arc, - sub_info: &SubscriberInfo, + _sub_info: &SubscriberInfo, src_face: &mut Arc, send_declare: &mut SendDeclare, ) { @@ -72,7 +73,6 @@ fn propagate_simple_subscription_to( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id, wire_expr: key_expr, - ext_info: *sub_info, }), }, res.expr(), @@ -110,7 +110,6 @@ fn propagate_simple_subscription_to( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id, wire_expr: key_expr, - ext_info: *sub_info, }), }, res.expr(), @@ -200,7 +199,6 @@ fn declare_simple_subscription( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id: 0, // @TODO use proper SubscriberId wire_expr: res.expr().into(), - ext_info: *sub_info, }), }, res.expr(), @@ -376,9 +374,7 @@ pub(super) fn pubsub_new_face( send_declare: &mut SendDeclare, ) { if face.whatami != WhatAmI::Client { - let sub_info = SubscriberInfo { - reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers - }; + let sub_info = SubscriberInfo; for src_face in tables .faces .values() @@ -414,9 +410,6 @@ pub(super) fn declare_sub_interest( ) { if mode.current() && face.whatami == WhatAmI::Client { let interest_id = (!mode.future()).then_some(id); - let sub_info = SubscriberInfo { - reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers - }; if let Some(res) = res.as_ref() { if aggregate { if tables.faces.values().any(|src_face| { @@ -445,7 +438,6 @@ pub(super) fn declare_sub_interest( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id, wire_expr, - ext_info: sub_info, }), }, res.expr(), @@ -480,11 +472,7 @@ pub(super) fn declare_sub_interest( ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareSubscriber( - DeclareSubscriber { - id, - wire_expr, - ext_info: sub_info, - }, + DeclareSubscriber { id, wire_expr }, ), }, sub.expr(), @@ -524,7 +512,6 @@ pub(super) fn declare_sub_interest( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id, wire_expr, - ext_info: sub_info, }), }, sub.expr(), diff --git a/zenoh/src/net/routing/hat/router/pubsub.rs b/zenoh/src/net/routing/hat/router/pubsub.rs index cc0251a07a..69ad2c495f 100644 --- a/zenoh/src/net/routing/hat/router/pubsub.rs +++ b/zenoh/src/net/routing/hat/router/pubsub.rs @@ -19,11 +19,11 @@ use std::{ use petgraph::graph::NodeIndex; use zenoh_protocol::{ - core::{key_expr::OwnedKeyExpr, Reliability, WhatAmI, ZenohIdProto}, + core::{key_expr::OwnedKeyExpr, WhatAmI, ZenohIdProto}, network::{ declare::{ - common::ext::WireExprType, ext, subscriber::ext::SubscriberInfo, Declare, DeclareBody, - DeclareSubscriber, SubscriberId, UndeclareSubscriber, + common::ext::WireExprType, ext, Declare, DeclareBody, DeclareSubscriber, SubscriberId, + UndeclareSubscriber, }, interest::{InterestId, InterestMode, InterestOptions}, }, @@ -40,7 +40,7 @@ use crate::key_expr::KeyExpr; use crate::net::routing::{ dispatcher::{ face::FaceState, - pubsub::*, + pubsub::{update_data_routes_from, update_matches_data_routes, SubscriberInfo}, resource::{NodeId, Resource, SessionContext}, tables::{Route, RoutingExpr, Tables}, }, @@ -56,7 +56,7 @@ fn send_sourced_subscription_to_net_children( children: &[NodeIndex], res: &Arc, src_face: Option<&Arc>, - sub_info: &SubscriberInfo, + _sub_info: &SubscriberInfo, routing_context: NodeId, ) { for child in children { @@ -81,7 +81,6 @@ fn send_sourced_subscription_to_net_children( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id: 0, // Sourced subscriptions do not use ids wire_expr: key_expr, - ext_info: *sub_info, }), }, res.expr(), @@ -99,7 +98,7 @@ fn propagate_simple_subscription_to( tables: &mut Tables, dst_face: &mut Arc, res: &Arc, - sub_info: &SubscriberInfo, + _sub_info: &SubscriberInfo, src_face: &mut Arc, full_peer_net: bool, send_declare: &mut SendDeclare, @@ -144,7 +143,6 @@ fn propagate_simple_subscription_to( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id, wire_expr: key_expr, - ext_info: *sub_info, }), }, res.expr(), @@ -816,9 +814,7 @@ pub(super) fn pubsub_tree_change( }; for sub in subs { if *sub == tree_id { - let sub_info = SubscriberInfo { - reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers - }; + let sub_info = SubscriberInfo; send_sourced_subscription_to_net_children( tables, net, @@ -899,9 +895,6 @@ pub(super) fn pubsub_linkstate_change( .insert(res.clone(), id); let push_declaration = push_declaration_profile(tables, &dst_face); let key_expr = Resource::decl_key(res, &mut dst_face, push_declaration); - let sub_info = SubscriberInfo { - reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers - }; send_declare( &dst_face.primitives, RoutingContext::with_expr( @@ -913,7 +906,6 @@ pub(super) fn pubsub_linkstate_change( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id, wire_expr: key_expr, - ext_info: sub_info, }), }, res.expr(), @@ -938,9 +930,6 @@ pub(crate) fn declare_sub_interest( ) { if mode.current() { let interest_id = (!mode.future()).then_some(id); - let sub_info = SubscriberInfo { - reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers - }; if let Some(res) = res.as_ref() { if aggregate { if hat!(tables).router_subs.iter().any(|sub| { @@ -970,7 +959,6 @@ pub(crate) fn declare_sub_interest( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id, wire_expr, - ext_info: sub_info, }), }, res.expr(), @@ -1016,7 +1004,6 @@ pub(crate) fn declare_sub_interest( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id, wire_expr, - ext_info: sub_info, }), }, sub.expr(), @@ -1060,7 +1047,6 @@ pub(crate) fn declare_sub_interest( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id, wire_expr, - ext_info: sub_info, }), }, sub.expr(), diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index ce87d68ef0..5ba52e08c8 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -28,9 +28,7 @@ use zenoh_protocol::core::key_expr::keyexpr; use zenoh_protocol::{ core::{key_expr::OwnedKeyExpr, ExprId, WireExpr, EMPTY_EXPR_ID}, network::{ - declare::{ - queryable::ext::QueryableInfoType, subscriber::ext::SubscriberInfo, QueryableId, - }, + declare::{queryable::ext::QueryableInfoType, QueryableId}, ext, Declare, DeclareBody, DeclareQueryable, DeclareSubscriber, Interest, Push, Request, Response, ResponseFinal, }, @@ -336,7 +334,6 @@ impl AdminSpace { body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id: runtime.next_id(), wire_expr: [&root_key, "/config/**"].concat().into(), - ext_info: SubscriberInfo::DEFAULT, }), }); } diff --git a/zenoh/src/net/tests/tables.rs b/zenoh/src/net/tests/tables.rs index 5fd8a49261..ee58acec04 100644 --- a/zenoh/src/net/tests/tables.rs +++ b/zenoh/src/net/tests/tables.rs @@ -21,20 +21,18 @@ use zenoh_buffers::ZBuf; use zenoh_config::Config; use zenoh_core::zlock; use zenoh_protocol::{ - core::{ - key_expr::keyexpr, Encoding, ExprId, Reliability, WhatAmI, WireExpr, ZenohIdProto, - EMPTY_EXPR_ID, - }, - network::{ - declare::subscriber::ext::SubscriberInfo, ext, Declare, DeclareBody, DeclareKeyExpr, - }, + core::{key_expr::keyexpr, Encoding, ExprId, WhatAmI, WireExpr, ZenohIdProto, EMPTY_EXPR_ID}, + network::{ext, Declare, DeclareBody, DeclareKeyExpr}, zenoh::{PushBody, Put}, }; use crate::net::{ primitives::{DummyPrimitives, EPrimitives, Primitives}, routing::{ - dispatcher::tables::{self, Tables}, + dispatcher::{ + pubsub::SubscriberInfo, + tables::{self, Tables}, + }, router::*, RoutingContext, }, @@ -67,9 +65,7 @@ fn base_test() { &"one/deux/trois".into(), ); - let sub_info = SubscriberInfo { - reliability: Reliability::Reliable, - }; + let sub_info = SubscriberInfo; declare_subscription( zlock!(tables.ctrl_lock).as_ref(), @@ -194,9 +190,7 @@ fn multisub_test() { assert!(face0.upgrade().is_some()); // -------------- - let sub_info = SubscriberInfo { - reliability: Reliability::Reliable, - }; + let sub_info = SubscriberInfo; declare_subscription( zlock!(tables.ctrl_lock).as_ref(), &tables, @@ -316,9 +310,7 @@ async fn clean_test() { let res1 = optres1.unwrap(); assert!(res1.upgrade().is_some()); - let sub_info = SubscriberInfo { - reliability: Reliability::Reliable, - }; + let sub_info = SubscriberInfo; declare_subscription( zlock!(tables.ctrl_lock).as_ref(), @@ -594,9 +586,7 @@ fn client_test() { .unwrap(); let tables = router.tables.clone(); - let sub_info = SubscriberInfo { - reliability: Reliability::Reliable, - }; + let sub_info = SubscriberInfo; let primitives0 = Arc::new(ClientPrimitives::new()); let face0 = Arc::downgrade(&router.new_primitives(primitives0.clone()).state); From 2c74733080cf63a1952aa26045ea408fd8a456f1 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Tue, 3 Sep 2024 16:48:51 +0200 Subject: [PATCH 2/4] Precommit --- zenoh/src/api/subscriber.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/zenoh/src/api/subscriber.rs b/zenoh/src/api/subscriber.rs index fa2ca30b4f..47cd44cd60 100644 --- a/zenoh/src/api/subscriber.rs +++ b/zenoh/src/api/subscriber.rs @@ -11,26 +11,28 @@ // Contributors: // ZettaScale Zenoh Team, // -use super::{ - handlers::{locked, Callback, DefaultHandler, IntoHandler}, - key_expr::KeyExpr, - sample::{Locality, Sample}, - session::{SessionRef, UndeclarableSealed}, - Id, -}; -#[cfg(feature = "unstable")] -use crate::pubsub::Reliability; use std::{ fmt, future::{IntoFuture, Ready}, ops::{Deref, DerefMut}, sync::Arc, }; + use zenoh_core::{Resolvable, Wait}; use zenoh_result::ZResult; #[cfg(feature = "unstable")] use {zenoh_config::wrappers::EntityGlobalId, zenoh_protocol::core::EntityGlobalIdProto}; +use super::{ + handlers::{locked, Callback, DefaultHandler, IntoHandler}, + key_expr::KeyExpr, + sample::{Locality, Sample}, + session::{SessionRef, UndeclarableSealed}, + Id, +}; +#[cfg(feature = "unstable")] +use crate::pubsub::Reliability; + pub(crate) struct SubscriberState { pub(crate) id: Id, pub(crate) remote_id: Id, From 580fc3ac62539af51735ec7c0501005e2d0869f1 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Tue, 3 Sep 2024 17:54:42 +0200 Subject: [PATCH 3/4] Reintroduce unstable feature for realibility --- zenoh-ext/src/querying_subscriber.rs | 14 ++++++++++---- zenoh/src/api/subscriber.rs | 3 +++ 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/zenoh-ext/src/querying_subscriber.rs b/zenoh-ext/src/querying_subscriber.rs index d904eea194..c61e4b8c42 100644 --- a/zenoh-ext/src/querying_subscriber.rs +++ b/zenoh-ext/src/querying_subscriber.rs @@ -11,6 +11,7 @@ // Contributors: // ZettaScale Zenoh Team, // +use crate::ExtractSample; use std::{ collections::{btree_map, BTreeMap, VecDeque}, convert::TryInto, @@ -19,13 +20,14 @@ use std::{ sync::{Arc, Mutex}, time::{Duration, SystemTime, UNIX_EPOCH}, }; - +#[cfg(feature = "unstable")] +use zenoh::pubsub::Reliability; use zenoh::{ handlers::{locked, DefaultHandler, IntoHandler}, internal::zlock, key_expr::KeyExpr, prelude::Wait, - pubsub::{Reliability, Subscriber}, + pubsub::Subscriber, query::{QueryConsolidation, QueryTarget, ReplyKeyExpr, Selector}, sample::{Locality, Sample, SampleBuilder, TimestampBuilderTrait}, session::{SessionDeclarations, SessionRef}, @@ -33,8 +35,6 @@ use zenoh::{ Error, Resolvable, Resolve, Result as ZResult, }; -use crate::ExtractSample; - /// The builder of [`FetchingSubscriber`], allowing to configure it. #[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] pub struct QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler> { @@ -140,6 +140,7 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle impl<'a, 'b, Handler> QueryingSubscriberBuilder<'a, 'b, crate::UserSpace, Handler> { /// Change the subscription reliability. + #[cfg(feature = "unstable")] #[deprecated( since = "1.0.0", note = "please use `reliability` on `declare_publisher` or `put`" @@ -150,6 +151,7 @@ impl<'a, 'b, Handler> QueryingSubscriberBuilder<'a, 'b, crate::UserSpace, Handle } /// Change the subscription reliability to Reliable. + #[cfg(feature = "unstable")] #[deprecated( since = "1.0.0", note = "please use `reliability` on `declare_publisher` or `put`" @@ -160,6 +162,7 @@ impl<'a, 'b, Handler> QueryingSubscriberBuilder<'a, 'b, crate::UserSpace, Handle } /// Change the subscription reliability to BestEffort. + #[cfg(feature = "unstable")] #[deprecated( since = "1.0.0", note = "please use `reliability` on `declare_publisher` or `put`" @@ -503,6 +506,7 @@ where TryIntoSample: ExtractSample, { /// Change the subscription reliability. + #[cfg(feature = "unstable")] #[deprecated( since = "1.0.0", note = "please use `reliability` on `declare_publisher` or `put`" @@ -513,6 +517,7 @@ where } /// Change the subscription reliability to Reliable. + #[cfg(feature = "unstable")] #[deprecated( since = "1.0.0", note = "please use `reliability` on `declare_publisher` or `put`" @@ -523,6 +528,7 @@ where } /// Change the subscription reliability to BestEffort. + #[cfg(feature = "unstable")] #[deprecated( since = "1.0.0", note = "please use `reliability` on `declare_publisher` or `put`" diff --git a/zenoh/src/api/subscriber.rs b/zenoh/src/api/subscriber.rs index 47cd44cd60..74f7605d8b 100644 --- a/zenoh/src/api/subscriber.rs +++ b/zenoh/src/api/subscriber.rs @@ -317,6 +317,7 @@ impl<'a, 'b> SubscriberBuilder<'a, 'b, DefaultHandler> { impl<'a, 'b, Handler> SubscriberBuilder<'a, 'b, Handler> { /// Change the subscription reliability. + #[cfg(feature = "unstable")] #[deprecated( since = "1.0.0", note = "please use `reliability` on `declare_publisher` or `put`" @@ -327,6 +328,7 @@ impl<'a, 'b, Handler> SubscriberBuilder<'a, 'b, Handler> { } /// Change the subscription reliability to `Reliable`. + #[cfg(feature = "unstable")] #[deprecated( since = "1.0.0", note = "please use `reliability` on `declare_publisher` or `put`" @@ -337,6 +339,7 @@ impl<'a, 'b, Handler> SubscriberBuilder<'a, 'b, Handler> { } /// Change the subscription reliability to `BestEffort`. + #[cfg(feature = "unstable")] #[deprecated( since = "1.0.0", note = "please use `reliability` on `declare_publisher` or `put`" From fc21b2f325bb573eec2bbf12bc5bcea224032cdd Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Wed, 4 Sep 2024 08:46:52 +0200 Subject: [PATCH 4/4] Precommit --- zenoh-ext/src/querying_subscriber.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/zenoh-ext/src/querying_subscriber.rs b/zenoh-ext/src/querying_subscriber.rs index c61e4b8c42..c8969076e0 100644 --- a/zenoh-ext/src/querying_subscriber.rs +++ b/zenoh-ext/src/querying_subscriber.rs @@ -11,7 +11,6 @@ // Contributors: // ZettaScale Zenoh Team, // -use crate::ExtractSample; use std::{ collections::{btree_map, BTreeMap, VecDeque}, convert::TryInto, @@ -20,6 +19,7 @@ use std::{ sync::{Arc, Mutex}, time::{Duration, SystemTime, UNIX_EPOCH}, }; + #[cfg(feature = "unstable")] use zenoh::pubsub::Reliability; use zenoh::{ @@ -35,6 +35,8 @@ use zenoh::{ Error, Resolvable, Resolve, Result as ZResult, }; +use crate::ExtractSample; + /// The builder of [`FetchingSubscriber`], allowing to configure it. #[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] pub struct QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler> {