From 839853547475a45f2ce7bfb6a0f0ef7270ac83de Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 9 Feb 2024 10:35:27 +0100 Subject: [PATCH] Introduce EntityGlobalId type --- commons/zenoh-codec/src/network/mod.rs | 18 +++++++------- commons/zenoh-codec/src/zenoh/mod.rs | 24 ++++++++++++------- commons/zenoh-protocol/src/core/mod.rs | 17 +++++++++++++ commons/zenoh-protocol/src/network/mod.rs | 4 ++-- .../zenoh-protocol/src/network/response.rs | 2 +- commons/zenoh-protocol/src/zenoh/mod.rs | 10 ++++---- zenoh/src/prelude.rs | 6 +++++ zenoh/src/queryable.rs | 8 ++----- zenoh/src/sample.rs | 14 ++++------- zenoh/src/session.rs | 11 ++++----- 10 files changed, 64 insertions(+), 50 deletions(-) diff --git a/commons/zenoh-codec/src/network/mod.rs b/commons/zenoh-codec/src/network/mod.rs index 23956d64fd..d06f9bec2e 100644 --- a/commons/zenoh-codec/src/network/mod.rs +++ b/commons/zenoh-codec/src/network/mod.rs @@ -27,7 +27,7 @@ use zenoh_buffers::{ use zenoh_protocol::{ common::{imsg, ZExtZ64, ZExtZBufHeader}, core::{EntityId, Reliability, ZenohId}, - network::{ext::EntityIdType, *}, + network::{ext::EntityGlobalIdType, *}, }; // NetworkMessage @@ -218,21 +218,21 @@ where } // Extension: EntityId -impl LCodec<&ext::EntityIdType<{ ID }>> for Zenoh080 { - fn w_len(self, x: &ext::EntityIdType<{ ID }>) -> usize { - let EntityIdType { zid, eid } = x; +impl LCodec<&ext::EntityGlobalIdType<{ ID }>> for Zenoh080 { + fn w_len(self, x: &ext::EntityGlobalIdType<{ ID }>) -> usize { + let EntityGlobalIdType { zid, eid } = x; 1 + self.w_len(zid) + self.w_len(*eid) } } -impl WCodec<(&ext::EntityIdType<{ ID }>, bool), &mut W> for Zenoh080 +impl WCodec<(&ext::EntityGlobalIdType<{ ID }>, bool), &mut W> for Zenoh080 where W: Writer, { type Output = Result<(), DidntWrite>; - fn write(self, writer: &mut W, x: (&ext::EntityIdType<{ ID }>, bool)) -> Self::Output { + fn write(self, writer: &mut W, x: (&ext::EntityGlobalIdType<{ ID }>, bool)) -> Self::Output { let (x, more) = x; let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(x)); self.write(&mut *writer, (&header, more))?; @@ -248,13 +248,13 @@ where } } -impl RCodec<(ext::EntityIdType<{ ID }>, bool), &mut R> for Zenoh080Header +impl RCodec<(ext::EntityGlobalIdType<{ ID }>, bool), &mut R> for Zenoh080Header where R: Reader, { type Error = DidntRead; - fn read(self, reader: &mut R) -> Result<(ext::EntityIdType<{ ID }>, bool), Self::Error> { + fn read(self, reader: &mut R) -> Result<(ext::EntityGlobalIdType<{ ID }>, bool), Self::Error> { let (_, more): (ZExtZBufHeader<{ ID }>, bool) = self.read(&mut *reader)?; let flags: u8 = self.codec.read(&mut *reader)?; @@ -265,6 +265,6 @@ where let eid: EntityId = self.codec.read(&mut *reader)?; - Ok((ext::EntityIdType { zid, eid }, more)) + Ok((ext::EntityGlobalIdType { zid, eid }, more)) } } diff --git a/commons/zenoh-codec/src/zenoh/mod.rs b/commons/zenoh-codec/src/zenoh/mod.rs index aafff8021e..7f686b009c 100644 --- a/commons/zenoh-codec/src/zenoh/mod.rs +++ b/commons/zenoh-codec/src/zenoh/mod.rs @@ -33,7 +33,7 @@ use zenoh_buffers::{ use zenoh_protocol::common::{iext, ZExtUnit}; use zenoh_protocol::{ common::{imsg, ZExtZBufHeader}, - core::{Encoding, EntityId, ZenohId}, + core::{Encoding, EntityGlobalId, EntityId, ZenohId}, zenoh::{ext, id, PushBody, RequestBody, ResponseBody}, }; @@ -153,9 +153,9 @@ where // Extension: SourceInfo impl LCodec<&ext::SourceInfoType<{ ID }>> for Zenoh080 { fn w_len(self, x: &ext::SourceInfoType<{ ID }>) -> usize { - let ext::SourceInfoType { zid, eid, sn } = x; + let ext::SourceInfoType { id, sn } = x; - 1 + self.w_len(zid) + self.w_len(*eid) + self.w_len(*sn) + 1 + self.w_len(&id.zid) + self.w_len(id.eid) + self.w_len(*sn) } } @@ -167,18 +167,18 @@ where fn write(self, writer: &mut W, x: (&ext::SourceInfoType<{ ID }>, bool)) -> Self::Output { let (x, more) = x; - let ext::SourceInfoType { zid, eid, sn } = x; + let ext::SourceInfoType { id, sn } = x; let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(x)); self.write(&mut *writer, (&header, more))?; - let flags: u8 = (zid.size() as u8 - 1) << 4; + let flags: u8 = (id.zid.size() as u8 - 1) << 4; self.write(&mut *writer, flags)?; - let lodec = Zenoh080Length::new(zid.size()); - lodec.write(&mut *writer, zid)?; + let lodec = Zenoh080Length::new(id.zid.size()); + lodec.write(&mut *writer, &id.zid)?; - self.write(&mut *writer, eid)?; + self.write(&mut *writer, id.eid)?; self.write(&mut *writer, sn)?; Ok(()) } @@ -202,7 +202,13 @@ where let eid: EntityId = self.codec.read(&mut *reader)?; let sn: u32 = self.codec.read(&mut *reader)?; - Ok((ext::SourceInfoType { zid, eid, sn }, more)) + Ok(( + ext::SourceInfoType { + id: EntityGlobalId { zid, eid }, + sn, + }, + more, + )) } } diff --git a/commons/zenoh-protocol/src/core/mod.rs b/commons/zenoh-protocol/src/core/mod.rs index 1440ccaf11..0b92a3b445 100644 --- a/commons/zenoh-protocol/src/core/mod.rs +++ b/commons/zenoh-protocol/src/core/mod.rs @@ -301,6 +301,23 @@ impl<'de> serde::Deserialize<'de> for ZenohId { pub type EntityId = u32; +#[derive(Debug, Default, Clone, Eq, Hash, PartialEq)] +pub struct EntityGlobalId { + pub zid: ZenohId, + pub eid: EntityId, +} + +impl EntityGlobalId { + #[cfg(feature = "test")] + pub fn rand() -> Self { + use rand::Rng; + Self { + zid: ZenohId::rand(), + eid: rand::thread_rng().gen(), + } + } +} + #[repr(u8)] #[derive(Debug, Default, Copy, Clone, Eq, Hash, PartialEq)] pub enum Priority { diff --git a/commons/zenoh-protocol/src/network/mod.rs b/commons/zenoh-protocol/src/network/mod.rs index 1d5b85abae..e39cc59d8f 100644 --- a/commons/zenoh-protocol/src/network/mod.rs +++ b/commons/zenoh-protocol/src/network/mod.rs @@ -410,12 +410,12 @@ pub mod ext { /// % eid % /// +---------------+ #[derive(Debug, Clone, PartialEq, Eq)] - pub struct EntityIdType { + pub struct EntityGlobalIdType { pub zid: ZenohId, pub eid: EntityId, } - impl EntityIdType<{ ID }> { + impl EntityGlobalIdType<{ ID }> { #[cfg(feature = "test")] pub fn rand() -> Self { use rand::Rng; diff --git a/commons/zenoh-protocol/src/network/response.rs b/commons/zenoh-protocol/src/network/response.rs index 9ef2c26a10..6f0925429b 100644 --- a/commons/zenoh-protocol/src/network/response.rs +++ b/commons/zenoh-protocol/src/network/response.rs @@ -67,7 +67,7 @@ pub mod ext { pub type TimestampType = crate::network::ext::TimestampType<{ Timestamp::ID }>; pub type ResponderId = zextzbuf!(0x3, false); - pub type ResponderIdType = crate::network::ext::EntityIdType<{ ResponderId::ID }>; + pub type ResponderIdType = crate::network::ext::EntityGlobalIdType<{ ResponderId::ID }>; } impl Response { diff --git a/commons/zenoh-protocol/src/zenoh/mod.rs b/commons/zenoh-protocol/src/zenoh/mod.rs index e2c22b6509..ed106fd76a 100644 --- a/commons/zenoh-protocol/src/zenoh/mod.rs +++ b/commons/zenoh-protocol/src/zenoh/mod.rs @@ -169,7 +169,7 @@ impl From for ResponseBody { pub mod ext { use zenoh_buffers::ZBuf; - use crate::core::{Encoding, EntityId, ZenohId}; + use crate::core::{Encoding, EntityGlobalId}; /// 7 6 5 4 3 2 1 0 /// +-+-+-+-+-+-+-+-+ @@ -183,8 +183,7 @@ pub mod ext { /// +---------------+ #[derive(Debug, Clone, PartialEq, Eq)] pub struct SourceInfoType { - pub zid: ZenohId, - pub eid: EntityId, + pub id: EntityGlobalId, pub sn: u32, } @@ -194,10 +193,9 @@ pub mod ext { use rand::Rng; let mut rng = rand::thread_rng(); - let zid = ZenohId::rand(); - let eid: EntityId = rng.gen(); + let id = EntityGlobalId::rand(); let sn: u32 = rng.gen(); - Self { zid, eid, sn } + Self { id, sn } } } diff --git a/zenoh/src/prelude.rs b/zenoh/src/prelude.rs index 0b74d23cd0..3182ba1c56 100644 --- a/zenoh/src/prelude.rs +++ b/zenoh/src/prelude.rs @@ -61,6 +61,12 @@ pub(crate) mod common { /// A [`Locator`] contains a choice of protocol, an address and port, as well as optional additional properties to work with. pub use zenoh_protocol::core::EndPoint; + /// The global unique id of a zenoh entity. + #[zenoh_macros::unstable] + pub use zenoh_protocol::core::EntityGlobalId; + /// The unique id of a zenoh entity inside it's parent [`Session`]. + #[zenoh_macros::unstable] + pub use zenoh_protocol::core::EntityId; /// A [`Locator`] contains a choice of protocol, an address and port, as well as optional additional properties to work with. pub use zenoh_protocol::core::Locator; /// The global unique id of a zenoh peer. diff --git a/zenoh/src/queryable.rs b/zenoh/src/queryable.rs index faec334e46..01a237af46 100644 --- a/zenoh/src/queryable.rs +++ b/zenoh/src/queryable.rs @@ -210,13 +210,9 @@ impl SyncResolve for ReplyBuilder<'_> { let mut ext_sinfo = None; #[cfg(feature = "unstable")] { - if source_info.source_id.is_some() - || source_info.source_eid.is_some() - || source_info.source_sn.is_some() - { + if source_info.source_id.is_some() || source_info.source_sn.is_some() { ext_sinfo = Some(zenoh::reply::ext::SourceInfoType { - zid: source_info.source_id.unwrap_or_default(), - eid: source_info.source_eid.unwrap_or_default(), + id: source_info.source_id.unwrap_or_default(), sn: source_info.source_sn.unwrap_or_default() as u32, }) } diff --git a/zenoh/src/sample.rs b/zenoh/src/sample.rs index 73005348b9..6c1dafad66 100644 --- a/zenoh/src/sample.rs +++ b/zenoh/src/sample.rs @@ -14,14 +14,13 @@ //! Sample primitives use crate::buffers::ZBuf; -use crate::prelude::ZenohId; use crate::prelude::{KeyExpr, SampleKind, Value}; use crate::query::Reply; use crate::time::{new_reception_timestamp, Timestamp}; #[zenoh_macros::unstable] use serde::Serialize; use std::convert::{TryFrom, TryInto}; -use zenoh_protocol::core::{Encoding, EntityId}; +use zenoh_protocol::core::{Encoding, EntityGlobalId}; pub type SourceSn = u64; @@ -48,8 +47,7 @@ pub(crate) struct DataInfo { pub kind: SampleKind, pub encoding: Option, pub timestamp: Option, - pub source_id: Option, - pub source_eid: Option, + pub source_id: Option, pub source_sn: Option, } @@ -57,10 +55,8 @@ pub(crate) struct DataInfo { #[zenoh_macros::unstable] #[derive(Debug, Clone)] pub struct SourceInfo { - /// The [`ZenohId`] of the zenoh instance that published the concerned [`Sample`]. - pub source_id: Option, - /// The [`EntityId`] of the zenoh entity that published the concerned [`Sample`]. - pub source_eid: Option, + /// The [`EntityGlobalId`] of the zenoh entity that published the concerned [`Sample`]. + pub source_id: Option, /// The sequence number of the [`Sample`] from the source. pub source_sn: Option, } @@ -84,7 +80,6 @@ impl SourceInfo { pub(crate) fn empty() -> Self { SourceInfo { source_id: None, - source_eid: None, source_sn: None, } } @@ -95,7 +90,6 @@ impl From for SourceInfo { fn from(data_info: DataInfo) -> Self { SourceInfo { source_id: data_info.source_id, - source_eid: data_info.source_eid, source_sn: data_info.source_sn, } } diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 8c579a969b..40277215b6 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -2059,8 +2059,7 @@ impl Primitives for Session { kind: SampleKind::Put, encoding: Some(m.encoding), timestamp: m.timestamp, - source_id: m.ext_sinfo.as_ref().map(|i| i.zid), - source_eid: m.ext_sinfo.as_ref().map(|i| i.eid), + source_id: m.ext_sinfo.as_ref().map(|i| i.id.clone()), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), }; self.handle_data( @@ -2077,8 +2076,7 @@ impl Primitives for Session { kind: SampleKind::Delete, encoding: None, timestamp: m.timestamp, - source_id: m.ext_sinfo.as_ref().map(|i| i.zid), - source_eid: m.ext_sinfo.as_ref().map(|i| i.eid), + source_id: m.ext_sinfo.as_ref().map(|i| i.id.clone()), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), }; self.handle_data( @@ -2143,7 +2141,7 @@ impl Primitives for Session { }, }; let replier_id = match e.ext_sinfo { - Some(info) => info.zid, + Some(info) => info.id.zid, None => ZenohId::rand(), }; let new_reply = Reply { @@ -2214,8 +2212,7 @@ impl Primitives for Session { kind: SampleKind::Put, encoding: Some(m.encoding), timestamp: m.timestamp, - source_id: m.ext_sinfo.as_ref().map(|i| i.zid), - source_eid: m.ext_sinfo.as_ref().map(|i| i.eid), + source_id: m.ext_sinfo.as_ref().map(|i| i.id.clone()), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), }; #[allow(unused_mut)]