From cc8d4a1f93f358ef3a951e0ae0fe27c5b3e41171 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Mon, 26 Feb 2024 12:30:09 +0100 Subject: [PATCH] Allow DeclareInterest for any keyexpr (#739) * Allow to DeclareInterest for any keyexpr * Remove forgotten println --- commons/zenoh-codec/src/network/declare.rs | 41 +-- commons/zenoh-protocol/src/network/declare.rs | 283 ++++++++++++++---- zenoh/src/net/routing/mod.rs | 2 +- 3 files changed, 249 insertions(+), 77 deletions(-) diff --git a/commons/zenoh-codec/src/network/declare.rs b/commons/zenoh-codec/src/network/declare.rs index cf92b27c17..6df25a8d2a 100644 --- a/commons/zenoh-codec/src/network/declare.rs +++ b/commons/zenoh-codec/src/network/declare.rs @@ -24,6 +24,7 @@ use zenoh_protocol::{ network::{ declare::{ self, common, interest, keyexpr, queryable, subscriber, token, Declare, DeclareBody, + Interest, }, id, Mapping, }, @@ -845,24 +846,20 @@ where fn write(self, writer: &mut W, x: &interest::DeclareInterest) -> Self::Output { let interest::DeclareInterest { id, + interest: _, wire_expr, - interest, } = x; // Header - let mut header = declare::id::D_INTEREST; - if wire_expr.mapping != Mapping::DEFAULT { - header |= subscriber::flag::M; - } - if wire_expr.has_suffix() { - header |= subscriber::flag::N; - } + let header = declare::id::D_INTEREST | x.flags(); self.write(&mut *writer, header)?; // Body self.write(&mut *writer, id)?; - self.write(&mut *writer, wire_expr)?; - self.write(&mut *writer, interest.as_u8())?; + self.write(&mut *writer, x.options())?; + if let Some(we) = wire_expr.as_ref() { + self.write(&mut *writer, we)?; + } Ok(()) } @@ -894,14 +891,20 @@ where // Body let id: interest::InterestId = self.codec.read(&mut *reader)?; - let ccond = Zenoh080Condition::new(imsg::has_flag(self.header, token::flag::N)); - let mut wire_expr: WireExpr<'static> = ccond.read(&mut *reader)?; - wire_expr.mapping = if imsg::has_flag(self.header, token::flag::M) { - Mapping::Sender - } else { - Mapping::Receiver - }; - let interest: u8 = self.codec.read(&mut *reader)?; + let options: u8 = self.codec.read(&mut *reader)?; + let interest = Interest::from((imsg::flags(self.header), options)); + + let mut wire_expr = None; + if interest.restricted() { + let ccond = Zenoh080Condition::new(interest.named()); + let mut we: WireExpr<'static> = ccond.read(&mut *reader)?; + we.mapping = if interest.mapping() { + Mapping::Sender + } else { + Mapping::Receiver + }; + wire_expr = Some(we); + } // Extensions let has_ext = imsg::has_flag(self.header, token::flag::Z); @@ -911,8 +914,8 @@ where Ok(interest::DeclareInterest { id, + interest, wire_expr, - interest: interest.into(), }) } } diff --git a/commons/zenoh-protocol/src/network/declare.rs b/commons/zenoh-protocol/src/network/declare.rs index 1568029cc6..8164d9440d 100644 --- a/commons/zenoh-protocol/src/network/declare.rs +++ b/commons/zenoh-protocol/src/network/declare.rs @@ -18,7 +18,6 @@ use crate::{ zextz64, zextzbuf, }; use alloc::borrow::Cow; -use core::ops::BitOr; pub use interest::*; pub use keyexpr::*; pub use queryable::*; @@ -703,13 +702,18 @@ pub mod token { } pub mod interest { + use core::{ + fmt::{self, Debug}, + ops::{Add, AddAssign, Sub, SubAssign}, + }; + use super::*; pub type InterestId = u32; pub mod flag { - pub const N: u8 = 1 << 5; // 0x20 Named if N==1 then the key expr has name/suffix - pub const M: u8 = 1 << 6; // 0x40 Mapping if M==1 then key expr mapping is the one declared by the sender, else it is the one declared by the receiver + pub const C: u8 = 1 << 5; // 0x20 Current if C==1 then the interest refers to the current declarations. + pub const F: u8 = 1 << 6; // 0x40 Future if F==1 then the interest refers to the future declarations. pub const Z: u8 = 1 << 7; // 0x80 Extensions if Z==1 then an extension will follow } @@ -753,21 +757,23 @@ pub mod interest { /// /// ```text /// Flags: - /// - N: Named If N==1 then the key expr has name/suffix - /// - M: Mapping if M==1 then key expr mapping is the one declared by the sender, else it is the one declared by the receiver + /// - C: Current if C==1 then the interest refers to the current declarations. + /// - F: Future if F==1 then the interest refers to the future declarations. Note that if F==0 then: + /// - Declarations SHOULD NOT be sent after the FinalInterest; + /// - UndeclareInterest SHOULD NOT be sent after the FinalInterest. /// - Z: Extension If Z==1 then at least one extension is present /// /// 7 6 5 4 3 2 1 0 /// +-+-+-+-+-+-+-+-+ - /// |Z|M|N| D_INT | + /// |Z|F|C| D_INT | /// +---------------+ /// ~ intst_id:z32 ~ /// +---------------+ - /// ~ key_scope:z16 ~ + /// |A|M|N|R|T|Q|S|K| (*) /// +---------------+ - /// ~ key_suffix ~ if N==1 -- + /// ~ key_scope:z16 ~ if R==1 /// +---------------+ - /// |A|F|C|X|T|Q|S|K| (*) + /// ~ key_suffix ~ if R==1 && N==1 -- /// +---------------+ /// ~ [decl_exts] ~ if Z==1 /// +---------------+ @@ -776,63 +782,141 @@ pub mod interest { /// - if S==1 then the interest refers to subscribers /// - if Q==1 then the interest refers to queryables /// - if T==1 then the interest refers to tokens - /// - if C==1 then the interest refers to the current declarations. - /// - if F==1 then the interest refers to the future declarations. Note that if F==0 then: - /// - replies SHOULD NOT be sent after the FinalInterest; - /// - UndeclareInterest SHOULD NOT be sent after the FinalInterest. + /// - if R==1 then the interest is restricted to the matching key expression, else it is for all key expressions. + /// - if N==1 then the key expr has name/suffix. If R==0 then N should be set to 0. + /// - if M==1 then key expr mapping is the one declared by the sender, else it is the one declared by the receiver. + /// If R==0 then M should be set to 0. /// - if A==1 then the replies SHOULD be aggregated /// ``` #[derive(Debug, Clone, PartialEq, Eq)] pub struct DeclareInterest { pub id: InterestId, - pub wire_expr: WireExpr<'static>, pub interest: Interest, + pub wire_expr: Option>, } - #[repr(transparent)] - #[derive(Debug, Clone, PartialEq, Eq)] - pub struct Interest(u8); + impl DeclareInterest { + pub fn flags(&self) -> u8 { + let mut interest = self.interest; + if self.interest.current() { + interest += Interest::CURRENT; + } + if self.interest.future() { + interest += Interest::FUTURE; + } + interest.flags + } + + pub fn options(&self) -> u8 { + let mut interest = self.interest; + if let Some(we) = self.wire_expr.as_ref() { + interest += Interest::RESTRICTED; + if we.has_suffix() { + interest += Interest::NAMED; + } + if let Mapping::Sender = we.mapping { + interest += Interest::MAPPING; + } + } + interest.options + } + + #[cfg(feature = "test")] + pub fn rand() -> Self { + use rand::Rng; + let mut rng = rand::thread_rng(); + + let id: InterestId = rng.gen(); + let wire_expr = rng.gen_bool(0.5).then_some(WireExpr::rand()); + let interest = Interest::rand(); + + Self { + id, + wire_expr, + interest, + } + } + } + + #[derive(Clone, Copy)] + pub struct Interest { + flags: u8, + options: u8, + } impl Interest { - pub const KEYEXPRS: Interest = Interest(1); - pub const SUBSCRIBERS: Interest = Interest(1 << 1); - pub const QUERYABLES: Interest = Interest(1 << 2); - pub const TOKENS: Interest = Interest(1 << 3); - // pub const X: Interest = Interest(1 << 4); - pub const CURRENT: Interest = Interest(1 << 5); - pub const FUTURE: Interest = Interest(1 << 6); - pub const AGGREGATE: Interest = Interest(1 << 7); + // Header + pub const CURRENT: Interest = Interest::flags(interest::flag::C); + pub const FUTURE: Interest = Interest::flags(interest::flag::F); + // Flags + pub const KEYEXPRS: Interest = Interest::options(1); + pub const SUBSCRIBERS: Interest = Interest::options(1 << 1); + pub const QUERYABLES: Interest = Interest::options(1 << 2); + pub const TOKENS: Interest = Interest::options(1 << 3); + const RESTRICTED: Interest = Interest::options(1 << 4); + const NAMED: Interest = Interest::options(1 << 5); + const MAPPING: Interest = Interest::options(1 << 6); + pub const AGGREGATE: Interest = Interest::options(1 << 7); + pub const ALL: Interest = Interest::options( + Interest::KEYEXPRS.options + | Interest::SUBSCRIBERS.options + | Interest::QUERYABLES.options + | Interest::TOKENS.options, + ); + + const fn flags(flags: u8) -> Self { + Self { flags, options: 0 } + } + + const fn options(options: u8) -> Self { + Self { flags: 0, options } + } + + pub const fn empty() -> Self { + Self { + flags: 0, + options: 0, + } + } + + pub const fn current(&self) -> bool { + imsg::has_flag(self.flags, Self::CURRENT.flags) + } + + pub const fn future(&self) -> bool { + imsg::has_flag(self.flags, Self::FUTURE.flags) + } pub const fn keyexprs(&self) -> bool { - imsg::has_flag(self.0, Self::KEYEXPRS.0) + imsg::has_flag(self.options, Self::KEYEXPRS.options) } pub const fn subscribers(&self) -> bool { - imsg::has_flag(self.0, Self::SUBSCRIBERS.0) + imsg::has_flag(self.options, Self::SUBSCRIBERS.options) } pub const fn queryables(&self) -> bool { - imsg::has_flag(self.0, Self::QUERYABLES.0) + imsg::has_flag(self.options, Self::QUERYABLES.options) } pub const fn tokens(&self) -> bool { - imsg::has_flag(self.0, Self::TOKENS.0) + imsg::has_flag(self.options, Self::TOKENS.options) } - pub const fn current(&self) -> bool { - imsg::has_flag(self.0, Self::CURRENT.0) + pub const fn restricted(&self) -> bool { + imsg::has_flag(self.options, Self::RESTRICTED.options) } - pub const fn future(&self) -> bool { - imsg::has_flag(self.0, Self::FUTURE.0) + pub const fn named(&self) -> bool { + imsg::has_flag(self.options, Self::NAMED.options) } - pub const fn aggregate(&self) -> bool { - imsg::has_flag(self.0, Self::AGGREGATE.0) + pub const fn mapping(&self) -> bool { + imsg::has_flag(self.options, Self::MAPPING.options) } - pub const fn as_u8(&self) -> u8 { - self.0 + pub const fn aggregate(&self) -> bool { + imsg::has_flag(self.options, Self::AGGREGATE.options) } #[cfg(feature = "test")] @@ -840,44 +924,129 @@ pub mod interest { use rand::Rng; let mut rng = rand::thread_rng(); - let inner: u8 = rng.gen(); + let mut s = Self::empty(); + if rng.gen_bool(0.5) { + s += Interest::CURRENT; + } + if rng.gen_bool(0.5) { + s += Interest::FUTURE; + } + if rng.gen_bool(0.5) { + s += Interest::KEYEXPRS; + } + if rng.gen_bool(0.5) { + s += Interest::SUBSCRIBERS; + } + if rng.gen_bool(0.5) { + s += Interest::TOKENS; + } + if rng.gen_bool(0.5) { + s += Interest::AGGREGATE; + } + s + } + } - Self(inner) + impl PartialEq for Interest { + fn eq(&self, other: &Self) -> bool { + self.current() == other.current() + && self.future() == other.future() + && self.keyexprs() == other.keyexprs() + && self.subscribers() == other.subscribers() + && self.queryables() == other.queryables() + && self.tokens() == other.tokens() + && self.aggregate() == other.aggregate() } } - impl BitOr for Interest { + impl Debug for Interest { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Interest {{ ")?; + if self.current() { + write!(f, "C:Y, ")?; + } else { + write!(f, "C:N, ")?; + } + if self.future() { + write!(f, "F:Y, ")?; + } else { + write!(f, "F:N, ")?; + } + if self.keyexprs() { + write!(f, "K:Y, ")?; + } else { + write!(f, "K:N, ")?; + } + if self.subscribers() { + write!(f, "S:Y, ")?; + } else { + write!(f, "S:N, ")?; + } + if self.queryables() { + write!(f, "Q:Y, ")?; + } else { + write!(f, "Q:N, ")?; + } + if self.tokens() { + write!(f, "T:Y, ")?; + } else { + write!(f, "T:N, ")?; + } + if self.aggregate() { + write!(f, "A:Y")?; + } else { + write!(f, "A:N")?; + } + write!(f, " }}")?; + Ok(()) + } + } + + impl Eq for Interest {} + + impl Add for Interest { type Output = Self; - fn bitor(self, rhs: Self) -> Self::Output { - Self(self.0 | rhs.0) + fn add(self, rhs: Self) -> Self::Output { + Self { + flags: self.flags | rhs.flags, + options: self.options | rhs.options, + } } } - impl From for Interest { - fn from(v: u8) -> Self { - Self(v) + impl AddAssign for Interest { + fn add_assign(&mut self, rhs: Self) { + self.flags |= rhs.flags; + self.options |= rhs.options; } } - impl DeclareInterest { - #[cfg(feature = "test")] - pub fn rand() -> Self { - use rand::Rng; - let mut rng = rand::thread_rng(); - - let id: InterestId = rng.gen(); - let wire_expr = WireExpr::rand(); - let interest = Interest::rand(); + impl Sub for Interest { + type Output = Self; + fn sub(self, rhs: Self) -> Self::Output { Self { - id, - wire_expr, - interest, + flags: self.flags & !rhs.flags, + options: self.options & !rhs.options, } } } + impl SubAssign for Interest { + fn sub_assign(&mut self, rhs: Self) { + self.flags &= !rhs.flags; + self.options &= !rhs.options; + } + } + + impl From<(u8, u8)> for Interest { + fn from(value: (u8, u8)) -> Self { + let (flags, options) = value; + Self { flags, options } + } + } + /// ```text /// Flags: /// - X: Reserved diff --git a/zenoh/src/net/routing/mod.rs b/zenoh/src/net/routing/mod.rs index 0b069c1337..8147cca31c 100644 --- a/zenoh/src/net/routing/mod.rs +++ b/zenoh/src/net/routing/mod.rs @@ -115,7 +115,7 @@ impl RoutingContext { DeclareBody::UndeclareQueryable(m) => Some(&m.ext_wire_expr.wire_expr), DeclareBody::DeclareToken(m) => Some(&m.wire_expr), DeclareBody::UndeclareToken(m) => Some(&m.ext_wire_expr.wire_expr), - DeclareBody::DeclareInterest(m) => Some(&m.wire_expr), + DeclareBody::DeclareInterest(m) => m.wire_expr.as_ref(), DeclareBody::FinalInterest(_) => None, DeclareBody::UndeclareInterest(m) => Some(&m.ext_wire_expr.wire_expr), },