diff --git a/commons/zenoh-codec/src/network/declare.rs b/commons/zenoh-codec/src/network/declare.rs index 7c3b797d5d..3eab8564ff 100644 --- a/commons/zenoh-codec/src/network/declare.rs +++ b/commons/zenoh-codec/src/network/declare.rs @@ -19,7 +19,7 @@ use zenoh_buffers::{ ZBuf, }; use zenoh_protocol::{ - common::{iext, imsg, ZExtZ64}, + common::{iext, imsg}, core::{ExprId, ExprLen, WireExpr}, network::{ declare::{self, common, keyexpr, queryable, subscriber, token, Declare, DeclareBody}, @@ -384,9 +384,6 @@ where } } -// SubscriberInfo -crate::impl_zextz64!(subscriber::ext::SubscriberInfo, subscriber::ext::Info::ID); - // DeclareSubscriber impl WCodec<&subscriber::DeclareSubscriber, &mut W> for Zenoh080 where @@ -395,18 +392,10 @@ where type Output = Result<(), DidntWrite>; fn write(self, writer: &mut W, x: &subscriber::DeclareSubscriber) -> Self::Output { - let subscriber::DeclareSubscriber { - id, - wire_expr, - ext_info, - } = x; + let subscriber::DeclareSubscriber { id, wire_expr } = x; // Header let mut header = declare::id::D_SUBSCRIBER; - let mut n_exts = (ext_info != &subscriber::ext::SubscriberInfo::DEFAULT) as u8; - if n_exts != 0 { - header |= subscriber::flag::Z; - } if wire_expr.mapping != Mapping::DEFAULT { header |= subscriber::flag::M; } @@ -420,10 +409,6 @@ where self.write(&mut *writer, wire_expr)?; // Extensions - if ext_info != &subscriber::ext::SubscriberInfo::DEFAULT { - n_exts -= 1; - self.write(&mut *writer, (*ext_info, n_exts != 0))?; - } Ok(()) } @@ -465,30 +450,13 @@ where }; // Extensions - let mut ext_info = subscriber::ext::SubscriberInfo::DEFAULT; - let mut has_ext = imsg::has_flag(self.header, subscriber::flag::Z); while has_ext { let ext: u8 = self.codec.read(&mut *reader)?; - let eodec = Zenoh080Header::new(ext); - match iext::eid(ext) { - subscriber::ext::Info::ID => { - let (i, ext): (subscriber::ext::SubscriberInfo, bool) = - eodec.read(&mut *reader)?; - ext_info = i; - has_ext = ext; - } - _ => { - has_ext = extension::skip(reader, "DeclareSubscriber", ext)?; - } - } + has_ext = extension::skip(reader, "DeclareSubscriber", ext)?; } - Ok(subscriber::DeclareSubscriber { - id, - wire_expr, - ext_info, - }) + Ok(subscriber::DeclareSubscriber { id, wire_expr }) } } diff --git a/commons/zenoh-protocol/src/network/declare.rs b/commons/zenoh-protocol/src/network/declare.rs index 8f31e0ff2a..6efd69d31f 100644 --- a/commons/zenoh-protocol/src/network/declare.rs +++ b/commons/zenoh-protocol/src/network/declare.rs @@ -20,8 +20,8 @@ pub use subscriber::*; pub use token::*; use crate::{ - common::{imsg, ZExtZ64, ZExtZBuf}, - core::{ExprId, Reliability, WireExpr}, + common::{ZExtZ64, ZExtZBuf}, + core::{ExprId, WireExpr}, network::Mapping, zextz64, zextzbuf, }; @@ -341,73 +341,6 @@ pub mod subscriber { pub struct DeclareSubscriber { pub id: SubscriberId, pub wire_expr: WireExpr<'static>, - pub ext_info: ext::SubscriberInfo, - } - - pub mod ext { - use super::*; - - pub type Info = zextz64!(0x01, false); - - /// # The subscription mode. - /// - /// ```text - /// 7 6 5 4 3 2 1 0 - /// +-+-+-+-+-+-+-+-+ - /// |Z|0_1| ID | - /// +-+-+-+---------+ - /// % reserved |R% - /// +---------------+ - /// - /// - if R==1 then the subscription is reliable, else it is best effort - /// - rsv: Reserved - /// ``` - #[derive(Debug, Clone, Copy, PartialEq, Eq)] - pub struct SubscriberInfo { - pub reliability: Reliability, - } - - impl SubscriberInfo { - pub const R: u64 = 1; - - pub const DEFAULT: Self = Self { - reliability: Reliability::DEFAULT, - }; - - #[cfg(feature = "test")] - pub fn rand() -> Self { - let reliability = Reliability::rand(); - - Self { reliability } - } - } - - impl Default for SubscriberInfo { - fn default() -> Self { - Self::DEFAULT - } - } - - impl From for SubscriberInfo { - fn from(ext: Info) -> Self { - let reliability = if imsg::has_option(ext.value, SubscriberInfo::R) { - Reliability::Reliable - } else { - Reliability::BestEffort - }; - Self { reliability } - } - } - - impl From for Info { - fn from(ext: SubscriberInfo) -> Self { - let mut v: u64 = 0; - if ext.reliability == Reliability::Reliable { - v |= SubscriberInfo::R; - } - Info::new(v) - } - } } impl DeclareSubscriber { @@ -418,13 +351,8 @@ pub mod subscriber { let id: SubscriberId = rng.gen(); let wire_expr = WireExpr::rand(); - let ext_info = ext::SubscriberInfo::rand(); - Self { - id, - wire_expr, - ext_info, - } + Self { id, wire_expr } } } diff --git a/zenoh-ext/src/querying_subscriber.rs b/zenoh-ext/src/querying_subscriber.rs index e3ab70c1c6..813d2963db 100644 --- a/zenoh-ext/src/querying_subscriber.rs +++ b/zenoh-ext/src/querying_subscriber.rs @@ -20,12 +20,14 @@ use std::{ time::{Duration, SystemTime, UNIX_EPOCH}, }; +#[cfg(feature = "unstable")] +use zenoh::pubsub::Reliability; use zenoh::{ handlers::{locked, DefaultHandler, IntoHandler}, internal::zlock, key_expr::KeyExpr, prelude::Wait, - pubsub::{Reliability, Subscriber}, + pubsub::Subscriber, query::{QueryConsolidation, QueryTarget, ReplyKeyExpr, Selector}, sample::{Locality, Sample, SampleBuilder}, session::{SessionDeclarations, SessionRef}, @@ -41,7 +43,6 @@ pub struct QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler> { pub(crate) session: SessionRef<'a>, pub(crate) key_expr: ZResult>, pub(crate) key_space: KeySpace, - pub(crate) reliability: Reliability, pub(crate) origin: Locality, pub(crate) query_selector: Option>>, pub(crate) query_target: QueryTarget, @@ -65,7 +66,6 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle session, key_expr, key_space, - reliability, origin, query_selector, query_target, @@ -78,7 +78,6 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle session, key_expr, key_space, - reliability, origin, query_selector, query_target, @@ -118,7 +117,6 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle session, key_expr, key_space, - reliability, origin, query_selector, query_target, @@ -131,7 +129,6 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle session, key_expr, key_space, - reliability, origin, query_selector, query_target, @@ -145,23 +142,35 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle impl<'a, 'b, Handler> QueryingSubscriberBuilder<'a, 'b, crate::UserSpace, Handler> { /// Change the subscription reliability. - #[inline] + #[cfg(feature = "unstable")] + #[deprecated( + since = "1.0.0", + note = "please use `reliability` on `declare_publisher` or `put`" + )] + #[allow(unused_mut, unused_variables)] pub fn reliability(mut self, reliability: Reliability) -> Self { - self.reliability = reliability; self } /// Change the subscription reliability to Reliable. - #[inline] + #[cfg(feature = "unstable")] + #[deprecated( + since = "1.0.0", + note = "please use `reliability` on `declare_publisher` or `put`" + )] + #[allow(unused_mut)] pub fn reliable(mut self) -> Self { - self.reliability = Reliability::Reliable; self } /// Change the subscription reliability to BestEffort. - #[inline] + #[cfg(feature = "unstable")] + #[deprecated( + since = "1.0.0", + note = "please use `reliability` on `declare_publisher` or `put`" + )] + #[allow(unused_mut)] pub fn best_effort(mut self) -> Self { - self.reliability = Reliability::BestEffort; self } @@ -249,7 +258,6 @@ where session: self.session, key_expr: Ok(key_expr.clone()), key_space: self.key_space, - reliability: self.reliability, origin: self.origin, fetch: |cb| match key_space { crate::KeySpace::User => match query_selector { @@ -365,7 +373,6 @@ pub struct FetchingSubscriberBuilder< pub(crate) session: SessionRef<'a>, pub(crate) key_expr: ZResult>, pub(crate) key_space: KeySpace, - pub(crate) reliability: Reliability, pub(crate) origin: Locality, pub(crate) fetch: Fetch, pub(crate) handler: Handler, @@ -390,7 +397,6 @@ where session: self.session, key_expr: self.key_expr.map(|s| s.into_owned()), key_space: self.key_space, - reliability: self.reliability, origin: self.origin, fetch: self.fetch, handler: self.handler, @@ -422,7 +428,6 @@ where session, key_expr, key_space, - reliability, origin, fetch, handler: _, @@ -432,7 +437,6 @@ where session, key_expr, key_space, - reliability, origin, fetch, handler: callback, @@ -476,7 +480,6 @@ where session, key_expr, key_space, - reliability, origin, fetch, handler: _, @@ -486,7 +489,6 @@ where session, key_expr, key_space, - reliability, origin, fetch, handler, @@ -506,23 +508,35 @@ where TryIntoSample: ExtractSample, { /// Change the subscription reliability. - #[inline] + #[cfg(feature = "unstable")] + #[deprecated( + since = "1.0.0", + note = "please use `reliability` on `declare_publisher` or `put`" + )] + #[allow(unused_mut, unused_variables)] pub fn reliability(mut self, reliability: Reliability) -> Self { - self.reliability = reliability; self } /// Change the subscription reliability to Reliable. - #[inline] + #[cfg(feature = "unstable")] + #[deprecated( + since = "1.0.0", + note = "please use `reliability` on `declare_publisher` or `put`" + )] + #[allow(unused_mut)] pub fn reliable(mut self) -> Self { - self.reliability = Reliability::Reliable; self } /// Change the subscription reliability to BestEffort. - #[inline] + #[cfg(feature = "unstable")] + #[deprecated( + since = "1.0.0", + note = "please use `reliability` on `declare_publisher` or `put`" + )] + #[allow(unused_mut)] pub fn best_effort(mut self) -> Self { - self.reliability = Reliability::BestEffort; self } @@ -698,7 +712,6 @@ impl<'a, Handler> FetchingSubscriber<'a, Handler> { .session .declare_subscriber(&key_expr) .callback(sub_callback) - .reliability(conf.reliability) .allowed_origin(conf.origin) .wait()?, crate::KeySpace::Liveliness => conf diff --git a/zenoh-ext/src/subscriber_ext.rs b/zenoh-ext/src/subscriber_ext.rs index a7356f86dc..cfd810185a 100644 --- a/zenoh-ext/src/subscriber_ext.rs +++ b/zenoh-ext/src/subscriber_ext.rs @@ -17,7 +17,7 @@ use flume::r#async::RecvStream; use futures::stream::{Forward, Map}; use zenoh::{ liveliness::LivelinessSubscriberBuilder, - pubsub::{Reliability, Subscriber, SubscriberBuilder}, + pubsub::{Subscriber, SubscriberBuilder}, query::{QueryConsolidation, QueryTarget, ReplyKeyExpr}, sample::{Locality, Sample}, Result as ZResult, @@ -171,7 +171,6 @@ impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> for SubscriberBuilde session: self.session, key_expr: self.key_expr, key_space: crate::UserSpace, - reliability: self.reliability, origin: self.origin, fetch, handler: self.handler, @@ -213,7 +212,6 @@ impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> for SubscriberBuilde session: self.session, key_expr: self.key_expr, key_space: crate::UserSpace, - reliability: self.reliability, origin: self.origin, query_selector: None, // By default query all matching publication caches and storages @@ -283,7 +281,6 @@ impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> session: self.session, key_expr: self.key_expr, key_space: crate::LivelinessSpace, - reliability: Reliability::DEFAULT, origin: Locality::default(), fetch, handler: self.handler, @@ -327,7 +324,6 @@ impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> session: self.session, key_expr: self.key_expr, key_space: crate::LivelinessSpace, - reliability: Reliability::DEFAULT, origin: Locality::default(), query_selector: None, query_target: QueryTarget::DEFAULT, diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 72ad150a11..2e059710c9 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -45,9 +45,9 @@ use zenoh_protocol::{ network::{ self, declare::{ - self, common::ext::WireExprType, queryable::ext::QueryableInfoType, - subscriber::ext::SubscriberInfo, Declare, DeclareBody, DeclareKeyExpr, - DeclareQueryable, DeclareSubscriber, UndeclareQueryable, UndeclareSubscriber, + self, common::ext::WireExprType, queryable::ext::QueryableInfoType, Declare, + DeclareBody, DeclareKeyExpr, DeclareQueryable, DeclareSubscriber, UndeclareQueryable, + UndeclareSubscriber, }, interest::{InterestMode, InterestOptions}, request::{self, ext::TargetType}, @@ -377,8 +377,6 @@ impl<'s, 'a> SessionDeclarations<'s, 'a> for SessionRef<'a> { SubscriberBuilder { session: self.clone(), key_expr: TryIntoKeyExpr::try_into(key_expr).map_err(Into::into), - #[cfg(feature = "unstable")] - reliability: Reliability::DEFAULT, origin: Locality::default(), handler: DefaultHandler::default(), } @@ -1128,7 +1126,6 @@ impl Session { key_expr: &KeyExpr, origin: Locality, callback: Callback<'static, Sample>, - info: &SubscriberInfo, ) -> ZResult> { let mut state = zwrite!(self.state); tracing::trace!("declare_subscriber({:?})", key_expr); @@ -1241,7 +1238,6 @@ impl Session { body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id, wire_expr: key_expr.to_wire(self).to_owned(), - ext_info: *info, }), }); @@ -2055,8 +2051,6 @@ impl<'s> SessionDeclarations<'s, 'static> for Arc { SubscriberBuilder { session: SessionRef::Shared(self.clone()), key_expr: key_expr.try_into().map_err(Into::into), - #[cfg(feature = "unstable")] - reliability: Reliability::DEFAULT, origin: Locality::default(), handler: DefaultHandler::default(), } diff --git a/zenoh/src/api/subscriber.rs b/zenoh/src/api/subscriber.rs index 0e82a20331..74f7605d8b 100644 --- a/zenoh/src/api/subscriber.rs +++ b/zenoh/src/api/subscriber.rs @@ -11,7 +11,6 @@ // Contributors: // ZettaScale Zenoh Team, // - use std::{ fmt, future::{IntoFuture, Ready}, @@ -20,7 +19,6 @@ use std::{ }; use zenoh_core::{Resolvable, Wait}; -use zenoh_protocol::network::declare::subscriber::ext::SubscriberInfo; use zenoh_result::ZResult; #[cfg(feature = "unstable")] use {zenoh_config::wrappers::EntityGlobalId, zenoh_protocol::core::EntityGlobalIdProto}; @@ -200,9 +198,6 @@ pub struct SubscriberBuilder<'a, 'b, Handler> { #[cfg(not(feature = "unstable"))] pub(crate) key_expr: ZResult>, - #[cfg(feature = "unstable")] - pub reliability: Reliability, - #[cfg(feature = "unstable")] pub origin: Locality, #[cfg(not(feature = "unstable"))] @@ -239,16 +234,12 @@ impl<'a, 'b> SubscriberBuilder<'a, 'b, DefaultHandler> { let SubscriberBuilder { session, key_expr, - #[cfg(feature = "unstable")] - reliability, origin, handler: _, } = self; SubscriberBuilder { session, key_expr, - #[cfg(feature = "unstable")] - reliability, origin, handler: callback, } @@ -312,16 +303,12 @@ impl<'a, 'b> SubscriberBuilder<'a, 'b, DefaultHandler> { let SubscriberBuilder { session, key_expr, - #[cfg(feature = "unstable")] - reliability, origin, handler: _, } = self; SubscriberBuilder { session, key_expr, - #[cfg(feature = "unstable")] - reliability, origin, handler, } @@ -330,26 +317,35 @@ impl<'a, 'b> SubscriberBuilder<'a, 'b, DefaultHandler> { impl<'a, 'b, Handler> SubscriberBuilder<'a, 'b, Handler> { /// Change the subscription reliability. - #[inline] - #[zenoh_macros::unstable] + #[cfg(feature = "unstable")] + #[deprecated( + since = "1.0.0", + note = "please use `reliability` on `declare_publisher` or `put`" + )] + #[allow(unused_mut, unused_variables)] pub fn reliability(mut self, reliability: Reliability) -> Self { - self.reliability = reliability; self } - /// Change the subscription reliability to `Reliable`. - #[inline] - #[zenoh_macros::unstable] + /// Change the subscription reliability to `Reliable`. + #[cfg(feature = "unstable")] + #[deprecated( + since = "1.0.0", + note = "please use `reliability` on `declare_publisher` or `put`" + )] + #[allow(unused_mut)] pub fn reliable(mut self) -> Self { - self.reliability = Reliability::Reliable; self } /// Change the subscription reliability to `BestEffort`. - #[inline] - #[zenoh_macros::unstable] + #[cfg(feature = "unstable")] + #[deprecated( + since = "1.0.0", + note = "please use `reliability` on `declare_publisher` or `put`" + )] + #[allow(unused_mut)] pub fn best_effort(mut self) -> Self { - self.reliability = Reliability::BestEffort; self } @@ -382,17 +378,7 @@ where let session = self.session; let (callback, receiver) = self.handler.into_handler(); session - .declare_subscriber_inner( - &key_expr, - self.origin, - callback, - #[cfg(feature = "unstable")] - &SubscriberInfo { - reliability: self.reliability, - }, - #[cfg(not(feature = "unstable"))] - &SubscriberInfo::default(), - ) + .declare_subscriber_inner(&key_expr, self.origin, callback) .map(|sub_state| Subscriber { subscriber: SubscriberInner { session, diff --git a/zenoh/src/net/routing/dispatcher/face.rs b/zenoh/src/net/routing/dispatcher/face.rs index 0bc14450a7..654f5cf9c3 100644 --- a/zenoh/src/net/routing/dispatcher/face.rs +++ b/zenoh/src/net/routing/dispatcher/face.rs @@ -257,7 +257,7 @@ impl Primitives for Face { &mut self.state.clone(), m.id, &m.wire_expr, - &m.ext_info, + &SubscriberInfo, msg.ext_nodeid.node_id, &mut |p, m| declares.push((p.clone(), m)), ); diff --git a/zenoh/src/net/routing/dispatcher/pubsub.rs b/zenoh/src/net/routing/dispatcher/pubsub.rs index 7334f4f267..b34c77e4ee 100644 --- a/zenoh/src/net/routing/dispatcher/pubsub.rs +++ b/zenoh/src/net/routing/dispatcher/pubsub.rs @@ -17,7 +17,7 @@ use zenoh_core::zread; use zenoh_protocol::{ core::{key_expr::keyexpr, Reliability, WhatAmI, WireExpr}, network::{ - declare::{ext, subscriber::ext::SubscriberInfo, SubscriberId}, + declare::{ext, SubscriberId}, Push, }, zenoh::PushBody, @@ -33,6 +33,9 @@ use super::{ use crate::key_expr::KeyExpr; use crate::net::routing::hat::{HatTrait, SendDeclare}; +#[derive(Copy, Clone)] +pub(crate) struct SubscriberInfo; + #[allow(clippy::too_many_arguments)] pub(crate) fn declare_subscription( hat_code: &(dyn HatTrait + Send + Sync), diff --git a/zenoh/src/net/routing/dispatcher/resource.rs b/zenoh/src/net/routing/dispatcher/resource.rs index 326ea0f02c..595bdd0a8d 100644 --- a/zenoh/src/net/routing/dispatcher/resource.rs +++ b/zenoh/src/net/routing/dispatcher/resource.rs @@ -23,10 +23,7 @@ use zenoh_config::WhatAmI; use zenoh_protocol::{ core::{key_expr::keyexpr, ExprId, WireExpr}, network::{ - declare::{ - ext, queryable::ext::QueryableInfoType, subscriber::ext::SubscriberInfo, Declare, - DeclareBody, DeclareKeyExpr, - }, + declare::{ext, queryable::ext::QueryableInfoType, Declare, DeclareBody, DeclareKeyExpr}, interest::InterestId, Mapping, RequestId, }, @@ -35,6 +32,7 @@ use zenoh_sync::get_mut_unchecked; use super::{ face::FaceState, + pubsub::SubscriberInfo, tables::{Tables, TablesLock}, }; use crate::net::routing::{dispatcher::face::Face, RoutingContext}; @@ -43,6 +41,7 @@ pub(crate) type NodeId = u16; pub(crate) type Direction = (Arc, WireExpr<'static>, NodeId); pub(crate) type Route = HashMap; + pub(crate) type QueryRoute = HashMap; pub(crate) struct QueryTargetQabl { pub(crate) direction: Direction, diff --git a/zenoh/src/net/routing/hat/client/pubsub.rs b/zenoh/src/net/routing/hat/client/pubsub.rs index 5a19f3549c..cd6338d7b5 100644 --- a/zenoh/src/net/routing/hat/client/pubsub.rs +++ b/zenoh/src/net/routing/hat/client/pubsub.rs @@ -18,10 +18,10 @@ use std::{ }; use zenoh_protocol::{ - core::{key_expr::OwnedKeyExpr, Reliability, WhatAmI}, + core::{key_expr::OwnedKeyExpr, WhatAmI}, network::declare::{ - common::ext::WireExprType, ext, subscriber::ext::SubscriberInfo, Declare, DeclareBody, - DeclareSubscriber, SubscriberId, UndeclareSubscriber, + common::ext::WireExprType, ext, Declare, DeclareBody, DeclareSubscriber, SubscriberId, + UndeclareSubscriber, }, }; use zenoh_sync::get_mut_unchecked; @@ -32,6 +32,7 @@ use crate::{ net::routing::{ dispatcher::{ face::FaceState, + pubsub::SubscriberInfo, resource::{NodeId, Resource, SessionContext}, tables::{Route, RoutingExpr, Tables}, }, @@ -46,7 +47,7 @@ fn propagate_simple_subscription_to( _tables: &mut Tables, dst_face: &mut Arc, res: &Arc, - sub_info: &SubscriberInfo, + _sub_info: &SubscriberInfo, src_face: &mut Arc, send_declare: &mut SendDeclare, ) { @@ -68,7 +69,6 @@ fn propagate_simple_subscription_to( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id, wire_expr: key_expr, - ext_info: *sub_info, }), }, res.expr(), @@ -155,7 +155,6 @@ fn declare_simple_subscription( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id: 0, // @TODO use proper SubscriberId wire_expr: res.expr().into(), - ext_info: *sub_info, }), }, res.expr(), @@ -262,9 +261,7 @@ pub(super) fn pubsub_new_face( face: &mut Arc, send_declare: &mut SendDeclare, ) { - let sub_info = SubscriberInfo { - reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers - }; + let sub_info = SubscriberInfo; for src_face in tables .faces .values() diff --git a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs index f1412ec807..53cacba95c 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs @@ -19,11 +19,11 @@ use std::{ use petgraph::graph::NodeIndex; use zenoh_protocol::{ - core::{key_expr::OwnedKeyExpr, Reliability, WhatAmI, ZenohIdProto}, + core::{key_expr::OwnedKeyExpr, WhatAmI, ZenohIdProto}, network::{ declare::{ - common::ext::WireExprType, ext, subscriber::ext::SubscriberInfo, Declare, DeclareBody, - DeclareSubscriber, SubscriberId, UndeclareSubscriber, + common::ext::WireExprType, ext, Declare, DeclareBody, DeclareSubscriber, SubscriberId, + UndeclareSubscriber, }, interest::{InterestId, InterestMode, InterestOptions}, }, @@ -39,7 +39,7 @@ use crate::key_expr::KeyExpr; use crate::net::routing::{ dispatcher::{ face::FaceState, - pubsub::*, + pubsub::{update_data_routes_from, update_matches_data_routes, SubscriberInfo}, resource::{NodeId, Resource, SessionContext}, tables::{Route, RoutingExpr, Tables}, }, @@ -55,7 +55,7 @@ fn send_sourced_subscription_to_net_children( children: &[NodeIndex], res: &Arc, src_face: Option<&Arc>, - sub_info: &SubscriberInfo, + _sub_info: &SubscriberInfo, routing_context: NodeId, ) { for child in children { @@ -80,7 +80,6 @@ fn send_sourced_subscription_to_net_children( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id: 0, // Sourced subscriptions do not use ids wire_expr: key_expr, - ext_info: *sub_info, }), }, res.expr(), @@ -98,7 +97,7 @@ fn propagate_simple_subscription_to( _tables: &mut Tables, dst_face: &mut Arc, res: &Arc, - sub_info: &SubscriberInfo, + _sub_info: &SubscriberInfo, src_face: &mut Arc, send_declare: &mut SendDeclare, ) { @@ -121,7 +120,6 @@ fn propagate_simple_subscription_to( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id, wire_expr: key_expr, - ext_info: *sub_info, }), }, res.expr(), @@ -159,7 +157,6 @@ fn propagate_simple_subscription_to( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id, wire_expr: key_expr, - ext_info: *sub_info, }), }, res.expr(), @@ -654,9 +651,7 @@ pub(super) fn pubsub_tree_change(tables: &mut Tables, new_children: &[Vec, res: &Arc, - sub_info: &SubscriberInfo, + _sub_info: &SubscriberInfo, src_face: &mut Arc, send_declare: &mut SendDeclare, ) { @@ -74,7 +75,6 @@ fn propagate_simple_subscription_to( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id, wire_expr: key_expr, - ext_info: *sub_info, }), }, res.expr(), @@ -112,7 +112,6 @@ fn propagate_simple_subscription_to( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id, wire_expr: key_expr, - ext_info: *sub_info, }), }, res.expr(), @@ -202,7 +201,6 @@ fn declare_simple_subscription( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id: 0, // @TODO use proper SubscriberId wire_expr: res.expr().into(), - ext_info: *sub_info, }), }, res.expr(), @@ -378,9 +376,7 @@ pub(super) fn pubsub_new_face( send_declare: &mut SendDeclare, ) { if face.whatami != WhatAmI::Client { - let sub_info = SubscriberInfo { - reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers - }; + let sub_info = SubscriberInfo; for src_face in tables .faces .values() @@ -416,9 +412,6 @@ pub(super) fn declare_sub_interest( ) { if mode.current() && face.whatami == WhatAmI::Client { let interest_id = (!mode.future()).then_some(id); - let sub_info = SubscriberInfo { - reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers - }; if let Some(res) = res.as_ref() { if aggregate { if tables.faces.values().any(|src_face| { @@ -447,7 +440,6 @@ pub(super) fn declare_sub_interest( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id, wire_expr, - ext_info: sub_info, }), }, res.expr(), @@ -482,11 +474,7 @@ pub(super) fn declare_sub_interest( ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareSubscriber( - DeclareSubscriber { - id, - wire_expr, - ext_info: sub_info, - }, + DeclareSubscriber { id, wire_expr }, ), }, sub.expr(), @@ -526,7 +514,6 @@ pub(super) fn declare_sub_interest( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id, wire_expr, - ext_info: sub_info, }), }, sub.expr(), diff --git a/zenoh/src/net/routing/hat/router/pubsub.rs b/zenoh/src/net/routing/hat/router/pubsub.rs index cc0251a07a..69ad2c495f 100644 --- a/zenoh/src/net/routing/hat/router/pubsub.rs +++ b/zenoh/src/net/routing/hat/router/pubsub.rs @@ -19,11 +19,11 @@ use std::{ use petgraph::graph::NodeIndex; use zenoh_protocol::{ - core::{key_expr::OwnedKeyExpr, Reliability, WhatAmI, ZenohIdProto}, + core::{key_expr::OwnedKeyExpr, WhatAmI, ZenohIdProto}, network::{ declare::{ - common::ext::WireExprType, ext, subscriber::ext::SubscriberInfo, Declare, DeclareBody, - DeclareSubscriber, SubscriberId, UndeclareSubscriber, + common::ext::WireExprType, ext, Declare, DeclareBody, DeclareSubscriber, SubscriberId, + UndeclareSubscriber, }, interest::{InterestId, InterestMode, InterestOptions}, }, @@ -40,7 +40,7 @@ use crate::key_expr::KeyExpr; use crate::net::routing::{ dispatcher::{ face::FaceState, - pubsub::*, + pubsub::{update_data_routes_from, update_matches_data_routes, SubscriberInfo}, resource::{NodeId, Resource, SessionContext}, tables::{Route, RoutingExpr, Tables}, }, @@ -56,7 +56,7 @@ fn send_sourced_subscription_to_net_children( children: &[NodeIndex], res: &Arc, src_face: Option<&Arc>, - sub_info: &SubscriberInfo, + _sub_info: &SubscriberInfo, routing_context: NodeId, ) { for child in children { @@ -81,7 +81,6 @@ fn send_sourced_subscription_to_net_children( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id: 0, // Sourced subscriptions do not use ids wire_expr: key_expr, - ext_info: *sub_info, }), }, res.expr(), @@ -99,7 +98,7 @@ fn propagate_simple_subscription_to( tables: &mut Tables, dst_face: &mut Arc, res: &Arc, - sub_info: &SubscriberInfo, + _sub_info: &SubscriberInfo, src_face: &mut Arc, full_peer_net: bool, send_declare: &mut SendDeclare, @@ -144,7 +143,6 @@ fn propagate_simple_subscription_to( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id, wire_expr: key_expr, - ext_info: *sub_info, }), }, res.expr(), @@ -816,9 +814,7 @@ pub(super) fn pubsub_tree_change( }; for sub in subs { if *sub == tree_id { - let sub_info = SubscriberInfo { - reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers - }; + let sub_info = SubscriberInfo; send_sourced_subscription_to_net_children( tables, net, @@ -899,9 +895,6 @@ pub(super) fn pubsub_linkstate_change( .insert(res.clone(), id); let push_declaration = push_declaration_profile(tables, &dst_face); let key_expr = Resource::decl_key(res, &mut dst_face, push_declaration); - let sub_info = SubscriberInfo { - reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers - }; send_declare( &dst_face.primitives, RoutingContext::with_expr( @@ -913,7 +906,6 @@ pub(super) fn pubsub_linkstate_change( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id, wire_expr: key_expr, - ext_info: sub_info, }), }, res.expr(), @@ -938,9 +930,6 @@ pub(crate) fn declare_sub_interest( ) { if mode.current() { let interest_id = (!mode.future()).then_some(id); - let sub_info = SubscriberInfo { - reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers - }; if let Some(res) = res.as_ref() { if aggregate { if hat!(tables).router_subs.iter().any(|sub| { @@ -970,7 +959,6 @@ pub(crate) fn declare_sub_interest( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id, wire_expr, - ext_info: sub_info, }), }, res.expr(), @@ -1016,7 +1004,6 @@ pub(crate) fn declare_sub_interest( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id, wire_expr, - ext_info: sub_info, }), }, sub.expr(), @@ -1060,7 +1047,6 @@ pub(crate) fn declare_sub_interest( body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id, wire_expr, - ext_info: sub_info, }), }, sub.expr(), diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index dc984c2e92..b29401e23e 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -28,9 +28,7 @@ use zenoh_protocol::core::key_expr::keyexpr; use zenoh_protocol::{ core::{key_expr::OwnedKeyExpr, ExprId, Reliability, WireExpr, EMPTY_EXPR_ID}, network::{ - declare::{ - queryable::ext::QueryableInfoType, subscriber::ext::SubscriberInfo, QueryableId, - }, + declare::{queryable::ext::QueryableInfoType, QueryableId}, ext, Declare, DeclareBody, DeclareQueryable, DeclareSubscriber, Interest, Push, Request, Response, ResponseFinal, }, @@ -335,7 +333,6 @@ impl AdminSpace { body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id: runtime.next_id(), wire_expr: [&root_key, "/config/**"].concat().into(), - ext_info: SubscriberInfo::DEFAULT, }), }); } diff --git a/zenoh/src/net/tests/tables.rs b/zenoh/src/net/tests/tables.rs index bab638af83..23c0d9c053 100644 --- a/zenoh/src/net/tests/tables.rs +++ b/zenoh/src/net/tests/tables.rs @@ -25,16 +25,17 @@ use zenoh_protocol::{ key_expr::keyexpr, Encoding, ExprId, Reliability, WhatAmI, WireExpr, ZenohIdProto, EMPTY_EXPR_ID, }, - network::{ - declare::subscriber::ext::SubscriberInfo, ext, Declare, DeclareBody, DeclareKeyExpr, Push, - }, + network::{ext, Declare, DeclareBody, DeclareKeyExpr, Push}, zenoh::{PushBody, Put}, }; use crate::net::{ primitives::{DummyPrimitives, EPrimitives, Primitives}, routing::{ - dispatcher::tables::{self, Tables}, + dispatcher::{ + pubsub::SubscriberInfo, + tables::{self, Tables}, + }, router::*, RoutingContext, }, @@ -67,9 +68,7 @@ fn base_test() { &"one/deux/trois".into(), ); - let sub_info = SubscriberInfo { - reliability: Reliability::Reliable, - }; + let sub_info = SubscriberInfo; declare_subscription( zlock!(tables.ctrl_lock).as_ref(), @@ -194,9 +193,7 @@ fn multisub_test() { assert!(face0.upgrade().is_some()); // -------------- - let sub_info = SubscriberInfo { - reliability: Reliability::Reliable, - }; + let sub_info = SubscriberInfo; declare_subscription( zlock!(tables.ctrl_lock).as_ref(), &tables, @@ -316,9 +313,7 @@ async fn clean_test() { let res1 = optres1.unwrap(); assert!(res1.upgrade().is_some()); - let sub_info = SubscriberInfo { - reliability: Reliability::Reliable, - }; + let sub_info = SubscriberInfo; declare_subscription( zlock!(tables.ctrl_lock).as_ref(), @@ -594,9 +589,7 @@ fn client_test() { .unwrap(); let tables = router.tables.clone(); - let sub_info = SubscriberInfo { - reliability: Reliability::Reliable, - }; + let sub_info = SubscriberInfo; let primitives0 = Arc::new(ClientPrimitives::new()); let face0 = Arc::downgrade(&router.new_primitives(primitives0.clone()).state);