Skip to content

Commit

Permalink
Declare message can be Push/Request/RequestContinuous/Response
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Apr 4, 2024
1 parent 312c03a commit 41f59d3
Show file tree
Hide file tree
Showing 19 changed files with 280 additions and 236 deletions.
157 changes: 83 additions & 74 deletions commons/zenoh-codec/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ use zenoh_buffers::{
ZBuf,
};
use zenoh_protocol::{
common::{iext, imsg, ZExtZ64},
common::{
iext,
imsg::{self, HEADER_BITS},
ZExtZ64,
},
core::{ExprId, ExprLen, WireExpr},
network::{
declare::{
self, common, interest, keyexpr, queryable, subscriber, token, Declare, DeclareBody,
Interest,
DeclareMode, Interest,
},
id, Mapping,
},
Expand All @@ -48,8 +52,8 @@ where
DeclareBody::DeclareToken(r) => self.write(&mut *writer, r)?,
DeclareBody::UndeclareToken(r) => self.write(&mut *writer, r)?,
DeclareBody::DeclareInterest(r) => self.write(&mut *writer, r)?,
DeclareBody::FinalInterest(r) => self.write(&mut *writer, r)?,
DeclareBody::UndeclareInterest(r) => self.write(&mut *writer, r)?,
DeclareBody::DeclareFinal(r) => self.write(&mut *writer, r)?,
}

Ok(())
Expand Down Expand Up @@ -77,8 +81,8 @@ where
D_TOKEN => DeclareBody::DeclareToken(codec.read(&mut *reader)?),
U_TOKEN => DeclareBody::UndeclareToken(codec.read(&mut *reader)?),
D_INTEREST => DeclareBody::DeclareInterest(codec.read(&mut *reader)?),
F_INTEREST => DeclareBody::FinalInterest(codec.read(&mut *reader)?),
U_INTEREST => DeclareBody::UndeclareInterest(codec.read(&mut *reader)?),
D_FINAL => DeclareBody::DeclareFinal(codec.read(&mut *reader)?),
_ => return Err(DidntRead),
};

Expand All @@ -95,7 +99,7 @@ where

fn write(self, writer: &mut W, x: &Declare) -> Self::Output {
let Declare {
interest_id,
mode,
ext_qos,
ext_tstamp,
ext_nodeid,
Expand All @@ -104,9 +108,13 @@ where

// Header
let mut header = id::DECLARE;
if x.interest_id.is_some() {
header |= declare::flag::I;
}
header |= match mode {
DeclareMode::Push => 0b00,
DeclareMode::Request(_) => 0b01,
DeclareMode::RequestContinuous(_) => 0b10,
DeclareMode::Response(_) => 0b11,
} << HEADER_BITS;

let mut n_exts = ((ext_qos != &declare::ext::QoSType::DEFAULT) as u8)
+ (ext_tstamp.is_some() as u8)
+ ((ext_nodeid != &declare::ext::NodeIdType::DEFAULT) as u8);
Expand All @@ -116,8 +124,11 @@ where
self.write(&mut *writer, header)?;

// Body
if let Some(interest_id) = interest_id {
self.write(&mut *writer, interest_id)?;
if let DeclareMode::Request(rid)
| DeclareMode::RequestContinuous(rid)
| DeclareMode::Response(rid) = mode
{
self.write(&mut *writer, rid)?;
}

// Extensions
Expand Down Expand Up @@ -166,10 +177,14 @@ where
return Err(DidntRead);
}

let mut interest_id = None;
if imsg::has_flag(self.header, declare::flag::I) {
interest_id = Some(self.codec.read(&mut *reader)?);
}
// Body
let mode = match (self.header >> HEADER_BITS) & 0b11 {
0b00 => DeclareMode::Push,
0b01 => DeclareMode::Request(self.codec.read(&mut *reader)?),
0b10 => DeclareMode::RequestContinuous(self.codec.read(&mut *reader)?),
0b11 => DeclareMode::Response(self.codec.read(&mut *reader)?),
_ => return Err(DidntRead),
};

// Extensions
let mut ext_qos = declare::ext::QoSType::DEFAULT;
Expand Down Expand Up @@ -206,7 +221,7 @@ where
let body: DeclareBody = self.codec.read(&mut *reader)?;

Ok(Declare {
interest_id,
mode,
ext_qos,
ext_tstamp,
ext_nodeid,
Expand All @@ -215,6 +230,59 @@ where
}
}

// Final
impl<W> WCodec<&common::DeclareFinal, &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &common::DeclareFinal) -> Self::Output {
let common::DeclareFinal = x;

// Header
let header = declare::id::D_FINAL;
self.write(&mut *writer, header)?;

Ok(())
}
}

impl<R> RCodec<common::DeclareFinal, &mut R> for Zenoh080
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<common::DeclareFinal, Self::Error> {
let header: u8 = self.read(&mut *reader)?;
let codec = Zenoh080Header::new(header);

codec.read(reader)
}
}

impl<R> RCodec<common::DeclareFinal, &mut R> for Zenoh080Header
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<common::DeclareFinal, Self::Error> {
if imsg::mid(self.header) != declare::id::D_FINAL {
return Err(DidntRead);
}

// Extensions
let has_ext = imsg::has_flag(self.header, token::flag::Z);
if has_ext {
extension::skip_all(reader, "Final")?;
}

Ok(common::DeclareFinal)
}
}

// DeclareKeyExpr
impl<W> WCodec<&keyexpr::DeclareKeyExpr, &mut W> for Zenoh080
where
Expand Down Expand Up @@ -976,65 +1044,6 @@ where
}
}

// FinalInterest
impl<W> WCodec<&interest::FinalInterest, &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &interest::FinalInterest) -> Self::Output {
let interest::FinalInterest { id } = x;

// Header
let header = declare::id::F_INTEREST;
self.write(&mut *writer, header)?;

// Body
self.write(&mut *writer, id)?;

Ok(())
}
}

impl<R> RCodec<interest::FinalInterest, &mut R> for Zenoh080
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<interest::FinalInterest, Self::Error> {
let header: u8 = self.read(&mut *reader)?;
let codec = Zenoh080Header::new(header);

codec.read(reader)
}
}

impl<R> RCodec<interest::FinalInterest, &mut R> for Zenoh080Header
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<interest::FinalInterest, Self::Error> {
if imsg::mid(self.header) != declare::id::F_INTEREST {
return Err(DidntRead);
}

// Body
let id: interest::InterestId = self.codec.read(&mut *reader)?;

// Extensions
let has_ext = imsg::has_flag(self.header, token::flag::Z);
if has_ext {
extension::skip_all(reader, "FinalInterest")?;
}

Ok(interest::FinalInterest { id })
}
}

// UndeclareInterest
impl<W> WCodec<&interest::UndeclareInterest, &mut W> for Zenoh080
where
Expand Down
16 changes: 16 additions & 0 deletions commons/zenoh-codec/tests/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,22 @@ use zenoh_protocol::{
zenoh, zextunit, zextz64, zextzbuf,
};

#[test]
fn zbuf_test() {
let mut buffer = vec![0u8; 64];

let zbuf = ZBuf::empty();
let mut writer = buffer.writer();

let codec = Zenoh080::new();
codec.write(&mut writer, &zbuf).unwrap();
println!("Buffer: {:?}", buffer);

let mut reader = buffer.reader();
let ret: ZBuf = codec.read(&mut reader).unwrap();
assert_eq!(ret, zbuf);
}

const NUM_ITER: usize = 100;
const MAX_PAYLOAD_SIZE: usize = 256;

Expand Down
Loading

0 comments on commit 41f59d3

Please sign in to comment.