Skip to content

Commit

Permalink
Declare message can be Push/Request/RequestContinuous/Response 2 (#906)
Browse files Browse the repository at this point in the history
* Declare message can be Push/Request/RequestContinuous/Response

* Address review comments

* Remove F: Future flag from DeclareInterest

* cargo fmt --all

* Remove unused Interest flags field

* Update doc

* Remove unneeded interest_id field

* Update commons/zenoh-protocol/src/network/declare.rs

---------

Co-authored-by: Luca Cominardi <[email protected]>
  • Loading branch information
OlivierHecart and Mallets authored Apr 5, 2024
1 parent 2da0aeb commit 71a9423
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 30 deletions.
6 changes: 1 addition & 5 deletions commons/zenoh-codec/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,6 @@ where

fn write(self, writer: &mut W, x: &interest::DeclareInterest) -> Self::Output {
let interest::DeclareInterest {
id,
interest: _,
wire_expr,
} = x;
Expand All @@ -977,7 +976,6 @@ where
self.write(&mut *writer, header)?;

// Body
self.write(&mut *writer, id)?;
self.write(&mut *writer, x.options())?;
if let Some(we) = wire_expr.as_ref() {
self.write(&mut *writer, we)?;
Expand Down Expand Up @@ -1012,9 +1010,8 @@ where
}

// Body
let id: interest::InterestId = self.codec.read(&mut *reader)?;
let options: u8 = self.codec.read(&mut *reader)?;
let interest = Interest::from((imsg::flags(self.header), options));
let interest = Interest::from(options);

let mut wire_expr = None;
if interest.restricted() {
Expand All @@ -1035,7 +1032,6 @@ where
}

Ok(interest::DeclareInterest {
id,
interest,
wire_expr,
})
Expand Down
58 changes: 33 additions & 25 deletions commons/zenoh-protocol/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -733,23 +733,23 @@ pub mod interest {

/// # DeclareInterest message
///
/// The DECLARE INTEREST message is sent to request the transmission of current and/or future
/// The DECLARE INTEREST message is sent to request the transmission of current and optionally future
/// declarations of a given kind matching a target keyexpr. E.g., a declare interest could be
/// sent to request the transmisison of all current subscriptions matching `a/*`.
///
/// The behaviour of a DECLARE INTEREST depends on the DECLARE MODE in the DECLARE MESSAGE:
/// - Push: only future declarations
/// - Push: invalid
/// - Request: only current declarations
/// - RequestContinous: current and future declarations
/// - Response: invalid
///
/// E.g., the [`DeclareInterest`] message flow is the following:
/// E.g., the [`DeclareInterest`] message flow is the following for a Request:
///
/// ```text
/// A B
/// | DECL INTEREST |
/// |------------------>| -- Sent in Declare::RequestContinuous.
/// | | This is a DeclareInterest e.g. for subscriber declarations/undeclarations.
/// |------------------>| -- Sent in Declare::Request.
/// | | This is a DeclareInterest e.g. for subscriber declarations.
/// | |
/// | DECL SUBSCRIBER |
/// |<------------------| -- Sent in Declare::Response
Expand All @@ -760,6 +760,26 @@ pub mod interest {
/// | |
/// | FINAL |
/// |<------------------| -- Sent in Declare::Response
/// ```
///
///
/// And the [`DeclareInterest`] message flow is the following for a RequestContinuous:
///
/// ```text
/// A B
/// | DECL INTEREST |
/// |------------------>| -- Sent in Declare::RequestContinuous.
/// | | This is a DeclareInterest e.g. for subscriber declarations/undeclarations.
/// | |
/// | DECL SUBSCRIBER |
/// |<------------------| -- Sent in Declare::Push
/// | DECL SUBSCRIBER |
/// |<------------------| -- Sent in Declare::Push
/// | DECL SUBSCRIBER |
/// |<------------------| -- Sent in Declare::Push
/// | |
/// | FINAL |
/// |<------------------| -- Sent in Declare::Response
/// | |
/// | DECL SUBSCRIBER |
/// |<------------------| -- Sent in Declare::Push. This is a new subscriber declaration.
Expand All @@ -784,9 +804,7 @@ pub mod interest {
///
/// 7 6 5 4 3 2 1 0
/// +-+-+-+-+-+-+-+-+
/// |Z|F|X| D_INT |
/// +---------------+
/// ~ intst_id:z32 ~
/// |Z|X|X| D_INT |
/// +---------------+
/// |A|M|N|R|T|Q|S|K| (*)
/// +---------------+
Expand All @@ -809,7 +827,6 @@ pub mod interest {
/// ```
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DeclareInterest {
pub id: InterestId,
pub interest: Interest,
pub wire_expr: Option<WireExpr<'static>>,
}
Expand All @@ -834,12 +851,10 @@ pub mod interest {
use rand::Rng;
let mut rng = rand::thread_rng();

let id: InterestId = rng.gen();
let wire_expr = rng.gen_bool(0.5).then_some(WireExpr::rand());
let interest = Interest::rand();

Self {
id,
wire_expr,
interest,
}
Expand All @@ -848,7 +863,6 @@ pub mod interest {

#[derive(Clone, Copy)]
pub struct Interest {
flags: u8,
options: u8,
}

Expand All @@ -870,14 +884,11 @@ pub mod interest {
);

const fn options(options: u8) -> Self {
Self { flags: 0, options }
Self { options }
}

pub const fn empty() -> Self {
Self {
flags: 0,
options: 0,
}
Self { options: 0 }
}

pub const fn keyexprs(&self) -> bool {
Expand Down Expand Up @@ -982,17 +993,17 @@ pub mod interest {
impl Add for Interest {
type Output = Self;

#[allow(clippy::suspicious_arithmetic_impl)] // Allows to implement Add & Sub for Interest
fn add(self, rhs: Self) -> Self::Output {
Self {
flags: self.flags | rhs.flags,
options: self.options | rhs.options,
}
}
}

impl AddAssign for Interest {
#[allow(clippy::suspicious_op_assign_impl)] // Allows to implement Add & Sub for Interest
fn add_assign(&mut self, rhs: Self) {
self.flags |= rhs.flags;
self.options |= rhs.options;
}
}
Expand All @@ -1002,23 +1013,20 @@ pub mod interest {

fn sub(self, rhs: Self) -> Self::Output {
Self {
flags: self.flags & !rhs.flags,
options: self.options & !rhs.options,
}
}
}

impl SubAssign for Interest {
fn sub_assign(&mut self, rhs: Self) {
self.flags &= !rhs.flags;
self.options &= !rhs.options;
}
}

impl From<(u8, u8)> for Interest {
fn from(value: (u8, u8)) -> Self {
let (flags, options) = value;
Self { flags, options }
impl From<u8> for Interest {
fn from(options: u8) -> Self {
Self { options }
}
}

Expand Down

0 comments on commit 71a9423

Please sign in to comment.