Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interest network message #915

Merged
merged 20 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 14 additions & 115 deletions commons/zenoh-codec/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,10 @@ use zenoh_buffers::{
ZBuf,
};
use zenoh_protocol::{
common::{
iext,
imsg::{self, HEADER_BITS},
ZExtZ64,
},
common::{iext, imsg, ZExtZ64},
core::{ExprId, ExprLen, WireExpr},
network::{
declare::{
self, common, interest, keyexpr, queryable, subscriber, token, Declare, DeclareBody,
DeclareMode, Interest,
},
declare::{self, common, keyexpr, queryable, subscriber, token, Declare, DeclareBody},
id, Mapping,
},
};
Expand All @@ -51,7 +44,6 @@ where
DeclareBody::UndeclareQueryable(r) => self.write(&mut *writer, r)?,
DeclareBody::DeclareToken(r) => self.write(&mut *writer, r)?,
DeclareBody::UndeclareToken(r) => self.write(&mut *writer, r)?,
DeclareBody::DeclareInterest(r) => self.write(&mut *writer, r)?,
DeclareBody::DeclareFinal(r) => self.write(&mut *writer, r)?,
}

Expand Down Expand Up @@ -79,7 +71,6 @@ where
U_QUERYABLE => DeclareBody::UndeclareQueryable(codec.read(&mut *reader)?),
D_TOKEN => DeclareBody::DeclareToken(codec.read(&mut *reader)?),
U_TOKEN => DeclareBody::UndeclareToken(codec.read(&mut *reader)?),
D_INTEREST => DeclareBody::DeclareInterest(codec.read(&mut *reader)?),
D_FINAL => DeclareBody::DeclareFinal(codec.read(&mut *reader)?),
_ => return Err(DidntRead),
};
Expand All @@ -97,7 +88,7 @@ where

fn write(self, writer: &mut W, x: &Declare) -> Self::Output {
let Declare {
mode,
interest_id,
ext_qos,
ext_tstamp,
ext_nodeid,
Expand All @@ -106,13 +97,9 @@ where

// Header
let mut header = id::DECLARE;
header |= match mode {
DeclareMode::Push => 0b00,
DeclareMode::Response(_) => 0b01,
DeclareMode::Request(_) => 0b10,
DeclareMode::RequestContinuous(_) => 0b11,
} << HEADER_BITS;

if x.interest_id.is_some() {
header |= declare::flag::I;
}
let mut n_exts = ((ext_qos != &declare::ext::QoSType::DEFAULT) as u8)
+ (ext_tstamp.is_some() as u8)
+ ((ext_nodeid != &declare::ext::NodeIdType::DEFAULT) as u8);
Expand All @@ -121,12 +108,8 @@ where
}
self.write(&mut *writer, header)?;

// Body
if let DeclareMode::Request(rid)
| DeclareMode::RequestContinuous(rid)
| DeclareMode::Response(rid) = mode
{
self.write(&mut *writer, rid)?;
if let Some(interest_id) = interest_id {
self.write(&mut *writer, interest_id)?;
}

// Extensions
Expand Down Expand Up @@ -175,14 +158,10 @@ where
return Err(DidntRead);
}

// Body
let mode = match (self.header >> HEADER_BITS) & 0b11 {
0b00 => DeclareMode::Push,
0b01 => DeclareMode::Response(self.codec.read(&mut *reader)?),
0b10 => DeclareMode::Request(self.codec.read(&mut *reader)?),
0b11 => DeclareMode::RequestContinuous(self.codec.read(&mut *reader)?),
_ => return Err(DidntRead),
};
let mut interest_id = None;
if imsg::has_flag(self.header, declare::flag::I) {
interest_id = Some(self.codec.read(&mut *reader)?);
}

// Extensions
let mut ext_qos = declare::ext::QoSType::DEFAULT;
Expand Down Expand Up @@ -219,7 +198,7 @@ where
let body: DeclareBody = self.codec.read(&mut *reader)?;

Ok(Declare {
mode,
interest_id,
ext_qos,
ext_tstamp,
ext_nodeid,
Expand Down Expand Up @@ -938,7 +917,7 @@ where
// Extensions
let mut ext_wire_expr = common::ext::WireExprType::null();

let mut has_ext = imsg::has_flag(self.header, interest::flag::Z);
let mut has_ext = imsg::has_flag(self.header, token::flag::Z);
while has_ext {
let ext: u8 = self.codec.read(&mut *reader)?;
let eodec = Zenoh080Header::new(ext);
Expand All @@ -958,86 +937,6 @@ where
}
}

// DeclareInterest
impl<W> WCodec<&interest::DeclareInterest, &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &interest::DeclareInterest) -> Self::Output {
let interest::DeclareInterest {
interest: _,
wire_expr,
} = x;

// Header
let header = declare::id::D_INTEREST;
self.write(&mut *writer, header)?;

// Body
self.write(&mut *writer, x.options())?;
if let Some(we) = wire_expr.as_ref() {
self.write(&mut *writer, we)?;
}

Ok(())
}
}

impl<R> RCodec<interest::DeclareInterest, &mut R> for Zenoh080
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<interest::DeclareInterest, Self::Error> {
let header: u8 = self.read(&mut *reader)?;
let codec = Zenoh080Header::new(header);
codec.read(reader)
}
}

impl<R> RCodec<interest::DeclareInterest, &mut R> for Zenoh080Header
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<interest::DeclareInterest, Self::Error> {
if imsg::mid(self.header) != declare::id::D_INTEREST {
return Err(DidntRead);
}

// Body
let options: u8 = self.codec.read(&mut *reader)?;
let interest = Interest::from(options);

let mut wire_expr = None;
if interest.restricted() {
let ccond = Zenoh080Condition::new(interest.named());
let mut we: WireExpr<'static> = ccond.read(&mut *reader)?;
we.mapping = if interest.mapping() {
Mapping::Sender
} else {
Mapping::Receiver
};
wire_expr = Some(we);
}

// Extensions
let has_ext = imsg::has_flag(self.header, token::flag::Z);
if has_ext {
extension::skip_all(reader, "DeclareInterest")?;
}

Ok(interest::DeclareInterest {
interest,
wire_expr,
})
}
}

// WARNING: this is a temporary extension used for undeclarations
impl<W> WCodec<(&common::ext::WireExprType, bool), &mut W> for Zenoh080
where
Expand Down
186 changes: 186 additions & 0 deletions commons/zenoh-codec/src/network/interest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
//
// Copyright (c) 2022 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Condition, Zenoh080Header};
use zenoh_buffers::{
reader::{DidntRead, Reader},
writer::{DidntWrite, Writer},
};
use zenoh_protocol::{
common::{
iext,
imsg::{self, HEADER_BITS},
},
core::WireExpr,
network::{
declare, id,
interest::{self, Interest, InterestMode, InterestOptions},
Mapping,
},
};

// Interest
impl<W> WCodec<&Interest, &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &Interest) -> Self::Output {
let Interest {
id,
mode,
options: _, // Compute the options on-the-fly according to Interest fields
wire_expr,
ext_qos,
ext_tstamp,
ext_nodeid,
} = x;

// Header
let mut header = id::INTEREST;
header |= match mode {
InterestMode::Final => 0b00,
InterestMode::Current => 0b01,
InterestMode::Future => 0b10,
InterestMode::CurrentFuture => 0b11,
} << HEADER_BITS;
let mut n_exts = ((ext_qos != &declare::ext::QoSType::DEFAULT) as u8)
+ (ext_tstamp.is_some() as u8)
+ ((ext_nodeid != &declare::ext::NodeIdType::DEFAULT) as u8);
if n_exts != 0 {
header |= declare::flag::Z;
}
self.write(&mut *writer, header)?;

self.write(&mut *writer, id)?;

if *mode != InterestMode::Final {
self.write(&mut *writer, x.options())?;
if let Some(we) = wire_expr.as_ref() {
self.write(&mut *writer, we)?;
}
}

// Extensions
if ext_qos != &declare::ext::QoSType::DEFAULT {
n_exts -= 1;
self.write(&mut *writer, (*ext_qos, n_exts != 0))?;
}
if let Some(ts) = ext_tstamp.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (ts, n_exts != 0))?;
}
if ext_nodeid != &declare::ext::NodeIdType::DEFAULT {
n_exts -= 1;
self.write(&mut *writer, (*ext_nodeid, n_exts != 0))?;
}

Ok(())
}
}

impl<R> RCodec<Interest, &mut R> for Zenoh080
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<Interest, Self::Error> {
let header: u8 = self.read(&mut *reader)?;
let codec = Zenoh080Header::new(header);

codec.read(reader)
}
}

impl<R> RCodec<Interest, &mut R> for Zenoh080Header
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<Interest, Self::Error> {
if imsg::mid(self.header) != id::INTEREST {
return Err(DidntRead);
}

let id = self.codec.read(&mut *reader)?;
let mode = match (self.header >> HEADER_BITS) & 0b11 {
0b00 => InterestMode::Final,
0b01 => InterestMode::Current,
0b10 => InterestMode::Future,
0b11 => InterestMode::CurrentFuture,
_ => return Err(DidntRead),
};

let mut options = InterestOptions::empty();
let mut wire_expr = None;
if mode != InterestMode::Final {
let options_byte: u8 = self.codec.read(&mut *reader)?;
options = InterestOptions::from(options_byte);
if options.restricted() {
let ccond = Zenoh080Condition::new(options.named());
let mut we: WireExpr<'static> = ccond.read(&mut *reader)?;
we.mapping = if options.mapping() {
Mapping::Sender
} else {
Mapping::Receiver
};
wire_expr = Some(we);
}
}

// Extensions
let mut ext_qos = declare::ext::QoSType::DEFAULT;
let mut ext_tstamp = None;
let mut ext_nodeid = declare::ext::NodeIdType::DEFAULT;

let mut has_ext = imsg::has_flag(self.header, declare::flag::Z);
while has_ext {
let ext: u8 = self.codec.read(&mut *reader)?;
let eodec = Zenoh080Header::new(ext);
match iext::eid(ext) {
declare::ext::QoS::ID => {
let (q, ext): (interest::ext::QoSType, bool) = eodec.read(&mut *reader)?;
ext_qos = q;
has_ext = ext;
}
declare::ext::Timestamp::ID => {
let (t, ext): (interest::ext::TimestampType, bool) =
eodec.read(&mut *reader)?;
ext_tstamp = Some(t);
has_ext = ext;
}
declare::ext::NodeId::ID => {
let (nid, ext): (interest::ext::NodeIdType, bool) = eodec.read(&mut *reader)?;
ext_nodeid = nid;
has_ext = ext;
}
_ => {
has_ext = extension::skip(reader, "Declare", ext)?;
}
}
}

Ok(Interest {
id,
mode,
options,
wire_expr,
ext_qos,
ext_tstamp,
ext_nodeid,
})
}
}
Loading
Loading