From 2da0aeb0c59a5634b1975fad1200fb92256ec733 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Fri, 5 Apr 2024 11:06:54 +0200 Subject: [PATCH] Declare message can be Push/Request/RequestContinuous/Response (#902) * Declare message can be Push/Request/RequestContinuous/Response * Address review comments * Remove F: Future flag from DeclareInterest * cargo fmt --all --- commons/zenoh-codec/src/network/declare.rs | 236 +++++++----------- commons/zenoh-codec/tests/codec.rs | 16 ++ commons/zenoh-protocol/src/network/declare.rs | 215 ++++++++-------- commons/zenoh-protocol/src/network/mod.rs | 6 +- zenoh/src/key_expr.rs | 4 +- zenoh/src/net/routing/dispatcher/face.rs | 3 +- zenoh/src/net/routing/dispatcher/resource.rs | 4 +- zenoh/src/net/routing/hat/client/pubsub.rs | 10 +- zenoh/src/net/routing/hat/client/queries.rs | 8 +- .../net/routing/hat/linkstate_peer/pubsub.rs | 14 +- .../net/routing/hat/linkstate_peer/queries.rs | 14 +- zenoh/src/net/routing/hat/p2p_peer/pubsub.rs | 10 +- zenoh/src/net/routing/hat/p2p_peer/queries.rs | 8 +- zenoh/src/net/routing/hat/router/pubsub.rs | 22 +- zenoh/src/net/routing/hat/router/queries.rs | 22 +- zenoh/src/net/routing/mod.rs | 3 +- zenoh/src/net/runtime/adminspace.rs | 8 +- zenoh/src/net/tests/tables.rs | 12 +- zenoh/src/session.rs | 19 +- 19 files changed, 283 insertions(+), 351 deletions(-) diff --git a/commons/zenoh-codec/src/network/declare.rs b/commons/zenoh-codec/src/network/declare.rs index d7a25ea0a9..173fbe5e4a 100644 --- a/commons/zenoh-codec/src/network/declare.rs +++ b/commons/zenoh-codec/src/network/declare.rs @@ -19,12 +19,16 @@ use zenoh_buffers::{ ZBuf, }; use zenoh_protocol::{ - common::{iext, imsg, ZExtZ64}, + common::{ + iext, + imsg::{self, HEADER_BITS}, + ZExtZ64, + }, core::{ExprId, ExprLen, WireExpr}, network::{ declare::{ self, common, interest, keyexpr, queryable, subscriber, token, Declare, DeclareBody, - Interest, + DeclareMode, Interest, }, id, Mapping, }, @@ -48,8 +52,7 @@ where DeclareBody::DeclareToken(r) => self.write(&mut *writer, r)?, DeclareBody::UndeclareToken(r) => self.write(&mut *writer, r)?, DeclareBody::DeclareInterest(r) => self.write(&mut *writer, r)?, - DeclareBody::FinalInterest(r) => self.write(&mut *writer, r)?, - DeclareBody::UndeclareInterest(r) => self.write(&mut *writer, r)?, + DeclareBody::DeclareFinal(r) => self.write(&mut *writer, r)?, } Ok(()) @@ -77,8 +80,7 @@ where D_TOKEN => DeclareBody::DeclareToken(codec.read(&mut *reader)?), U_TOKEN => DeclareBody::UndeclareToken(codec.read(&mut *reader)?), D_INTEREST => DeclareBody::DeclareInterest(codec.read(&mut *reader)?), - F_INTEREST => DeclareBody::FinalInterest(codec.read(&mut *reader)?), - U_INTEREST => DeclareBody::UndeclareInterest(codec.read(&mut *reader)?), + D_FINAL => DeclareBody::DeclareFinal(codec.read(&mut *reader)?), _ => return Err(DidntRead), }; @@ -95,7 +97,7 @@ where fn write(self, writer: &mut W, x: &Declare) -> Self::Output { let Declare { - interest_id, + mode, ext_qos, ext_tstamp, ext_nodeid, @@ -104,9 +106,13 @@ where // Header let mut header = id::DECLARE; - if x.interest_id.is_some() { - header |= declare::flag::I; - } + header |= match mode { + DeclareMode::Push => 0b00, + DeclareMode::Response(_) => 0b01, + DeclareMode::Request(_) => 0b10, + DeclareMode::RequestContinuous(_) => 0b11, + } << HEADER_BITS; + let mut n_exts = ((ext_qos != &declare::ext::QoSType::DEFAULT) as u8) + (ext_tstamp.is_some() as u8) + ((ext_nodeid != &declare::ext::NodeIdType::DEFAULT) as u8); @@ -116,8 +122,11 @@ where self.write(&mut *writer, header)?; // Body - if let Some(interest_id) = interest_id { - self.write(&mut *writer, interest_id)?; + if let DeclareMode::Request(rid) + | DeclareMode::RequestContinuous(rid) + | DeclareMode::Response(rid) = mode + { + self.write(&mut *writer, rid)?; } // Extensions @@ -166,10 +175,14 @@ where return Err(DidntRead); } - let mut interest_id = None; - if imsg::has_flag(self.header, declare::flag::I) { - interest_id = Some(self.codec.read(&mut *reader)?); - } + // Body + let mode = match (self.header >> HEADER_BITS) & 0b11 { + 0b00 => DeclareMode::Push, + 0b01 => DeclareMode::Response(self.codec.read(&mut *reader)?), + 0b10 => DeclareMode::Request(self.codec.read(&mut *reader)?), + 0b11 => DeclareMode::RequestContinuous(self.codec.read(&mut *reader)?), + _ => return Err(DidntRead), + }; // Extensions let mut ext_qos = declare::ext::QoSType::DEFAULT; @@ -206,7 +219,7 @@ where let body: DeclareBody = self.codec.read(&mut *reader)?; Ok(Declare { - interest_id, + mode, ext_qos, ext_tstamp, ext_nodeid, @@ -215,6 +228,59 @@ where } } +// Final +impl WCodec<&common::DeclareFinal, &mut W> for Zenoh080 +where + W: Writer, +{ + type Output = Result<(), DidntWrite>; + + fn write(self, writer: &mut W, x: &common::DeclareFinal) -> Self::Output { + let common::DeclareFinal = x; + + // Header + let header = declare::id::D_FINAL; + self.write(&mut *writer, header)?; + + Ok(()) + } +} + +impl RCodec for Zenoh080 +where + R: Reader, +{ + type Error = DidntRead; + + fn read(self, reader: &mut R) -> Result { + let header: u8 = self.read(&mut *reader)?; + let codec = Zenoh080Header::new(header); + + codec.read(reader) + } +} + +impl RCodec for Zenoh080Header +where + R: Reader, +{ + type Error = DidntRead; + + fn read(self, reader: &mut R) -> Result { + if imsg::mid(self.header) != declare::id::D_FINAL { + return Err(DidntRead); + } + + // Extensions + let has_ext = imsg::has_flag(self.header, token::flag::Z); + if has_ext { + extension::skip_all(reader, "Final")?; + } + + Ok(common::DeclareFinal) + } +} + // DeclareKeyExpr impl WCodec<&keyexpr::DeclareKeyExpr, &mut W> for Zenoh080 where @@ -907,7 +973,7 @@ where } = x; // Header - let header = declare::id::D_INTEREST | x.flags(); + let header = declare::id::D_INTEREST; self.write(&mut *writer, header)?; // Body @@ -976,140 +1042,6 @@ where } } -// FinalInterest -impl WCodec<&interest::FinalInterest, &mut W> for Zenoh080 -where - W: Writer, -{ - type Output = Result<(), DidntWrite>; - - fn write(self, writer: &mut W, x: &interest::FinalInterest) -> Self::Output { - let interest::FinalInterest { id } = x; - - // Header - let header = declare::id::F_INTEREST; - self.write(&mut *writer, header)?; - - // Body - self.write(&mut *writer, id)?; - - Ok(()) - } -} - -impl RCodec for Zenoh080 -where - R: Reader, -{ - type Error = DidntRead; - - fn read(self, reader: &mut R) -> Result { - let header: u8 = self.read(&mut *reader)?; - let codec = Zenoh080Header::new(header); - - codec.read(reader) - } -} - -impl RCodec for Zenoh080Header -where - R: Reader, -{ - type Error = DidntRead; - - fn read(self, reader: &mut R) -> Result { - if imsg::mid(self.header) != declare::id::F_INTEREST { - return Err(DidntRead); - } - - // Body - let id: interest::InterestId = self.codec.read(&mut *reader)?; - - // Extensions - let has_ext = imsg::has_flag(self.header, token::flag::Z); - if has_ext { - extension::skip_all(reader, "FinalInterest")?; - } - - Ok(interest::FinalInterest { id }) - } -} - -// UndeclareInterest -impl WCodec<&interest::UndeclareInterest, &mut W> for Zenoh080 -where - W: Writer, -{ - type Output = Result<(), DidntWrite>; - - fn write(self, writer: &mut W, x: &interest::UndeclareInterest) -> Self::Output { - let interest::UndeclareInterest { id, ext_wire_expr } = x; - - // Header - let header = declare::id::U_INTEREST | interest::flag::Z; - self.write(&mut *writer, header)?; - - // Body - self.write(&mut *writer, id)?; - - // Extension - self.write(&mut *writer, (ext_wire_expr, false))?; - - Ok(()) - } -} - -impl RCodec for Zenoh080 -where - R: Reader, -{ - type Error = DidntRead; - - fn read(self, reader: &mut R) -> Result { - let header: u8 = self.read(&mut *reader)?; - let codec = Zenoh080Header::new(header); - - codec.read(reader) - } -} - -impl RCodec for Zenoh080Header -where - R: Reader, -{ - type Error = DidntRead; - - fn read(self, reader: &mut R) -> Result { - if imsg::mid(self.header) != declare::id::U_INTEREST { - return Err(DidntRead); - } - - // Body - let id: interest::InterestId = self.codec.read(&mut *reader)?; - - // Extensions - let mut ext_wire_expr = common::ext::WireExprType::null(); - - let mut has_ext = imsg::has_flag(self.header, interest::flag::Z); - while has_ext { - let ext: u8 = self.codec.read(&mut *reader)?; - let eodec = Zenoh080Header::new(ext); - match iext::eid(ext) { - common::ext::WireExprExt::ID => { - let (we, ext): (common::ext::WireExprType, bool) = eodec.read(&mut *reader)?; - ext_wire_expr = we; - has_ext = ext; - } - _ => { - has_ext = extension::skip(reader, "UndeclareInterest", ext)?; - } - } - } - - Ok(interest::UndeclareInterest { id, ext_wire_expr }) - } -} - // WARNING: this is a temporary extension used for undeclarations impl WCodec<(&common::ext::WireExprType, bool), &mut W> for Zenoh080 where diff --git a/commons/zenoh-codec/tests/codec.rs b/commons/zenoh-codec/tests/codec.rs index 2f0e870c4f..d28ba9a4d3 100644 --- a/commons/zenoh-codec/tests/codec.rs +++ b/commons/zenoh-codec/tests/codec.rs @@ -31,6 +31,22 @@ use zenoh_protocol::{ zenoh, zextunit, zextz64, zextzbuf, }; +#[test] +fn zbuf_test() { + let mut buffer = vec![0u8; 64]; + + let zbuf = ZBuf::empty(); + let mut writer = buffer.writer(); + + let codec = Zenoh080::new(); + codec.write(&mut writer, &zbuf).unwrap(); + println!("Buffer: {:?}", buffer); + + let mut reader = buffer.reader(); + let ret: ZBuf = codec.read(&mut reader).unwrap(); + assert_eq!(ret, zbuf); +} + const NUM_ITER: usize = 100; const MAX_PAYLOAD_SIZE: usize = 256; diff --git a/commons/zenoh-protocol/src/network/declare.rs b/commons/zenoh-protocol/src/network/declare.rs index 10027259c2..996e7768ee 100644 --- a/commons/zenoh-protocol/src/network/declare.rs +++ b/commons/zenoh-protocol/src/network/declare.rs @@ -18,6 +18,8 @@ use crate::{ zextz64, zextzbuf, }; use alloc::borrow::Cow; +pub use common::*; +use core::sync::atomic::AtomicU32; pub use interest::*; pub use keyexpr::*; pub use queryable::*; @@ -31,24 +33,59 @@ pub mod flag { } /// Flags: -/// - I: Interest If I==1 then the declare is in a response to an Interest with future==false -/// - X: Reserved +/// - |: Mode The mode of the the declaration* +/// -/ /// - Z: Extension If Z==1 then at least one extension is present /// /// 7 6 5 4 3 2 1 0 /// +-+-+-+-+-+-+-+-+ -/// |Z|X|I| DECLARE | +/// |Z|Mod| DECLARE | /// +-+-+-+---------+ -/// ~interest_id:z32~ if I==1 +/// ~ rid:z32 ~ if Mode != Push /// +---------------+ /// ~ [decl_exts] ~ if Z==1 /// +---------------+ /// ~ declaration ~ /// +---------------+ /// +/// *Mode of declaration: +/// - Mode 0b00: Push +/// - Mode 0b01: Response +/// - Mode 0b10: Request +/// - Mode 0b11: RequestContinuous + +/// The resolution of a RequestId +pub type DeclareRequestId = u32; +pub type AtomicDeclareRequestId = AtomicU32; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum DeclareMode { + Push, + Request(DeclareRequestId), + RequestContinuous(DeclareRequestId), + Response(DeclareRequestId), +} + +impl DeclareMode { + #[cfg(feature = "test")] + pub fn rand() -> Self { + use rand::Rng; + + let mut rng = rand::thread_rng(); + + match rng.gen_range(0..4) { + 0 => DeclareMode::Push, + 1 => DeclareMode::Request(rng.gen()), + 2 => DeclareMode::RequestContinuous(rng.gen()), + 3 => DeclareMode::Response(rng.gen()), + _ => unreachable!(), + } + } +} + #[derive(Debug, Clone, PartialEq, Eq)] pub struct Declare { - pub interest_id: Option, + pub mode: DeclareMode, pub ext_qos: ext::QoSType, pub ext_tstamp: Option, pub ext_nodeid: ext::NodeIdType, @@ -85,8 +122,8 @@ pub mod id { pub const U_TOKEN: u8 = 0x07; pub const D_INTEREST: u8 = 0x08; - pub const F_INTEREST: u8 = 0x09; - pub const U_INTEREST: u8 = 0x0A; + + pub const D_FINAL: u8 = 0x1A; } #[derive(Debug, Clone, PartialEq, Eq)] @@ -100,8 +137,7 @@ pub enum DeclareBody { DeclareToken(DeclareToken), UndeclareToken(UndeclareToken), DeclareInterest(DeclareInterest), - FinalInterest(FinalInterest), - UndeclareInterest(UndeclareInterest), + DeclareFinal(DeclareFinal), } impl DeclareBody { @@ -111,7 +147,7 @@ impl DeclareBody { let mut rng = rand::thread_rng(); - match rng.gen_range(0..11) { + match rng.gen_range(0..10) { 0 => DeclareBody::DeclareKeyExpr(DeclareKeyExpr::rand()), 1 => DeclareBody::UndeclareKeyExpr(UndeclareKeyExpr::rand()), 2 => DeclareBody::DeclareSubscriber(DeclareSubscriber::rand()), @@ -121,8 +157,7 @@ impl DeclareBody { 6 => DeclareBody::DeclareToken(DeclareToken::rand()), 7 => DeclareBody::UndeclareToken(UndeclareToken::rand()), 8 => DeclareBody::DeclareInterest(DeclareInterest::rand()), - 9 => DeclareBody::FinalInterest(FinalInterest::rand()), - 10 => DeclareBody::UndeclareInterest(UndeclareInterest::rand()), + 9 => DeclareBody::DeclareFinal(DeclareFinal::rand()), _ => unreachable!(), } } @@ -135,14 +170,14 @@ impl Declare { let mut rng = rand::thread_rng(); - let interest_id = rng.gen_bool(0.5).then_some(rng.gen::()); + let mode = DeclareMode::rand(); let ext_qos = ext::QoSType::rand(); let ext_tstamp = rng.gen_bool(0.5).then(ext::TimestampType::rand); let ext_nodeid = ext::NodeIdType::rand(); let body = DeclareBody::rand(); Self { - interest_id, + mode, ext_qos, ext_tstamp, ext_nodeid, @@ -154,6 +189,29 @@ impl Declare { pub mod common { use super::*; + /// ```text + /// Flags: + /// - X: Reserved + /// - X: Reserved + /// - Z: Extension If Z==1 then at least one extension is present + /// + /// 7 6 5 4 3 2 1 0 + /// +-+-+-+-+-+-+-+-+ + /// |Z|x|x| D_FINAL | + /// +---------------+ + /// ~ [final_exts] ~ if Z==1 + /// +---------------+ + /// ``` + #[derive(Debug, Clone, PartialEq, Eq)] + pub struct DeclareFinal; + + impl DeclareFinal { + #[cfg(feature = "test")] + pub fn rand() -> Self { + Self + } + } + pub mod ext { use super::*; @@ -545,7 +603,7 @@ pub mod queryable { /// /// 7 6 5 4 3 2 1 0 /// +-+-+-+-+-+-+-+-+ - /// |Z|0_2| U_QBL | + /// |Z|X|X| U_QBL | /// +---------------+ /// ~ qbls_id:z32 ~ /// +---------------+ @@ -668,44 +726,51 @@ pub mod interest { pub type InterestId = u32; pub mod flag { - pub const C: u8 = 1 << 5; // 0x20 Current if C==1 then the interest refers to the current declarations. - pub const F: u8 = 1 << 6; // 0x40 Future if F==1 then the interest refers to the future declarations. + // pub const X: u8 = 1 << 5; // 0x20 Reserved + // pub const X: u8 = 1 << 6; // 0x40 Reserved pub const Z: u8 = 1 << 7; // 0x80 Extensions if Z==1 then an extension will follow } /// # DeclareInterest message /// - /// The DECLARE INTEREST message is sent to request the transmission of existing and future - /// declarations of a given kind matching a target keyexpr. E.g., a declare interest could be sent to - /// request the transmisison of all existing subscriptions matching `a/*`. A FINAL INTEREST is used to - /// mark the end of the transmission of exisiting matching declarations. + /// The DECLARE INTEREST message is sent to request the transmission of current and/or future + /// declarations of a given kind matching a target keyexpr. E.g., a declare interest could be + /// sent to request the transmisison of all current subscriptions matching `a/*`. + /// + /// The behaviour of a DECLARE INTEREST depends on the DECLARE MODE in the DECLARE MESSAGE: + /// - Push: only future declarations + /// - Request: only current declarations + /// - RequestContinous: current and future declarations + /// - Response: invalid /// - /// E.g., the [`DeclareInterest`]/[`FinalInterest`]/[`UndeclareInterest`] message flow is the following: + /// E.g., the [`DeclareInterest`] message flow is the following: /// /// ```text /// A B /// | DECL INTEREST | - /// |------------------>| -- This is a DeclareInterest e.g. for subscriber declarations/undeclarations. + /// |------------------>| -- Sent in Declare::RequestContinuous. + /// | | This is a DeclareInterest e.g. for subscriber declarations/undeclarations. /// | | /// | DECL SUBSCRIBER | - /// |<------------------| + /// |<------------------| -- Sent in Declare::Response /// | DECL SUBSCRIBER | - /// |<------------------| + /// |<------------------| -- Sent in Declare::Response /// | DECL SUBSCRIBER | - /// |<------------------| + /// |<------------------| -- Sent in Declare::Response /// | | - /// | FINAL INTEREST | - /// |<------------------| -- The FinalInterest signals that all known subscribers have been transmitted. + /// | FINAL | + /// |<------------------| -- Sent in Declare::Response /// | | /// | DECL SUBSCRIBER | - /// |<------------------| -- This is a new subscriber declaration. + /// |<------------------| -- Sent in Declare::Push. This is a new subscriber declaration. /// | UNDECL SUBSCRIBER | - /// |<------------------| -- This is a new subscriber undeclaration. + /// |<------------------| -- Sent in Declare::Push. This is a new subscriber undeclaration. /// | | /// | ... | /// | | - /// | UNDECL INTEREST | - /// |------------------>| -- This is an UndeclareInterest to stop receiving subscriber declarations/undeclarations. + /// | FINAL | + /// |------------------>| -- Sent in Declare::RequestContinuous. + /// | | This stops the transmission of subscriber declarations/undeclarations. /// | | /// ``` /// @@ -713,15 +778,13 @@ pub mod interest { /// /// ```text /// Flags: - /// - C: Current if C==1 then the interest refers to the current declarations. - /// - F: Future if F==1 then the interest refers to the future declarations. Note that if F==0 then: - /// - Declarations SHOULD NOT be sent after the FinalInterest; - /// - UndeclareInterest SHOULD NOT be sent after the FinalInterest. + /// - X: Reserved + /// - X: Reserved /// - Z: Extension If Z==1 then at least one extension is present /// /// 7 6 5 4 3 2 1 0 /// +-+-+-+-+-+-+-+-+ - /// |Z|F|C| D_INT | + /// |Z|F|X| D_INT | /// +---------------+ /// ~ intst_id:z32 ~ /// +---------------+ @@ -752,17 +815,6 @@ pub mod interest { } impl DeclareInterest { - pub fn flags(&self) -> u8 { - let mut interest = self.interest; - if self.interest.current() { - interest += Interest::CURRENT; - } - if self.interest.future() { - interest += Interest::FUTURE; - } - interest.flags - } - pub fn options(&self) -> u8 { let mut interest = self.interest; if let Some(we) = self.wire_expr.as_ref() { @@ -801,9 +853,6 @@ pub mod interest { } impl Interest { - // Header - pub const CURRENT: Interest = Interest::flags(interest::flag::C); - pub const FUTURE: Interest = Interest::flags(interest::flag::F); // Flags pub const KEYEXPRS: Interest = Interest::options(1); pub const SUBSCRIBERS: Interest = Interest::options(1 << 1); @@ -820,10 +869,6 @@ pub mod interest { | Interest::TOKENS.options, ); - const fn flags(flags: u8) -> Self { - Self { flags, options: 0 } - } - const fn options(options: u8) -> Self { Self { flags: 0, options } } @@ -835,14 +880,6 @@ pub mod interest { } } - pub const fn current(&self) -> bool { - imsg::has_flag(self.flags, Self::CURRENT.flags) - } - - pub const fn future(&self) -> bool { - imsg::has_flag(self.flags, Self::FUTURE.flags) - } - pub const fn keyexprs(&self) -> bool { imsg::has_flag(self.options, Self::KEYEXPRS.options) } @@ -881,12 +918,6 @@ pub mod interest { let mut rng = rand::thread_rng(); let mut s = Self::empty(); - if rng.gen_bool(0.5) { - s += Interest::CURRENT; - } - if rng.gen_bool(0.5) { - s += Interest::FUTURE; - } if rng.gen_bool(0.5) { s += Interest::KEYEXPRS; } @@ -905,9 +936,7 @@ pub mod interest { impl PartialEq for Interest { fn eq(&self, other: &Self) -> bool { - self.current() == other.current() - && self.future() == other.future() - && self.keyexprs() == other.keyexprs() + self.keyexprs() == other.keyexprs() && self.subscribers() == other.subscribers() && self.queryables() == other.queryables() && self.tokens() == other.tokens() @@ -918,16 +947,6 @@ pub mod interest { impl Debug for Interest { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Interest {{ ")?; - if self.current() { - write!(f, "C:Y, ")?; - } else { - write!(f, "C:N, ")?; - } - if self.future() { - write!(f, "F:Y, ")?; - } else { - write!(f, "F:N, ")?; - } if self.keyexprs() { write!(f, "K:Y, ")?; } else { @@ -1003,38 +1022,6 @@ pub mod interest { } } - /// ```text - /// Flags: - /// - X: Reserved - /// - X: Reserved - /// - Z: Extension If Z==1 then at least one extension is present - /// - /// 7 6 5 4 3 2 1 0 - /// +-+-+-+-+-+-+-+-+ - /// |Z|X|X| F_INT | - /// +---------------+ - /// ~ intst_id:z32 ~ - /// +---------------+ - /// ~ [decl_exts] ~ if Z==1 - /// +---------------+ - /// ``` - #[derive(Debug, Clone, PartialEq, Eq)] - pub struct FinalInterest { - pub id: InterestId, - } - - impl FinalInterest { - #[cfg(feature = "test")] - pub fn rand() -> Self { - use rand::Rng; - let mut rng = rand::thread_rng(); - - let id: InterestId = rng.gen(); - - Self { id } - } - } - /// ```text /// Flags: /// - X: Reserved diff --git a/commons/zenoh-protocol/src/network/mod.rs b/commons/zenoh-protocol/src/network/mod.rs index 0e198ddf0f..cbf9894aef 100644 --- a/commons/zenoh-protocol/src/network/mod.rs +++ b/commons/zenoh-protocol/src/network/mod.rs @@ -20,9 +20,9 @@ pub mod response; use core::fmt; pub use declare::{ - Declare, DeclareBody, DeclareInterest, DeclareKeyExpr, DeclareQueryable, DeclareSubscriber, - DeclareToken, UndeclareInterest, UndeclareKeyExpr, UndeclareQueryable, UndeclareSubscriber, - UndeclareToken, + Declare, DeclareBody, DeclareFinal, DeclareInterest, DeclareKeyExpr, DeclareMode, + DeclareQueryable, DeclareSubscriber, DeclareToken, UndeclareInterest, UndeclareKeyExpr, + UndeclareQueryable, UndeclareSubscriber, UndeclareToken, }; pub use oam::Oam; pub use push::Push; diff --git a/zenoh/src/key_expr.rs b/zenoh/src/key_expr.rs index aaa1d13724..17aa0425b6 100644 --- a/zenoh/src/key_expr.rs +++ b/zenoh/src/key_expr.rs @@ -53,7 +53,7 @@ pub use zenoh_keyexpr::*; pub use zenoh_macros::{kedefine, keformat, kewrite}; use zenoh_protocol::{ core::{key_expr::canon::Canonizable, ExprId, WireExpr}, - network::{declare, DeclareBody, Mapping, UndeclareKeyExpr}, + network::{declare, DeclareBody, DeclareMode, Mapping, UndeclareKeyExpr}, }; use zenoh_result::ZResult; @@ -664,7 +664,7 @@ impl SyncResolve for KeyExprUndeclaration<'_> { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(zenoh_protocol::network::Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: declare::ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/dispatcher/face.rs b/zenoh/src/net/routing/dispatcher/face.rs index cb565053c9..3531dd2d88 100644 --- a/zenoh/src/net/routing/dispatcher/face.rs +++ b/zenoh/src/net/routing/dispatcher/face.rs @@ -211,8 +211,7 @@ impl Primitives for Face { zenoh_protocol::network::DeclareBody::DeclareToken(_m) => todo!(), zenoh_protocol::network::DeclareBody::UndeclareToken(_m) => todo!(), zenoh_protocol::network::DeclareBody::DeclareInterest(_m) => todo!(), - zenoh_protocol::network::DeclareBody::FinalInterest(_m) => todo!(), - zenoh_protocol::network::DeclareBody::UndeclareInterest(_m) => todo!(), + zenoh_protocol::network::DeclareBody::DeclareFinal(_m) => todo!(), } drop(ctrl_lock); } diff --git a/zenoh/src/net/routing/dispatcher/resource.rs b/zenoh/src/net/routing/dispatcher/resource.rs index 194b97fca8..941b37f916 100644 --- a/zenoh/src/net/routing/dispatcher/resource.rs +++ b/zenoh/src/net/routing/dispatcher/resource.rs @@ -27,7 +27,7 @@ use zenoh_protocol::{ network::{ declare::{ ext, queryable::ext::QueryableInfoType, subscriber::ext::SubscriberInfo, Declare, - DeclareBody, DeclareKeyExpr, + DeclareBody, DeclareKeyExpr, DeclareMode, }, Mapping, }, @@ -452,7 +452,7 @@ impl Resource { .insert(expr_id, nonwild_prefix.clone()); face.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/hat/client/pubsub.rs b/zenoh/src/net/routing/hat/client/pubsub.rs index e85bb77bf9..6c689d3336 100644 --- a/zenoh/src/net/routing/hat/client/pubsub.rs +++ b/zenoh/src/net/routing/hat/client/pubsub.rs @@ -30,7 +30,7 @@ use zenoh_protocol::{ core::{Reliability, WhatAmI}, network::declare::{ common::ext::WireExprType, ext, subscriber::ext::SubscriberInfo, Declare, DeclareBody, - DeclareSubscriber, UndeclareSubscriber, + DeclareMode, DeclareSubscriber, UndeclareSubscriber, }, }; use zenoh_sync::get_mut_unchecked; @@ -53,7 +53,7 @@ fn propagate_simple_subscription_to( let key_expr = Resource::decl_key(res, dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -137,7 +137,7 @@ fn declare_client_subscription( .primitives .send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -171,7 +171,7 @@ fn propagate_forget_simple_subscription(tables: &mut Tables, res: &Arc if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -206,7 +206,7 @@ pub(super) fn undeclare_client_subscription( if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/hat/client/queries.rs b/zenoh/src/net/routing/hat/client/queries.rs index 5c0bc5349b..28e1d75460 100644 --- a/zenoh/src/net/routing/hat/client/queries.rs +++ b/zenoh/src/net/routing/hat/client/queries.rs @@ -33,7 +33,7 @@ use zenoh_protocol::{ core::{WhatAmI, WireExpr}, network::declare::{ common::ext::WireExprType, ext, queryable::ext::QueryableInfoType, Declare, DeclareBody, - DeclareQueryable, UndeclareQueryable, + DeclareMode, DeclareQueryable, UndeclareQueryable, }, }; use zenoh_sync::get_mut_unchecked; @@ -93,7 +93,7 @@ fn propagate_simple_queryable( let key_expr = Resource::decl_key(res, &mut dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -165,7 +165,7 @@ fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -418,7 +418,7 @@ pub(super) fn undeclare_client_subscription( if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -460,7 +460,7 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { let key_expr = Resource::decl_key(sub, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs index 150c12a632..356793e3a3 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs @@ -36,7 +36,7 @@ use zenoh_protocol::{ core::{WhatAmI, WireExpr, ZenohId}, network::declare::{ common::ext::WireExprType, ext, queryable::ext::QueryableInfoType, Declare, DeclareBody, - DeclareQueryable, UndeclareQueryable, + DeclareMode, DeclareQueryable, UndeclareQueryable, }, }; use zenoh_sync::get_mut_unchecked; @@ -126,7 +126,7 @@ fn send_sourced_queryable_to_net_childs( someface.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType { @@ -170,7 +170,7 @@ fn propagate_simple_queryable( let key_expr = Resource::decl_key(res, &mut dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -339,7 +339,7 @@ fn send_forget_sourced_queryable_to_net_childs( someface.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType { @@ -365,7 +365,7 @@ fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc) { let key_expr = Resource::decl_key(qabl, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs index b495248788..5ac0b22846 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs @@ -30,7 +30,7 @@ use zenoh_protocol::{ core::{Reliability, WhatAmI}, network::declare::{ common::ext::WireExprType, ext, subscriber::ext::SubscriberInfo, Declare, DeclareBody, - DeclareSubscriber, UndeclareSubscriber, + DeclareMode, DeclareSubscriber, UndeclareSubscriber, }, }; use zenoh_sync::get_mut_unchecked; @@ -53,7 +53,7 @@ fn propagate_simple_subscription_to( let key_expr = Resource::decl_key(res, dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -137,7 +137,7 @@ fn declare_client_subscription( .primitives .send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -171,7 +171,7 @@ fn propagate_forget_simple_subscription(tables: &mut Tables, res: &Arc if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -206,7 +206,7 @@ pub(super) fn undeclare_client_subscription( if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index 72c32b9217..c2d62c7658 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -33,7 +33,7 @@ use zenoh_protocol::{ core::{WhatAmI, WireExpr}, network::declare::{ common::ext::WireExprType, ext, queryable::ext::QueryableInfoType, Declare, DeclareBody, - DeclareQueryable, UndeclareQueryable, + DeclareMode, DeclareQueryable, UndeclareQueryable, }, }; use zenoh_sync::get_mut_unchecked; @@ -93,7 +93,7 @@ fn propagate_simple_queryable( let key_expr = Resource::decl_key(res, &mut dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -165,7 +165,7 @@ fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -412,7 +412,7 @@ fn propagate_forget_simple_subscription_to_peers(tables: &mut Tables, res: &Arc< if let Some(id) = face_hat_mut!(&mut face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -564,7 +564,7 @@ pub(super) fn undeclare_client_subscription( if let Some(id) = face_hat_mut!(face).local_subs.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -606,7 +606,7 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { let key_expr = Resource::decl_key(sub, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -635,7 +635,7 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { let key_expr = Resource::decl_key(sub, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -774,7 +774,7 @@ pub(super) fn pubsub_linkstate_change(tables: &mut Tables, zid: &ZenohId, links: if forget { dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -800,7 +800,7 @@ pub(super) fn pubsub_linkstate_change(tables: &mut Tables, zid: &ZenohId, links: }; dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/hat/router/queries.rs b/zenoh/src/net/routing/hat/router/queries.rs index 99e787beb5..e647cf2dc7 100644 --- a/zenoh/src/net/routing/hat/router/queries.rs +++ b/zenoh/src/net/routing/hat/router/queries.rs @@ -36,7 +36,7 @@ use zenoh_protocol::{ core::{WhatAmI, WireExpr, ZenohId}, network::declare::{ common::ext::WireExprType, ext, queryable::ext::QueryableInfoType, Declare, DeclareBody, - DeclareQueryable, UndeclareQueryable, + DeclareMode, DeclareQueryable, UndeclareQueryable, }, }; use zenoh_sync::get_mut_unchecked; @@ -194,7 +194,7 @@ fn send_sourced_queryable_to_net_childs( someface.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType { @@ -248,7 +248,7 @@ fn propagate_simple_queryable( let key_expr = Resource::decl_key(res, &mut dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -473,7 +473,7 @@ fn send_forget_sourced_queryable_to_net_childs( someface.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType { @@ -499,7 +499,7 @@ fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc) { let key_expr = Resource::decl_key(qabl, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -775,7 +775,7 @@ pub(super) fn queries_new_face(tables: &mut Tables, face: &mut Arc) { let key_expr = Resource::decl_key(qabl, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -874,7 +874,7 @@ pub(super) fn queries_linkstate_change(tables: &mut Tables, zid: &ZenohId, links if forget { dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -900,7 +900,7 @@ pub(super) fn queries_linkstate_change(tables: &mut Tables, zid: &ZenohId, links let key_expr = Resource::decl_key(res, dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/routing/mod.rs b/zenoh/src/net/routing/mod.rs index 0ddf12b82f..77f51c16b3 100644 --- a/zenoh/src/net/routing/mod.rs +++ b/zenoh/src/net/routing/mod.rs @@ -117,8 +117,7 @@ impl RoutingContext { DeclareBody::DeclareToken(m) => Some(&m.wire_expr), DeclareBody::UndeclareToken(m) => Some(&m.ext_wire_expr.wire_expr), DeclareBody::DeclareInterest(m) => m.wire_expr.as_ref(), - DeclareBody::FinalInterest(_) => None, - DeclareBody::UndeclareInterest(m) => Some(&m.ext_wire_expr.wire_expr), + DeclareBody::DeclareFinal(_) => None, }, NetworkBody::OAM(_) => None, } diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index d460ee3f1c..a5739d830c 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -39,8 +39,8 @@ use zenoh_protocol::{ }, network::{ declare::{queryable::ext::QueryableInfoType, subscriber::ext::SubscriberInfo}, - ext, Declare, DeclareBody, DeclareQueryable, DeclareSubscriber, Push, Request, Response, - ResponseFinal, + ext, Declare, DeclareBody, DeclareMode, DeclareQueryable, DeclareSubscriber, Push, Request, + Response, ResponseFinal, }, zenoh::{PushBody, RequestBody}, }; @@ -276,7 +276,7 @@ impl AdminSpace { zlock!(admin.primitives).replace(primitives.clone()); primitives.send_declare(Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, @@ -289,7 +289,7 @@ impl AdminSpace { }); primitives.send_declare(Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/net/tests/tables.rs b/zenoh/src/net/tests/tables.rs index 4067f2ad8f..55ff9f0a4d 100644 --- a/zenoh/src/net/tests/tables.rs +++ b/zenoh/src/net/tests/tables.rs @@ -26,7 +26,7 @@ use zenoh_protocol::core::{ key_expr::keyexpr, ExprId, Reliability, WhatAmI, WireExpr, ZenohId, EMPTY_EXPR_ID, }; use zenoh_protocol::network::declare::subscriber::ext::SubscriberInfo; -use zenoh_protocol::network::{ext, Declare, DeclareBody, DeclareKeyExpr}; +use zenoh_protocol::network::{ext, Declare, DeclareBody, DeclareKeyExpr, DeclareMode}; use zenoh_protocol::zenoh::{PushBody, Put}; #[test] @@ -579,7 +579,7 @@ fn client_test() { Primitives::send_declare( primitives0.as_ref(), Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -607,7 +607,7 @@ fn client_test() { Primitives::send_declare( primitives0.as_ref(), Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -629,7 +629,7 @@ fn client_test() { Primitives::send_declare( primitives1.as_ref(), Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -657,7 +657,7 @@ fn client_test() { Primitives::send_declare( primitives1.as_ref(), Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -679,7 +679,7 @@ fn client_test() { Primitives::send_declare( primitives2.as_ref(), Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index addb757807..9bc6c9c331 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -71,7 +71,7 @@ use zenoh_protocol::{ network::{ declare::{ self, common::ext::WireExprType, queryable::ext::QueryableInfoType, - subscriber::ext::SubscriberInfo, Declare, DeclareBody, DeclareKeyExpr, + subscriber::ext::SubscriberInfo, Declare, DeclareBody, DeclareKeyExpr, DeclareMode, DeclareQueryable, DeclareSubscriber, UndeclareQueryable, UndeclareSubscriber, }, ext, @@ -872,7 +872,7 @@ impl Session { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: declare::ext::NodeIdType::DEFAULT, @@ -1085,7 +1085,7 @@ impl Session { // }; primitives.send_declare(Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: declare::ext::NodeIdType::DEFAULT, @@ -1142,7 +1142,7 @@ impl Session { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: declare::ext::NodeIdType::DEFAULT, @@ -1194,7 +1194,7 @@ impl Session { distance: 0, }; primitives.send_declare(Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: declare::ext::NodeIdType::DEFAULT, @@ -1216,7 +1216,7 @@ impl Session { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: declare::ext::NodeIdType::DEFAULT, @@ -1252,7 +1252,7 @@ impl Session { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: declare::ext::NodeIdType::DEFAULT, @@ -1277,7 +1277,7 @@ impl Session { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(Declare { - interest_id: None, + mode: DeclareMode::Push, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -2047,8 +2047,7 @@ impl Primitives for Session { DeclareBody::DeclareToken(_) => todo!(), DeclareBody::UndeclareToken(_) => todo!(), DeclareBody::DeclareInterest(_) => todo!(), - DeclareBody::FinalInterest(_) => todo!(), - DeclareBody::UndeclareInterest(_) => todo!(), + DeclareBody::DeclareFinal(_) => todo!(), } }