diff --git a/commons/zenoh-codec/src/network/request.rs b/commons/zenoh-codec/src/network/request.rs index 21f42709c4..16cbfed273 100644 --- a/commons/zenoh-codec/src/network/request.rs +++ b/commons/zenoh-codec/src/network/request.rs @@ -31,37 +31,37 @@ use crate::{ }; // Target -impl WCodec<(&ext::TargetType, bool), &mut W> for Zenoh080 +impl WCodec<(&ext::QueryTarget, bool), &mut W> for Zenoh080 where W: Writer, { type Output = Result<(), DidntWrite>; - fn write(self, writer: &mut W, x: (&ext::TargetType, bool)) -> Self::Output { + fn write(self, writer: &mut W, x: (&ext::QueryTarget, bool)) -> Self::Output { let (x, more) = x; let v = match x { - ext::TargetType::BestMatching => 0, - ext::TargetType::All => 1, - ext::TargetType::AllComplete => 2, + ext::QueryTarget::BestMatching => 0, + ext::QueryTarget::All => 1, + ext::QueryTarget::AllComplete => 2, }; let ext = ext::Target::new(v); self.write(&mut *writer, (&ext, more)) } } -impl RCodec<(ext::TargetType, bool), &mut R> for Zenoh080Header +impl RCodec<(ext::QueryTarget, bool), &mut R> for Zenoh080Header where R: Reader, { type Error = DidntRead; - fn read(self, reader: &mut R) -> Result<(ext::TargetType, bool), Self::Error> { + fn read(self, reader: &mut R) -> Result<(ext::QueryTarget, bool), Self::Error> { let (ext, more): (ext::Target, bool) = self.read(&mut *reader)?; let rt = match ext.value { - 0 => ext::TargetType::BestMatching, - 1 => ext::TargetType::All, - 2 => ext::TargetType::AllComplete, + 0 => ext::QueryTarget::BestMatching, + 1 => ext::QueryTarget::All, + 2 => ext::QueryTarget::AllComplete, _ => return Err(DidntRead), }; Ok((rt, more)) @@ -91,7 +91,7 @@ where let mut header = id::REQUEST; let mut n_exts = ((ext_qos != &ext::QoSType::DEFAULT) as u8) + (ext_tstamp.is_some() as u8) - + ((ext_target != &ext::TargetType::DEFAULT) as u8) + + ((ext_target != &ext::QueryTarget::DEFAULT) as u8) + (ext_budget.is_some() as u8) + (ext_timeout.is_some() as u8) + ((ext_nodeid != &ext::NodeIdType::DEFAULT) as u8); @@ -119,7 +119,7 @@ where n_exts -= 1; self.write(&mut *writer, (ts, n_exts != 0))?; } - if ext_target != &ext::TargetType::DEFAULT { + if ext_target != &ext::QueryTarget::DEFAULT { n_exts -= 1; self.write(&mut *writer, (ext_target, n_exts != 0))?; } @@ -184,7 +184,7 @@ where let mut ext_qos = ext::QoSType::DEFAULT; let mut ext_tstamp = None; let mut ext_nodeid = ext::NodeIdType::DEFAULT; - let mut ext_target = ext::TargetType::DEFAULT; + let mut ext_target = ext::QueryTarget::DEFAULT; let mut ext_limit = None; let mut ext_timeout = None; @@ -209,7 +209,7 @@ where has_ext = ext; } ext::Target::ID => { - let (rt, ext): (ext::TargetType, bool) = eodec.read(&mut *reader)?; + let (rt, ext): (ext::QueryTarget, bool) = eodec.read(&mut *reader)?; ext_target = rt; has_ext = ext; } diff --git a/commons/zenoh-codec/src/zenoh/query.rs b/commons/zenoh-codec/src/zenoh/query.rs index c9b1cc196e..2815b6263a 100644 --- a/commons/zenoh-codec/src/zenoh/query.rs +++ b/commons/zenoh-codec/src/zenoh/query.rs @@ -21,44 +21,44 @@ use zenoh_protocol::{ common::{iext, imsg}, zenoh::{ id, - query::{ext, flag, Consolidation, Query}, + query::{ext, flag, ConsolidationMode, Query}, }, }; use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header}; // Consolidation -impl WCodec for Zenoh080 +impl WCodec for Zenoh080 where W: Writer, { type Output = Result<(), DidntWrite>; - fn write(self, writer: &mut W, x: Consolidation) -> Self::Output { + fn write(self, writer: &mut W, x: ConsolidationMode) -> Self::Output { let v: u64 = match x { - Consolidation::Auto => 0, - Consolidation::None => 1, - Consolidation::Monotonic => 2, - Consolidation::Latest => 3, + ConsolidationMode::Auto => 0, + ConsolidationMode::None => 1, + ConsolidationMode::Monotonic => 2, + ConsolidationMode::Latest => 3, }; self.write(&mut *writer, v) } } -impl RCodec for Zenoh080 +impl RCodec for Zenoh080 where R: Reader, { type Error = DidntRead; - fn read(self, reader: &mut R) -> Result { + fn read(self, reader: &mut R) -> Result { let v: u64 = self.read(&mut *reader)?; let c = match v { - 0 => Consolidation::Auto, - 1 => Consolidation::None, - 2 => Consolidation::Monotonic, - 3 => Consolidation::Latest, - _ => Consolidation::Auto, // Fallback on Auto if Consolidation is unknown + 0 => ConsolidationMode::Auto, + 1 => ConsolidationMode::None, + 2 => ConsolidationMode::Monotonic, + 3 => ConsolidationMode::Latest, + _ => ConsolidationMode::Auto, // Fallback on Auto if Consolidation is unknown }; Ok(c) } @@ -82,7 +82,7 @@ where // Header let mut header = id::QUERY; - if consolidation != &Consolidation::DEFAULT { + if consolidation != &ConsolidationMode::DEFAULT { header |= flag::C; } if !parameters.is_empty() { @@ -98,7 +98,7 @@ where self.write(&mut *writer, header)?; // Body - if consolidation != &Consolidation::DEFAULT { + if consolidation != &ConsolidationMode::DEFAULT { self.write(&mut *writer, *consolidation)?; } if !parameters.is_empty() { @@ -152,7 +152,7 @@ where } // Body - let mut consolidation = Consolidation::DEFAULT; + let mut consolidation = ConsolidationMode::DEFAULT; if imsg::has_flag(self.header, flag::C) { consolidation = self.codec.read(&mut *reader)?; } diff --git a/commons/zenoh-codec/src/zenoh/reply.rs b/commons/zenoh-codec/src/zenoh/reply.rs index a8d6a2afdc..02f2392fbc 100644 --- a/commons/zenoh-codec/src/zenoh/reply.rs +++ b/commons/zenoh-codec/src/zenoh/reply.rs @@ -21,7 +21,7 @@ use zenoh_protocol::{ common::imsg, zenoh::{ id, - query::Consolidation, + query::ConsolidationMode, reply::{flag, Reply, ReplyBody}, }, }; @@ -43,7 +43,7 @@ where // Header let mut header = id::REPLY; - if consolidation != &Consolidation::DEFAULT { + if consolidation != &ConsolidationMode::DEFAULT { header |= flag::C; } let mut n_exts = ext_unknown.len() as u8; @@ -53,7 +53,7 @@ where self.write(&mut *writer, header)?; // Body - if consolidation != &Consolidation::DEFAULT { + if consolidation != &ConsolidationMode::DEFAULT { self.write(&mut *writer, *consolidation)?; } @@ -95,7 +95,7 @@ where } // Body - let mut consolidation = Consolidation::DEFAULT; + let mut consolidation = ConsolidationMode::DEFAULT; if imsg::has_flag(self.header, flag::C) { consolidation = self.codec.read(&mut *reader)?; } diff --git a/commons/zenoh-config/src/wrappers.rs b/commons/zenoh-config/src/wrappers.rs index e9df0c9a54..10389ce979 100644 --- a/commons/zenoh-config/src/wrappers.rs +++ b/commons/zenoh-config/src/wrappers.rs @@ -20,7 +20,7 @@ use std::str::FromStr; use serde::{Deserialize, Serialize}; use zenoh_protocol::{ - core::{key_expr::OwnedKeyExpr, EntityGlobalIdProto, EntityId, Locator, WhatAmI, ZenohIdProto}, + core::{key_expr::OwnedKeyExpr, EntityGlobalIdProto, Locator, WhatAmI, ZenohIdProto}, scouting::HelloProto, }; @@ -143,15 +143,21 @@ impl fmt::Display for Hello { } } +/// The ID globally identifying an entity in a zenoh system. #[derive(Default, Copy, Clone, Eq, Hash, PartialEq)] #[repr(transparent)] pub struct EntityGlobalId(EntityGlobalIdProto); +/// The ID to locally identify an entity in a Zenoh session. +pub type EntityId = u32; + impl EntityGlobalId { + /// Returns the [`ZenohId`], i.e. the Zenoh session, this ID is associated to. pub fn zid(&self) -> ZenohId { self.0.zid.into() } + /// Returns the [`EntityId`] used to identify the entity in a Zenoh session. pub fn eid(&self) -> EntityId { self.0.eid } diff --git a/commons/zenoh-protocol/src/network/request.rs b/commons/zenoh-protocol/src/network/request.rs index ceeec85043..3fd9eb2217 100644 --- a/commons/zenoh-protocol/src/network/request.rs +++ b/commons/zenoh-protocol/src/network/request.rs @@ -58,7 +58,7 @@ pub struct Request { pub ext_qos: ext::QoSType, pub ext_tstamp: Option, pub ext_nodeid: ext::NodeIdType, - pub ext_target: ext::TargetType, + pub ext_target: ext::QueryTarget, pub ext_budget: Option, pub ext_timeout: Option, pub payload: RequestBody, @@ -82,23 +82,27 @@ pub mod ext { pub type NodeIdType = crate::network::ext::NodeIdType<{ NodeId::ID }>; pub type Target = zextz64!(0x4, true); - /// ```text - /// - Target (0x03) - /// 7 6 5 4 3 2 1 0 - /// +-+-+-+-+-+-+-+-+ - /// % target % - /// +---------------+ - /// ``` - /// The `zenoh::queryable::Queryable`s that should be target of a `zenoh::Session::get()`. + // ```text + // - Target (0x03) + // 7 6 5 4 3 2 1 0 + // +-+-+-+-+-+-+-+-+ + // % target % + // +---------------+ + // ``` + // The `zenoh::queryable::Queryable`s that should be target of a `zenoh::Session::get()`. + #[repr(u8)] #[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] - pub enum TargetType { + pub enum QueryTarget { + /// Let Zenoh find the BestMatching queryable capabale of serving the query. #[default] BestMatching, + /// Deliver the query to all queryables matching the query's key expression. All, + /// Deliver the query to all queryables matching the query's key expression that are declared as complete. AllComplete, } - impl TargetType { + impl QueryTarget { pub const DEFAULT: Self = Self::BestMatching; #[cfg(feature = "test")] @@ -107,9 +111,9 @@ pub mod ext { let mut rng = rand::thread_rng(); *[ - TargetType::All, - TargetType::AllComplete, - TargetType::BestMatching, + QueryTarget::All, + QueryTarget::AllComplete, + QueryTarget::BestMatching, ] .choose(&mut rng) .unwrap() @@ -139,7 +143,7 @@ impl Request { let ext_qos = ext::QoSType::rand(); let ext_tstamp = rng.gen_bool(0.5).then(ext::TimestampType::rand); let ext_nodeid = ext::NodeIdType::rand(); - let ext_target = ext::TargetType::rand(); + let ext_target = ext::QueryTarget::rand(); let ext_budget = if rng.gen_bool(0.5) { NonZeroU32::new(rng.gen()) } else { diff --git a/commons/zenoh-protocol/src/zenoh/mod.rs b/commons/zenoh-protocol/src/zenoh/mod.rs index 320db6884d..1c1b39eabe 100644 --- a/commons/zenoh-protocol/src/zenoh/mod.rs +++ b/commons/zenoh-protocol/src/zenoh/mod.rs @@ -20,7 +20,7 @@ pub mod reply; pub use del::Del; pub use err::Err; pub use put::Put; -pub use query::{Consolidation, Query}; +pub use query::{ConsolidationMode, Query}; pub use reply::Reply; use crate::core::Encoding; diff --git a/commons/zenoh-protocol/src/zenoh/query.rs b/commons/zenoh-protocol/src/zenoh/query.rs index 988447b835..49ac3e6d52 100644 --- a/commons/zenoh-protocol/src/zenoh/query.rs +++ b/commons/zenoh-protocol/src/zenoh/query.rs @@ -15,10 +15,10 @@ use alloc::{string::String, vec::Vec}; use crate::common::ZExtUnknown; -/// The kind of consolidation. +/// The kind of consolidation to apply to a query. #[repr(u8)] #[derive(Debug, Default, Clone, PartialEq, Eq, Copy)] -pub enum Consolidation { +pub enum ConsolidationMode { /// Apply automatic consolidation based on queryable's preferences #[default] Auto, @@ -38,7 +38,7 @@ pub enum Consolidation { // Unique, } -impl Consolidation { +impl ConsolidationMode { pub const DEFAULT: Self = Self::Auto; #[cfg(feature = "test")] @@ -79,7 +79,7 @@ pub mod flag { #[derive(Debug, Clone, PartialEq, Eq)] pub struct Query { - pub consolidation: Consolidation, + pub consolidation: ConsolidationMode, pub parameters: String, pub ext_sinfo: Option, pub ext_body: Option, @@ -120,7 +120,7 @@ impl Query { const MIN: usize = 2; const MAX: usize = 16; - let consolidation = Consolidation::rand(); + let consolidation = ConsolidationMode::rand(); let parameters: String = if rng.gen_bool(0.5) { let len = rng.gen_range(MIN..MAX); Alphanumeric.sample_string(&mut rng, len) diff --git a/commons/zenoh-protocol/src/zenoh/reply.rs b/commons/zenoh-protocol/src/zenoh/reply.rs index f29521a4a9..82354c35fb 100644 --- a/commons/zenoh-protocol/src/zenoh/reply.rs +++ b/commons/zenoh-protocol/src/zenoh/reply.rs @@ -15,7 +15,7 @@ use alloc::vec::Vec; use crate::{ common::ZExtUnknown, - zenoh::{query::Consolidation, PushBody}, + zenoh::{query::ConsolidationMode, PushBody}, }; /// # Reply message @@ -45,7 +45,7 @@ pub mod flag { #[derive(Debug, Clone, PartialEq, Eq)] pub struct Reply { - pub consolidation: Consolidation, + pub consolidation: ConsolidationMode, pub ext_unknown: Vec, pub payload: ReplyBody, } @@ -59,7 +59,7 @@ impl Reply { let mut rng = rand::thread_rng(); let payload = ReplyBody::rand(); - let consolidation = Consolidation::rand(); + let consolidation = ConsolidationMode::rand(); let mut ext_unknown = Vec::new(); for _ in 0..rng.gen_range(0..4) { ext_unknown.push(ZExtUnknown::rand2(1, false)); diff --git a/zenoh/src/api/query.rs b/zenoh/src/api/query.rs index 7cb8d5eb84..edeb2db185 100644 --- a/zenoh/src/api/query.rs +++ b/zenoh/src/api/query.rs @@ -25,6 +25,10 @@ use zenoh_keyexpr::OwnedKeyExpr; #[cfg(feature = "unstable")] use zenoh_protocol::core::ZenohIdProto; use zenoh_protocol::core::{CongestionControl, Parameters}; +/// The [`Queryable`](crate::query::Queryable)s that should be target of a [`get`](Session::get). +pub use zenoh_protocol::network::request::ext::QueryTarget; +#[doc(inline)] +pub use zenoh_protocol::zenoh::query::ConsolidationMode; use zenoh_result::ZResult; use super::{ @@ -43,12 +47,6 @@ use super::{ use super::{sample::SourceInfo, selector::ZenohParameters}; use crate::bytes::OptionZBytes; -/// The [`Queryable`](crate::query::Queryable)s that should be target of a [`get`](Session::get). -pub type QueryTarget = zenoh_protocol::network::request::ext::TargetType; - -/// The kind of consolidation. -pub type ConsolidationMode = zenoh_protocol::zenoh::query::Consolidation; - /// The replies consolidation strategy to apply on replies to a [`get`](Session::get). #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub struct QueryConsolidation { diff --git a/zenoh/src/api/queryable.rs b/zenoh/src/api/queryable.rs index 97675336b7..c0c3f76c38 100644 --- a/zenoh/src/api/queryable.rs +++ b/zenoh/src/api/queryable.rs @@ -418,7 +418,7 @@ impl Query { mapping: Mapping::Sender, }, payload: ResponseBody::Reply(zenoh::Reply { - consolidation: zenoh::Consolidation::DEFAULT, + consolidation: zenoh::ConsolidationMode::DEFAULT, ext_unknown: vec![], payload: match sample.kind { SampleKind::Put => ReplyBody::Put(Put { diff --git a/zenoh/src/api/sample.rs b/zenoh/src/api/sample.rs index 482a33382d..0b07d5ebe3 100644 --- a/zenoh/src/api/sample.rs +++ b/zenoh/src/api/sample.rs @@ -30,7 +30,8 @@ use super::{ publisher::Priority, value::Value, }; -pub type SourceSn = u64; +/// The sequence number of the [`Sample`] from the source. +pub type SourceSn = u32; /// The locality of samples to be received by subscribers or targeted by publishers. #[zenoh_macros::unstable] @@ -167,8 +168,9 @@ fn source_info_stack_size() { assert_eq!(std::mem::size_of::(), 16); assert_eq!(std::mem::size_of::>(), 17); - assert_eq!(std::mem::size_of::>(), 16); - assert_eq!(std::mem::size_of::(), 17 + 16 + 7); + assert_eq!(std::mem::size_of::>(), 8); + assert_eq!(std::mem::size_of::>(), 24); + assert_eq!(std::mem::size_of::(), 24 + 8); } #[zenoh_macros::unstable] @@ -192,7 +194,7 @@ impl From for Option Option<&HLC> { self.0.runtime.hlc() } @@ -654,9 +652,9 @@ impl Session { self.0.runtime.config() } - /// Get a new Timestamp from a Zenoh session [`Session`](Session). + /// Get a new Timestamp from a Zenoh [`Session`]. /// - /// The returned timestamp has the current time, with the Session's runtime ZenohID + /// The returned timestamp has the current time, with the Session's runtime [`ZenohId`]. /// /// # Examples /// ### Read current zenoh configuration @@ -2200,8 +2198,8 @@ impl SessionInner { key_expr: &WireExpr, parameters: &str, qid: RequestId, - _target: TargetType, - _consolidation: Consolidation, + _target: QueryTarget, + _consolidation: ConsolidationMode, body: Option, attachment: Option, ) { @@ -2486,7 +2484,7 @@ impl Primitives for WeakSession { timestamp: m.timestamp, qos: QoS::from(msg.ext_qos), source_id: m.ext_sinfo.as_ref().map(|i| i.id.into()), - source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), + source_sn: m.ext_sinfo.as_ref().map(|i| i.sn), }; self.execute_subscriber_callbacks( false, @@ -2506,7 +2504,7 @@ impl Primitives for WeakSession { timestamp: m.timestamp, qos: QoS::from(msg.ext_qos), source_id: m.ext_sinfo.as_ref().map(|i| i.id.into()), - source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), + source_sn: m.ext_sinfo.as_ref().map(|i| i.sn), }; self.execute_subscriber_callbacks( false, @@ -2611,7 +2609,7 @@ impl Primitives for WeakSession { timestamp, qos: QoS::from(msg.ext_qos), source_id: ext_sinfo.as_ref().map(|i| i.id.into()), - source_sn: ext_sinfo.as_ref().map(|i| i.sn as u64), + source_sn: ext_sinfo.as_ref().map(|i| i.sn), }, attachment: _attachment.map(Into::into), }, @@ -2628,7 +2626,7 @@ impl Primitives for WeakSession { timestamp, qos: QoS::from(msg.ext_qos), source_id: ext_sinfo.as_ref().map(|i| i.id.into()), - source_sn: ext_sinfo.as_ref().map(|i| i.sn as u64), + source_sn: ext_sinfo.as_ref().map(|i| i.sn), }, attachment: _attachment.map(Into::into), }, @@ -2688,7 +2686,7 @@ impl Primitives for WeakSession { } } } - Consolidation::Auto | ConsolidationMode::Latest => { + ConsolidationMode::Auto | ConsolidationMode::Latest => { match query.replies.as_ref().unwrap().get( new_reply.result.as_ref().unwrap().key_expr.as_keyexpr(), ) { diff --git a/zenoh/src/net/routing/dispatcher/queries.rs b/zenoh/src/net/routing/dispatcher/queries.rs index 7d1287beeb..4ec6007acd 100644 --- a/zenoh/src/net/routing/dispatcher/queries.rs +++ b/zenoh/src/net/routing/dispatcher/queries.rs @@ -28,7 +28,7 @@ use zenoh_protocol::{ network::{ declare::{ext, queryable::ext::QueryableInfoType, QueryableId}, request::{ - ext::{BudgetType, TargetType, TimeoutType}, + ext::{BudgetType, QueryTarget, TimeoutType}, Request, RequestId, }, response::{self, ext::ResponderIdType, Response, ResponseFinal}, @@ -304,11 +304,11 @@ fn compute_final_route( qabls: &Arc, src_face: &Arc, expr: &mut RoutingExpr, - target: &TargetType, + target: &QueryTarget, query: Arc, ) -> QueryRoute { match target { - TargetType::All => { + QueryTarget::All => { let mut route = HashMap::new(); for qabl in qabls.iter() { if tables @@ -324,7 +324,7 @@ fn compute_final_route( } route } - TargetType::AllComplete => { + QueryTarget::AllComplete => { let mut route = HashMap::new(); for qabl in qabls.iter() { if qabl.info.map(|info| info.complete).unwrap_or(true) @@ -341,7 +341,7 @@ fn compute_final_route( } route } - TargetType::BestMatching => { + QueryTarget::BestMatching => { if let Some(qabl) = qabls.iter().find(|qabl| { qabl.direction.0.id != src_face.id && qabl.info.is_some_and(|info| info.complete) }) { @@ -353,7 +353,7 @@ fn compute_final_route( route } else { - compute_final_route(tables, qabls, src_face, expr, &TargetType::All, query) + compute_final_route(tables, qabls, src_face, expr, &QueryTarget::All, query) } } } @@ -545,7 +545,7 @@ pub fn route_query( qid: RequestId, ext_qos: ext::QoSType, ext_tstamp: Option, - ext_target: TargetType, + ext_target: QueryTarget, ext_budget: Option, ext_timeout: Option, body: RequestBody,