From efe1135d2841d219a88a7f3c97e399d741bf1137 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Fri, 9 Feb 2024 16:55:30 +0100 Subject: [PATCH] Update Reply protocol definition and codec (#717) * Update Reply protocol definition and codec * Make consolidation a flag in Query/Reply * Fix wrong Consolidation cast in codec * Apply Reply changes to routing * Fix shared-memory feature * Fix stats * Bump Zenoh Protocol Version * Add query/reply ok(put|del)/err() tests --- commons/zenoh-codec/src/zenoh/mod.rs | 4 +- commons/zenoh-codec/src/zenoh/query.rs | 70 +++++----- commons/zenoh-codec/src/zenoh/reply.rs | 141 +++----------------- commons/zenoh-codec/tests/codec.rs | 2 +- commons/zenoh-protocol/src/lib.rs | 2 +- commons/zenoh-protocol/src/zenoh/mod.rs | 10 +- commons/zenoh-protocol/src/zenoh/query.rs | 25 ++-- commons/zenoh-protocol/src/zenoh/reply.rs | 90 +++---------- io/zenoh-transport/src/common/stats.rs | 8 ++ io/zenoh-transport/src/shm.rs | 17 +-- zenoh/src/net/routing/dispatcher/pubsub.rs | 13 +- zenoh/src/net/routing/dispatcher/queries.rs | 54 +++++--- zenoh/src/queryable.rs | 81 ++++++----- zenoh/src/session.rs | 74 +++++++--- zenoh/tests/session.rs | 71 +++++++++- 15 files changed, 329 insertions(+), 333 deletions(-) diff --git a/commons/zenoh-codec/src/zenoh/mod.rs b/commons/zenoh-codec/src/zenoh/mod.rs index 2e3ea48be7..d59add9d63 100644 --- a/commons/zenoh-codec/src/zenoh/mod.rs +++ b/commons/zenoh-codec/src/zenoh/mod.rs @@ -121,8 +121,8 @@ where fn write(self, writer: &mut W, x: &ResponseBody) -> Self::Output { match x { ResponseBody::Reply(b) => self.write(&mut *writer, b), - ResponseBody::Err(b) => self.write(&mut *writer, b), ResponseBody::Ack(b) => self.write(&mut *writer, b), + ResponseBody::Err(b) => self.write(&mut *writer, b), ResponseBody::Put(b) => self.write(&mut *writer, b), } } @@ -140,8 +140,8 @@ where let codec = Zenoh080Header::new(header); let body = match imsg::mid(codec.header) { id::REPLY => ResponseBody::Reply(codec.read(&mut *reader)?), - id::ERR => ResponseBody::Err(codec.read(&mut *reader)?), id::ACK => ResponseBody::Ack(codec.read(&mut *reader)?), + id::ERR => ResponseBody::Err(codec.read(&mut *reader)?), id::PUT => ResponseBody::Put(codec.read(&mut *reader)?), _ => return Err(DidntRead), }; diff --git a/commons/zenoh-codec/src/zenoh/query.rs b/commons/zenoh-codec/src/zenoh/query.rs index 09b01b2266..cb0506e474 100644 --- a/commons/zenoh-codec/src/zenoh/query.rs +++ b/commons/zenoh-codec/src/zenoh/query.rs @@ -22,48 +22,46 @@ use zenoh_protocol::{ common::{iext, imsg}, zenoh::{ id, - query::{ext, flag, Query}, + query::{ext, flag, Consolidation, Query}, }, }; -// Extension Consolidation -impl WCodec<(ext::ConsolidationType, bool), &mut W> for Zenoh080 +// Consolidation +impl WCodec for Zenoh080 where W: Writer, { type Output = Result<(), DidntWrite>; - fn write(self, writer: &mut W, x: (ext::ConsolidationType, bool)) -> Self::Output { - let (x, more) = x; + fn write(self, writer: &mut W, x: Consolidation) -> Self::Output { let v: u64 = match x { - ext::ConsolidationType::Auto => 0, - ext::ConsolidationType::None => 1, - ext::ConsolidationType::Monotonic => 2, - ext::ConsolidationType::Latest => 3, - ext::ConsolidationType::Unique => 4, + Consolidation::Auto => 0, + Consolidation::None => 1, + Consolidation::Monotonic => 2, + Consolidation::Latest => 3, + Consolidation::Unique => 4, }; - let v = ext::Consolidation::new(v); - self.write(&mut *writer, (&v, more)) + self.write(&mut *writer, v) } } -impl RCodec<(ext::ConsolidationType, bool), &mut R> for Zenoh080Header +impl RCodec for Zenoh080 where R: Reader, { type Error = DidntRead; - fn read(self, reader: &mut R) -> Result<(ext::ConsolidationType, bool), Self::Error> { - let (ext, more): (ext::Consolidation, bool) = self.read(&mut *reader)?; - let c = match ext.value { - 0 => ext::ConsolidationType::Auto, - 1 => ext::ConsolidationType::None, - 2 => ext::ConsolidationType::Monotonic, - 3 => ext::ConsolidationType::Latest, - 4 => ext::ConsolidationType::Unique, - _ => return Err(DidntRead), + fn read(self, reader: &mut R) -> Result { + let v: u64 = self.read(&mut *reader)?; + let c = match v { + 0 => Consolidation::Auto, + 1 => Consolidation::None, + 2 => Consolidation::Monotonic, + 3 => Consolidation::Latest, + 4 => Consolidation::Unique, + _ => Consolidation::Auto, // Fallback on Auto if Consolidation is unknown }; - Ok((c, more)) + Ok(c) } } @@ -75,9 +73,9 @@ where fn write(self, writer: &mut W, x: &Query) -> Self::Output { let Query { + consolidation, parameters, ext_sinfo, - ext_consolidation, ext_body, ext_attachment, ext_unknown, @@ -85,11 +83,13 @@ where // Header let mut header = id::QUERY; + if consolidation != &Consolidation::default() { + header |= flag::C; + } if !parameters.is_empty() { header |= flag::P; } let mut n_exts = (ext_sinfo.is_some() as u8) - + ((ext_consolidation != &ext::ConsolidationType::default()) as u8) + (ext_body.is_some() as u8) + (ext_attachment.is_some() as u8) + (ext_unknown.len() as u8); @@ -99,6 +99,9 @@ where self.write(&mut *writer, header)?; // Body + if consolidation != &Consolidation::default() { + self.write(&mut *writer, *consolidation)?; + } if !parameters.is_empty() { self.write(&mut *writer, parameters)?; } @@ -108,10 +111,6 @@ where n_exts -= 1; self.write(&mut *writer, (sinfo, n_exts != 0))?; } - if ext_consolidation != &ext::ConsolidationType::default() { - n_exts -= 1; - self.write(&mut *writer, (*ext_consolidation, n_exts != 0))?; - } if let Some(body) = ext_body.as_ref() { n_exts -= 1; self.write(&mut *writer, (body, n_exts != 0))?; @@ -154,6 +153,11 @@ where } // Body + let mut consolidation = Consolidation::default(); + if imsg::has_flag(self.header, flag::C) { + consolidation = self.codec.read(&mut *reader)?; + } + let mut parameters = String::new(); if imsg::has_flag(self.header, flag::P) { parameters = self.codec.read(&mut *reader)?; @@ -161,7 +165,6 @@ where // Extensions let mut ext_sinfo: Option = None; - let mut ext_consolidation = ext::ConsolidationType::default(); let mut ext_body: Option = None; let mut ext_attachment: Option = None; let mut ext_unknown = Vec::new(); @@ -176,11 +179,6 @@ where ext_sinfo = Some(s); has_ext = ext; } - ext::Consolidation::ID => { - let (c, ext): (ext::ConsolidationType, bool) = eodec.read(&mut *reader)?; - ext_consolidation = c; - has_ext = ext; - } ext::QueryBodyType::SID | ext::QueryBodyType::VID => { let (s, ext): (ext::QueryBodyType, bool) = eodec.read(&mut *reader)?; ext_body = Some(s); @@ -200,9 +198,9 @@ where } Ok(Query { + consolidation, parameters, ext_sinfo, - ext_consolidation, ext_body, ext_attachment, ext_unknown, diff --git a/commons/zenoh-codec/src/zenoh/reply.rs b/commons/zenoh-codec/src/zenoh/reply.rs index d98c72b341..d54e98cc5e 100644 --- a/commons/zenoh-codec/src/zenoh/reply.rs +++ b/commons/zenoh-codec/src/zenoh/reply.rs @@ -11,23 +11,18 @@ // Contributors: // ZettaScale Zenoh Team, // -#[cfg(not(feature = "shared-memory"))] -use crate::Zenoh080Bounded; -#[cfg(feature = "shared-memory")] -use crate::Zenoh080Sliced; use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header}; use alloc::vec::Vec; use zenoh_buffers::{ reader::{DidntRead, Reader}, writer::{DidntWrite, Writer}, - ZBuf, }; use zenoh_protocol::{ - common::{iext, imsg}, - core::Encoding, + common::imsg, zenoh::{ id, - reply::{ext, flag, Reply}, + query::Consolidation, + reply::{flag, Reply, ReplyBody}, }, }; @@ -39,81 +34,35 @@ where fn write(self, writer: &mut W, x: &Reply) -> Self::Output { let Reply { - timestamp, - encoding, - ext_sinfo, - ext_consolidation, - #[cfg(feature = "shared-memory")] - ext_shm, - ext_attachment, + consolidation, ext_unknown, payload, } = x; // Header let mut header = id::REPLY; - if timestamp.is_some() { - header |= flag::T; - } - if encoding != &Encoding::default() { - header |= flag::E; - } - let mut n_exts = (ext_sinfo.is_some()) as u8 - + ((ext_consolidation != &ext::ConsolidationType::default()) as u8) - + (ext_attachment.is_some()) as u8 - + (ext_unknown.len() as u8); - #[cfg(feature = "shared-memory")] - { - n_exts += ext_shm.is_some() as u8; + if consolidation != &Consolidation::default() { + header |= flag::C; } + let mut n_exts = ext_unknown.len() as u8; if n_exts != 0 { header |= flag::Z; } self.write(&mut *writer, header)?; // Body - if let Some(ts) = timestamp.as_ref() { - self.write(&mut *writer, ts)?; - } - if encoding != &Encoding::default() { - self.write(&mut *writer, encoding)?; + if consolidation != &Consolidation::default() { + self.write(&mut *writer, *consolidation)?; } // Extensions - if let Some(sinfo) = ext_sinfo.as_ref() { - n_exts -= 1; - self.write(&mut *writer, (sinfo, n_exts != 0))?; - } - if ext_consolidation != &ext::ConsolidationType::default() { - n_exts -= 1; - self.write(&mut *writer, (*ext_consolidation, n_exts != 0))?; - } - #[cfg(feature = "shared-memory")] - if let Some(eshm) = ext_shm.as_ref() { - n_exts -= 1; - self.write(&mut *writer, (eshm, n_exts != 0))?; - } - if let Some(att) = ext_attachment.as_ref() { - n_exts -= 1; - self.write(&mut *writer, (att, n_exts != 0))?; - } for u in ext_unknown.iter() { n_exts -= 1; self.write(&mut *writer, (u, n_exts != 0))?; } // Payload - #[cfg(feature = "shared-memory")] - { - let codec = Zenoh080Sliced::::new(ext_shm.is_some()); - codec.write(&mut *writer, payload)?; - } - - #[cfg(not(feature = "shared-memory"))] - { - let bodec = Zenoh080Bounded::::new(); - bodec.write(&mut *writer, payload)?; - } + self.write(&mut *writer, payload)?; Ok(()) } @@ -144,81 +93,27 @@ where } // Body - let mut timestamp: Option = None; - if imsg::has_flag(self.header, flag::T) { - timestamp = Some(self.codec.read(&mut *reader)?); - } - - let mut encoding = Encoding::default(); - if imsg::has_flag(self.header, flag::E) { - encoding = self.codec.read(&mut *reader)?; + let mut consolidation = Consolidation::default(); + if imsg::has_flag(self.header, flag::C) { + consolidation = self.codec.read(&mut *reader)?; } // Extensions - let mut ext_sinfo: Option = None; - let mut ext_consolidation = ext::ConsolidationType::default(); - #[cfg(feature = "shared-memory")] - let mut ext_shm: Option = None; - let mut ext_attachment: Option = None; let mut ext_unknown = Vec::new(); let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { let ext: u8 = self.codec.read(&mut *reader)?; - let eodec = Zenoh080Header::new(ext); - match iext::eid(ext) { - ext::SourceInfo::ID => { - let (s, ext): (ext::SourceInfoType, bool) = eodec.read(&mut *reader)?; - ext_sinfo = Some(s); - has_ext = ext; - } - ext::Consolidation::ID => { - let (c, ext): (ext::ConsolidationType, bool) = eodec.read(&mut *reader)?; - ext_consolidation = c; - has_ext = ext; - } - #[cfg(feature = "shared-memory")] - ext::Shm::ID => { - let (s, ext): (ext::ShmType, bool) = eodec.read(&mut *reader)?; - ext_shm = Some(s); - has_ext = ext; - } - ext::Attachment::ID => { - let (a, ext): (ext::AttachmentType, bool) = eodec.read(&mut *reader)?; - ext_attachment = Some(a); - has_ext = ext; - } - _ => { - let (u, ext) = extension::read(reader, "Reply", ext)?; - ext_unknown.push(u); - has_ext = ext; - } - } + let (u, ext) = extension::read(reader, "Reply", ext)?; + ext_unknown.push(u); + has_ext = ext; } // Payload - let payload: ZBuf = { - #[cfg(feature = "shared-memory")] - { - let codec = Zenoh080Sliced::::new(ext_shm.is_some()); - codec.read(&mut *reader)? - } - - #[cfg(not(feature = "shared-memory"))] - { - let bodec = Zenoh080Bounded::::new(); - bodec.read(&mut *reader)? - } - }; + let payload: ReplyBody = self.codec.read(&mut *reader)?; Ok(Reply { - timestamp, - encoding, - ext_sinfo, - ext_consolidation, - #[cfg(feature = "shared-memory")] - ext_shm, - ext_attachment, + consolidation, ext_unknown, payload, }) diff --git a/commons/zenoh-codec/tests/codec.rs b/commons/zenoh-codec/tests/codec.rs index 3fdb95e1b5..28201c1977 100644 --- a/commons/zenoh-codec/tests/codec.rs +++ b/commons/zenoh-codec/tests/codec.rs @@ -556,7 +556,7 @@ fn codec_network() { run!(NetworkMessage, NetworkMessage::rand()); } -// Zenoh new +// Zenoh #[test] fn codec_put() { run!(zenoh::Put, zenoh::Put::rand()); diff --git a/commons/zenoh-protocol/src/lib.rs b/commons/zenoh-protocol/src/lib.rs index 2e1a2fa7cf..8d26f52ed9 100644 --- a/commons/zenoh-protocol/src/lib.rs +++ b/commons/zenoh-protocol/src/lib.rs @@ -28,7 +28,7 @@ pub mod transport; pub mod zenoh; // Zenoh version -pub const VERSION: u8 = 0x08; +pub const VERSION: u8 = 0x09; // Zenoh protocol uses the following conventions for message definition and representation. // diff --git a/commons/zenoh-protocol/src/zenoh/mod.rs b/commons/zenoh-protocol/src/zenoh/mod.rs index e67576e673..a23eaa9b21 100644 --- a/commons/zenoh-protocol/src/zenoh/mod.rs +++ b/commons/zenoh-protocol/src/zenoh/mod.rs @@ -95,10 +95,11 @@ impl RequestBody { let mut rng = rand::thread_rng(); - match rng.gen_range(0..3) { + match rng.gen_range(0..4) { 0 => RequestBody::Query(Query::rand()), 1 => RequestBody::Put(Put::rand()), 2 => RequestBody::Del(Del::rand()), + 3 => RequestBody::Pull(Pull::rand()), _ => unreachable!(), } } @@ -126,8 +127,8 @@ impl From for RequestBody { #[derive(Debug, Clone, PartialEq, Eq)] pub enum ResponseBody { Reply(Reply), - Err(Err), Ack(Ack), + Err(Err), Put(Put), } @@ -135,13 +136,12 @@ impl ResponseBody { #[cfg(feature = "test")] pub fn rand() -> Self { use rand::Rng; - let mut rng = rand::thread_rng(); match rng.gen_range(0..4) { 0 => ResponseBody::Reply(Reply::rand()), - 1 => ResponseBody::Err(Err::rand()), - 2 => ResponseBody::Ack(Ack::rand()), + 1 => ResponseBody::Ack(Ack::rand()), + 2 => ResponseBody::Err(Err::rand()), 3 => ResponseBody::Put(Put::rand()), _ => unreachable!(), } diff --git a/commons/zenoh-protocol/src/zenoh/query.rs b/commons/zenoh-protocol/src/zenoh/query.rs index 7432840492..17dfa23df8 100644 --- a/commons/zenoh-protocol/src/zenoh/query.rs +++ b/commons/zenoh-protocol/src/zenoh/query.rs @@ -69,50 +69,45 @@ impl From for Consolidation { /// /// ```text /// Flags: +/// - C: Consolidation if C==1 then consolidation is present /// - P: Parameters If P==1 then the parameters are present -/// - X: Reserved /// - Z: Extension If Z==1 then at least one extension is present /// /// 7 6 5 4 3 2 1 0 /// +-+-+-+-+-+-+-+-+ -/// |Z|X|P| QUERY | +/// |Z|P|C| QUERY | /// +-+-+-+---------+ +/// % consolidation % if C==1 +/// +---------------+ /// ~ ps: ~ if P==1 /// +---------------+ /// ~ [qry_exts] ~ if Z==1 /// +---------------+ /// ``` pub mod flag { - pub const P: u8 = 1 << 5; // 0x20 Parameters if P==1 then the parameters are present - // pub const X: u8 = 1 << 6; // 0x40 Reserved + pub const C: u8 = 1 << 5; // 0x20 Consolidation if C==1 then consolidation is present + pub const P: u8 = 1 << 6; // 0x40 Parameters if P==1 then the parameters are present pub const Z: u8 = 1 << 7; // 0x80 Extensions if Z==1 then an extension will follow } #[derive(Debug, Clone, PartialEq, Eq)] pub struct Query { + pub consolidation: Consolidation, pub parameters: String, pub ext_sinfo: Option, - pub ext_consolidation: Consolidation, pub ext_body: Option, pub ext_attachment: Option, pub ext_unknown: Vec, } pub mod ext { - use crate::{ - common::{ZExtZ64, ZExtZBuf}, - zextz64, zextzbuf, - }; + use crate::{common::ZExtZBuf, zextzbuf}; /// # SourceInfo extension /// Used to carry additional information about the source of data pub type SourceInfo = zextzbuf!(0x1, false); pub type SourceInfoType = crate::zenoh::ext::SourceInfoType<{ SourceInfo::ID }>; - /// # Consolidation extension - pub type Consolidation = zextz64!(0x2, true); - pub type ConsolidationType = crate::zenoh::query::Consolidation; - /// # QueryBody extension /// Used to carry a body attached to the query /// Shared Memory extension is automatically defined by ValueType extension if @@ -137,6 +132,7 @@ impl Query { const MIN: usize = 2; const MAX: usize = 16; + let consolidation = Consolidation::rand(); let parameters: String = if rng.gen_bool(0.5) { let len = rng.gen_range(MIN..MAX); Alphanumeric.sample_string(&mut rng, len) @@ -144,7 +140,6 @@ impl Query { String::new() }; let ext_sinfo = rng.gen_bool(0.5).then_some(ext::SourceInfoType::rand()); - let ext_consolidation = Consolidation::rand(); let ext_body = rng.gen_bool(0.5).then_some(ext::QueryBodyType::rand()); let ext_attachment = rng.gen_bool(0.5).then_some(ext::AttachmentType::rand()); let mut ext_unknown = Vec::new(); @@ -156,9 +151,9 @@ impl Query { } Self { + consolidation, parameters, ext_sinfo, - ext_consolidation, ext_body, ext_attachment, ext_unknown, diff --git a/commons/zenoh-protocol/src/zenoh/reply.rs b/commons/zenoh-protocol/src/zenoh/reply.rs index 2395e1e9b2..7cbab4ca0a 100644 --- a/commons/zenoh-protocol/src/zenoh/reply.rs +++ b/commons/zenoh-protocol/src/zenoh/reply.rs @@ -11,115 +11,61 @@ // Contributors: // ZettaScale Zenoh Team, // -use crate::{common::ZExtUnknown, core::Encoding}; +use crate::{ + common::ZExtUnknown, + zenoh::{query::Consolidation, PushBody}, +}; use alloc::vec::Vec; -use uhlc::Timestamp; -use zenoh_buffers::ZBuf; /// # Reply message /// /// ```text /// Flags: -/// - T: Timestamp If T==1 then the timestamp if present -/// - E: Encoding If E==1 then the encoding is present +/// - C: Consolidation if C==1 then consolidation is present +/// - X: Reserved /// - Z: Extension If Z==1 then at least one extension is present /// /// 7 6 5 4 3 2 1 0 /// +-+-+-+-+-+-+-+-+ -/// |Z|E|T| REPLY | +/// |Z|X|C| REPLY | /// +-+-+-+---------+ -/// ~ ts: ~ if T==1 -/// +---------------+ -/// ~ encoding ~ if E==1 +/// % consolidation % if C==1 /// +---------------+ /// ~ [repl_exts] ~ if Z==1 /// +---------------+ -/// ~ pl: ~ -- Payload +/// ~ ReplyBody ~ -- Payload /// +---------------+ /// ``` pub mod flag { - pub const T: u8 = 1 << 5; // 0x20 Timestamp if T==0 then the timestamp if present - pub const E: u8 = 1 << 6; // 0x40 Encoding if E==1 then the encoding is present + pub const C: u8 = 1 << 5; // 0x20 Consolidation if C==1 then consolidation is present + // pub const X: u8 = 1 << 6; // 0x40 Reserved pub const Z: u8 = 1 << 7; // 0x80 Extensions if Z==1 then an extension will follow } #[derive(Debug, Clone, PartialEq, Eq)] pub struct Reply { - pub timestamp: Option, - pub encoding: Encoding, - pub ext_sinfo: Option, - pub ext_consolidation: ext::ConsolidationType, - #[cfg(feature = "shared-memory")] - pub ext_shm: Option, - pub ext_attachment: Option, + pub consolidation: Consolidation, pub ext_unknown: Vec, - pub payload: ZBuf, + pub payload: ReplyBody, } -pub mod ext { - #[cfg(feature = "shared-memory")] - use crate::{common::ZExtUnit, zextunit}; - use crate::{ - common::{ZExtZ64, ZExtZBuf}, - zextz64, zextzbuf, - }; - - /// # SourceInfo extension - /// Used to carry additional information about the source of data - pub type SourceInfo = zextzbuf!(0x1, false); - pub type SourceInfoType = crate::zenoh::ext::SourceInfoType<{ SourceInfo::ID }>; - - /// # Consolidation extension - pub type Consolidation = zextz64!(0x2, true); - pub type ConsolidationType = crate::zenoh::query::ext::ConsolidationType; - - /// # Shared Memory extension - /// Used to carry additional information about the shared-memory layour of data - #[cfg(feature = "shared-memory")] - pub type Shm = zextunit!(0x3, true); - #[cfg(feature = "shared-memory")] - pub type ShmType = crate::zenoh::ext::ShmType<{ Shm::ID }>; - - /// # User attachment - pub type Attachment = zextzbuf!(0x4, false); - pub type AttachmentType = crate::zenoh::ext::AttachmentType<{ Attachment::ID }>; -} +pub type ReplyBody = PushBody; impl Reply { #[cfg(feature = "test")] pub fn rand() -> Self { - use crate::{common::iext, core::ZenohId, zenoh::Consolidation}; use rand::Rng; let mut rng = rand::thread_rng(); - let timestamp = rng.gen_bool(0.5).then_some({ - let time = uhlc::NTP64(rng.gen()); - let id = uhlc::ID::try_from(ZenohId::rand().to_le_bytes()).unwrap(); - Timestamp::new(time, id) - }); - let encoding = Encoding::rand(); - let ext_sinfo = rng.gen_bool(0.5).then_some(ext::SourceInfoType::rand()); - let ext_consolidation = Consolidation::rand(); - #[cfg(feature = "shared-memory")] - let ext_shm = rng.gen_bool(0.5).then_some(ext::ShmType::rand()); - let ext_attachment = rng.gen_bool(0.5).then_some(ext::AttachmentType::rand()); + let payload = ReplyBody::rand(); + let consolidation = Consolidation::rand(); let mut ext_unknown = Vec::new(); for _ in 0..rng.gen_range(0..4) { - ext_unknown.push(ZExtUnknown::rand2( - iext::mid(ext::Attachment::ID) + 1, - false, - )); + ext_unknown.push(ZExtUnknown::rand2(1, false)); } - let payload = ZBuf::rand(rng.gen_range(1..=64)); Self { - timestamp, - encoding, - ext_sinfo, - ext_consolidation, - #[cfg(feature = "shared-memory")] - ext_shm, - ext_attachment, + consolidation, ext_unknown, payload, } diff --git a/io/zenoh-transport/src/common/stats.rs b/io/zenoh-transport/src/common/stats.rs index f095a58273..aaf39641c0 100644 --- a/io/zenoh-transport/src/common/stats.rs +++ b/io/zenoh-transport/src/common/stats.rs @@ -208,6 +208,10 @@ stats_struct! { # TYPE "counter" pub tx_z_del_msgs DiscriminatedStats, + # HELP "Counter of received bytes in zenoh del message attachments." + # TYPE "counter" + pub tx_z_del_pl_bytes DiscriminatedStats, + # HELP "Counter of sent zenoh query messages." # TYPE "counter" pub tx_z_query_msgs DiscriminatedStats, @@ -252,6 +256,10 @@ stats_struct! { # TYPE "counter" pub rx_z_del_msgs DiscriminatedStats, + # HELP "Counter of received bytes in zenoh del message attachments." + # TYPE "counter" + pub rx_z_del_pl_bytes DiscriminatedStats, + # HELP "Counter of received zenoh query messages." # TYPE "counter" pub rx_z_query_msgs DiscriminatedStats, diff --git a/io/zenoh-transport/src/shm.rs b/io/zenoh-transport/src/shm.rs index 04a8f502c4..8b0e93f494 100644 --- a/io/zenoh-transport/src/shm.rs +++ b/io/zenoh-transport/src/shm.rs @@ -21,6 +21,7 @@ use zenoh_protocol::{ err::{ext::ErrBodyType, Err}, ext::ShmType, query::{ext::QueryBodyType, Query}, + reply::ReplyBody, PushBody, Put, Reply, RequestBody, ResponseBody, }, }; @@ -105,17 +106,17 @@ impl MapShm for Query { // Impl - Reply impl MapShm for Reply { fn map_to_shminfo(&mut self) -> ZResult { - let Self { - payload, ext_shm, .. - } = self; - map_to_shminfo!(payload, ext_shm) + match &mut self.payload { + ReplyBody::Put(b) => b.map_to_shminfo(), + _ => Ok(false), + } } fn map_to_shmbuf(&mut self, shmr: &RwLock) -> ZResult { - let Self { - payload, ext_shm, .. - } = self; - map_to_shmbuf!(payload, ext_shm, shmr) + match &mut self.payload { + ReplyBody::Put(b) => b.map_to_shmbuf(shmr), + _ => Ok(false), + } } } diff --git a/zenoh/src/net/routing/dispatcher/pubsub.rs b/zenoh/src/net/routing/dispatcher/pubsub.rs index da6ae0c371..ffe2d3ccca 100644 --- a/zenoh/src/net/routing/dispatcher/pubsub.rs +++ b/zenoh/src/net/routing/dispatcher/pubsub.rs @@ -413,10 +413,19 @@ macro_rules! inc_stats { match &$body { PushBody::Put(p) => { stats.[<$txrx _z_put_msgs>].[](1); - stats.[<$txrx _z_put_pl_bytes>].[](p.payload.len()); + let mut n = p.payload.len(); + if let Some(a) = p.ext_attachment.as_ref() { + n += a.buffer.len(); + } + stats.[<$txrx _z_put_pl_bytes>].[](n); } - PushBody::Del(_) => { + PushBody::Del(d) => { stats.[<$txrx _z_del_msgs>].[](1); + let mut n = 0; + if let Some(a) = d.ext_attachment.as_ref() { + n += a.buffer.len(); + } + stats.[<$txrx _z_del_pl_bytes>].[](n); } } } diff --git a/zenoh/src/net/routing/dispatcher/queries.rs b/zenoh/src/net/routing/dispatcher/queries.rs index 9645af0f74..a6748650ab 100644 --- a/zenoh/src/net/routing/dispatcher/queries.rs +++ b/zenoh/src/net/routing/dispatcher/queries.rs @@ -21,16 +21,16 @@ use async_trait::async_trait; use std::collections::HashMap; use std::sync::{Arc, Weak}; use zenoh_config::WhatAmI; -use zenoh_protocol::core::key_expr::keyexpr; -use zenoh_protocol::network::declare::queryable::ext::QueryableInfo; +use zenoh_protocol::zenoh::reply::ReplyBody; +use zenoh_protocol::zenoh::Put; use zenoh_protocol::{ - core::{Encoding, WireExpr}, + core::{key_expr::keyexpr, Encoding, WireExpr}, network::{ - declare::ext, + declare::{ext, queryable::ext::QueryableInfo}, request::{ext::TargetType, Request, RequestId}, response::{self, ext::ResponderIdType, Response, ResponseFinal}, }, - zenoh::{reply::ext::ConsolidationType, Reply, RequestBody, ResponseBody}, + zenoh::{query::Consolidation, Reply, RequestBody, ResponseBody}, }; use zenoh_sync::get_mut_unchecked; use zenoh_util::Timed; @@ -464,11 +464,29 @@ macro_rules! inc_res_stats { match &$body { ResponseBody::Put(p) => { stats.[<$txrx _z_put_msgs>].[](1); - stats.[<$txrx _z_put_pl_bytes>].[](p.payload.len()); + let mut n = p.payload.len(); + if let Some(a) = p.ext_attachment.as_ref() { + n += a.buffer.len(); + } + stats.[<$txrx _z_put_pl_bytes>].[](n); } ResponseBody::Reply(r) => { stats.[<$txrx _z_reply_msgs>].[](1); - stats.[<$txrx _z_reply_pl_bytes>].[](r.payload.len()); + let mut n = 0; + match &r.payload { + ReplyBody::Put(p) => { + if let Some(a) = p.ext_attachment.as_ref() { + n += a.buffer.len(); + } + n += p.payload.len(); + } + ReplyBody::Del(d) => { + if let Some(a) = d.ext_attachment.as_ref() { + n += a.buffer.len(); + } + } + } + stats.[<$txrx _z_reply_pl_bytes>].[](n); } ResponseBody::Err(e) => { stats.[<$txrx _z_reply_msgs>].[](1); @@ -537,15 +555,19 @@ pub fn route_query( for (wexpr, payload) in local_replies { let payload = ResponseBody::Reply(Reply { - timestamp: None, - encoding: Encoding::default(), - ext_sinfo: None, - ext_consolidation: ConsolidationType::default(), - #[cfg(feature = "shared-memory")] - ext_shm: None, - ext_attachment: None, // @TODO: expose it in the API - ext_unknown: vec![], - payload, + consolidation: Consolidation::default(), // @TODO: handle Del case + ext_unknown: vec![], // @TODO: handle unknown extensions + payload: ReplyBody::Put(Put { + // @TODO: handle Del case + timestamp: None, // @TODO: handle timestamp + encoding: Encoding::default(), // @TODO: handle encoding + ext_sinfo: None, // @TODO: handle source info + ext_attachment: None, // @TODO: expose it in the API + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_unknown: vec![], // @TODO: handle unknown extensions + payload, + }), }); #[cfg(feature = "stats")] if !admin { diff --git a/zenoh/src/queryable.rs b/zenoh/src/queryable.rs index 9ee73d1641..4e9f4914dd 100644 --- a/zenoh/src/queryable.rs +++ b/zenoh/src/queryable.rs @@ -30,11 +30,11 @@ use std::future::Ready; use std::ops::Deref; use std::sync::Arc; use zenoh_core::{AsyncResolve, Resolvable, SyncResolve}; -use zenoh_protocol::core::WireExpr; -use zenoh_protocol::network::{response, Mapping, RequestId, Response, ResponseFinal}; -use zenoh_protocol::zenoh::ext::ValueType; -use zenoh_protocol::zenoh::reply::ext::ConsolidationType; -use zenoh_protocol::zenoh::{self, ResponseBody}; +use zenoh_protocol::{ + core::WireExpr, + network::{response, Mapping, RequestId, Response, ResponseFinal}, + zenoh::{self, ext::ValueType, reply::ReplyBody, Del, Put, ResponseBody}, +}; use zenoh_result::ZResult; pub(crate) struct QueryInner { @@ -206,16 +206,33 @@ impl SyncResolve for ReplyBuilder<'_> { source_id: None, source_sn: None, }; - #[allow(unused_mut)] - let mut ext_attachment = None; - #[cfg(feature = "unstable")] - { - data_info.source_id = source_info.source_id; - data_info.source_sn = source_info.source_sn; - if let Some(attachment) = attachment { - ext_attachment = Some(attachment.into()); - } + + // Use a macro for inferring the proper const extension ID between Put and Del cases + macro_rules! ext_attachment { + () => {{ + #[allow(unused_mut)] + let mut ext_attachment = None; + #[cfg(feature = "unstable")] + { + data_info.source_id = source_info.source_id; + data_info.source_sn = source_info.source_sn; + if let Some(attachment) = attachment { + ext_attachment = Some(attachment.into()); + } + } + ext_attachment + }}; } + + let ext_sinfo = if data_info.source_id.is_some() || data_info.source_sn.is_some() { + Some(zenoh::put::ext::SourceInfoType { + zid: data_info.source_id.unwrap_or_default(), + eid: 0, // @TODO use proper EntityId (#703) + sn: data_info.source_sn.unwrap_or_default() as u32, + }) + } else { + None + }; self.query.inner.primitives.send_response(Response { rid: self.query.inner.qid, wire_expr: WireExpr { @@ -224,24 +241,26 @@ impl SyncResolve for ReplyBuilder<'_> { mapping: Mapping::Sender, }, payload: ResponseBody::Reply(zenoh::Reply { - timestamp: data_info.timestamp, - encoding: data_info.encoding.unwrap_or_default(), - ext_sinfo: if data_info.source_id.is_some() || data_info.source_sn.is_some() - { - Some(zenoh::reply::ext::SourceInfoType { - zid: data_info.source_id.unwrap_or_default(), - eid: 0, // @TODO use proper EntityId (#703) - sn: data_info.source_sn.unwrap_or_default() as u32, - }) - } else { - None - }, - ext_consolidation: ConsolidationType::default(), - #[cfg(feature = "shared-memory")] - ext_shm: None, - ext_attachment, + consolidation: zenoh::Consolidation::default(), ext_unknown: vec![], - payload, + payload: match kind { + SampleKind::Put => ReplyBody::Put(Put { + timestamp: data_info.timestamp, + encoding: data_info.encoding.unwrap_or_default(), + ext_sinfo, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_attachment: ext_attachment!(), + ext_unknown: vec![], + payload, + }), + SampleKind::Delete => ReplyBody::Del(Del { + timestamp, + ext_sinfo, + ext_attachment: ext_attachment!(), + ext_unknown: vec![], + }), + }, }), ext_qos: response::ext::QoSType::response_default(), ext_tstamp: None, diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index d52c446d3d..46cfd5e499 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -57,6 +57,9 @@ use zenoh_config::unwrap_or_default; use zenoh_core::{zconfigurable, zread, Resolve, ResolveClosure, ResolveFuture, SyncResolve}; use zenoh_protocol::network::AtomicRequestId; use zenoh_protocol::network::RequestId; +use zenoh_protocol::zenoh::reply::ReplyBody; +use zenoh_protocol::zenoh::Del; +use zenoh_protocol::zenoh::Put; use zenoh_protocol::{ core::{ key_expr::{keyexpr, OwnedKeyExpr}, @@ -73,10 +76,7 @@ use zenoh_protocol::{ Mapping, Push, Response, ResponseFinal, }, zenoh::{ - query::{ - self, - ext::{ConsolidationType, QueryBodyType}, - }, + query::{self, ext::QueryBodyType, Consolidation}, Pull, PushBody, RequestBody, ResponseBody, }, }; @@ -1808,9 +1808,9 @@ impl Session { ext_budget: None, ext_timeout: Some(timeout), payload: RequestBody::Query(zenoh_protocol::zenoh::Query { + consolidation: consolidation.into(), parameters: selector.parameters().to_string(), ext_sinfo: None, - ext_consolidation: consolidation.into(), ext_body: value.as_ref().map(|v| query::ext::QueryBodyType { #[cfg(feature = "shared-memory")] ext_shm: None, @@ -1851,7 +1851,7 @@ impl Session { parameters: &str, qid: RequestId, _target: TargetType, - _consolidation: ConsolidationType, + _consolidation: Consolidation, body: Option, #[cfg(feature = "unstable")] attachment: Option, ) { @@ -2233,7 +2233,7 @@ impl Primitives for Session { &m.parameters, msg.id, msg.ext_target, - m.ext_consolidation, + m.consolidation, m.ext_body, #[cfg(feature = "unstable")] m.ext_attachment.map(Into::into), @@ -2341,19 +2341,63 @@ impl Primitives for Session { } None => key_expr, }; - let info = DataInfo { - kind: SampleKind::Put, - encoding: Some(m.encoding), - timestamp: m.timestamp, - source_id: m.ext_sinfo.as_ref().map(|i| i.zid), - source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), + + struct Ret { + payload: ZBuf, + info: DataInfo, + #[cfg(feature = "unstable")] + attachment: Option, + } + let Ret { + payload, + info, + #[cfg(feature = "unstable")] + attachment, + } = match m.payload { + ReplyBody::Put(Put { + timestamp, + encoding, + ext_sinfo, + ext_attachment: _attachment, + payload, + .. + }) => Ret { + payload, + info: DataInfo { + kind: SampleKind::Put, + encoding: Some(encoding), + timestamp, + source_id: ext_sinfo.as_ref().map(|i| i.zid), + source_sn: ext_sinfo.as_ref().map(|i| i.sn as u64), + }, + #[cfg(feature = "unstable")] + attachment: _attachment.map(Into::into), + }, + ReplyBody::Del(Del { + timestamp, + ext_sinfo, + ext_attachment: _attachment, + .. + }) => Ret { + payload: ZBuf::empty(), + info: DataInfo { + kind: SampleKind::Delete, + encoding: None, + timestamp, + source_id: ext_sinfo.as_ref().map(|i| i.zid), + source_sn: ext_sinfo.as_ref().map(|i| i.sn as u64), + }, + #[cfg(feature = "unstable")] + attachment: _attachment.map(Into::into), + }, }; + #[allow(unused_mut)] let mut sample = - Sample::with_info(key_expr.into_owned(), m.payload, Some(info)); + Sample::with_info(key_expr.into_owned(), payload, Some(info)); #[cfg(feature = "unstable")] { - sample.attachment = m.ext_attachment.map(Into::into); + sample.attachment = attachment; } let new_reply = Reply { sample: Ok(sample), diff --git a/zenoh/tests/session.rs b/zenoh/tests/session.rs index c2cec7c627..f727ad60c3 100644 --- a/zenoh/tests/session.rs +++ b/zenoh/tests/session.rs @@ -153,10 +153,31 @@ async fn test_session_qryrep(peer01: &Session, peer02: &Session, reliability: Re let c_msgs = msgs.clone(); let qbl = ztimeout!(peer01 .declare_queryable(key_expr) - .callback(move |sample| { + .callback(move |query| { c_msgs.fetch_add(1, Ordering::Relaxed); - let rep = Sample::try_from(key_expr, vec![0u8; size]).unwrap(); - task::block_on(async { ztimeout!(sample.reply(Ok(rep)).res_async()).unwrap() }); + match query.parameters() { + "ok_put" => { + let mut rep = Sample::try_from(key_expr, vec![0u8; size]).unwrap(); + rep.kind = SampleKind::Put; + task::block_on(async { + ztimeout!(query.reply(Ok(rep)).res_async()).unwrap() + }); + } + "ok_del" => { + let mut rep = Sample::try_from(key_expr, vec![0u8; size]).unwrap(); + rep.kind = SampleKind::Delete; + task::block_on(async { + ztimeout!(query.reply(Ok(rep)).res_async()).unwrap() + }); + } + "err" => { + let rep = Value::from(vec![0u8; size]); + task::block_on(async { + ztimeout!(query.reply(Err(rep)).res_async()).unwrap() + }); + } + _ => panic!("Unknown query parameter"), + } }) .res_async()) .unwrap(); @@ -165,12 +186,15 @@ async fn test_session_qryrep(peer01: &Session, peer02: &Session, reliability: Re task::sleep(SLEEP).await; // Get data - println!("[QR][02c] Getting on peer02 session. {msg_count} msgs."); + println!("[QR][02c] Getting Ok(Put) on peer02 session. {msg_count} msgs."); let mut cnt = 0; for _ in 0..msg_count { - let rs = ztimeout!(peer02.get(key_expr).res_async()).unwrap(); + let selector = format!("{}?ok_put", key_expr); + let rs = ztimeout!(peer02.get(selector).res_async()).unwrap(); while let Ok(s) = ztimeout!(rs.recv_async()) { - assert_eq!(s.sample.unwrap().value.payload.len(), size); + let s = s.sample.unwrap(); + assert_eq!(s.kind, SampleKind::Put); + assert_eq!(s.value.payload.len(), size); cnt += 1; } } @@ -178,6 +202,41 @@ async fn test_session_qryrep(peer01: &Session, peer02: &Session, reliability: Re assert_eq!(msgs.load(Ordering::Relaxed), msg_count); assert_eq!(cnt, msg_count); + msgs.store(0, Ordering::Relaxed); + + println!("[QR][03c] Getting Ok(Delete) on peer02 session. {msg_count} msgs."); + let mut cnt = 0; + for _ in 0..msg_count { + let selector = format!("{}?ok_del", key_expr); + let rs = ztimeout!(peer02.get(selector).res_async()).unwrap(); + while let Ok(s) = ztimeout!(rs.recv_async()) { + let s = s.sample.unwrap(); + assert_eq!(s.kind, SampleKind::Delete); + assert_eq!(s.value.payload.len(), 0); + cnt += 1; + } + } + println!("[QR][03c] Got on peer02 session. {cnt}/{msg_count} msgs."); + assert_eq!(msgs.load(Ordering::Relaxed), msg_count); + assert_eq!(cnt, msg_count); + + msgs.store(0, Ordering::Relaxed); + + println!("[QR][04c] Getting Err() on peer02 session. {msg_count} msgs."); + let mut cnt = 0; + for _ in 0..msg_count { + let selector = format!("{}?err", key_expr); + let rs = ztimeout!(peer02.get(selector).res_async()).unwrap(); + while let Ok(s) = ztimeout!(rs.recv_async()) { + let e = s.sample.unwrap_err(); + assert_eq!(e.payload.len(), size); + cnt += 1; + } + } + println!("[QR][04c] Got on peer02 session. {cnt}/{msg_count} msgs."); + assert_eq!(msgs.load(Ordering::Relaxed), msg_count); + assert_eq!(cnt, msg_count); + println!("[PS][03c] Unqueryable on peer01 session"); ztimeout!(qbl.undeclare().res_async()).unwrap();