Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Declare message can be Push/Request/RequestContinuous/Response #902

Merged
merged 4 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 83 additions & 74 deletions commons/zenoh-codec/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ use zenoh_buffers::{
ZBuf,
};
use zenoh_protocol::{
common::{iext, imsg, ZExtZ64},
common::{
iext,
imsg::{self, HEADER_BITS},
ZExtZ64,
},
core::{ExprId, ExprLen, WireExpr},
network::{
declare::{
self, common, interest, keyexpr, queryable, subscriber, token, Declare, DeclareBody,
Interest,
DeclareMode, Interest,
},
id, Mapping,
},
Expand All @@ -48,8 +52,8 @@ where
DeclareBody::DeclareToken(r) => self.write(&mut *writer, r)?,
DeclareBody::UndeclareToken(r) => self.write(&mut *writer, r)?,
DeclareBody::DeclareInterest(r) => self.write(&mut *writer, r)?,
DeclareBody::FinalInterest(r) => self.write(&mut *writer, r)?,
DeclareBody::UndeclareInterest(r) => self.write(&mut *writer, r)?,
DeclareBody::DeclareFinal(r) => self.write(&mut *writer, r)?,
}

Ok(())
Expand Down Expand Up @@ -77,8 +81,8 @@ where
D_TOKEN => DeclareBody::DeclareToken(codec.read(&mut *reader)?),
U_TOKEN => DeclareBody::UndeclareToken(codec.read(&mut *reader)?),
D_INTEREST => DeclareBody::DeclareInterest(codec.read(&mut *reader)?),
F_INTEREST => DeclareBody::FinalInterest(codec.read(&mut *reader)?),
U_INTEREST => DeclareBody::UndeclareInterest(codec.read(&mut *reader)?),
D_FINAL => DeclareBody::DeclareFinal(codec.read(&mut *reader)?),
_ => return Err(DidntRead),
};

Expand All @@ -95,7 +99,7 @@ where

fn write(self, writer: &mut W, x: &Declare) -> Self::Output {
let Declare {
interest_id,
mode,
ext_qos,
ext_tstamp,
ext_nodeid,
Expand All @@ -104,9 +108,13 @@ where

// Header
let mut header = id::DECLARE;
if x.interest_id.is_some() {
header |= declare::flag::I;
}
header |= match mode {
Mallets marked this conversation as resolved.
Show resolved Hide resolved
DeclareMode::Push => 0b00,
DeclareMode::Request(_) => 0b01,
DeclareMode::RequestContinuous(_) => 0b10,
DeclareMode::Response(_) => 0b11,
} << HEADER_BITS;

let mut n_exts = ((ext_qos != &declare::ext::QoSType::DEFAULT) as u8)
+ (ext_tstamp.is_some() as u8)
+ ((ext_nodeid != &declare::ext::NodeIdType::DEFAULT) as u8);
Expand All @@ -116,8 +124,11 @@ where
self.write(&mut *writer, header)?;

// Body
if let Some(interest_id) = interest_id {
self.write(&mut *writer, interest_id)?;
if let DeclareMode::Request(rid)
| DeclareMode::RequestContinuous(rid)
| DeclareMode::Response(rid) = mode
{
self.write(&mut *writer, rid)?;
}

// Extensions
Expand Down Expand Up @@ -166,10 +177,14 @@ where
return Err(DidntRead);
}

let mut interest_id = None;
if imsg::has_flag(self.header, declare::flag::I) {
interest_id = Some(self.codec.read(&mut *reader)?);
}
// Body
let mode = match (self.header >> HEADER_BITS) & 0b11 {
0b00 => DeclareMode::Push,
0b01 => DeclareMode::Request(self.codec.read(&mut *reader)?),
0b10 => DeclareMode::RequestContinuous(self.codec.read(&mut *reader)?),
0b11 => DeclareMode::Response(self.codec.read(&mut *reader)?),
_ => return Err(DidntRead),
};

// Extensions
let mut ext_qos = declare::ext::QoSType::DEFAULT;
Expand Down Expand Up @@ -206,7 +221,7 @@ where
let body: DeclareBody = self.codec.read(&mut *reader)?;

Ok(Declare {
interest_id,
mode,
ext_qos,
ext_tstamp,
ext_nodeid,
Expand All @@ -215,6 +230,59 @@ where
}
}

// Final
impl<W> WCodec<&common::DeclareFinal, &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &common::DeclareFinal) -> Self::Output {
let common::DeclareFinal = x;

// Header
let header = declare::id::D_FINAL;
self.write(&mut *writer, header)?;

Ok(())
}
}

impl<R> RCodec<common::DeclareFinal, &mut R> for Zenoh080
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<common::DeclareFinal, Self::Error> {
let header: u8 = self.read(&mut *reader)?;
let codec = Zenoh080Header::new(header);

codec.read(reader)
}
}

impl<R> RCodec<common::DeclareFinal, &mut R> for Zenoh080Header
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<common::DeclareFinal, Self::Error> {
if imsg::mid(self.header) != declare::id::D_FINAL {
return Err(DidntRead);
}

// Extensions
let has_ext = imsg::has_flag(self.header, token::flag::Z);
if has_ext {
extension::skip_all(reader, "Final")?;
}

Ok(common::DeclareFinal)
}
}

// DeclareKeyExpr
impl<W> WCodec<&keyexpr::DeclareKeyExpr, &mut W> for Zenoh080
where
Expand Down Expand Up @@ -976,65 +1044,6 @@ where
}
}

// FinalInterest
impl<W> WCodec<&interest::FinalInterest, &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &interest::FinalInterest) -> Self::Output {
let interest::FinalInterest { id } = x;

// Header
let header = declare::id::F_INTEREST;
self.write(&mut *writer, header)?;

// Body
self.write(&mut *writer, id)?;

Ok(())
}
}

impl<R> RCodec<interest::FinalInterest, &mut R> for Zenoh080
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<interest::FinalInterest, Self::Error> {
let header: u8 = self.read(&mut *reader)?;
let codec = Zenoh080Header::new(header);

codec.read(reader)
}
}

impl<R> RCodec<interest::FinalInterest, &mut R> for Zenoh080Header
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<interest::FinalInterest, Self::Error> {
if imsg::mid(self.header) != declare::id::F_INTEREST {
return Err(DidntRead);
}

// Body
let id: interest::InterestId = self.codec.read(&mut *reader)?;

// Extensions
let has_ext = imsg::has_flag(self.header, token::flag::Z);
if has_ext {
extension::skip_all(reader, "FinalInterest")?;
}

Ok(interest::FinalInterest { id })
}
}

// UndeclareInterest
impl<W> WCodec<&interest::UndeclareInterest, &mut W> for Zenoh080
where
Expand Down
16 changes: 16 additions & 0 deletions commons/zenoh-codec/tests/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,22 @@ use zenoh_protocol::{
zenoh, zextunit, zextz64, zextzbuf,
};

#[test]
fn zbuf_test() {
let mut buffer = vec![0u8; 64];

let zbuf = ZBuf::empty();
let mut writer = buffer.writer();

let codec = Zenoh080::new();
codec.write(&mut writer, &zbuf).unwrap();
println!("Buffer: {:?}", buffer);

let mut reader = buffer.reader();
let ret: ZBuf = codec.read(&mut reader).unwrap();
assert_eq!(ret, zbuf);
}

const NUM_ITER: usize = 100;
const MAX_PAYLOAD_SIZE: usize = 256;

Expand Down
Loading