Skip to content

Commit

Permalink
Simplify Error message (#813)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets authored Mar 13, 2024
1 parent f12f338 commit e06b46d
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 114 deletions.
57 changes: 23 additions & 34 deletions commons/zenoh-codec/src/zenoh/err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header};
use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Bounded, Zenoh080Header};
use alloc::vec::Vec;
use zenoh_buffers::{
reader::{DidntRead, Reader},
writer::{DidntWrite, Writer},
ZBuf,
};
use zenoh_protocol::{
common::{iext, imsg},
core::Encoding,
zenoh::{
err::{ext, flag, Err},
id,
Expand All @@ -33,49 +35,42 @@ where

fn write(self, writer: &mut W, x: &Err) -> Self::Output {
let Err {
code,
is_infrastructure,
timestamp,
encoding,
ext_sinfo,
ext_body,
ext_unknown,
payload,
} = x;

// Header
let mut header = id::ERR;
if timestamp.is_some() {
header |= flag::T;
if encoding != &Encoding::empty() {
header |= flag::E;
}
if *is_infrastructure {
header |= flag::I;
}
let mut n_exts =
(ext_sinfo.is_some() as u8) + (ext_body.is_some() as u8) + (ext_unknown.len() as u8);
let mut n_exts = (ext_sinfo.is_some() as u8) + (ext_unknown.len() as u8);
if n_exts != 0 {
header |= flag::Z;
}
self.write(&mut *writer, header)?;

// Body
self.write(&mut *writer, code)?;
if let Some(ts) = timestamp.as_ref() {
self.write(&mut *writer, ts)?;
if encoding != &Encoding::empty() {
self.write(&mut *writer, encoding)?;
}

// Extensions
if let Some(sinfo) = ext_sinfo.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (sinfo, n_exts != 0))?;
}
if let Some(body) = ext_body.as_ref() {
n_exts -= 1;
self.write(&mut *writer, (body, n_exts != 0))?;
}
for u in ext_unknown.iter() {
n_exts -= 1;
self.write(&mut *writer, (u, n_exts != 0))?;
}

// Payload
let bodec = Zenoh080Bounded::<u32>::new();
bodec.write(&mut *writer, payload)?;

Ok(())
}
}
Expand Down Expand Up @@ -105,16 +100,13 @@ where
}

// Body
let code: u16 = self.codec.read(&mut *reader)?;
let is_infrastructure = imsg::has_flag(self.header, flag::I);
let mut timestamp: Option<uhlc::Timestamp> = None;
if imsg::has_flag(self.header, flag::T) {
timestamp = Some(self.codec.read(&mut *reader)?);
let mut encoding = Encoding::empty();
if imsg::has_flag(self.header, flag::E) {
encoding = self.codec.read(&mut *reader)?;
}

// Extensions
let mut ext_sinfo: Option<ext::SourceInfoType> = None;
let mut ext_body: Option<ext::ErrBodyType> = None;
let mut ext_unknown = Vec::new();

let mut has_ext = imsg::has_flag(self.header, flag::Z);
Expand All @@ -127,11 +119,6 @@ where
ext_sinfo = Some(s);
has_ext = ext;
}
ext::ErrBodyType::VID | ext::ErrBodyType::SID => {
let (s, ext): (ext::ErrBodyType, bool) = eodec.read(&mut *reader)?;
ext_body = Some(s);
has_ext = ext;
}
_ => {
let (u, ext) = extension::read(reader, "Err", ext)?;
ext_unknown.push(u);
Expand All @@ -140,13 +127,15 @@ where
}
}

// Payload
let bodec = Zenoh080Bounded::<u32>::new();
let payload: ZBuf = bodec.read(&mut *reader)?;

Ok(Err {
code,
is_infrastructure,
timestamp,
encoding,
ext_sinfo,
ext_body,
ext_unknown,
payload,
})
}
}
52 changes: 18 additions & 34 deletions commons/zenoh-protocol/src/zenoh/err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,43 +11,41 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use crate::common::ZExtUnknown;
use crate::{common::ZExtUnknown, core::Encoding};
use alloc::vec::Vec;
use uhlc::Timestamp;
use zenoh_buffers::ZBuf;

/// # Err message
///
/// ```text
/// Flags:
/// - T: Timestamp If T==1 then the timestamp if present
/// - I: Infrastructure If I==1 then the error is related to the infrastructure else to the user
/// - X: Reserved
/// - E: Encoding If E==1 then the encoding is present
/// - Z: Extension If Z==1 then at least one extension is present
///
/// 7 6 5 4 3 2 1 0
/// +-+-+-+-+-+-+-+-+
/// |Z|I|T| ERR |
/// |Z|E|X| ERR |
/// +-+-+-+---------+
/// % code:z16 %
/// +---------------+
/// ~ ts: <u8;z16> ~ if T==1
/// ~ encoding ~ if E==1
/// +---------------+
/// ~ [err_exts] ~ if Z==1
/// +---------------+
/// ~ pl: <u8;z32> ~ -- Payload
/// +---------------+
/// ```
pub mod flag {
pub const T: u8 = 1 << 5; // 0x20 Timestamp if T==0 then the timestamp if present
pub const I: u8 = 1 << 6; // 0x40 Infrastructure if I==1 then the error is related to the infrastructure else to the user
// pub const X: u8 = 1 << 5; // 0x20 Reserved
pub const E: u8 = 1 << 6; // 0x40 Encoding if E==1 then the encoding is present
pub const Z: u8 = 1 << 7; // 0x80 Extensions if Z==1 then an extension will follow
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Err {
pub code: u16,
pub is_infrastructure: bool,
pub timestamp: Option<Timestamp>,
pub encoding: Encoding,
pub ext_sinfo: Option<ext::SourceInfoType>,
pub ext_body: Option<ext::ErrBodyType>,
pub ext_unknown: Vec<ZExtUnknown>,
pub payload: ZBuf,
}

pub mod ext {
Expand All @@ -57,45 +55,31 @@ pub mod ext {
/// Used to carry additional information about the source of data
pub type SourceInfo = zextzbuf!(0x1, false);
pub type SourceInfoType = crate::zenoh::ext::SourceInfoType<{ SourceInfo::ID }>;

/// # ErrBody extension
/// Used to carry a body attached to the query
/// Shared Memory extension is automatically defined by ValueType extension if
/// #[cfg(feature = "shared-memory")] is defined.
pub type ErrBodyType = crate::zenoh::ext::ValueType<{ ZExtZBuf::<0x02>::id(false) }, 0x03>;
}

impl Err {
#[cfg(feature = "test")]
pub fn rand() -> Self {
use crate::{common::iext, core::ZenohId};
use crate::common::iext;
use rand::Rng;
let mut rng = rand::thread_rng();

let code: u16 = rng.gen();
let is_infrastructure = rng.gen_bool(0.5);
let timestamp = rng.gen_bool(0.5).then_some({
let time = uhlc::NTP64(rng.gen());
let id = uhlc::ID::try_from(ZenohId::rand().to_le_bytes()).unwrap();
Timestamp::new(time, id)
});
let encoding = Encoding::rand();
let ext_sinfo = rng.gen_bool(0.5).then_some(ext::SourceInfoType::rand());
let ext_body = rng.gen_bool(0.5).then_some(ext::ErrBodyType::rand());
let mut ext_unknown = Vec::new();
for _ in 0..rng.gen_range(0..4) {
ext_unknown.push(ZExtUnknown::rand2(
iext::mid(ext::ErrBodyType::SID) + 1,
iext::mid(ext::SourceInfo::ID) + 1,
false,
));
}
let payload = ZBuf::rand(rng.gen_range(0..=64));

Self {
code,
is_infrastructure,
timestamp,
encoding,
ext_sinfo,
ext_body,
ext_unknown,
payload,
}
}
}
28 changes: 4 additions & 24 deletions io/zenoh-transport/src/shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use zenoh_core::{zasyncread, zasyncwrite, zerror};
use zenoh_protocol::{
network::{NetworkBody, NetworkMessage, Push, Request, Response},
zenoh::{
err::{ext::ErrBodyType, Err},
err::Err,
ext::ShmType,
query::{ext::QueryBodyType, Query},
reply::ReplyBody,
Expand Down Expand Up @@ -123,31 +123,11 @@ impl MapShm for Reply {
// Impl - Err
impl MapShm for Err {
fn map_to_shminfo(&mut self) -> ZResult<bool> {
if let Self {
ext_body: Some(ErrBodyType {
payload, ext_shm, ..
}),
..
} = self
{
map_to_shminfo!(payload, ext_shm)
} else {
Ok(false)
}
Ok(false)
}

fn map_to_shmbuf(&mut self, shmr: &RwLock<SharedMemoryReader>) -> ZResult<bool> {
if let Self {
ext_body: Some(ErrBodyType {
payload, ext_shm, ..
}),
..
} = self
{
map_to_shmbuf!(payload, ext_shm, shmr)
} else {
Ok(false)
}
fn map_to_shmbuf(&mut self, _shmr: &RwLock<SharedMemoryReader>) -> ZResult<bool> {
Ok(false)
}
}

Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/net/routing/dispatcher/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ macro_rules! inc_res_stats {
ResponseBody::Err(e) => {
stats.[<$txrx _z_reply_msgs>].[<inc_ $space>](1);
stats.[<$txrx _z_reply_pl_bytes>].[<inc_ $space>](
e.ext_body.as_ref().map(|b| b.payload.len()).unwrap_or(0),
e.payload.len()
);
}
}
Expand Down
19 changes: 7 additions & 12 deletions zenoh/src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ use std::ops::Deref;
use std::sync::Arc;
use uhlc::Timestamp;
use zenoh_core::{AsyncResolve, Resolvable, SyncResolve};
use zenoh_protocol::core::{EntityId, WireExpr};
use zenoh_protocol::network::{response, Mapping, RequestId, Response, ResponseFinal};
use zenoh_protocol::zenoh::{self, ext::ValueType, reply::ReplyBody, Del, Put, ResponseBody};
use zenoh_protocol::{
core::{EntityId, WireExpr},
network::{response, Mapping, RequestId, Response, ResponseFinal},
zenoh::{self, reply::ReplyBody, Del, Put, ResponseBody},
};
use zenoh_result::ZResult;

pub(crate) struct QueryInner {
Expand Down Expand Up @@ -380,17 +382,10 @@ impl SyncResolve for ReplyErrBuilder<'_> {
mapping: Mapping::Sender,
},
payload: ResponseBody::Err(zenoh::Err {
timestamp: None,
is_infrastructure: false,
encoding: self.value.encoding.into(),
ext_sinfo: None,
ext_unknown: vec![],
ext_body: Some(ValueType {
#[cfg(feature = "shared-memory")]
ext_shm: None,
payload: self.value.payload.into(),
encoding: self.value.encoding.into(),
}),
code: 0, // TODO
payload: self.value.payload.into(),
}),
ext_qos: response::ext::QoSType::RESPONSE,
ext_tstamp: None,
Expand Down
12 changes: 3 additions & 9 deletions zenoh/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2128,15 +2128,9 @@ impl Primitives for Session {
Some(query) => {
let callback = query.callback.clone();
std::mem::drop(state);
let value = match e.ext_body {
Some(body) => Value {
payload: body.payload.into(),
encoding: body.encoding.into(),
},
None => Value {
payload: Payload::empty(),
encoding: Encoding::default(),
},
let value = Value {
payload: e.payload.into(),
encoding: e.encoding.into(),
};
let replier_id = match e.ext_sinfo {
Some(info) => info.id.zid,
Expand Down

0 comments on commit e06b46d

Please sign in to comment.