diff --git a/commons/zenoh-codec/src/zenoh/mod.rs b/commons/zenoh-codec/src/zenoh/mod.rs index e3ed91d21f..d59add9d63 100644 --- a/commons/zenoh-codec/src/zenoh/mod.rs +++ b/commons/zenoh-codec/src/zenoh/mod.rs @@ -123,6 +123,7 @@ where ResponseBody::Reply(b) => self.write(&mut *writer, b), ResponseBody::Ack(b) => self.write(&mut *writer, b), ResponseBody::Err(b) => self.write(&mut *writer, b), + ResponseBody::Put(b) => self.write(&mut *writer, b), } } } @@ -141,6 +142,7 @@ where id::REPLY => ResponseBody::Reply(codec.read(&mut *reader)?), id::ACK => ResponseBody::Ack(codec.read(&mut *reader)?), id::ERR => ResponseBody::Err(codec.read(&mut *reader)?), + id::PUT => ResponseBody::Put(codec.read(&mut *reader)?), _ => return Err(DidntRead), }; diff --git a/commons/zenoh-protocol/src/zenoh/mod.rs b/commons/zenoh-protocol/src/zenoh/mod.rs index 0cc7b46b7e..a23eaa9b21 100644 --- a/commons/zenoh-protocol/src/zenoh/mod.rs +++ b/commons/zenoh-protocol/src/zenoh/mod.rs @@ -129,6 +129,7 @@ pub enum ResponseBody { Reply(Reply), Ack(Ack), Err(Err), + Put(Put), } impl ResponseBody { @@ -137,10 +138,11 @@ impl ResponseBody { use rand::Rng; let mut rng = rand::thread_rng(); - match rng.gen_range(0..3) { + match rng.gen_range(0..4) { 0 => ResponseBody::Reply(Reply::rand()), 1 => ResponseBody::Ack(Ack::rand()), 2 => ResponseBody::Err(Err::rand()), + 3 => ResponseBody::Put(Put::rand()), _ => unreachable!(), } } diff --git a/zenoh/src/net/routing/dispatcher/queries.rs b/zenoh/src/net/routing/dispatcher/queries.rs index 9645af0f74..ec85dbee43 100644 --- a/zenoh/src/net/routing/dispatcher/queries.rs +++ b/zenoh/src/net/routing/dispatcher/queries.rs @@ -21,16 +21,16 @@ use async_trait::async_trait; use std::collections::HashMap; use std::sync::{Arc, Weak}; use zenoh_config::WhatAmI; -use zenoh_protocol::core::key_expr::keyexpr; -use zenoh_protocol::network::declare::queryable::ext::QueryableInfo; +use zenoh_protocol::zenoh::reply::ReplyBody; +use zenoh_protocol::zenoh::Put; use zenoh_protocol::{ - core::{Encoding, WireExpr}, + core::{key_expr::keyexpr, Encoding, WireExpr}, network::{ - declare::ext, + declare::{ext, queryable::ext::QueryableInfo}, request::{ext::TargetType, Request, RequestId}, response::{self, ext::ResponderIdType, Response, ResponseFinal}, }, - zenoh::{reply::ext::ConsolidationType, Reply, RequestBody, ResponseBody}, + zenoh::{query::Consolidation, Reply, RequestBody, ResponseBody}, }; use zenoh_sync::get_mut_unchecked; use zenoh_util::Timed; @@ -537,15 +537,17 @@ pub fn route_query( for (wexpr, payload) in local_replies { let payload = ResponseBody::Reply(Reply { - timestamp: None, - encoding: Encoding::default(), - ext_sinfo: None, - ext_consolidation: ConsolidationType::default(), - #[cfg(feature = "shared-memory")] - ext_shm: None, - ext_attachment: None, // @TODO: expose it in the API - ext_unknown: vec![], - payload, + consolidation: Consolidation::default(), // @TODO: handle Del case + ext_unknown: vec![], // @TODO: handle unknown extensions + payload: ReplyBody::Put(Put { + // @TODO: handle Del case + timestamp: None, // @TODO: handle timestamp + encoding: Encoding::default(), // @TODO: handle encoding + ext_sinfo: None, // @TODO: handle source info + ext_attachment: None, // @TODO: expose it in the API + ext_unknown: vec![], // @TODO: handle unknown extensions + payload, + }), }); #[cfg(feature = "stats")] if !admin { diff --git a/zenoh/src/queryable.rs b/zenoh/src/queryable.rs index 9ee73d1641..4e9f4914dd 100644 --- a/zenoh/src/queryable.rs +++ b/zenoh/src/queryable.rs @@ -30,11 +30,11 @@ use std::future::Ready; use std::ops::Deref; use std::sync::Arc; use zenoh_core::{AsyncResolve, Resolvable, SyncResolve}; -use zenoh_protocol::core::WireExpr; -use zenoh_protocol::network::{response, Mapping, RequestId, Response, ResponseFinal}; -use zenoh_protocol::zenoh::ext::ValueType; -use zenoh_protocol::zenoh::reply::ext::ConsolidationType; -use zenoh_protocol::zenoh::{self, ResponseBody}; +use zenoh_protocol::{ + core::WireExpr, + network::{response, Mapping, RequestId, Response, ResponseFinal}, + zenoh::{self, ext::ValueType, reply::ReplyBody, Del, Put, ResponseBody}, +}; use zenoh_result::ZResult; pub(crate) struct QueryInner { @@ -206,16 +206,33 @@ impl SyncResolve for ReplyBuilder<'_> { source_id: None, source_sn: None, }; - #[allow(unused_mut)] - let mut ext_attachment = None; - #[cfg(feature = "unstable")] - { - data_info.source_id = source_info.source_id; - data_info.source_sn = source_info.source_sn; - if let Some(attachment) = attachment { - ext_attachment = Some(attachment.into()); - } + + // Use a macro for inferring the proper const extension ID between Put and Del cases + macro_rules! ext_attachment { + () => {{ + #[allow(unused_mut)] + let mut ext_attachment = None; + #[cfg(feature = "unstable")] + { + data_info.source_id = source_info.source_id; + data_info.source_sn = source_info.source_sn; + if let Some(attachment) = attachment { + ext_attachment = Some(attachment.into()); + } + } + ext_attachment + }}; } + + let ext_sinfo = if data_info.source_id.is_some() || data_info.source_sn.is_some() { + Some(zenoh::put::ext::SourceInfoType { + zid: data_info.source_id.unwrap_or_default(), + eid: 0, // @TODO use proper EntityId (#703) + sn: data_info.source_sn.unwrap_or_default() as u32, + }) + } else { + None + }; self.query.inner.primitives.send_response(Response { rid: self.query.inner.qid, wire_expr: WireExpr { @@ -224,24 +241,26 @@ impl SyncResolve for ReplyBuilder<'_> { mapping: Mapping::Sender, }, payload: ResponseBody::Reply(zenoh::Reply { - timestamp: data_info.timestamp, - encoding: data_info.encoding.unwrap_or_default(), - ext_sinfo: if data_info.source_id.is_some() || data_info.source_sn.is_some() - { - Some(zenoh::reply::ext::SourceInfoType { - zid: data_info.source_id.unwrap_or_default(), - eid: 0, // @TODO use proper EntityId (#703) - sn: data_info.source_sn.unwrap_or_default() as u32, - }) - } else { - None - }, - ext_consolidation: ConsolidationType::default(), - #[cfg(feature = "shared-memory")] - ext_shm: None, - ext_attachment, + consolidation: zenoh::Consolidation::default(), ext_unknown: vec![], - payload, + payload: match kind { + SampleKind::Put => ReplyBody::Put(Put { + timestamp: data_info.timestamp, + encoding: data_info.encoding.unwrap_or_default(), + ext_sinfo, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_attachment: ext_attachment!(), + ext_unknown: vec![], + payload, + }), + SampleKind::Delete => ReplyBody::Del(Del { + timestamp, + ext_sinfo, + ext_attachment: ext_attachment!(), + ext_unknown: vec![], + }), + }, }), ext_qos: response::ext::QoSType::response_default(), ext_tstamp: None, diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index d52c446d3d..46cfd5e499 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -57,6 +57,9 @@ use zenoh_config::unwrap_or_default; use zenoh_core::{zconfigurable, zread, Resolve, ResolveClosure, ResolveFuture, SyncResolve}; use zenoh_protocol::network::AtomicRequestId; use zenoh_protocol::network::RequestId; +use zenoh_protocol::zenoh::reply::ReplyBody; +use zenoh_protocol::zenoh::Del; +use zenoh_protocol::zenoh::Put; use zenoh_protocol::{ core::{ key_expr::{keyexpr, OwnedKeyExpr}, @@ -73,10 +76,7 @@ use zenoh_protocol::{ Mapping, Push, Response, ResponseFinal, }, zenoh::{ - query::{ - self, - ext::{ConsolidationType, QueryBodyType}, - }, + query::{self, ext::QueryBodyType, Consolidation}, Pull, PushBody, RequestBody, ResponseBody, }, }; @@ -1808,9 +1808,9 @@ impl Session { ext_budget: None, ext_timeout: Some(timeout), payload: RequestBody::Query(zenoh_protocol::zenoh::Query { + consolidation: consolidation.into(), parameters: selector.parameters().to_string(), ext_sinfo: None, - ext_consolidation: consolidation.into(), ext_body: value.as_ref().map(|v| query::ext::QueryBodyType { #[cfg(feature = "shared-memory")] ext_shm: None, @@ -1851,7 +1851,7 @@ impl Session { parameters: &str, qid: RequestId, _target: TargetType, - _consolidation: ConsolidationType, + _consolidation: Consolidation, body: Option, #[cfg(feature = "unstable")] attachment: Option, ) { @@ -2233,7 +2233,7 @@ impl Primitives for Session { &m.parameters, msg.id, msg.ext_target, - m.ext_consolidation, + m.consolidation, m.ext_body, #[cfg(feature = "unstable")] m.ext_attachment.map(Into::into), @@ -2341,19 +2341,63 @@ impl Primitives for Session { } None => key_expr, }; - let info = DataInfo { - kind: SampleKind::Put, - encoding: Some(m.encoding), - timestamp: m.timestamp, - source_id: m.ext_sinfo.as_ref().map(|i| i.zid), - source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), + + struct Ret { + payload: ZBuf, + info: DataInfo, + #[cfg(feature = "unstable")] + attachment: Option, + } + let Ret { + payload, + info, + #[cfg(feature = "unstable")] + attachment, + } = match m.payload { + ReplyBody::Put(Put { + timestamp, + encoding, + ext_sinfo, + ext_attachment: _attachment, + payload, + .. + }) => Ret { + payload, + info: DataInfo { + kind: SampleKind::Put, + encoding: Some(encoding), + timestamp, + source_id: ext_sinfo.as_ref().map(|i| i.zid), + source_sn: ext_sinfo.as_ref().map(|i| i.sn as u64), + }, + #[cfg(feature = "unstable")] + attachment: _attachment.map(Into::into), + }, + ReplyBody::Del(Del { + timestamp, + ext_sinfo, + ext_attachment: _attachment, + .. + }) => Ret { + payload: ZBuf::empty(), + info: DataInfo { + kind: SampleKind::Delete, + encoding: None, + timestamp, + source_id: ext_sinfo.as_ref().map(|i| i.zid), + source_sn: ext_sinfo.as_ref().map(|i| i.sn as u64), + }, + #[cfg(feature = "unstable")] + attachment: _attachment.map(Into::into), + }, }; + #[allow(unused_mut)] let mut sample = - Sample::with_info(key_expr.into_owned(), m.payload, Some(info)); + Sample::with_info(key_expr.into_owned(), payload, Some(info)); #[cfg(feature = "unstable")] { - sample.attachment = m.ext_attachment.map(Into::into); + sample.attachment = attachment; } let new_reply = Reply { sample: Ok(sample),