Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Declare message can be Push/Request/RequestContinuous/Response 2 #906

Merged
merged 9 commits into from
Apr 5, 2024
6 changes: 1 addition & 5 deletions commons/zenoh-codec/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,6 @@ where

fn write(self, writer: &mut W, x: &interest::DeclareInterest) -> Self::Output {
let interest::DeclareInterest {
id,
interest: _,
wire_expr,
} = x;
Expand All @@ -977,7 +976,6 @@ where
self.write(&mut *writer, header)?;

// Body
self.write(&mut *writer, id)?;
self.write(&mut *writer, x.options())?;
if let Some(we) = wire_expr.as_ref() {
self.write(&mut *writer, we)?;
Expand Down Expand Up @@ -1012,9 +1010,8 @@ where
}

// Body
let id: interest::InterestId = self.codec.read(&mut *reader)?;
let options: u8 = self.codec.read(&mut *reader)?;
let interest = Interest::from((imsg::flags(self.header), options));
let interest = Interest::from(options);

let mut wire_expr = None;
if interest.restricted() {
Expand All @@ -1035,7 +1032,6 @@ where
}

Ok(interest::DeclareInterest {
id,
interest,
wire_expr,
})
Expand Down
58 changes: 33 additions & 25 deletions commons/zenoh-protocol/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -733,23 +733,23 @@ pub mod interest {

/// # DeclareInterest message
///
/// The DECLARE INTEREST message is sent to request the transmission of current and/or future
/// The DECLARE INTEREST message is sent to request the transmission of current and optionally future
/// declarations of a given kind matching a target keyexpr. E.g., a declare interest could be
/// sent to request the transmisison of all current subscriptions matching `a/*`.
///
/// The behaviour of a DECLARE INTEREST depends on the DECLARE MODE in the DECLARE MESSAGE:
/// - Push: only future declarations
/// - Push: invalid
/// - Request: only current declarations
/// - RequestContinous: current and future declarations
/// - Response: invalid
///
/// E.g., the [`DeclareInterest`] message flow is the following:
/// E.g., the [`DeclareInterest`] message flow is the following for a Request:
///
/// ```text
/// A B
/// | DECL INTEREST |
/// |------------------>| -- Sent in Declare::RequestContinuous.
/// | | This is a DeclareInterest e.g. for subscriber declarations/undeclarations.
/// |------------------>| -- Sent in Declare::Request.
/// | | This is a DeclareInterest e.g. for subscriber declarations.
/// | |
/// | DECL SUBSCRIBER |
/// |<------------------| -- Sent in Declare::Response
Expand All @@ -760,6 +760,26 @@ pub mod interest {
/// | |
/// | FINAL |
/// |<------------------| -- Sent in Declare::Response
/// ```
///
///
/// And the [`DeclareInterest`] message flow is the following for a ContinuousRequest:
Mallets marked this conversation as resolved.
Show resolved Hide resolved
///
/// ```text
/// A B
/// | DECL INTEREST |
/// |------------------>| -- Sent in Declare::RequestContinuous.
/// | | This is a DeclareInterest e.g. for subscriber declarations/undeclarations.
/// | |
/// | DECL SUBSCRIBER |
/// |<------------------| -- Sent in Declare::Push
/// | DECL SUBSCRIBER |
/// |<------------------| -- Sent in Declare::Push
/// | DECL SUBSCRIBER |
/// |<------------------| -- Sent in Declare::Push
/// | |
/// | FINAL |
/// |<------------------| -- Sent in Declare::Response
/// | |
/// | DECL SUBSCRIBER |
/// |<------------------| -- Sent in Declare::Push. This is a new subscriber declaration.
Expand All @@ -784,9 +804,7 @@ pub mod interest {
///
/// 7 6 5 4 3 2 1 0
/// +-+-+-+-+-+-+-+-+
/// |Z|F|X| D_INT |
/// +---------------+
/// ~ intst_id:z32 ~
/// |Z|X|X| D_INT |
/// +---------------+
/// |A|M|N|R|T|Q|S|K| (*)
/// +---------------+
Expand All @@ -809,7 +827,6 @@ pub mod interest {
/// ```
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DeclareInterest {
pub id: InterestId,
pub interest: Interest,
pub wire_expr: Option<WireExpr<'static>>,
}
Expand All @@ -834,12 +851,10 @@ pub mod interest {
use rand::Rng;
let mut rng = rand::thread_rng();

let id: InterestId = rng.gen();
let wire_expr = rng.gen_bool(0.5).then_some(WireExpr::rand());
let interest = Interest::rand();

Self {
id,
wire_expr,
interest,
}
Expand All @@ -848,7 +863,6 @@ pub mod interest {

#[derive(Clone, Copy)]
pub struct Interest {
flags: u8,
options: u8,
}

Expand All @@ -870,14 +884,11 @@ pub mod interest {
);

const fn options(options: u8) -> Self {
Self { flags: 0, options }
Self { options }
}

pub const fn empty() -> Self {
Self {
flags: 0,
options: 0,
}
Self { options: 0 }
}

pub const fn keyexprs(&self) -> bool {
Expand Down Expand Up @@ -982,17 +993,17 @@ pub mod interest {
impl Add for Interest {
type Output = Self;

#[allow(clippy::suspicious_arithmetic_impl)] // Allows to implement Add & Sub for Interest
fn add(self, rhs: Self) -> Self::Output {
Self {
flags: self.flags | rhs.flags,
options: self.options | rhs.options,
}
}
}

impl AddAssign for Interest {
#[allow(clippy::suspicious_op_assign_impl)] // Allows to implement Add & Sub for Interest
fn add_assign(&mut self, rhs: Self) {
self.flags |= rhs.flags;
self.options |= rhs.options;
}
}
Expand All @@ -1002,23 +1013,20 @@ pub mod interest {

fn sub(self, rhs: Self) -> Self::Output {
Self {
flags: self.flags & !rhs.flags,
options: self.options & !rhs.options,
}
}
}

impl SubAssign for Interest {
fn sub_assign(&mut self, rhs: Self) {
self.flags &= !rhs.flags;
self.options &= !rhs.options;
}
}

impl From<(u8, u8)> for Interest {
fn from(value: (u8, u8)) -> Self {
let (flags, options) = value;
Self { flags, options }
impl From<u8> for Interest {
fn from(options: u8) -> Self {
Self { options }
}
}

Expand Down
Loading