From 053565be70bff93d860588254252a79cf053be9a Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 7 Feb 2024 18:10:58 +0100 Subject: [PATCH] Add source_eid to SourceInfo --- zenoh/src/queryable.rs | 43 +++++++++++++++++++----------------------- zenoh/src/sample.rs | 7 ++++++- zenoh/src/session.rs | 3 +++ 3 files changed, 28 insertions(+), 25 deletions(-) diff --git a/zenoh/src/queryable.rs b/zenoh/src/queryable.rs index 9b0595146a..faec334e46 100644 --- a/zenoh/src/queryable.rs +++ b/zenoh/src/queryable.rs @@ -21,7 +21,6 @@ use crate::prelude::*; use crate::query::ReplyKeyExpr; #[zenoh_macros::unstable] use crate::sample::Attachment; -use crate::sample::DataInfo; use crate::SessionRef; use crate::Undeclarable; @@ -192,7 +191,7 @@ impl SyncResolve for ReplyBuilder<'_> { let Sample { key_expr, value: Value { payload, encoding }, - kind, + kind: _, timestamp, #[cfg(feature = "unstable")] source_info, @@ -200,23 +199,28 @@ impl SyncResolve for ReplyBuilder<'_> { attachment, } = sample; #[allow(unused_mut)] - let mut data_info = DataInfo { - kind, - encoding: Some(encoding), - timestamp, - source_id: None, - source_sn: None, - }; - #[allow(unused_mut)] let mut ext_attachment = None; #[cfg(feature = "unstable")] { - data_info.source_id = source_info.source_id; - data_info.source_sn = source_info.source_sn; if let Some(attachment) = attachment { ext_attachment = Some(attachment.into()); } } + #[allow(unused_mut)] + let mut ext_sinfo = None; + #[cfg(feature = "unstable")] + { + if source_info.source_id.is_some() + || source_info.source_eid.is_some() + || source_info.source_sn.is_some() + { + ext_sinfo = Some(zenoh::reply::ext::SourceInfoType { + zid: source_info.source_id.unwrap_or_default(), + eid: source_info.source_eid.unwrap_or_default(), + sn: source_info.source_sn.unwrap_or_default() as u32, + }) + } + } self.query.inner.primitives.send_response(Response { rid: self.query.inner.qid, wire_expr: WireExpr { @@ -225,18 +229,9 @@ impl SyncResolve for ReplyBuilder<'_> { mapping: Mapping::Sender, }, payload: ResponseBody::Reply(zenoh::Reply { - timestamp: data_info.timestamp, - encoding: data_info.encoding.unwrap_or_default(), - ext_sinfo: if data_info.source_id.is_some() || data_info.source_sn.is_some() - { - Some(zenoh::reply::ext::SourceInfoType { - zid: data_info.source_id.unwrap_or_default(), - eid: 0, // @TODO use proper EntityId (#703) - sn: data_info.source_sn.unwrap_or_default() as u32, - }) - } else { - None - }, + timestamp, + encoding, + ext_sinfo, ext_consolidation: ConsolidationType::default(), #[cfg(feature = "shared-memory")] ext_shm: None, diff --git a/zenoh/src/sample.rs b/zenoh/src/sample.rs index 5d707e5936..9949a7aae8 100644 --- a/zenoh/src/sample.rs +++ b/zenoh/src/sample.rs @@ -21,7 +21,7 @@ use crate::time::{new_reception_timestamp, Timestamp}; #[zenoh_macros::unstable] use serde::Serialize; use std::convert::{TryFrom, TryInto}; -use zenoh_protocol::core::Encoding; +use zenoh_protocol::core::{Encoding, EntityId}; pub type SourceSn = u64; @@ -49,6 +49,7 @@ pub(crate) struct DataInfo { pub encoding: Option, pub timestamp: Option, pub source_id: Option, + pub source_eid: Option, pub source_sn: Option, } @@ -58,6 +59,8 @@ pub(crate) struct DataInfo { pub struct SourceInfo { /// The [`ZenohId`] of the zenoh instance that published the concerned [`Sample`]. pub source_id: Option, + /// The [`EntityId`] of the zenoh entity that published the concerned [`Sample`]. + pub source_eid: Option, /// The sequence number of the [`Sample`] from the source. pub source_sn: Option, } @@ -73,6 +76,7 @@ impl SourceInfo { pub(crate) fn empty() -> Self { SourceInfo { source_id: None, + source_eid: None, source_sn: None, } } @@ -83,6 +87,7 @@ impl From for SourceInfo { fn from(data_info: DataInfo) -> Self { SourceInfo { source_id: data_info.source_id, + source_eid: data_info.source_eid, source_sn: data_info.source_sn, } } diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index c128852ed2..8c579a969b 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -2060,6 +2060,7 @@ impl Primitives for Session { encoding: Some(m.encoding), timestamp: m.timestamp, source_id: m.ext_sinfo.as_ref().map(|i| i.zid), + source_eid: m.ext_sinfo.as_ref().map(|i| i.eid), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), }; self.handle_data( @@ -2077,6 +2078,7 @@ impl Primitives for Session { encoding: None, timestamp: m.timestamp, source_id: m.ext_sinfo.as_ref().map(|i| i.zid), + source_eid: m.ext_sinfo.as_ref().map(|i| i.eid), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), }; self.handle_data( @@ -2213,6 +2215,7 @@ impl Primitives for Session { encoding: Some(m.encoding), timestamp: m.timestamp, source_id: m.ext_sinfo.as_ref().map(|i| i.zid), + source_eid: m.ext_sinfo.as_ref().map(|i| i.eid), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), }; #[allow(unused_mut)]