Skip to content

Commit

Permalink
Introduce EntityGlobalId type
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Feb 9, 2024
1 parent 18af62b commit 8398535
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 50 deletions.
18 changes: 9 additions & 9 deletions commons/zenoh-codec/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use zenoh_buffers::{
use zenoh_protocol::{
common::{imsg, ZExtZ64, ZExtZBufHeader},
core::{EntityId, Reliability, ZenohId},
network::{ext::EntityIdType, *},
network::{ext::EntityGlobalIdType, *},
};

// NetworkMessage
Expand Down Expand Up @@ -218,21 +218,21 @@ where
}

// Extension: EntityId
impl<const ID: u8> LCodec<&ext::EntityIdType<{ ID }>> for Zenoh080 {
fn w_len(self, x: &ext::EntityIdType<{ ID }>) -> usize {
let EntityIdType { zid, eid } = x;
impl<const ID: u8> LCodec<&ext::EntityGlobalIdType<{ ID }>> for Zenoh080 {
fn w_len(self, x: &ext::EntityGlobalIdType<{ ID }>) -> usize {
let EntityGlobalIdType { zid, eid } = x;

1 + self.w_len(zid) + self.w_len(*eid)
}
}

impl<W, const ID: u8> WCodec<(&ext::EntityIdType<{ ID }>, bool), &mut W> for Zenoh080
impl<W, const ID: u8> WCodec<(&ext::EntityGlobalIdType<{ ID }>, bool), &mut W> for Zenoh080
where
W: Writer,
{
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: (&ext::EntityIdType<{ ID }>, bool)) -> Self::Output {
fn write(self, writer: &mut W, x: (&ext::EntityGlobalIdType<{ ID }>, bool)) -> Self::Output {
let (x, more) = x;
let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(x));
self.write(&mut *writer, (&header, more))?;
Expand All @@ -248,13 +248,13 @@ where
}
}

impl<R, const ID: u8> RCodec<(ext::EntityIdType<{ ID }>, bool), &mut R> for Zenoh080Header
impl<R, const ID: u8> RCodec<(ext::EntityGlobalIdType<{ ID }>, bool), &mut R> for Zenoh080Header
where
R: Reader,
{
type Error = DidntRead;

fn read(self, reader: &mut R) -> Result<(ext::EntityIdType<{ ID }>, bool), Self::Error> {
fn read(self, reader: &mut R) -> Result<(ext::EntityGlobalIdType<{ ID }>, bool), Self::Error> {
let (_, more): (ZExtZBufHeader<{ ID }>, bool) = self.read(&mut *reader)?;

let flags: u8 = self.codec.read(&mut *reader)?;
Expand All @@ -265,6 +265,6 @@ where

let eid: EntityId = self.codec.read(&mut *reader)?;

Ok((ext::EntityIdType { zid, eid }, more))
Ok((ext::EntityGlobalIdType { zid, eid }, more))
}
}
24 changes: 15 additions & 9 deletions commons/zenoh-codec/src/zenoh/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use zenoh_buffers::{
use zenoh_protocol::common::{iext, ZExtUnit};
use zenoh_protocol::{
common::{imsg, ZExtZBufHeader},
core::{Encoding, EntityId, ZenohId},
core::{Encoding, EntityGlobalId, EntityId, ZenohId},
zenoh::{ext, id, PushBody, RequestBody, ResponseBody},
};

Expand Down Expand Up @@ -153,9 +153,9 @@ where
// Extension: SourceInfo
impl<const ID: u8> LCodec<&ext::SourceInfoType<{ ID }>> for Zenoh080 {
fn w_len(self, x: &ext::SourceInfoType<{ ID }>) -> usize {
let ext::SourceInfoType { zid, eid, sn } = x;
let ext::SourceInfoType { id, sn } = x;

1 + self.w_len(zid) + self.w_len(*eid) + self.w_len(*sn)
1 + self.w_len(&id.zid) + self.w_len(id.eid) + self.w_len(*sn)
}
}

Expand All @@ -167,18 +167,18 @@ where

fn write(self, writer: &mut W, x: (&ext::SourceInfoType<{ ID }>, bool)) -> Self::Output {
let (x, more) = x;
let ext::SourceInfoType { zid, eid, sn } = x;
let ext::SourceInfoType { id, sn } = x;

let header: ZExtZBufHeader<{ ID }> = ZExtZBufHeader::new(self.w_len(x));
self.write(&mut *writer, (&header, more))?;

let flags: u8 = (zid.size() as u8 - 1) << 4;
let flags: u8 = (id.zid.size() as u8 - 1) << 4;
self.write(&mut *writer, flags)?;

let lodec = Zenoh080Length::new(zid.size());
lodec.write(&mut *writer, zid)?;
let lodec = Zenoh080Length::new(id.zid.size());
lodec.write(&mut *writer, &id.zid)?;

self.write(&mut *writer, eid)?;
self.write(&mut *writer, id.eid)?;
self.write(&mut *writer, sn)?;
Ok(())
}
Expand All @@ -202,7 +202,13 @@ where
let eid: EntityId = self.codec.read(&mut *reader)?;
let sn: u32 = self.codec.read(&mut *reader)?;

Ok((ext::SourceInfoType { zid, eid, sn }, more))
Ok((
ext::SourceInfoType {
id: EntityGlobalId { zid, eid },
sn,
},
more,
))
}
}

Expand Down
17 changes: 17 additions & 0 deletions commons/zenoh-protocol/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,23 @@ impl<'de> serde::Deserialize<'de> for ZenohId {

pub type EntityId = u32;

#[derive(Debug, Default, Clone, Eq, Hash, PartialEq)]
pub struct EntityGlobalId {
pub zid: ZenohId,
pub eid: EntityId,
}

impl EntityGlobalId {
#[cfg(feature = "test")]
pub fn rand() -> Self {
use rand::Rng;
Self {
zid: ZenohId::rand(),
eid: rand::thread_rng().gen(),
}
}
}

#[repr(u8)]
#[derive(Debug, Default, Copy, Clone, Eq, Hash, PartialEq)]
pub enum Priority {
Expand Down
4 changes: 2 additions & 2 deletions commons/zenoh-protocol/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,12 +410,12 @@ pub mod ext {
/// % eid %
/// +---------------+
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EntityIdType<const ID: u8> {
pub struct EntityGlobalIdType<const ID: u8> {
pub zid: ZenohId,
pub eid: EntityId,
}

impl<const ID: u8> EntityIdType<{ ID }> {
impl<const ID: u8> EntityGlobalIdType<{ ID }> {
#[cfg(feature = "test")]
pub fn rand() -> Self {
use rand::Rng;
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-protocol/src/network/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub mod ext {
pub type TimestampType = crate::network::ext::TimestampType<{ Timestamp::ID }>;

pub type ResponderId = zextzbuf!(0x3, false);
pub type ResponderIdType = crate::network::ext::EntityIdType<{ ResponderId::ID }>;
pub type ResponderIdType = crate::network::ext::EntityGlobalIdType<{ ResponderId::ID }>;
}

impl Response {
Expand Down
10 changes: 4 additions & 6 deletions commons/zenoh-protocol/src/zenoh/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ impl From<Ack> for ResponseBody {
pub mod ext {
use zenoh_buffers::ZBuf;

use crate::core::{Encoding, EntityId, ZenohId};
use crate::core::{Encoding, EntityGlobalId};

/// 7 6 5 4 3 2 1 0
/// +-+-+-+-+-+-+-+-+
Expand All @@ -183,8 +183,7 @@ pub mod ext {
/// +---------------+
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SourceInfoType<const ID: u8> {
pub zid: ZenohId,
pub eid: EntityId,
pub id: EntityGlobalId,
pub sn: u32,
}

Expand All @@ -194,10 +193,9 @@ pub mod ext {
use rand::Rng;
let mut rng = rand::thread_rng();

let zid = ZenohId::rand();
let eid: EntityId = rng.gen();
let id = EntityGlobalId::rand();
let sn: u32 = rng.gen();
Self { zid, eid, sn }
Self { id, sn }
}
}

Expand Down
6 changes: 6 additions & 0 deletions zenoh/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ pub(crate) mod common {

/// A [`Locator`] contains a choice of protocol, an address and port, as well as optional additional properties to work with.
pub use zenoh_protocol::core::EndPoint;
/// The global unique id of a zenoh entity.
#[zenoh_macros::unstable]
pub use zenoh_protocol::core::EntityGlobalId;
/// The unique id of a zenoh entity inside it's parent [`Session`].
#[zenoh_macros::unstable]
pub use zenoh_protocol::core::EntityId;
/// A [`Locator`] contains a choice of protocol, an address and port, as well as optional additional properties to work with.
pub use zenoh_protocol::core::Locator;
/// The global unique id of a zenoh peer.
Expand Down
8 changes: 2 additions & 6 deletions zenoh/src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,9 @@ impl SyncResolve for ReplyBuilder<'_> {
let mut ext_sinfo = None;
#[cfg(feature = "unstable")]
{
if source_info.source_id.is_some()
|| source_info.source_eid.is_some()
|| source_info.source_sn.is_some()
{
if source_info.source_id.is_some() || source_info.source_sn.is_some() {
ext_sinfo = Some(zenoh::reply::ext::SourceInfoType {
zid: source_info.source_id.unwrap_or_default(),
eid: source_info.source_eid.unwrap_or_default(),
id: source_info.source_id.unwrap_or_default(),
sn: source_info.source_sn.unwrap_or_default() as u32,
})
}
Expand Down
14 changes: 4 additions & 10 deletions zenoh/src/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@

//! Sample primitives
use crate::buffers::ZBuf;
use crate::prelude::ZenohId;
use crate::prelude::{KeyExpr, SampleKind, Value};
use crate::query::Reply;
use crate::time::{new_reception_timestamp, Timestamp};
#[zenoh_macros::unstable]
use serde::Serialize;
use std::convert::{TryFrom, TryInto};
use zenoh_protocol::core::{Encoding, EntityId};
use zenoh_protocol::core::{Encoding, EntityGlobalId};

pub type SourceSn = u64;

Expand All @@ -48,19 +47,16 @@ pub(crate) struct DataInfo {
pub kind: SampleKind,
pub encoding: Option<Encoding>,
pub timestamp: Option<Timestamp>,
pub source_id: Option<ZenohId>,
pub source_eid: Option<EntityId>,
pub source_id: Option<EntityGlobalId>,
pub source_sn: Option<SourceSn>,
}

/// Informations on the source of a zenoh [`Sample`].
#[zenoh_macros::unstable]
#[derive(Debug, Clone)]
pub struct SourceInfo {
/// The [`ZenohId`] of the zenoh instance that published the concerned [`Sample`].
pub source_id: Option<ZenohId>,
/// The [`EntityId`] of the zenoh entity that published the concerned [`Sample`].
pub source_eid: Option<EntityId>,
/// The [`EntityGlobalId`] of the zenoh entity that published the concerned [`Sample`].
pub source_id: Option<EntityGlobalId>,
/// The sequence number of the [`Sample`] from the source.
pub source_sn: Option<SourceSn>,
}
Expand All @@ -84,7 +80,6 @@ impl SourceInfo {
pub(crate) fn empty() -> Self {
SourceInfo {
source_id: None,
source_eid: None,
source_sn: None,
}
}
Expand All @@ -95,7 +90,6 @@ impl From<DataInfo> for SourceInfo {
fn from(data_info: DataInfo) -> Self {
SourceInfo {
source_id: data_info.source_id,
source_eid: data_info.source_eid,
source_sn: data_info.source_sn,
}
}
Expand Down
11 changes: 4 additions & 7 deletions zenoh/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2059,8 +2059,7 @@ impl Primitives for Session {
kind: SampleKind::Put,
encoding: Some(m.encoding),
timestamp: m.timestamp,
source_id: m.ext_sinfo.as_ref().map(|i| i.zid),
source_eid: m.ext_sinfo.as_ref().map(|i| i.eid),
source_id: m.ext_sinfo.as_ref().map(|i| i.id.clone()),
source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64),
};
self.handle_data(
Expand All @@ -2077,8 +2076,7 @@ impl Primitives for Session {
kind: SampleKind::Delete,
encoding: None,
timestamp: m.timestamp,
source_id: m.ext_sinfo.as_ref().map(|i| i.zid),
source_eid: m.ext_sinfo.as_ref().map(|i| i.eid),
source_id: m.ext_sinfo.as_ref().map(|i| i.id.clone()),
source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64),
};
self.handle_data(
Expand Down Expand Up @@ -2143,7 +2141,7 @@ impl Primitives for Session {
},
};
let replier_id = match e.ext_sinfo {
Some(info) => info.zid,
Some(info) => info.id.zid,
None => ZenohId::rand(),
};
let new_reply = Reply {
Expand Down Expand Up @@ -2214,8 +2212,7 @@ impl Primitives for Session {
kind: SampleKind::Put,
encoding: Some(m.encoding),
timestamp: m.timestamp,
source_id: m.ext_sinfo.as_ref().map(|i| i.zid),
source_eid: m.ext_sinfo.as_ref().map(|i| i.eid),
source_id: m.ext_sinfo.as_ref().map(|i| i.id.clone()),
source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64),
};
#[allow(unused_mut)]
Expand Down

0 comments on commit 8398535

Please sign in to comment.