From d6ffebf080958157ac141c92b51b9fe00c075227 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Thu, 15 Feb 2024 11:58:21 +0100 Subject: [PATCH] Clean-up of protocol types (#729) * Update Reply protocol definition and codec * Make consolidation a flag in Query/Reply * Fix wrong Consolidation cast in codec * Apply Reply changes to routing * Fix shared-memory feature * Fix stats * Bump Zenoh Protocol Version * Add query/reply ok(put|del)/err() tests * Clean-up of code * Default CongestionControl for Push is Drop * Fix Priority::DEFAULT typo * Define DEFAULT consts * ConsolidationMode moved into the API * Remove unused Ack message * Fix Ack leftovers * CongestionControl::DEFAULT * QoSType::DEFAULT * Mapping::DEFAULT * Encoding::DEFAULT * QueryTarget::DEFAULT * NodeType::DEFAULT * QueryableInfo::DEFAULT * Remove ConsolidationMode from zenoh-protocol * ConsolidationType::DEFAULT * Remove dead code * Remove dead code * Move SampleKind to sample.rs * Cleanup SubMode * Cleanup QueryTarget * Remove emptyline --- commons/zenoh-codec/benches/codec.rs | 48 +++---- commons/zenoh-codec/src/common/mod.rs | 1 - commons/zenoh-codec/src/common/priority.rs | 66 --------- commons/zenoh-codec/src/core/mod.rs | 1 - commons/zenoh-codec/src/core/property.rs | 84 ----------- commons/zenoh-codec/src/core/wire_expr.rs | 2 +- commons/zenoh-codec/src/network/declare.rs | 32 ++--- commons/zenoh-codec/src/network/mod.rs | 2 +- commons/zenoh-codec/src/network/oam.rs | 7 +- commons/zenoh-codec/src/network/push.rs | 14 +- commons/zenoh-codec/src/network/request.rs | 20 +-- commons/zenoh-codec/src/network/response.rs | 15 +- commons/zenoh-codec/src/transport/fragment.rs | 6 +- commons/zenoh-codec/src/transport/frame.rs | 6 +- commons/zenoh-codec/src/transport/join.rs | 2 +- commons/zenoh-codec/src/transport/oam.rs | 6 +- commons/zenoh-codec/src/zenoh/ack.rs | 129 ----------------- commons/zenoh-codec/src/zenoh/mod.rs | 3 - commons/zenoh-codec/src/zenoh/put.rs | 6 +- commons/zenoh-codec/src/zenoh/query.rs | 6 +- commons/zenoh-codec/src/zenoh/reply.rs | 6 +- commons/zenoh-codec/tests/codec.rs | 5 - commons/zenoh-protocol/src/common/mod.rs | 15 -- commons/zenoh-protocol/src/core/encoding.rs | 2 + commons/zenoh-protocol/src/core/locator.rs | 64 --------- commons/zenoh-protocol/src/core/mod.rs | 98 ++----------- commons/zenoh-protocol/src/core/wire_expr.rs | 2 +- commons/zenoh-protocol/src/network/declare.rs | 28 +++- commons/zenoh-protocol/src/network/mod.rs | 44 +++--- commons/zenoh-protocol/src/network/request.rs | 13 +- commons/zenoh-protocol/src/transport/mod.rs | 12 +- commons/zenoh-protocol/src/zenoh/ack.rs | 84 ----------- commons/zenoh-protocol/src/zenoh/mod.rs | 19 +-- commons/zenoh-protocol/src/zenoh/query.rs | 14 +- examples/examples/z_pub_thr.rs | 2 +- io/zenoh-transport/src/common/batch.rs | 8 +- io/zenoh-transport/src/common/pipeline.rs | 16 +-- io/zenoh-transport/src/multicast/link.rs | 2 +- io/zenoh-transport/src/multicast/rx.rs | 4 +- io/zenoh-transport/src/shm.rs | 2 - .../src/unicast/establishment/cookie.rs | 1 - .../src/unicast/establishment/properties.rs | 132 ------------------ .../src/unicast/universal/rx.rs | 4 +- .../tests/multicast_compression.rs | 6 +- .../tests/multicast_transport.rs | 6 +- .../tests/unicast_compression.rs | 12 +- .../tests/unicast_concurrent.rs | 12 +- .../tests/unicast_defragmentation.rs | 16 +-- .../tests/unicast_intermittent.rs | 6 +- .../tests/unicast_priorities.rs | 4 +- io/zenoh-transport/tests/unicast_shm.rs | 12 +- .../tests/unicast_simultaneous.rs | 4 +- io/zenoh-transport/tests/unicast_transport.rs | 56 ++++---- zenoh-ext/src/subscriber_ext.rs | 8 +- zenoh/src/key_expr.rs | 4 +- zenoh/src/liveliness.rs | 6 +- zenoh/src/net/routing/dispatcher/pubsub.rs | 4 +- zenoh/src/net/routing/dispatcher/queries.rs | 29 ++-- zenoh/src/net/routing/dispatcher/resource.rs | 4 +- zenoh/src/net/routing/hat/client/pubsub.rs | 16 +-- zenoh/src/net/routing/hat/client/queries.rs | 12 +- .../net/routing/hat/linkstate_peer/network.rs | 2 +- .../net/routing/hat/linkstate_peer/pubsub.rs | 20 +-- .../net/routing/hat/linkstate_peer/queries.rs | 20 +-- zenoh/src/net/routing/hat/p2p_peer/gossip.rs | 2 +- zenoh/src/net/routing/hat/p2p_peer/pubsub.rs | 16 +-- zenoh/src/net/routing/hat/p2p_peer/queries.rs | 12 +- zenoh/src/net/routing/hat/router/network.rs | 2 +- zenoh/src/net/routing/hat/router/pubsub.rs | 36 ++--- zenoh/src/net/routing/hat/router/queries.rs | 36 ++--- zenoh/src/net/runtime/adminspace.rs | 14 +- zenoh/src/net/tests/tables.rs | 40 +++--- zenoh/src/prelude.rs | 4 +- zenoh/src/publication.rs | 6 +- zenoh/src/query.rs | 33 ++++- zenoh/src/queryable.rs | 8 +- zenoh/src/sample.rs | 40 +++++- zenoh/src/session.rs | 77 +++++----- zenoh/src/subscriber.rs | 17 --- 79 files changed, 510 insertions(+), 1125 deletions(-) delete mode 100644 commons/zenoh-codec/src/common/priority.rs delete mode 100644 commons/zenoh-codec/src/core/property.rs delete mode 100644 commons/zenoh-codec/src/zenoh/ack.rs delete mode 100644 commons/zenoh-protocol/src/zenoh/ack.rs delete mode 100644 io/zenoh-transport/src/unicast/establishment/properties.rs diff --git a/commons/zenoh-codec/benches/codec.rs b/commons/zenoh-codec/benches/codec.rs index 1c46a700a7..34c9313a7f 100644 --- a/commons/zenoh-codec/benches/codec.rs +++ b/commons/zenoh-codec/benches/codec.rs @@ -75,19 +75,19 @@ fn criterion_benchmark(c: &mut Criterion) { let codec = Zenoh080::new(); let frame = FrameHeader { - reliability: Reliability::default(), + reliability: Reliability::DEFAULT, sn: TransportSn::MIN, - ext_qos: zenoh_protocol::transport::frame::ext::QoSType::default(), + ext_qos: zenoh_protocol::transport::frame::ext::QoSType::DEFAULT, }; let data = Push { wire_expr: WireExpr::empty(), - ext_qos: ext::QoSType::default(), + ext_qos: ext::QoSType::DEFAULT, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, payload: PushBody::Put(Put { timestamp: None, - encoding: Encoding::default(), + encoding: Encoding::DEFAULT, ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, @@ -121,19 +121,19 @@ fn criterion_benchmark(c: &mut Criterion) { let codec = Zenoh080::new(); let frame = FrameHeader { - reliability: Reliability::default(), + reliability: Reliability::DEFAULT, sn: TransportSn::MIN, - ext_qos: zenoh_protocol::transport::frame::ext::QoSType::default(), + ext_qos: zenoh_protocol::transport::frame::ext::QoSType::DEFAULT, }; let data = Push { wire_expr: WireExpr::empty(), - ext_qos: ext::QoSType::default(), + ext_qos: ext::QoSType::DEFAULT, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, payload: PushBody::Put(Put { timestamp: None, - encoding: Encoding::default(), + encoding: Encoding::DEFAULT, ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, @@ -162,19 +162,19 @@ fn criterion_benchmark(c: &mut Criterion) { let codec = Zenoh080::new(); let frame = FrameHeader { - reliability: Reliability::default(), + reliability: Reliability::DEFAULT, sn: TransportSn::MIN, - ext_qos: zenoh_protocol::transport::frame::ext::QoSType::default(), + ext_qos: zenoh_protocol::transport::frame::ext::QoSType::DEFAULT, }; let data = Push { wire_expr: WireExpr::empty(), - ext_qos: ext::QoSType::default(), + ext_qos: ext::QoSType::DEFAULT, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, payload: PushBody::Put(Put { timestamp: None, - encoding: Encoding::default(), + encoding: Encoding::DEFAULT, ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, @@ -210,12 +210,12 @@ fn criterion_benchmark(c: &mut Criterion) { let data = Push { wire_expr: WireExpr::empty(), - ext_qos: ext::QoSType::default(), + ext_qos: ext::QoSType::DEFAULT, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, payload: PushBody::Put(Put { timestamp: None, - encoding: Encoding::default(), + encoding: Encoding::DEFAULT, ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, @@ -238,12 +238,12 @@ fn criterion_benchmark(c: &mut Criterion) { let data = Push { wire_expr: WireExpr::empty(), - ext_qos: ext::QoSType::default(), + ext_qos: ext::QoSType::DEFAULT, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, payload: PushBody::Put(Put { timestamp: None, - encoding: Encoding::default(), + encoding: Encoding::DEFAULT, ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, @@ -277,12 +277,12 @@ fn criterion_benchmark(c: &mut Criterion) { let data = Push { wire_expr: WireExpr::empty(), - ext_qos: ext::QoSType::default(), + ext_qos: ext::QoSType::DEFAULT, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, payload: PushBody::Put(Put { timestamp: None, - encoding: Encoding::default(), + encoding: Encoding::DEFAULT, ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, diff --git a/commons/zenoh-codec/src/common/mod.rs b/commons/zenoh-codec/src/common/mod.rs index 4c25c93241..f34f9872bf 100644 --- a/commons/zenoh-codec/src/common/mod.rs +++ b/commons/zenoh-codec/src/common/mod.rs @@ -12,4 +12,3 @@ // ZettaScale Zenoh Team, // pub mod extension; -mod priority; diff --git a/commons/zenoh-codec/src/common/priority.rs b/commons/zenoh-codec/src/common/priority.rs deleted file mode 100644 index 776229971e..0000000000 --- a/commons/zenoh-codec/src/common/priority.rs +++ /dev/null @@ -1,66 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -use crate::{RCodec, WCodec, Zenoh080, Zenoh080Header}; -use core::convert::TryInto; -use zenoh_buffers::{ - reader::{DidntRead, Reader}, - writer::{DidntWrite, Writer}, -}; -use zenoh_protocol::{common::imsg, core::Priority}; - -impl WCodec<&Priority, &mut W> for Zenoh080 -where - W: Writer, -{ - type Output = Result<(), DidntWrite>; - - fn write(self, writer: &mut W, x: &Priority) -> Self::Output { - // Header - let header = imsg::id::PRIORITY | ((*x as u8) << imsg::HEADER_BITS); - self.write(&mut *writer, header)?; - Ok(()) - } -} - -impl RCodec for Zenoh080 -where - R: Reader, -{ - type Error = DidntRead; - - fn read(self, reader: &mut R) -> Result { - let header: u8 = self.read(&mut *reader)?; - let codec = Zenoh080Header::new(header); - - codec.read(reader) - } -} - -impl RCodec for Zenoh080Header -where - R: Reader, -{ - type Error = DidntRead; - - fn read(self, _reader: &mut R) -> Result { - if imsg::mid(self.header) != imsg::id::PRIORITY { - return Err(DidntRead); - } - - let priority: Priority = (imsg::flags(self.header) >> imsg::HEADER_BITS) - .try_into() - .map_err(|_| DidntRead)?; - Ok(priority) - } -} diff --git a/commons/zenoh-codec/src/core/mod.rs b/commons/zenoh-codec/src/core/mod.rs index 1f48def695..c8e19f057f 100644 --- a/commons/zenoh-codec/src/core/mod.rs +++ b/commons/zenoh-codec/src/core/mod.rs @@ -13,7 +13,6 @@ // mod encoding; mod locator; -mod property; #[cfg(feature = "shared-memory")] mod shm; mod timestamp; diff --git a/commons/zenoh-codec/src/core/property.rs b/commons/zenoh-codec/src/core/property.rs deleted file mode 100644 index bb7f760208..0000000000 --- a/commons/zenoh-codec/src/core/property.rs +++ /dev/null @@ -1,84 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -use crate::{RCodec, WCodec, Zenoh080}; -use alloc::vec::Vec; -use zenoh_buffers::{ - reader::{DidntRead, Reader}, - writer::{DidntWrite, Writer}, -}; -use zenoh_protocol::core::Property; - -impl WCodec<&Property, &mut W> for Zenoh080 -where - W: Writer, -{ - type Output = Result<(), DidntWrite>; - - fn write(self, writer: &mut W, x: &Property) -> Self::Output { - let Property { key, value } = x; - - self.write(&mut *writer, key)?; - self.write(&mut *writer, value.as_slice())?; - Ok(()) - } -} - -impl RCodec for Zenoh080 -where - R: Reader, -{ - type Error = DidntRead; - - fn read(self, reader: &mut R) -> Result { - let key: u64 = self.read(&mut *reader)?; - let value: Vec = self.read(&mut *reader)?; - - Ok(Property { key, value }) - } -} - -impl WCodec<&[Property], &mut W> for Zenoh080 -where - W: Writer, -{ - type Output = Result<(), DidntWrite>; - - fn write(self, writer: &mut W, x: &[Property]) -> Self::Output { - self.write(&mut *writer, x.len())?; - for p in x.iter() { - self.write(&mut *writer, p)?; - } - - Ok(()) - } -} - -impl RCodec, &mut R> for Zenoh080 -where - R: Reader, -{ - type Error = DidntRead; - - fn read(self, reader: &mut R) -> Result, Self::Error> { - let num: usize = self.read(&mut *reader)?; - - let mut ps = Vec::with_capacity(num); - for _ in 0..num { - let p: Property = self.read(&mut *reader)?; - ps.push(p); - } - - Ok(ps) - } -} diff --git a/commons/zenoh-codec/src/core/wire_expr.rs b/commons/zenoh-codec/src/core/wire_expr.rs index 6caba6c8c7..aa6f77b379 100644 --- a/commons/zenoh-codec/src/core/wire_expr.rs +++ b/commons/zenoh-codec/src/core/wire_expr.rs @@ -65,7 +65,7 @@ where Ok(WireExpr { scope, suffix: suffix.into(), - mapping: Mapping::default(), + mapping: Mapping::DEFAULT, }) } } diff --git a/commons/zenoh-codec/src/network/declare.rs b/commons/zenoh-codec/src/network/declare.rs index 20916dc359..cf92b27c17 100644 --- a/commons/zenoh-codec/src/network/declare.rs +++ b/commons/zenoh-codec/src/network/declare.rs @@ -102,16 +102,16 @@ where // Header let mut header = id::DECLARE; - let mut n_exts = ((ext_qos != &declare::ext::QoSType::default()) as u8) + let mut n_exts = ((ext_qos != &declare::ext::QoSType::DEFAULT) as u8) + (ext_tstamp.is_some() as u8) - + ((ext_nodeid != &declare::ext::NodeIdType::default()) as u8); + + ((ext_nodeid != &declare::ext::NodeIdType::DEFAULT) as u8); if n_exts != 0 { header |= declare::flag::Z; } self.write(&mut *writer, header)?; // Extensions - if ext_qos != &declare::ext::QoSType::default() { + if ext_qos != &declare::ext::QoSType::DEFAULT { n_exts -= 1; self.write(&mut *writer, (*ext_qos, n_exts != 0))?; } @@ -119,7 +119,7 @@ where n_exts -= 1; self.write(&mut *writer, (ts, n_exts != 0))?; } - if ext_nodeid != &declare::ext::NodeIdType::default() { + if ext_nodeid != &declare::ext::NodeIdType::DEFAULT { n_exts -= 1; self.write(&mut *writer, (*ext_nodeid, n_exts != 0))?; } @@ -157,9 +157,9 @@ where } // Extensions - let mut ext_qos = declare::ext::QoSType::default(); + let mut ext_qos = declare::ext::QoSType::DEFAULT; let mut ext_tstamp = None; - let mut ext_nodeid = declare::ext::NodeIdType::default(); + let mut ext_nodeid = declare::ext::NodeIdType::DEFAULT; let mut has_ext = imsg::has_flag(self.header, declare::flag::Z); while has_ext { @@ -340,11 +340,11 @@ where // Header let mut header = declare::id::D_SUBSCRIBER; - let mut n_exts = (ext_info != &subscriber::ext::SubscriberInfo::default()) as u8; + let mut n_exts = (ext_info != &subscriber::ext::SubscriberInfo::DEFAULT) as u8; if n_exts != 0 { header |= subscriber::flag::Z; } - if wire_expr.mapping != Mapping::default() { + if wire_expr.mapping != Mapping::DEFAULT { header |= subscriber::flag::M; } if wire_expr.has_suffix() { @@ -357,7 +357,7 @@ where self.write(&mut *writer, wire_expr)?; // Extensions - if ext_info != &subscriber::ext::SubscriberInfo::default() { + if ext_info != &subscriber::ext::SubscriberInfo::DEFAULT { n_exts -= 1; self.write(&mut *writer, (*ext_info, n_exts != 0))?; } @@ -402,7 +402,7 @@ where }; // Extensions - let mut ext_info = subscriber::ext::SubscriberInfo::default(); + let mut ext_info = subscriber::ext::SubscriberInfo::DEFAULT; let mut has_ext = imsg::has_flag(self.header, subscriber::flag::Z); while has_ext { @@ -524,11 +524,11 @@ where // Header let mut header = declare::id::D_QUERYABLE; - let mut n_exts = (ext_info != &queryable::ext::QueryableInfo::default()) as u8; + let mut n_exts = (ext_info != &queryable::ext::QueryableInfo::DEFAULT) as u8; if n_exts != 0 { header |= subscriber::flag::Z; } - if wire_expr.mapping != Mapping::default() { + if wire_expr.mapping != Mapping::DEFAULT { header |= subscriber::flag::M; } if wire_expr.has_suffix() { @@ -539,7 +539,7 @@ where // Body self.write(&mut *writer, id)?; self.write(&mut *writer, wire_expr)?; - if ext_info != &queryable::ext::QueryableInfo::default() { + if ext_info != &queryable::ext::QueryableInfo::DEFAULT { n_exts -= 1; self.write(&mut *writer, (*ext_info, n_exts != 0))?; } @@ -584,7 +584,7 @@ where }; // Extensions - let mut ext_info = queryable::ext::QueryableInfo::default(); + let mut ext_info = queryable::ext::QueryableInfo::DEFAULT; let mut has_ext = imsg::has_flag(self.header, queryable::flag::Z); while has_ext { @@ -699,7 +699,7 @@ where // Header let mut header = declare::id::D_TOKEN; - if wire_expr.mapping != Mapping::default() { + if wire_expr.mapping != Mapping::DEFAULT { header |= subscriber::flag::M; } if wire_expr.has_suffix() { @@ -851,7 +851,7 @@ where // Header let mut header = declare::id::D_INTEREST; - if wire_expr.mapping != Mapping::default() { + if wire_expr.mapping != Mapping::DEFAULT { header |= subscriber::flag::M; } if wire_expr.has_suffix() { diff --git a/commons/zenoh-codec/src/network/mod.rs b/commons/zenoh-codec/src/network/mod.rs index c1f2489b88..dade13d362 100644 --- a/commons/zenoh-codec/src/network/mod.rs +++ b/commons/zenoh-codec/src/network/mod.rs @@ -58,7 +58,7 @@ where type Error = DidntRead; fn read(self, reader: &mut R) -> Result { - let codec = Zenoh080Reliability::new(Reliability::default()); + let codec = Zenoh080Reliability::new(Reliability::DEFAULT); codec.read(reader) } } diff --git a/commons/zenoh-codec/src/network/oam.rs b/commons/zenoh-codec/src/network/oam.rs index ff6daeb020..9751e9952d 100644 --- a/commons/zenoh-codec/src/network/oam.rs +++ b/commons/zenoh-codec/src/network/oam.rs @@ -52,8 +52,7 @@ where header |= iext::ENC_ZBUF; } } - let mut n_exts = - ((ext_qos != &ext::QoSType::default()) as u8) + (ext_tstamp.is_some() as u8); + let mut n_exts = ((ext_qos != &ext::QoSType::DEFAULT) as u8) + (ext_tstamp.is_some() as u8); if n_exts != 0 { header |= flag::Z; } @@ -63,7 +62,7 @@ where self.write(&mut *writer, id)?; // Extensions - if ext_qos != &ext::QoSType::default() { + if ext_qos != &ext::QoSType::DEFAULT { n_exts -= 1; self.write(&mut *writer, (*ext_qos, n_exts != 0))?; } @@ -115,7 +114,7 @@ where let id: OamId = self.codec.read(&mut *reader)?; // Extensions - let mut ext_qos = ext::QoSType::default(); + let mut ext_qos = ext::QoSType::DEFAULT; let mut ext_tstamp = None; let mut has_ext = imsg::has_flag(self.header, flag::Z); diff --git a/commons/zenoh-codec/src/network/push.rs b/commons/zenoh-codec/src/network/push.rs index 10a8489b29..b9ec2ba5db 100644 --- a/commons/zenoh-codec/src/network/push.rs +++ b/commons/zenoh-codec/src/network/push.rs @@ -44,13 +44,13 @@ where // Header let mut header = id::PUSH; - let mut n_exts = ((ext_qos != &ext::QoSType::default()) as u8) + let mut n_exts = ((ext_qos != &ext::QoSType::DEFAULT) as u8) + (ext_tstamp.is_some() as u8) - + ((ext_nodeid != &ext::NodeIdType::default()) as u8); + + ((ext_nodeid != &ext::NodeIdType::DEFAULT) as u8); if n_exts != 0 { header |= flag::Z; } - if wire_expr.mapping != Mapping::default() { + if wire_expr.mapping != Mapping::DEFAULT { header |= flag::M; } if wire_expr.has_suffix() { @@ -62,7 +62,7 @@ where self.write(&mut *writer, wire_expr)?; // Extensions - if ext_qos != &ext::QoSType::default() { + if ext_qos != &ext::QoSType::DEFAULT { n_exts -= 1; self.write(&mut *writer, (*ext_qos, n_exts != 0))?; } @@ -70,7 +70,7 @@ where n_exts -= 1; self.write(&mut *writer, (ts, n_exts != 0))?; } - if ext_nodeid != &ext::NodeIdType::default() { + if ext_nodeid != &ext::NodeIdType::DEFAULT { n_exts -= 1; self.write(&mut *writer, (*ext_nodeid, n_exts != 0))?; } @@ -116,9 +116,9 @@ where }; // Extensions - let mut ext_qos = ext::QoSType::default(); + let mut ext_qos = ext::QoSType::DEFAULT; let mut ext_tstamp = None; - let mut ext_nodeid = ext::NodeIdType::default(); + let mut ext_nodeid = ext::NodeIdType::DEFAULT; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { diff --git a/commons/zenoh-codec/src/network/request.rs b/commons/zenoh-codec/src/network/request.rs index 19711ff147..364c1af3d0 100644 --- a/commons/zenoh-codec/src/network/request.rs +++ b/commons/zenoh-codec/src/network/request.rs @@ -93,16 +93,16 @@ where // Header let mut header = id::REQUEST; - let mut n_exts = ((ext_qos != &ext::QoSType::default()) as u8) + let mut n_exts = ((ext_qos != &ext::QoSType::DEFAULT) as u8) + (ext_tstamp.is_some() as u8) - + ((ext_target != &ext::TargetType::default()) as u8) + + ((ext_target != &ext::TargetType::DEFAULT) as u8) + (ext_budget.is_some() as u8) + (ext_timeout.is_some() as u8) - + ((ext_nodeid != &ext::NodeIdType::default()) as u8); + + ((ext_nodeid != &ext::NodeIdType::DEFAULT) as u8); if n_exts != 0 { header |= flag::Z; } - if wire_expr.mapping != Mapping::default() { + if wire_expr.mapping != Mapping::DEFAULT { header |= flag::M; } if wire_expr.has_suffix() { @@ -115,7 +115,7 @@ where self.write(&mut *writer, wire_expr)?; // Extensions - if ext_qos != &ext::QoSType::default() { + if ext_qos != &ext::QoSType::DEFAULT { n_exts -= 1; self.write(&mut *writer, (*ext_qos, n_exts != 0))?; } @@ -123,7 +123,7 @@ where n_exts -= 1; self.write(&mut *writer, (ts, n_exts != 0))?; } - if ext_target != &ext::TargetType::default() { + if ext_target != &ext::TargetType::DEFAULT { n_exts -= 1; self.write(&mut *writer, (ext_target, n_exts != 0))?; } @@ -137,7 +137,7 @@ where let e = ext::Timeout::new(to.as_millis() as u64); self.write(&mut *writer, (&e, n_exts != 0))?; } - if ext_nodeid != &ext::NodeIdType::default() { + if ext_nodeid != &ext::NodeIdType::DEFAULT { n_exts -= 1; self.write(&mut *writer, (*ext_nodeid, n_exts != 0))?; } @@ -185,10 +185,10 @@ where }; // Extensions - let mut ext_qos = ext::QoSType::default(); + let mut ext_qos = ext::QoSType::DEFAULT; let mut ext_tstamp = None; - let mut ext_nodeid = ext::NodeIdType::default(); - let mut ext_target = ext::TargetType::default(); + let mut ext_nodeid = ext::NodeIdType::DEFAULT; + let mut ext_target = ext::TargetType::DEFAULT; let mut ext_limit = None; let mut ext_timeout = None; diff --git a/commons/zenoh-codec/src/network/response.rs b/commons/zenoh-codec/src/network/response.rs index bec7df2967..5b69e8b109 100644 --- a/commons/zenoh-codec/src/network/response.rs +++ b/commons/zenoh-codec/src/network/response.rs @@ -48,13 +48,13 @@ where // Header let mut header = id::RESPONSE; - let mut n_exts = ((ext_qos != &ext::QoSType::default()) as u8) + let mut n_exts = ((ext_qos != &ext::QoSType::DEFAULT) as u8) + (ext_tstamp.is_some() as u8) + (ext_respid.is_some() as u8); if n_exts != 0 { header |= flag::Z; } - if wire_expr.mapping != Mapping::default() { + if wire_expr.mapping != Mapping::DEFAULT { header |= flag::M; } if wire_expr.has_suffix() { @@ -67,7 +67,7 @@ where self.write(&mut *writer, wire_expr)?; // Extensions - if ext_qos != &ext::QoSType::default() { + if ext_qos != &ext::QoSType::DEFAULT { n_exts -= 1; self.write(&mut *writer, (*ext_qos, n_exts != 0))?; } @@ -123,7 +123,7 @@ where }; // Extensions - let mut ext_qos = ext::QoSType::default(); + let mut ext_qos = ext::QoSType::DEFAULT; let mut ext_tstamp = None; let mut ext_respid = None; @@ -183,8 +183,7 @@ where // Header let mut header = id::RESPONSE_FINAL; - let mut n_exts = - ((ext_qos != &ext::QoSType::default()) as u8) + (ext_tstamp.is_some() as u8); + let mut n_exts = ((ext_qos != &ext::QoSType::DEFAULT) as u8) + (ext_tstamp.is_some() as u8); if n_exts != 0 { header |= flag::Z; } @@ -194,7 +193,7 @@ where self.write(&mut *writer, rid)?; // Extensions - if ext_qos != &ext::QoSType::default() { + if ext_qos != &ext::QoSType::DEFAULT { n_exts -= 1; self.write(&mut *writer, (*ext_qos, n_exts != 0))?; } @@ -236,7 +235,7 @@ where let rid: RequestId = bodec.read(&mut *reader)?; // Extensions - let mut ext_qos = ext::QoSType::default(); + let mut ext_qos = ext::QoSType::DEFAULT; let mut ext_tstamp = None; let mut has_ext = imsg::has_flag(self.header, flag::Z); diff --git a/commons/zenoh-codec/src/transport/fragment.rs b/commons/zenoh-codec/src/transport/fragment.rs index b66f395df1..b01e2c2bae 100644 --- a/commons/zenoh-codec/src/transport/fragment.rs +++ b/commons/zenoh-codec/src/transport/fragment.rs @@ -48,7 +48,7 @@ where if *more { header |= flag::M; } - if ext_qos != &ext::QoSType::default() { + if ext_qos != &ext::QoSType::DEFAULT { header |= flag::Z; } self.write(&mut *writer, header)?; @@ -57,7 +57,7 @@ where self.write(&mut *writer, sn)?; // Extensions - if ext_qos != &ext::QoSType::default() { + if ext_qos != &ext::QoSType::DEFAULT { self.write(&mut *writer, (*ext_qos, false))?; } @@ -97,7 +97,7 @@ where let sn: TransportSn = self.codec.read(&mut *reader)?; // Extensions - let mut ext_qos = ext::QoSType::default(); + let mut ext_qos = ext::QoSType::DEFAULT; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { diff --git a/commons/zenoh-codec/src/transport/frame.rs b/commons/zenoh-codec/src/transport/frame.rs index 8d39aabcdb..ab82a024c4 100644 --- a/commons/zenoh-codec/src/transport/frame.rs +++ b/commons/zenoh-codec/src/transport/frame.rs @@ -46,7 +46,7 @@ where if let Reliability::Reliable = reliability { header |= flag::R; } - if ext_qos != &ext::QoSType::default() { + if ext_qos != &ext::QoSType::DEFAULT { header |= flag::Z; } self.write(&mut *writer, header)?; @@ -55,7 +55,7 @@ where self.write(&mut *writer, sn)?; // Extensions - if ext_qos != &ext::QoSType::default() { + if ext_qos != &ext::QoSType::DEFAULT { self.write(&mut *writer, (x.ext_qos, false))?; } @@ -94,7 +94,7 @@ where let sn: TransportSn = self.codec.read(&mut *reader)?; // Extensions - let mut ext_qos = ext::QoSType::default(); + let mut ext_qos = ext::QoSType::DEFAULT; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { diff --git a/commons/zenoh-codec/src/transport/join.rs b/commons/zenoh-codec/src/transport/join.rs index 80c1663413..d87ceecc78 100644 --- a/commons/zenoh-codec/src/transport/join.rs +++ b/commons/zenoh-codec/src/transport/join.rs @@ -121,7 +121,7 @@ where let (_, more): (ZExtZBufHeader<{ ext::QoS::ID }>, bool) = self.read(&mut *reader)?; // Body - let mut ext_qos = Box::new([PrioritySn::default(); Priority::NUM]); + let mut ext_qos = Box::new([PrioritySn::DEFAULT; Priority::NUM]); for p in ext_qos.iter_mut() { *p = self.codec.read(&mut *reader)?; } diff --git a/commons/zenoh-codec/src/transport/oam.rs b/commons/zenoh-codec/src/transport/oam.rs index e2f905abf8..6861f638d3 100644 --- a/commons/zenoh-codec/src/transport/oam.rs +++ b/commons/zenoh-codec/src/transport/oam.rs @@ -47,7 +47,7 @@ where header |= iext::ENC_ZBUF; } } - let mut n_exts = (ext_qos != &ext::QoSType::default()) as u8; + let mut n_exts = (ext_qos != &ext::QoSType::DEFAULT) as u8; if n_exts != 0 { header |= flag::Z; } @@ -57,7 +57,7 @@ where self.write(&mut *writer, id)?; // Extensions - if ext_qos != &ext::QoSType::default() { + if ext_qos != &ext::QoSType::DEFAULT { n_exts -= 1; self.write(&mut *writer, (*ext_qos, n_exts != 0))?; } @@ -105,7 +105,7 @@ where let id: OamId = self.codec.read(&mut *reader)?; // Extensions - let mut ext_qos = ext::QoSType::default(); + let mut ext_qos = ext::QoSType::DEFAULT; let mut has_ext = imsg::has_flag(self.header, flag::Z); while has_ext { diff --git a/commons/zenoh-codec/src/zenoh/ack.rs b/commons/zenoh-codec/src/zenoh/ack.rs deleted file mode 100644 index 78cbca2987..0000000000 --- a/commons/zenoh-codec/src/zenoh/ack.rs +++ /dev/null @@ -1,129 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -use crate::{common::extension, RCodec, WCodec, Zenoh080, Zenoh080Header}; -use alloc::vec::Vec; -use zenoh_buffers::{ - reader::{DidntRead, Reader}, - writer::{DidntWrite, Writer}, -}; -use zenoh_protocol::{ - common::{iext, imsg}, - zenoh::{ - ack::{ext, flag, Ack}, - id, - }, -}; - -impl WCodec<&Ack, &mut W> for Zenoh080 -where - W: Writer, -{ - type Output = Result<(), DidntWrite>; - - fn write(self, writer: &mut W, x: &Ack) -> Self::Output { - let Ack { - timestamp, - ext_sinfo, - ext_unknown, - } = x; - - // Header - let mut header = id::ACK; - if timestamp.is_some() { - header |= flag::T; - } - let mut n_exts = ((ext_sinfo.is_some()) as u8) + (ext_unknown.len() as u8); - if n_exts != 0 { - header |= flag::Z; - } - self.write(&mut *writer, header)?; - - // Body - if let Some(ts) = timestamp.as_ref() { - self.write(&mut *writer, ts)?; - } - - // Extensions - if let Some(sinfo) = ext_sinfo.as_ref() { - n_exts -= 1; - self.write(&mut *writer, (sinfo, n_exts != 0))?; - } - for u in ext_unknown.iter() { - n_exts -= 1; - self.write(&mut *writer, (u, n_exts != 0))?; - } - - Ok(()) - } -} - -impl RCodec for Zenoh080 -where - R: Reader, -{ - type Error = DidntRead; - - fn read(self, reader: &mut R) -> Result { - let header: u8 = self.read(&mut *reader)?; - let codec = Zenoh080Header::new(header); - codec.read(reader) - } -} - -impl RCodec for Zenoh080Header -where - R: Reader, -{ - type Error = DidntRead; - - fn read(self, reader: &mut R) -> Result { - if imsg::mid(self.header) != id::ACK { - return Err(DidntRead); - } - - // Body - let mut timestamp: Option = None; - if imsg::has_flag(self.header, flag::T) { - timestamp = Some(self.codec.read(&mut *reader)?); - } - - // Extensions - let mut ext_sinfo: Option = None; - let mut ext_unknown = Vec::new(); - - let mut has_ext = imsg::has_flag(self.header, flag::Z); - while has_ext { - let ext: u8 = self.codec.read(&mut *reader)?; - let eodec = Zenoh080Header::new(ext); - match iext::eid(ext) { - ext::SourceInfo::ID => { - let (s, ext): (ext::SourceInfoType, bool) = eodec.read(&mut *reader)?; - ext_sinfo = Some(s); - has_ext = ext; - } - _ => { - let (u, ext) = extension::read(reader, "Ack", ext)?; - ext_unknown.push(u); - has_ext = ext; - } - } - } - - Ok(Ack { - timestamp, - ext_sinfo, - ext_unknown, - }) - } -} diff --git a/commons/zenoh-codec/src/zenoh/mod.rs b/commons/zenoh-codec/src/zenoh/mod.rs index d59add9d63..fdff09be94 100644 --- a/commons/zenoh-codec/src/zenoh/mod.rs +++ b/commons/zenoh-codec/src/zenoh/mod.rs @@ -11,7 +11,6 @@ // Contributors: // ZettaScale Zenoh Team, // -pub mod ack; pub mod del; pub mod err; pub mod pull; @@ -121,7 +120,6 @@ where fn write(self, writer: &mut W, x: &ResponseBody) -> Self::Output { match x { ResponseBody::Reply(b) => self.write(&mut *writer, b), - ResponseBody::Ack(b) => self.write(&mut *writer, b), ResponseBody::Err(b) => self.write(&mut *writer, b), ResponseBody::Put(b) => self.write(&mut *writer, b), } @@ -140,7 +138,6 @@ where let codec = Zenoh080Header::new(header); let body = match imsg::mid(codec.header) { id::REPLY => ResponseBody::Reply(codec.read(&mut *reader)?), - id::ACK => ResponseBody::Ack(codec.read(&mut *reader)?), id::ERR => ResponseBody::Err(codec.read(&mut *reader)?), id::PUT => ResponseBody::Put(codec.read(&mut *reader)?), _ => return Err(DidntRead), diff --git a/commons/zenoh-codec/src/zenoh/put.rs b/commons/zenoh-codec/src/zenoh/put.rs index ebc364cf9b..4f50be4872 100644 --- a/commons/zenoh-codec/src/zenoh/put.rs +++ b/commons/zenoh-codec/src/zenoh/put.rs @@ -54,7 +54,7 @@ where if timestamp.is_some() { header |= flag::T; } - if encoding != &Encoding::default() { + if encoding != &Encoding::DEFAULT { header |= flag::E; } let mut n_exts = (ext_sinfo.is_some()) as u8 @@ -73,7 +73,7 @@ where if let Some(ts) = timestamp.as_ref() { self.write(&mut *writer, ts)?; } - if encoding != &Encoding::default() { + if encoding != &Encoding::DEFAULT { self.write(&mut *writer, encoding)?; } @@ -143,7 +143,7 @@ where timestamp = Some(self.codec.read(&mut *reader)?); } - let mut encoding = Encoding::default(); + let mut encoding = Encoding::DEFAULT; if imsg::has_flag(self.header, flag::E) { encoding = self.codec.read(&mut *reader)?; } diff --git a/commons/zenoh-codec/src/zenoh/query.rs b/commons/zenoh-codec/src/zenoh/query.rs index cb0506e474..55f25cd5ea 100644 --- a/commons/zenoh-codec/src/zenoh/query.rs +++ b/commons/zenoh-codec/src/zenoh/query.rs @@ -83,7 +83,7 @@ where // Header let mut header = id::QUERY; - if consolidation != &Consolidation::default() { + if consolidation != &Consolidation::DEFAULT { header |= flag::C; } if !parameters.is_empty() { @@ -99,7 +99,7 @@ where self.write(&mut *writer, header)?; // Body - if consolidation != &Consolidation::default() { + if consolidation != &Consolidation::DEFAULT { self.write(&mut *writer, *consolidation)?; } if !parameters.is_empty() { @@ -153,7 +153,7 @@ where } // Body - let mut consolidation = Consolidation::default(); + let mut consolidation = Consolidation::DEFAULT; if imsg::has_flag(self.header, flag::C) { consolidation = self.codec.read(&mut *reader)?; } diff --git a/commons/zenoh-codec/src/zenoh/reply.rs b/commons/zenoh-codec/src/zenoh/reply.rs index d54e98cc5e..308004a1c2 100644 --- a/commons/zenoh-codec/src/zenoh/reply.rs +++ b/commons/zenoh-codec/src/zenoh/reply.rs @@ -41,7 +41,7 @@ where // Header let mut header = id::REPLY; - if consolidation != &Consolidation::default() { + if consolidation != &Consolidation::DEFAULT { header |= flag::C; } let mut n_exts = ext_unknown.len() as u8; @@ -51,7 +51,7 @@ where self.write(&mut *writer, header)?; // Body - if consolidation != &Consolidation::default() { + if consolidation != &Consolidation::DEFAULT { self.write(&mut *writer, *consolidation)?; } @@ -93,7 +93,7 @@ where } // Body - let mut consolidation = Consolidation::default(); + let mut consolidation = Consolidation::DEFAULT; if imsg::has_flag(self.header, flag::C) { consolidation = self.codec.read(&mut *reader)?; } diff --git a/commons/zenoh-codec/tests/codec.rs b/commons/zenoh-codec/tests/codec.rs index 28201c1977..7f23214b49 100644 --- a/commons/zenoh-codec/tests/codec.rs +++ b/commons/zenoh-codec/tests/codec.rs @@ -582,11 +582,6 @@ fn codec_err() { run!(zenoh::Err, zenoh::Err::rand()); } -#[test] -fn codec_ack() { - run!(zenoh::Ack, zenoh::Ack::rand()); -} - #[test] fn codec_pull() { run!(zenoh::Pull, zenoh::Pull::rand()); diff --git a/commons/zenoh-protocol/src/common/mod.rs b/commons/zenoh-protocol/src/common/mod.rs index d11d0b0c52..ef53e5a8ac 100644 --- a/commons/zenoh-protocol/src/common/mod.rs +++ b/commons/zenoh-protocol/src/common/mod.rs @@ -19,21 +19,6 @@ pub use extension::*; /*************************************/ // Inner Message IDs pub mod imsg { - pub mod id { - // Zenoh Messages - pub const DECLARE: u8 = 0x0b; - pub const DATA: u8 = 0x0c; - pub const QUERY: u8 = 0x0d; - pub const PULL: u8 = 0x0e; - pub const UNIT: u8 = 0x0f; - pub const LINK_STATE_LIST: u8 = 0x10; - - // Message decorators - pub const PRIORITY: u8 = 0x1c; - pub const ROUTING_CONTEXT: u8 = 0x1d; - pub const REPLY_CONTEXT: u8 = 0x1e; - } - // Header mask pub const HEADER_BITS: u8 = 5; pub const HEADER_MASK: u8 = !(0xff << HEADER_BITS); diff --git a/commons/zenoh-protocol/src/core/encoding.rs b/commons/zenoh-protocol/src/core/encoding.rs index f202b8e79c..b3abae8aae 100644 --- a/commons/zenoh-protocol/src/core/encoding.rs +++ b/commons/zenoh-protocol/src/core/encoding.rs @@ -266,6 +266,8 @@ impl Default for Encoding { } impl Encoding { + pub const DEFAULT: Self = Self::EMPTY; + #[cfg(feature = "test")] pub fn rand() -> Self { use rand::{ diff --git a/commons/zenoh-protocol/src/core/locator.rs b/commons/zenoh-protocol/src/core/locator.rs index cdd3dfa64c..42379f2b65 100644 --- a/commons/zenoh-protocol/src/core/locator.rs +++ b/commons/zenoh-protocol/src/core/locator.rs @@ -122,67 +122,3 @@ impl Locator { EndPoint::rand().into() } } - -// pub(crate) trait HasCanonForm { -// fn is_canon(&self) -> bool; - -// type Output; -// fn canonicalize(self) -> Self::Output; -// } - -// fn cmp(this: &str, than: &str) -> core::cmp::Ordering { -// let is_longer = this.len().cmp(&than.len()); -// let this = this.chars(); -// let than = than.chars(); -// let zip = this.zip(than); -// for (this, than) in zip { -// match this.cmp(&than) { -// core::cmp::Ordering::Equal => {} -// o => return o, -// } -// } -// is_longer -// } - -// impl<'a, T: Iterator + Clone, V> HasCanonForm for T { -// fn is_canon(&self) -> bool { -// let mut iter = self.clone(); -// let mut acc = if let Some((key, _)) = iter.next() { -// key -// } else { -// return true; -// }; -// for (key, _) in iter { -// if cmp(key, acc) != core::cmp::Ordering::Greater { -// return false; -// } -// acc = key; -// } -// true -// } - -// type Output = Vec<(&'a str, V)>; -// fn canonicalize(mut self) -> Self::Output { -// let mut result = Vec::new(); -// if let Some(v) = self.next() { -// result.push(v); -// } -// 'outer: for (k, v) in self { -// for (i, (x, _)) in result.iter().enumerate() { -// match cmp(k, x) { -// core::cmp::Ordering::Less => { -// result.insert(i, (k, v)); -// continue 'outer; -// } -// core::cmp::Ordering::Equal => { -// result[i].1 = v; -// continue 'outer; -// } -// core::cmp::Ordering::Greater => {} -// } -// } -// result.push((k, v)) -// } -// result -// } -// } diff --git a/commons/zenoh-protocol/src/core/mod.rs b/commons/zenoh-protocol/src/core/mod.rs index 2547034c44..3e9315bec2 100644 --- a/commons/zenoh-protocol/src/core/mod.rs +++ b/commons/zenoh-protocol/src/core/mod.rs @@ -16,7 +16,6 @@ use alloc::{ boxed::Box, format, string::{String, ToString}, - vec::Vec, }; use core::{ convert::{From, TryFrom, TryInto}, @@ -54,43 +53,6 @@ pub use endpoint::*; pub mod resolution; pub use resolution::*; -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct Property { - pub key: u64, - pub value: Vec, -} - -/// The kind of a `Sample`. -#[repr(u8)] -#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)] -pub enum SampleKind { - /// if the `Sample` was issued by a `put` operation. - #[default] - Put = 0, - /// if the `Sample` was issued by a `delete` operation. - Delete = 1, -} - -impl fmt::Display for SampleKind { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - SampleKind::Put => write!(f, "PUT"), - SampleKind::Delete => write!(f, "DELETE"), - } - } -} - -impl TryFrom for SampleKind { - type Error = u64; - fn try_from(kind: u64) -> Result { - match kind { - 0 => Ok(SampleKind::Put), - 1 => Ok(SampleKind::Delete), - _ => Err(kind), - } - } -} - /// The global unique id of a zenoh peer. #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] #[repr(transparent)] @@ -314,6 +276,8 @@ pub enum Priority { } impl Priority { + /// Default + pub const DEFAULT: Self = Self::Data; /// The lowest Priority pub const MIN: Self = Self::Background; /// The highest Priority @@ -354,6 +318,8 @@ pub enum Reliability { } impl Reliability { + pub const DEFAULT: Self = Self::BestEffort; + #[cfg(feature = "test")] pub fn rand() -> Self { use rand::Rng; @@ -374,6 +340,13 @@ pub struct Channel { pub reliability: Reliability, } +impl Channel { + pub const DEFAULT: Self = Self { + priority: Priority::DEFAULT, + reliability: Reliability::DEFAULT, + }; +} + /// The kind of congestion control. #[derive(Debug, Default, Copy, Clone, PartialEq, Eq)] #[repr(u8)] @@ -383,51 +356,6 @@ pub enum CongestionControl { Block = 1, } -/// The subscription mode. -#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)] -#[repr(u8)] -pub enum SubMode { - #[default] - Push = 0, - Pull = 1, -} - -#[derive(Debug, Clone, PartialEq, Eq, Default)] -pub struct SubInfo { - pub reliability: Reliability, - pub mode: SubMode, -} - -#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)] -pub struct QueryableInfo { - pub complete: u64, // Default 0: incomplete - pub distance: u64, // Default 0: no distance -} - -/// The kind of consolidation. -#[derive(Debug, Clone, PartialEq, Eq, Copy)] -pub enum ConsolidationMode { - /// No consolidation applied: multiple samples may be received for the same key-timestamp. - None, - /// Monotonic consolidation immediately forwards samples, except if one with an equal or more recent timestamp - /// has already been sent with the same key. - /// - /// This optimizes latency while potentially reducing bandwidth. - /// - /// Note that this doesn't cause re-ordering, but drops the samples for which a more recent timestamp has already - /// been observed with the same key. - Monotonic, - /// Holds back samples to only send the set of samples that had the highest timestamp for their key. - Latest, -} - -/// The `zenoh::queryable::Queryable`s that should be target of a `zenoh::Session::get()`. -#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] -pub enum QueryTarget { - #[default] - BestMatching, - All, - AllComplete, - #[cfg(feature = "complete_n")] - Complete(u64), +impl CongestionControl { + pub const DEFAULT: Self = Self::Drop; } diff --git a/commons/zenoh-protocol/src/core/wire_expr.rs b/commons/zenoh-protocol/src/core/wire_expr.rs index 7b0dee7471..6d9623d6ca 100644 --- a/commons/zenoh-protocol/src/core/wire_expr.rs +++ b/commons/zenoh-protocol/src/core/wire_expr.rs @@ -257,7 +257,7 @@ impl WireExpr<'_> { WireExpr { scope, suffix: suffix.into(), - mapping: Mapping::default(), + mapping: Mapping::DEFAULT, } } } diff --git a/commons/zenoh-protocol/src/network/declare.rs b/commons/zenoh-protocol/src/network/declare.rs index 76415d52f5..1568029cc6 100644 --- a/commons/zenoh-protocol/src/network/declare.rs +++ b/commons/zenoh-protocol/src/network/declare.rs @@ -156,6 +156,8 @@ pub enum Mode { } impl Mode { + pub const DEFAULT: Self = Self::Push; + #[cfg(feature = "test")] fn rand() -> Self { use rand::Rng; @@ -344,7 +346,7 @@ pub mod subscriber { /// - if P==1 then the subscription is pull, else it is push /// - rsv: Reserved /// ``` - #[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct SubscriberInfo { pub reliability: Reliability, pub mode: Mode, @@ -354,6 +356,11 @@ pub mod subscriber { pub const R: u64 = 1; pub const P: u64 = 1 << 1; + pub const DEFAULT: Self = Self { + reliability: Reliability::DEFAULT, + mode: Mode::DEFAULT, + }; + #[cfg(feature = "test")] pub fn rand() -> Self { let reliability = Reliability::rand(); @@ -363,6 +370,12 @@ pub mod subscriber { } } + impl Default for SubscriberInfo { + fn default() -> Self { + Self::DEFAULT + } + } + impl From for SubscriberInfo { fn from(ext: Info) -> Self { let reliability = if imsg::has_option(ext.value, SubscriberInfo::R) { @@ -502,13 +515,18 @@ pub mod queryable { /// +---------------+ /// ~ distance ~ /// +---------------+ - #[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct QueryableInfo { pub complete: u8, // Default 0: incomplete // @TODO: maybe a bitflag pub distance: u32, // Default 0: no distance } impl QueryableInfo { + pub const DEFAULT: Self = Self { + complete: 0, + distance: 0, + }; + #[cfg(feature = "test")] pub fn rand() -> Self { use rand::Rng; @@ -520,6 +538,12 @@ pub mod queryable { } } + impl Default for QueryableInfo { + fn default() -> Self { + Self::DEFAULT + } + } + impl From for QueryableInfo { fn from(ext: Info) -> Self { let complete = ext.value as u8; diff --git a/commons/zenoh-protocol/src/network/mod.rs b/commons/zenoh-protocol/src/network/mod.rs index 1be58db5cc..6807488873 100644 --- a/commons/zenoh-protocol/src/network/mod.rs +++ b/commons/zenoh-protocol/src/network/mod.rs @@ -51,6 +51,8 @@ pub enum Mapping { } impl Mapping { + pub const DEFAULT: Self = Self::Receiver; + #[cfg(feature = "test")] pub fn rand() -> Self { use rand::Rng; @@ -226,6 +228,16 @@ pub mod ext { const D_FLAG: u8 = 0b00001000; const E_FLAG: u8 = 0b00010000; + pub const DEFAULT: Self = Self::new(Priority::DEFAULT, CongestionControl::DEFAULT, false); + + pub const DECLARE: Self = Self::new(Priority::DEFAULT, CongestionControl::Block, false); + pub const PUSH: Self = Self::new(Priority::DEFAULT, CongestionControl::Drop, false); + pub const REQUEST: Self = Self::new(Priority::DEFAULT, CongestionControl::Block, false); + pub const RESPONSE: Self = Self::new(Priority::DEFAULT, CongestionControl::Block, false); + pub const RESPONSE_FINAL: Self = + Self::new(Priority::DEFAULT, CongestionControl::Block, false); + pub const OAM: Self = Self::new(Priority::DEFAULT, CongestionControl::Block, false); + pub const fn new( priority: Priority, congestion_control: CongestionControl, @@ -275,35 +287,11 @@ pub mod ext { let inner: u8 = rng.gen(); Self { inner } } - - pub fn declare_default() -> Self { - Self::new(Priority::default(), CongestionControl::Block, false) - } - - pub fn push_default() -> Self { - Self::new(Priority::default(), CongestionControl::Drop, false) - } - - pub fn request_default() -> Self { - Self::new(Priority::default(), CongestionControl::Block, false) - } - - pub fn response_default() -> Self { - Self::new(Priority::default(), CongestionControl::Block, false) - } - - pub fn response_final_default() -> Self { - Self::new(Priority::default(), CongestionControl::Block, false) - } - - pub fn oam_default() -> Self { - Self::new(Priority::default(), CongestionControl::Block, false) - } } impl Default for QoSType<{ ID }> { fn default() -> Self { - Self::new(Priority::default(), CongestionControl::default(), false) + Self::new(Priority::DEFAULT, CongestionControl::DEFAULT, false) } } @@ -371,6 +359,9 @@ pub mod ext { } impl NodeIdType<{ ID }> { + // node_id == 0 means the message has been generated by the node itself + pub const DEFAULT: Self = Self { node_id: 0 }; + #[cfg(feature = "test")] pub fn rand() -> Self { use rand::Rng; @@ -382,8 +373,7 @@ pub mod ext { impl Default for NodeIdType<{ ID }> { fn default() -> Self { - // node_id == 0 means the message has been generated by the node itself - Self { node_id: 0 } + Self::DEFAULT } } diff --git a/commons/zenoh-protocol/src/network/request.rs b/commons/zenoh-protocol/src/network/request.rs index 9e0137ea3a..aba6bb057a 100644 --- a/commons/zenoh-protocol/src/network/request.rs +++ b/commons/zenoh-protocol/src/network/request.rs @@ -66,7 +66,6 @@ pub struct Request { pub mod ext { use crate::{ common::{ZExtZ64, ZExtZBuf}, - core::QueryTarget, zextz64, zextzbuf, }; use core::{num::NonZeroU32, time::Duration}; @@ -88,9 +87,19 @@ pub mod ext { /// +---------------+ /// /// The `zenoh::queryable::Queryable`s that should be target of a `zenoh::Session::get()`. - pub type TargetType = QueryTarget; + #[derive(Debug, Default, Clone, Copy, PartialEq, Eq)] + pub enum TargetType { + #[default] + BestMatching, + All, + AllComplete, + #[cfg(feature = "complete_n")] + Complete(u64), + } impl TargetType { + pub const DEFAULT: Self = Self::BestMatching; + #[cfg(feature = "test")] pub fn rand() -> Self { use rand::prelude::*; diff --git a/commons/zenoh-protocol/src/transport/mod.rs b/commons/zenoh-protocol/src/transport/mod.rs index cdf994e5dd..307389f8c9 100644 --- a/commons/zenoh-protocol/src/transport/mod.rs +++ b/commons/zenoh-protocol/src/transport/mod.rs @@ -75,13 +75,18 @@ pub enum TransportBodyLowLatency { pub type TransportSn = u32; -#[derive(Debug, Copy, Clone, PartialEq, Eq, Default)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub struct PrioritySn { pub reliable: TransportSn, pub best_effort: TransportSn, } impl PrioritySn { + pub const DEFAULT: Self = Self { + reliable: TransportSn::MIN, + best_effort: TransportSn::MIN, + }; + #[cfg(feature = "test")] pub fn rand() -> Self { use rand::Rng; @@ -252,7 +257,8 @@ pub mod ext { } impl QoSType<{ ID }> { - pub const P_MASK: u8 = 0b00000111; + const P_MASK: u8 = 0b00000111; + pub const DEFAULT: Self = Self::new(Priority::DEFAULT); pub const fn new(priority: Priority) -> Self { Self { @@ -276,7 +282,7 @@ pub mod ext { impl Default for QoSType<{ ID }> { fn default() -> Self { - Self::new(Priority::default()) + Self::DEFAULT } } diff --git a/commons/zenoh-protocol/src/zenoh/ack.rs b/commons/zenoh-protocol/src/zenoh/ack.rs deleted file mode 100644 index d40bf58791..0000000000 --- a/commons/zenoh-protocol/src/zenoh/ack.rs +++ /dev/null @@ -1,84 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -use crate::common::ZExtUnknown; -use alloc::vec::Vec; -use uhlc::Timestamp; - -/// # Ack message -/// -/// ```text -/// Flags: -/// - T: Timestamp If T==1 then the timestamp if present -/// - X: Reserved -/// - Z: Extension If Z==1 then at least one extension is present -/// -/// 7 6 5 4 3 2 1 0 -/// +-+-+-+-+-+-+-+-+ -/// |Z|X|T| ACK | -/// +-+-+-+---------+ -/// ~ ts: ~ if T==1 -/// +---------------+ -/// ~ [err_exts] ~ if Z==1 -/// +---------------+ -/// ``` -pub mod flag { - pub const T: u8 = 1 << 5; // 0x20 Timestamp if T==0 then the timestamp if present - // pub const X: u8 = 1 << 6; // 0x40 Reserved - pub const Z: u8 = 1 << 7; // 0x80 Extensions if Z==1 then an extension will follow -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct Ack { - pub timestamp: Option, - pub ext_sinfo: Option, - pub ext_unknown: Vec, -} - -pub mod ext { - use crate::{common::ZExtZBuf, zextzbuf}; - - /// # SourceInfo extension - /// Used to carry additional information about the source of data - pub type SourceInfo = zextzbuf!(0x1, false); - pub type SourceInfoType = crate::zenoh::ext::SourceInfoType<{ SourceInfo::ID }>; -} - -impl Ack { - #[cfg(feature = "test")] - pub fn rand() -> Self { - use crate::{common::iext, core::ZenohId}; - use rand::Rng; - let mut rng = rand::thread_rng(); - - let timestamp = rng.gen_bool(0.5).then_some({ - let time = uhlc::NTP64(rng.gen()); - let id = uhlc::ID::try_from(ZenohId::rand().to_le_bytes()).unwrap(); - Timestamp::new(time, id) - }); - let ext_sinfo = rng.gen_bool(0.5).then_some(ext::SourceInfoType::rand()); - let mut ext_unknown = Vec::new(); - for _ in 0..rng.gen_range(0..4) { - ext_unknown.push(ZExtUnknown::rand2( - iext::mid(ext::SourceInfo::ID) + 1, - false, - )); - } - - Self { - timestamp, - ext_sinfo, - ext_unknown, - } - } -} diff --git a/commons/zenoh-protocol/src/zenoh/mod.rs b/commons/zenoh-protocol/src/zenoh/mod.rs index a23eaa9b21..d73d8cdd06 100644 --- a/commons/zenoh-protocol/src/zenoh/mod.rs +++ b/commons/zenoh-protocol/src/zenoh/mod.rs @@ -11,7 +11,6 @@ // Contributors: // ZettaScale Zenoh Team, // -pub mod ack; pub mod del; pub mod err; pub mod pull; @@ -20,7 +19,6 @@ pub mod query; pub mod reply; use crate::core::Encoding; -pub use ack::Ack; pub use del::Del; pub use err::Err; pub use pull::Pull; @@ -35,8 +33,7 @@ pub mod id { pub const QUERY: u8 = 0x03; pub const REPLY: u8 = 0x04; pub const ERR: u8 = 0x05; - pub const ACK: u8 = 0x06; - pub const PULL: u8 = 0x07; + pub const PULL: u8 = 0x06; } // DataInfo @@ -127,7 +124,6 @@ impl From for RequestBody { #[derive(Debug, Clone, PartialEq, Eq)] pub enum ResponseBody { Reply(Reply), - Ack(Ack), Err(Err), Put(Put), } @@ -138,11 +134,10 @@ impl ResponseBody { use rand::Rng; let mut rng = rand::thread_rng(); - match rng.gen_range(0..4) { + match rng.gen_range(0..3) { 0 => ResponseBody::Reply(Reply::rand()), - 1 => ResponseBody::Ack(Ack::rand()), - 2 => ResponseBody::Err(Err::rand()), - 3 => ResponseBody::Put(Put::rand()), + 1 => ResponseBody::Err(Err::rand()), + 2 => ResponseBody::Put(Put::rand()), _ => unreachable!(), } } @@ -160,12 +155,6 @@ impl From for ResponseBody { } } -impl From for ResponseBody { - fn from(r: Ack) -> ResponseBody { - ResponseBody::Ack(r) - } -} - pub mod ext { use zenoh_buffers::ZBuf; diff --git a/commons/zenoh-protocol/src/zenoh/query.rs b/commons/zenoh-protocol/src/zenoh/query.rs index 17dfa23df8..ac53b963f5 100644 --- a/commons/zenoh-protocol/src/zenoh/query.rs +++ b/commons/zenoh-protocol/src/zenoh/query.rs @@ -11,7 +11,7 @@ // Contributors: // ZettaScale Zenoh Team, // -use crate::{common::ZExtUnknown, core::ConsolidationMode}; +use crate::common::ZExtUnknown; use alloc::{string::String, vec::Vec}; /// The kind of consolidation. @@ -38,6 +38,8 @@ pub enum Consolidation { } impl Consolidation { + pub const DEFAULT: Self = Self::Auto; + #[cfg(feature = "test")] pub fn rand() -> Self { use rand::prelude::SliceRandom; @@ -55,16 +57,6 @@ impl Consolidation { } } -impl From for Consolidation { - fn from(val: ConsolidationMode) -> Self { - match val { - ConsolidationMode::None => Consolidation::None, - ConsolidationMode::Monotonic => Consolidation::Monotonic, - ConsolidationMode::Latest => Consolidation::Latest, - } - } -} - /// # Query message /// /// ```text diff --git a/examples/examples/z_pub_thr.rs b/examples/examples/z_pub_thr.rs index 3e130e0608..b698cbc80b 100644 --- a/examples/examples/z_pub_thr.rs +++ b/examples/examples/z_pub_thr.rs @@ -23,7 +23,7 @@ fn main() { env_logger::init(); let args = Args::parse(); - let mut prio = Priority::default(); + let mut prio = Priority::DEFAULT; if let Some(p) = args.priority { prio = p.try_into().unwrap(); } diff --git a/io/zenoh-transport/src/common/batch.rs b/io/zenoh-transport/src/common/batch.rs index 4139a65a05..a6aad76f7b 100644 --- a/io/zenoh-transport/src/common/batch.rs +++ b/io/zenoh-transport/src/common/batch.rs @@ -574,12 +574,12 @@ mod tests { let tmsg: TransportMessage = KeepAlive.into(); let nmsg: NetworkMessage = Push { wire_expr: WireExpr::empty(), - ext_qos: ext::QoSType::new(Priority::default(), CongestionControl::Block, false), + ext_qos: ext::QoSType::new(Priority::DEFAULT, CongestionControl::Block, false), ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, payload: PushBody::Put(Put { timestamp: None, - encoding: Encoding::default(), + encoding: Encoding::DEFAULT, ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, @@ -601,7 +601,7 @@ mod tests { let mut frame = FrameHeader { reliability: Reliability::Reliable, sn: 0, - ext_qos: frame::ext::QoSType::default(), + ext_qos: frame::ext::QoSType::DEFAULT, }; // Serialize with a frame diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index 954c656280..eebf23abc9 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -513,7 +513,7 @@ impl TransmissionPipeline { let mut stage_in = vec![]; let mut stage_out = vec![]; - let default_queue_size = [config.queue_size[Priority::default() as usize]]; + let default_queue_size = [config.queue_size[Priority::DEFAULT as usize]]; let size_iter = if priority.len() == 1 { default_queue_size.iter() } else { @@ -602,7 +602,7 @@ impl TransmissionPipelineProducer { let priority = msg.priority(); (priority as usize, priority) } else { - (0, Priority::default()) + (0, Priority::DEFAULT) }; // Lock the channel. We are the only one that will be writing on it. let mut queue = zlock!(self.stage_in[idx]); @@ -751,10 +751,10 @@ mod tests { wire_expr: key, ext_qos: ext::QoSType::new(Priority::Control, CongestionControl::Block, false), ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, payload: PushBody::Put(Put { timestamp: None, - encoding: Encoding::default(), + encoding: Encoding::DEFAULT, ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, @@ -881,10 +881,10 @@ mod tests { wire_expr: key, ext_qos: ext::QoSType::new(Priority::Control, CongestionControl::Block, false), ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, payload: PushBody::Put(Put { timestamp: None, - encoding: Encoding::default(), + encoding: Encoding::DEFAULT, ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, @@ -993,10 +993,10 @@ mod tests { false, ), ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, payload: PushBody::Put(Put { timestamp: None, - encoding: Encoding::default(), + encoding: Encoding::DEFAULT, ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, diff --git a/io/zenoh-transport/src/multicast/link.rs b/io/zenoh-transport/src/multicast/link.rs index 21ed0b3fdf..b24c077c57 100644 --- a/io/zenoh-transport/src/multicast/link.rs +++ b/io/zenoh-transport/src/multicast/link.rs @@ -483,7 +483,7 @@ async fn tx_task( .collect::>(); let (next_sn, ext_qos) = if next_sns.len() == Priority::NUM { let tmp: [PrioritySn; Priority::NUM] = next_sns.try_into().unwrap(); - (PrioritySn::default(), Some(Box::new(tmp))) + (PrioritySn::DEFAULT, Some(Box::new(tmp))) } else { (next_sns[0], None) }; diff --git a/io/zenoh-transport/src/multicast/rx.rs b/io/zenoh-transport/src/multicast/rx.rs index 14f2fd619c..dedef2149c 100644 --- a/io/zenoh-transport/src/multicast/rx.rs +++ b/io/zenoh-transport/src/multicast/rx.rs @@ -145,7 +145,7 @@ impl TransportMulticastInner { let priority = ext_qos.priority(); let c = if self.is_qos() { &peer.priority_rx[priority as usize] - } else if priority == Priority::default() { + } else if priority == Priority::DEFAULT { &peer.priority_rx[0] } else { bail!( @@ -181,7 +181,7 @@ impl TransportMulticastInner { let priority = ext_qos.priority(); let c = if self.is_qos() { &peer.priority_rx[priority as usize] - } else if priority == Priority::default() { + } else if priority == Priority::DEFAULT { &peer.priority_rx[0] } else { bail!( diff --git a/io/zenoh-transport/src/shm.rs b/io/zenoh-transport/src/shm.rs index 8b0e93f494..6f98cafc14 100644 --- a/io/zenoh-transport/src/shm.rs +++ b/io/zenoh-transport/src/shm.rs @@ -167,7 +167,6 @@ pub fn map_zmsg_to_shminfo(msg: &mut NetworkMessage) -> ZResult { ResponseBody::Reply(b) => b.map_to_shminfo(), ResponseBody::Put(b) => b.map_to_shminfo(), ResponseBody::Err(b) => b.map_to_shminfo(), - ResponseBody::Ack(_) => Ok(false), }, NetworkBody::ResponseFinal(_) | NetworkBody::Declare(_) | NetworkBody::OAM(_) => Ok(false), } @@ -222,7 +221,6 @@ pub fn map_zmsg_to_shmbuf( ResponseBody::Put(b) => b.map_to_shmbuf(shmr), ResponseBody::Err(b) => b.map_to_shmbuf(shmr), ResponseBody::Reply(b) => b.map_to_shmbuf(shmr), - ResponseBody::Ack(_) => Ok(false), }, NetworkBody::ResponseFinal(_) | NetworkBody::Declare(_) | NetworkBody::OAM(_) => Ok(false), } diff --git a/io/zenoh-transport/src/unicast/establishment/cookie.rs b/io/zenoh-transport/src/unicast/establishment/cookie.rs index e9916be7e6..0db9e1c93a 100644 --- a/io/zenoh-transport/src/unicast/establishment/cookie.rs +++ b/io/zenoh-transport/src/unicast/establishment/cookie.rs @@ -11,7 +11,6 @@ // Contributors: // ZettaScale Zenoh Team, // -// use super::properties::EstablishmentProperties; use crate::unicast::establishment::ext; use std::convert::TryFrom; use zenoh_buffers::{ diff --git a/io/zenoh-transport/src/unicast/establishment/properties.rs b/io/zenoh-transport/src/unicast/establishment/properties.rs deleted file mode 100644 index e259b650ab..0000000000 --- a/io/zenoh-transport/src/unicast/establishment/properties.rs +++ /dev/null @@ -1,132 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -use std::{ - convert::TryFrom, - ops::{Deref, DerefMut}, -}; -use zenoh_buffers::{reader::HasReader, writer::HasWriter, ZBuf}; -use zenoh_codec::{RCodec, WCodec, Zenoh080}; -use zenoh_protocol::core::Property; -use zenoh_result::{bail, zerror, Error as ZError, ZResult}; - -#[derive(Clone, Debug, PartialEq, Eq)] -pub struct EstablishmentProperties(Vec); - -impl Deref for EstablishmentProperties { - type Target = Vec; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl DerefMut for EstablishmentProperties { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -impl EstablishmentProperties { - pub(super) fn new() -> Self { - EstablishmentProperties(vec![]) - } - - pub(super) fn insert(&mut self, p: Property) -> ZResult<()> { - if self.0.iter().any(|x| x.key == p.key) { - bail!("Property {} already exists", p.key) - } - self.0.push(p); - Ok(()) - } - - pub(super) fn remove(&mut self, key: u64) -> Option { - self.0 - .iter() - .position(|x| x.key == key) - .map(|i| self.0.remove(i)) - } -} - -impl TryFrom<&EstablishmentProperties> for Attachment { - type Error = ZError; - - fn try_from(eps: &EstablishmentProperties) -> Result { - if eps.is_empty() { - bail!("Can not create an attachment with zero properties") - } - - let mut zbuf = ZBuf::empty(); - let mut writer = zbuf.writer(); - let codec = Zenoh080::new(); - - codec - .write(&mut writer, eps.0.as_slice()) - .map_err(|_| zerror!(""))?; - - let attachment = Attachment::new(zbuf); - Ok(attachment) - } -} - -impl TryFrom> for EstablishmentProperties { - type Error = ZError; - - fn try_from(mut ps: Vec) -> Result { - let mut eps = EstablishmentProperties::new(); - for p in ps.drain(..) { - eps.insert(p)?; - } - - Ok(eps) - } -} - -impl TryFrom<&Attachment> for EstablishmentProperties { - type Error = ZError; - - fn try_from(att: &Attachment) -> Result { - let mut reader = att.buffer.reader(); - let codec = Zenoh080::new(); - - let ps: Vec = codec.read(&mut reader).map_err(|_| zerror!(""))?; - EstablishmentProperties::try_from(ps) - } -} - -impl EstablishmentProperties { - #[cfg(test)] - pub fn rand() -> Self { - use rand::Rng; - - const MIN: usize = 1; - const MAX: usize = 8; - - let mut rng = rand::thread_rng(); - - let mut eps = EstablishmentProperties::new(); - for _ in MIN..=MAX { - loop { - let key: u64 = rng.gen(); - let mut value = vec![0u8; rng.gen_range(MIN..=MAX)]; - rng.fill(&mut value[..]); - let p = Property { key, value }; - if eps.insert(p).is_ok() { - break; - } - } - } - - eps - } -} diff --git a/io/zenoh-transport/src/unicast/universal/rx.rs b/io/zenoh-transport/src/unicast/universal/rx.rs index 935a1814b0..04af432aef 100644 --- a/io/zenoh-transport/src/unicast/universal/rx.rs +++ b/io/zenoh-transport/src/unicast/universal/rx.rs @@ -81,7 +81,7 @@ impl TransportUnicastUniversal { let priority = ext_qos.priority(); let c = if self.is_qos() { &self.priority_rx[priority as usize] - } else if priority == Priority::default() { + } else if priority == Priority::DEFAULT { &self.priority_rx[0] } else { bail!( @@ -124,7 +124,7 @@ impl TransportUnicastUniversal { let c = if self.is_qos() { &self.priority_rx[qos.priority() as usize] - } else if qos.priority() == Priority::default() { + } else if qos.priority() == Priority::DEFAULT { &self.priority_rx[0] } else { bail!( diff --git a/io/zenoh-transport/tests/multicast_compression.rs b/io/zenoh-transport/tests/multicast_compression.rs index f8e56a5484..4d1196e10f 100644 --- a/io/zenoh-transport/tests/multicast_compression.rs +++ b/io/zenoh-transport/tests/multicast_compression.rs @@ -269,11 +269,11 @@ mod tests { wire_expr: "test".into(), ext_qos: QoSType::new(channel.priority, CongestionControl::Block, false), ext_tstamp: None, - ext_nodeid: NodeIdType::default(), + ext_nodeid: NodeIdType::DEFAULT, payload: Put { payload: vec![0u8; msg_size].into(), timestamp: None, - encoding: Encoding::default(), + encoding: Encoding::DEFAULT, ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, @@ -363,7 +363,7 @@ mod tests { // Define the reliability and congestion control let channel = [ Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::BestEffort, }, Channel { diff --git a/io/zenoh-transport/tests/multicast_transport.rs b/io/zenoh-transport/tests/multicast_transport.rs index ebb290af1e..fe5a44b7ee 100644 --- a/io/zenoh-transport/tests/multicast_transport.rs +++ b/io/zenoh-transport/tests/multicast_transport.rs @@ -265,11 +265,11 @@ mod tests { wire_expr: "test".into(), ext_qos: QoSType::new(channel.priority, CongestionControl::Block, false), ext_tstamp: None, - ext_nodeid: NodeIdType::default(), + ext_nodeid: NodeIdType::DEFAULT, payload: Put { payload: vec![0u8; msg_size].into(), timestamp: None, - encoding: Encoding::default(), + encoding: Encoding::DEFAULT, ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, @@ -359,7 +359,7 @@ mod tests { // Define the reliability and congestion control let channel = [ Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::BestEffort, }, Channel { diff --git a/io/zenoh-transport/tests/unicast_compression.rs b/io/zenoh-transport/tests/unicast_compression.rs index 323c6f529e..dd4f55b5f5 100644 --- a/io/zenoh-transport/tests/unicast_compression.rs +++ b/io/zenoh-transport/tests/unicast_compression.rs @@ -297,11 +297,11 @@ mod tests { wire_expr: "test".into(), ext_qos: QoSType::new(channel.priority, cctrl, false), ext_tstamp: None, - ext_nodeid: NodeIdType::default(), + ext_nodeid: NodeIdType::DEFAULT, payload: Put { payload: vec![0u8; msg_size].into(), timestamp: None, - encoding: Encoding::default(), + encoding: Encoding::DEFAULT, ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, @@ -442,7 +442,7 @@ mod tests { // Define the reliability and congestion control let channel = [ Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::Reliable, }, Channel { @@ -472,7 +472,7 @@ mod tests { // Define the reliability and congestion control let channel = [ Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::Reliable, }, Channel { @@ -505,7 +505,7 @@ mod tests { // Define the reliability and congestion control let channel = [ Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::BestEffort, }, Channel { @@ -535,7 +535,7 @@ mod tests { // Define the reliability and congestion control let channel = [ Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::BestEffort, }, Channel { diff --git a/io/zenoh-transport/tests/unicast_concurrent.rs b/io/zenoh-transport/tests/unicast_concurrent.rs index d13f763b68..4e90432193 100644 --- a/io/zenoh-transport/tests/unicast_concurrent.rs +++ b/io/zenoh-transport/tests/unicast_concurrent.rs @@ -194,13 +194,13 @@ async fn transport_concurrent(endpoint01: Vec, endpoint02: Vec, endpoint02: Vec, client_transport: TransportUn wire_expr: "test".into(), ext_qos: QoSType::new(*p, CongestionControl::Block, false), ext_tstamp: None, - ext_nodeid: NodeIdType::default(), + ext_nodeid: NodeIdType::DEFAULT, payload: Put { payload: vec![0u8; *ms].into(), timestamp: None, - encoding: Encoding::default(), + encoding: Encoding::DEFAULT, ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, diff --git a/io/zenoh-transport/tests/unicast_shm.rs b/io/zenoh-transport/tests/unicast_shm.rs index f9180849af..d12a9db7dc 100644 --- a/io/zenoh-transport/tests/unicast_shm.rs +++ b/io/zenoh-transport/tests/unicast_shm.rs @@ -271,13 +271,13 @@ mod tests { let message: NetworkMessage = Push { wire_expr: "test".into(), - ext_qos: QoSType::new(Priority::default(), CongestionControl::Block, false), + ext_qos: QoSType::new(Priority::DEFAULT, CongestionControl::Block, false), ext_tstamp: None, - ext_nodeid: NodeIdType::default(), + ext_nodeid: NodeIdType::DEFAULT, payload: Put { payload: sbuf.into(), timestamp: None, - encoding: Encoding::default(), + encoding: Encoding::DEFAULT, ext_sinfo: None, ext_shm: None, ext_attachment: None, @@ -319,13 +319,13 @@ mod tests { let message: NetworkMessage = Push { wire_expr: "test".into(), - ext_qos: QoSType::new(Priority::default(), CongestionControl::Block, false), + ext_qos: QoSType::new(Priority::DEFAULT, CongestionControl::Block, false), ext_tstamp: None, - ext_nodeid: NodeIdType::default(), + ext_nodeid: NodeIdType::DEFAULT, payload: Put { payload: sbuf.into(), timestamp: None, - encoding: Encoding::default(), + encoding: Encoding::DEFAULT, ext_sinfo: None, ext_shm: None, ext_attachment: None, diff --git a/io/zenoh-transport/tests/unicast_simultaneous.rs b/io/zenoh-transport/tests/unicast_simultaneous.rs index 19380eb49e..db73e99480 100644 --- a/io/zenoh-transport/tests/unicast_simultaneous.rs +++ b/io/zenoh-transport/tests/unicast_simultaneous.rs @@ -78,11 +78,11 @@ mod tests { wire_expr: "test".into(), ext_qos: QoSType::new(Priority::Control, CongestionControl::Block, false), ext_tstamp: None, - ext_nodeid: NodeIdType::default(), + ext_nodeid: NodeIdType::DEFAULT, payload: Put { payload: vec![0u8; MSG_SIZE].into(), timestamp: None, - encoding: Encoding::default(), + encoding: Encoding::DEFAULT, ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, diff --git a/io/zenoh-transport/tests/unicast_transport.rs b/io/zenoh-transport/tests/unicast_transport.rs index 11839aef2a..795ea90b41 100644 --- a/io/zenoh-transport/tests/unicast_transport.rs +++ b/io/zenoh-transport/tests/unicast_transport.rs @@ -468,11 +468,11 @@ async fn test_transport( wire_expr: "test".into(), ext_qos: QoSType::new(channel.priority, cctrl, false), ext_tstamp: None, - ext_nodeid: NodeIdType::default(), + ext_nodeid: NodeIdType::DEFAULT, payload: Put { payload: vec![0u8; msg_size].into(), timestamp: None, - encoding: Encoding::default(), + encoding: Encoding::DEFAULT, ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, @@ -614,7 +614,7 @@ fn transport_unicast_tcp_only() { // Define the reliability and congestion control let channel = [ Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::Reliable, }, Channel { @@ -644,7 +644,7 @@ fn transport_unicast_tcp_only_with_lowlatency_transport() { // Define the reliability and congestion control let channel = [ Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::Reliable, }, Channel { @@ -677,7 +677,7 @@ fn transport_unicast_udp_only() { // Define the reliability and congestion control let channel = [ Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::BestEffort, }, Channel { @@ -707,7 +707,7 @@ fn transport_unicast_udp_only_with_lowlatency_transport() { // Define the reliability and congestion control let channel = [ Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::BestEffort, }, Channel { @@ -739,7 +739,7 @@ fn transport_unicast_unix_only() { // Define the reliability and congestion control let channel = [ Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::BestEffort, }, Channel { @@ -773,7 +773,7 @@ fn transport_unicast_unix_only_with_lowlatency_transport() { // Define the reliability and congestion control let channel = [ Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::BestEffort, }, Channel { @@ -808,11 +808,11 @@ fn transport_unicast_ws_only() { // Define the reliability and congestion control let channel = [ Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::Reliable, }, Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::BestEffort, }, Channel { @@ -846,11 +846,11 @@ fn transport_unicast_ws_only_with_lowlatency_transport() { // Define the reliability and congestion control let channel = [ Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::Reliable, }, Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::BestEffort, }, Channel { @@ -887,7 +887,7 @@ fn transport_unicast_unixpipe_only() { // Define the reliability and congestion control let channel = [ Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::Reliable, }, Channel { @@ -921,7 +921,7 @@ fn transport_unicast_unixpipe_only_with_lowlatency_transport() { // Define the reliability and congestion control let channel = [ Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::Reliable, }, Channel { @@ -956,7 +956,7 @@ fn transport_unicast_tcp_udp() { // Define the reliability and congestion control let channel = [ Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::BestEffort, }, Channel { @@ -996,7 +996,7 @@ fn transport_unicast_tcp_unix() { // Define the reliability and congestion control let channel = [ Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::BestEffort, }, Channel { @@ -1038,7 +1038,7 @@ fn transport_unicast_udp_unix() { // Define the reliability and congestion control let channel = [ Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::BestEffort, }, Channel { @@ -1083,7 +1083,7 @@ fn transport_unicast_tcp_udp_unix() { // Define the reliability and congestion control let channel = [ Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::BestEffort, }, Channel { @@ -1130,11 +1130,11 @@ fn transport_unicast_tls_only_server() { // Define the reliability and congestion control let channel = [ Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::Reliable, }, Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::BestEffort, }, Channel { @@ -1184,11 +1184,11 @@ fn transport_unicast_quic_only_server() { // Define the reliability and congestion control let channel = [ Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::Reliable, }, Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::BestEffort, }, Channel { @@ -1256,11 +1256,11 @@ fn transport_unicast_tls_only_mutual_success() { // Define the reliability and congestion control let channel = [ Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::Reliable, }, Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::BestEffort, }, Channel { @@ -1323,11 +1323,11 @@ fn transport_unicast_tls_only_mutual_no_client_certs_failure() { // Define the reliability and congestion control let channel = [ Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::Reliable, }, Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::BestEffort, }, Channel { @@ -1403,11 +1403,11 @@ fn transport_unicast_tls_only_mutual_wrong_client_certs_failure() { // Define the reliability and congestion control let channel = [ Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::Reliable, }, Channel { - priority: Priority::default(), + priority: Priority::DEFAULT, reliability: Reliability::BestEffort, }, Channel { diff --git a/zenoh-ext/src/subscriber_ext.rs b/zenoh-ext/src/subscriber_ext.rs index a2987f8833..83de47779c 100644 --- a/zenoh-ext/src/subscriber_ext.rs +++ b/zenoh-ext/src/subscriber_ext.rs @@ -290,7 +290,7 @@ impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> session: self.session, key_expr: self.key_expr, key_space: crate::LivelinessSpace, - reliability: Reliability::default(), + reliability: Reliability::DEFAULT, origin: Locality::default(), fetch, handler: self.handler, @@ -334,11 +334,11 @@ impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> session: self.session, key_expr: self.key_expr, key_space: crate::LivelinessSpace, - reliability: Reliability::default(), + reliability: Reliability::DEFAULT, origin: Locality::default(), query_selector: None, - query_target: QueryTarget::default(), - query_consolidation: QueryConsolidation::default(), + query_target: QueryTarget::DEFAULT, + query_consolidation: QueryConsolidation::DEFAULT, query_accept_replies: ReplyKeyExpr::MatchingQuery, query_timeout: Duration::from_secs(10), handler: self.handler, diff --git a/zenoh/src/key_expr.rs b/zenoh/src/key_expr.rs index d2295f9798..36c696000a 100644 --- a/zenoh/src/key_expr.rs +++ b/zenoh/src/key_expr.rs @@ -633,9 +633,9 @@ impl SyncResolve for KeyExprUndeclaration<'_> { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(zenoh_protocol::network::Declare { - ext_qos: declare::ext::QoSType::declare_default(), + ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: declare::ext::NodeIdType::default(), + ext_nodeid: declare::ext::NodeIdType::DEFAULT, body: DeclareBody::UndeclareKeyExpr(UndeclareKeyExpr { id: expr_id }), }); diff --git a/zenoh/src/liveliness.rs b/zenoh/src/liveliness.rs index 0883041bb7..26a803fa43 100644 --- a/zenoh/src/liveliness.rs +++ b/zenoh/src/liveliness.rs @@ -549,7 +549,7 @@ where &Some(KeyExpr::from(*KE_PREFIX_LIVELINESS)), Locality::default(), callback, - &SubscriberInfo::default(), + &SubscriberInfo::DEFAULT, ) .map(|sub_state| Subscriber { subscriber: SubscriberInner { @@ -747,8 +747,8 @@ where .query( &self.key_expr?.into(), &Some(KeyExpr::from(*KE_PREFIX_LIVELINESS)), - QueryTarget::default(), - QueryConsolidation::default(), + QueryTarget::DEFAULT, + QueryConsolidation::DEFAULT, Locality::default(), self.timeout, None, diff --git a/zenoh/src/net/routing/dispatcher/pubsub.rs b/zenoh/src/net/routing/dispatcher/pubsub.rs index ffe2d3ccca..d6497a80b3 100644 --- a/zenoh/src/net/routing/dispatcher/pubsub.rs +++ b/zenoh/src/net/routing/dispatcher/pubsub.rs @@ -593,9 +593,9 @@ pub fn pull_data(tables_ref: &RwLock, face: &Arc, expr: WireE for (key_expr, payload) in route { face.primitives.send_push(Push { wire_expr: key_expr, - ext_qos: ext::QoSType::push_default(), + ext_qos: ext::QoSType::PUSH, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, payload, }); } diff --git a/zenoh/src/net/routing/dispatcher/queries.rs b/zenoh/src/net/routing/dispatcher/queries.rs index a6748650ab..e8e84395f8 100644 --- a/zenoh/src/net/routing/dispatcher/queries.rs +++ b/zenoh/src/net/routing/dispatcher/queries.rs @@ -494,7 +494,6 @@ macro_rules! inc_res_stats { e.ext_body.as_ref().map(|b| b.payload.len()).unwrap_or(0), ); } - ResponseBody::Ack(_) => (), } } } @@ -555,14 +554,14 @@ pub fn route_query( for (wexpr, payload) in local_replies { let payload = ResponseBody::Reply(Reply { - consolidation: Consolidation::default(), // @TODO: handle Del case - ext_unknown: vec![], // @TODO: handle unknown extensions + consolidation: Consolidation::DEFAULT, // @TODO: handle Del case + ext_unknown: vec![], // @TODO: handle unknown extensions payload: ReplyBody::Put(Put { // @TODO: handle Del case - timestamp: None, // @TODO: handle timestamp - encoding: Encoding::default(), // @TODO: handle encoding - ext_sinfo: None, // @TODO: handle source info - ext_attachment: None, // @TODO: expose it in the API + timestamp: None, // @TODO: handle timestamp + encoding: Encoding::DEFAULT, // @TODO: handle encoding + ext_sinfo: None, // @TODO: handle source info + ext_attachment: None, // @TODO: expose it in the API #[cfg(feature = "shared-memory")] ext_shm: None, ext_unknown: vec![], // @TODO: handle unknown extensions @@ -583,7 +582,7 @@ pub fn route_query( rid: qid, wire_expr: wexpr, payload, - ext_qos: response::ext::QoSType::declare_default(), + ext_qos: response::ext::QoSType::DECLARE, ext_tstamp: None, ext_respid: Some(response::ext::ResponderIdType { zid, @@ -605,7 +604,7 @@ pub fn route_query( .send_response_final(RoutingContext::with_expr( ResponseFinal { rid: qid, - ext_qos: response::ext::QoSType::response_final_default(), + ext_qos: response::ext::QoSType::RESPONSE_FINAL, ext_tstamp: None, }, expr.full_expr().to_string(), @@ -636,7 +635,7 @@ pub fn route_query( Request { id: *qid, wire_expr: key_expr.into(), - ext_qos: ext::QoSType::request_default(), + ext_qos: ext::QoSType::REQUEST, ext_tstamp: None, ext_nodeid: ext::NodeIdType { node_id: *context }, ext_target: *t, @@ -672,7 +671,7 @@ pub fn route_query( Request { id: *qid, wire_expr: key_expr.into(), - ext_qos: ext::QoSType::request_default(), + ext_qos: ext::QoSType::REQUEST, ext_tstamp: None, ext_nodeid: ext::NodeIdType { node_id: *context }, ext_target: target, @@ -693,7 +692,7 @@ pub fn route_query( .send_response_final(RoutingContext::with_expr( ResponseFinal { rid: qid, - ext_qos: response::ext::QoSType::response_final_default(), + ext_qos: response::ext::QoSType::RESPONSE_FINAL, ext_tstamp: None, }, expr.full_expr().to_string(), @@ -711,7 +710,7 @@ pub fn route_query( .send_response_final(RoutingContext::with_expr( ResponseFinal { rid: qid, - ext_qos: response::ext::QoSType::response_final_default(), + ext_qos: response::ext::QoSType::RESPONSE_FINAL, ext_tstamp: None, }, "".to_string(), @@ -758,7 +757,7 @@ pub(crate) fn route_send_response( rid: query.src_qid, wire_expr: key_expr.to_owned(), payload: body, - ext_qos: response::ext::QoSType::response_default(), + ext_qos: response::ext::QoSType::RESPONSE, ext_tstamp: None, ext_respid, }, @@ -818,7 +817,7 @@ pub(crate) fn finalize_pending_query(query: Arc) { .send_response_final(RoutingContext::with_expr( ResponseFinal { rid: query.src_qid, - ext_qos: response::ext::QoSType::response_final_default(), + ext_qos: response::ext::QoSType::RESPONSE_FINAL, ext_tstamp: None, }, "".to_string(), diff --git a/zenoh/src/net/routing/dispatcher/resource.rs b/zenoh/src/net/routing/dispatcher/resource.rs index 7fc71c623d..fb4dec4ad5 100644 --- a/zenoh/src/net/routing/dispatcher/resource.rs +++ b/zenoh/src/net/routing/dispatcher/resource.rs @@ -466,9 +466,9 @@ impl Resource { .insert(expr_id, nonwild_prefix.clone()); face.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareKeyExpr(DeclareKeyExpr { id: expr_id, wire_expr: nonwild_prefix.expr().into(), diff --git a/zenoh/src/net/routing/hat/client/pubsub.rs b/zenoh/src/net/routing/hat/client/pubsub.rs index 7becff4b4d..6f71ef443a 100644 --- a/zenoh/src/net/routing/hat/client/pubsub.rs +++ b/zenoh/src/net/routing/hat/client/pubsub.rs @@ -49,9 +49,9 @@ fn propagate_simple_subscription_to( let key_expr = Resource::decl_key(res, dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id: 0, // @TODO use proper SubscriberId (#703) wire_expr: key_expr, @@ -137,9 +137,9 @@ fn declare_client_subscription( .primitives .send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id: 0, // @TODO use proper SubscriberId (#703) wire_expr: res.expr().into(), @@ -171,9 +171,9 @@ fn propagate_forget_simple_subscription(tables: &mut Tables, res: &Arc let wire_expr = Resource::get_best_key(res, "", face.id); face.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { id: 0, // @TODO use proper SubscriberId (#703) ext_wire_expr: WireExprType { wire_expr }, @@ -209,9 +209,9 @@ pub(super) fn undeclare_client_subscription( let wire_expr = Resource::get_best_key(res, "", face.id); face.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { id: 0, // @TODO use proper SubscriberId (#703) ext_wire_expr: WireExprType { wire_expr }, diff --git a/zenoh/src/net/routing/hat/client/queries.rs b/zenoh/src/net/routing/hat/client/queries.rs index 35a10557dc..667ff63c0e 100644 --- a/zenoh/src/net/routing/hat/client/queries.rs +++ b/zenoh/src/net/routing/hat/client/queries.rs @@ -96,9 +96,9 @@ fn propagate_simple_queryable( let key_expr = Resource::decl_key(res, &mut dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareQueryable(DeclareQueryable { id: 0, // @TODO use proper QueryableId (#703) wire_expr: key_expr, @@ -166,9 +166,9 @@ fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc let wire_expr = Resource::get_best_key(res, "", face.id); face.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { id: 0, // TODO ext_wire_expr: WireExprType { wire_expr }, @@ -431,9 +431,9 @@ pub(super) fn undeclare_client_subscription( let wire_expr = Resource::get_best_key(res, "", face.id); face.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { id: 0, // TODO ext_wire_expr: WireExprType { wire_expr }, @@ -467,9 +467,9 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { let key_expr = Resource::decl_key(sub, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id: 0, // TODO wire_expr: key_expr, diff --git a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs index 6281993c93..03a1e11e67 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs @@ -137,7 +137,7 @@ fn send_sourced_queryable_to_net_childs( someface.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType { node_id: routing_context, @@ -177,9 +177,9 @@ fn propagate_simple_queryable( let key_expr = Resource::decl_key(res, &mut dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareQueryable(DeclareQueryable { id: 0, // @TODO use proper QueryableId (#703) wire_expr: key_expr, @@ -347,7 +347,7 @@ fn send_forget_sourced_queryable_to_net_childs( someface.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType { node_id: routing_context, @@ -373,9 +373,9 @@ fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc) { let key_expr = Resource::decl_key(qabl, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareQueryable(DeclareQueryable { id: 0, // @TODO use proper QueryableId (#703) wire_expr: key_expr, diff --git a/zenoh/src/net/routing/hat/p2p_peer/gossip.rs b/zenoh/src/net/routing/hat/p2p_peer/gossip.rs index ae3fda51a7..cf4d201867 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/gossip.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/gossip.rs @@ -214,7 +214,7 @@ impl Network { Ok(NetworkBody::OAM(Oam { id: OAM_LINKSTATE, body: ZExtBody::ZBuf(buf), - ext_qos: oam::ext::QoSType::oam_default(), + ext_qos: oam::ext::QoSType::OAM, ext_tstamp: None, }) .into()) diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs index 8f91335f0a..97677893aa 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs @@ -49,9 +49,9 @@ fn propagate_simple_subscription_to( let key_expr = Resource::decl_key(res, dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id: 0, // @TODO use proper SubscriberId (#703) wire_expr: key_expr, @@ -137,9 +137,9 @@ fn declare_client_subscription( .primitives .send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id: 0, // @TODO use proper SubscriberId (#703) wire_expr: res.expr().into(), @@ -171,9 +171,9 @@ fn propagate_forget_simple_subscription(tables: &mut Tables, res: &Arc let wire_expr = Resource::get_best_key(res, "", face.id); face.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { id: 0, // @TODO use proper SubscriberId (#703) ext_wire_expr: WireExprType { wire_expr }, @@ -209,9 +209,9 @@ pub(super) fn undeclare_client_subscription( let wire_expr = Resource::get_best_key(res, "", face.id); face.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { id: 0, // @TODO use proper SubscriberId (#703) ext_wire_expr: WireExprType { wire_expr }, diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index 35a10557dc..667ff63c0e 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -96,9 +96,9 @@ fn propagate_simple_queryable( let key_expr = Resource::decl_key(res, &mut dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareQueryable(DeclareQueryable { id: 0, // @TODO use proper QueryableId (#703) wire_expr: key_expr, @@ -166,9 +166,9 @@ fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc let wire_expr = Resource::get_best_key(res, "", face.id); face.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { id: 0, // @TODO use proper SubscriberId (#703) ext_wire_expr: WireExprType { wire_expr }, @@ -422,9 +422,9 @@ fn propagate_forget_simple_subscription_to_peers(tables: &mut Tables, res: &Arc< let wire_expr = Resource::get_best_key(res, "", face.id); face.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { id: 0, // @TODO use proper SubscriberId (#703) ext_wire_expr: WireExprType { wire_expr }, @@ -587,9 +587,9 @@ pub(super) fn undeclare_client_subscription( let wire_expr = Resource::get_best_key(res, "", face.id); face.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { id: 0, // @TODO use proper SubscriberId (#703) ext_wire_expr: WireExprType { wire_expr }, @@ -623,9 +623,9 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { let key_expr = Resource::decl_key(sub, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id: 0, // @TODO use proper SubscriberId (#703) wire_expr: key_expr, @@ -650,9 +650,9 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { let key_expr = Resource::decl_key(sub, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id: 0, // @TODO use proper SubscriberId (#703) wire_expr: key_expr, @@ -790,9 +790,9 @@ pub(super) fn pubsub_linkstate_change(tables: &mut Tables, zid: &ZenohId, links: let wire_expr = Resource::get_best_key(res, "", dst_face.id); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::UndeclareSubscriber( UndeclareSubscriber { id: 0, // @TODO use proper SubscriberId (#703) @@ -815,9 +815,9 @@ pub(super) fn pubsub_linkstate_change(tables: &mut Tables, zid: &ZenohId, links: }; dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id: 0, // @TODO use proper SubscriberId (#703) wire_expr: key_expr, diff --git a/zenoh/src/net/routing/hat/router/queries.rs b/zenoh/src/net/routing/hat/router/queries.rs index 90944a524f..dfffe42e0d 100644 --- a/zenoh/src/net/routing/hat/router/queries.rs +++ b/zenoh/src/net/routing/hat/router/queries.rs @@ -208,7 +208,7 @@ fn send_sourced_queryable_to_net_childs( someface.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType { node_id: routing_context, @@ -258,9 +258,9 @@ fn propagate_simple_queryable( let key_expr = Resource::decl_key(res, &mut dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareQueryable(DeclareQueryable { id: 0, // @TODO use proper QueryableId (#703) wire_expr: key_expr, @@ -488,7 +488,7 @@ fn send_forget_sourced_queryable_to_net_childs( someface.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType { node_id: routing_context, @@ -514,9 +514,9 @@ fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc) { let key_expr = Resource::decl_key(qabl, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareQueryable(DeclareQueryable { id: 0, // @TODO use proper QueryableId (#703) wire_expr: key_expr, @@ -785,9 +785,9 @@ pub(super) fn queries_new_face(tables: &mut Tables, face: &mut Arc) { let key_expr = Resource::decl_key(qabl, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareQueryable(DeclareQueryable { id: 0, // @TODO use proper QueryableId (#703) wire_expr: key_expr, @@ -884,9 +884,9 @@ pub(super) fn queries_linkstate_change(tables: &mut Tables, zid: &ZenohId, links let wire_expr = Resource::get_best_key(res, "", dst_face.id); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::UndeclareQueryable( UndeclareQueryable { id: 0, // @TODO use proper QueryableId (#703) @@ -908,9 +908,9 @@ pub(super) fn queries_linkstate_change(tables: &mut Tables, zid: &ZenohId, links let key_expr = Resource::decl_key(res, dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareQueryable(DeclareQueryable { id: 0, // @TODO use proper QueryableId (#703) wire_expr: key_expr, diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index f6fb13e76e..227dd035f4 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -270,9 +270,9 @@ impl AdminSpace { zlock!(admin.primitives).replace(primitives.clone()); primitives.send_declare(Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareQueryable(DeclareQueryable { id: 0, // @TODO use proper QueryableId (#703) wire_expr: [&root_key, "/**"].concat().into(), @@ -284,13 +284,13 @@ impl AdminSpace { }); primitives.send_declare(Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id: 0, // @TODO use proper SubscriberId (#703) wire_expr: [&root_key, "/config/**"].concat().into(), - ext_info: SubscriberInfo::default(), + ext_info: SubscriberInfo::DEFAULT, }), }); } @@ -392,7 +392,7 @@ impl Primitives for AdminSpace { ); primitives.send_response_final(ResponseFinal { rid: msg.id, - ext_qos: ext::QoSType::response_final_default(), + ext_qos: ext::QoSType::RESPONSE_FINAL, ext_tstamp: None, }); return; @@ -405,7 +405,7 @@ impl Primitives for AdminSpace { log::error!("Unknown KeyExpr: {}", e); primitives.send_response_final(ResponseFinal { rid: msg.id, - ext_qos: ext::QoSType::response_final_default(), + ext_qos: ext::QoSType::RESPONSE_FINAL, ext_tstamp: None, }); return; diff --git a/zenoh/src/net/tests/tables.rs b/zenoh/src/net/tests/tables.rs index 363803f682..57f6a6dcbc 100644 --- a/zenoh/src/net/tests/tables.rs +++ b/zenoh/src/net/tests/tables.rs @@ -497,9 +497,9 @@ fn client_test() { Primitives::send_declare( primitives0.as_ref(), Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareKeyExpr(DeclareKeyExpr { id: 11, wire_expr: "test/client".into(), @@ -523,9 +523,9 @@ fn client_test() { Primitives::send_declare( primitives0.as_ref(), Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareKeyExpr(DeclareKeyExpr { id: 12, wire_expr: WireExpr::from(11).with_suffix("/z1_pub1"), @@ -544,9 +544,9 @@ fn client_test() { Primitives::send_declare( primitives1.as_ref(), Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareKeyExpr(DeclareKeyExpr { id: 21, wire_expr: "test/client".into(), @@ -570,9 +570,9 @@ fn client_test() { Primitives::send_declare( primitives1.as_ref(), Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareKeyExpr(DeclareKeyExpr { id: 22, wire_expr: WireExpr::from(21).with_suffix("/z2_pub1"), @@ -591,9 +591,9 @@ fn client_test() { Primitives::send_declare( primitives2.as_ref(), Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareKeyExpr(DeclareKeyExpr { id: 31, wire_expr: "test/client".into(), @@ -617,10 +617,10 @@ fn client_test() { &tables, &face0.upgrade().unwrap(), &"test/client/z1_wr1".into(), - ext::QoSType::default(), + ext::QoSType::DEFAULT, PushBody::Put(Put { timestamp: None, - encoding: Encoding::default(), + encoding: Encoding::DEFAULT, ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, @@ -650,10 +650,10 @@ fn client_test() { &router.tables, &face0.upgrade().unwrap(), &WireExpr::from(11).with_suffix("/z1_wr2"), - ext::QoSType::default(), + ext::QoSType::DEFAULT, PushBody::Put(Put { timestamp: None, - encoding: Encoding::default(), + encoding: Encoding::DEFAULT, ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, @@ -683,10 +683,10 @@ fn client_test() { &router.tables, &face1.upgrade().unwrap(), &"test/client/**".into(), - ext::QoSType::default(), + ext::QoSType::DEFAULT, PushBody::Put(Put { timestamp: None, - encoding: Encoding::default(), + encoding: Encoding::DEFAULT, ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, @@ -716,10 +716,10 @@ fn client_test() { &router.tables, &face0.upgrade().unwrap(), &12.into(), - ext::QoSType::default(), + ext::QoSType::DEFAULT, PushBody::Put(Put { timestamp: None, - encoding: Encoding::default(), + encoding: Encoding::DEFAULT, ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, @@ -749,10 +749,10 @@ fn client_test() { &router.tables, &face1.upgrade().unwrap(), &22.into(), - ext::QoSType::default(), + ext::QoSType::DEFAULT, PushBody::Put(Put { timestamp: None, - encoding: Encoding::default(), + encoding: Encoding::DEFAULT, ext_sinfo: None, #[cfg(feature = "shared-memory")] ext_shm: None, diff --git a/zenoh/src/prelude.rs b/zenoh/src/prelude.rs index 36a841d1ef..ad28470f63 100644 --- a/zenoh/src/prelude.rs +++ b/zenoh/src/prelude.rs @@ -50,9 +50,7 @@ pub(crate) mod common { pub use crate::sample::Locality; #[cfg(not(feature = "unstable"))] pub(crate) use crate::sample::Locality; - pub use crate::sample::Sample; - - pub use zenoh_protocol::core::SampleKind; + pub use crate::sample::{Sample, SampleKind}; pub use crate::publication::Priority; #[zenoh_macros::unstable] diff --git a/zenoh/src/publication.rs b/zenoh/src/publication.rs index 843190ad45..58c7c5c367 100644 --- a/zenoh/src/publication.rs +++ b/zenoh/src/publication.rs @@ -811,7 +811,7 @@ fn resolve_put( false, ), ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, payload: match kind { SampleKind::Put => { #[allow(unused_mut)] @@ -887,6 +887,8 @@ pub enum Priority { } impl Priority { + /// Default + pub const DEFAULT: Self = Self::Data; /// The lowest Priority pub const MIN: Self = Self::Background; /// The highest Priority @@ -1328,7 +1330,6 @@ mod tests { #[test] fn sample_kind_integrity_in_publication() { use crate::{open, prelude::sync::*}; - use zenoh_protocol::core::SampleKind; const KEY_EXPR: &str = "test/sample_kind_integrity/publication"; const VALUE: &str = "zenoh"; @@ -1351,7 +1352,6 @@ mod tests { #[test] fn sample_kind_integrity_in_put_builder() { use crate::{open, prelude::sync::*}; - use zenoh_protocol::core::SampleKind; const KEY_EXPR: &str = "test/sample_kind_integrity/put_builder"; const VALUE: &str = "zenoh"; diff --git a/zenoh/src/query.rs b/zenoh/src/query.rs index c4f3fb35e9..7a7a867cd8 100644 --- a/zenoh/src/query.rs +++ b/zenoh/src/query.rs @@ -13,7 +13,6 @@ // //! Query primitives. - use crate::handlers::{locked, Callback, DefaultHandler}; use crate::prelude::*; #[zenoh_macros::unstable] @@ -23,13 +22,38 @@ use std::collections::HashMap; use std::future::Ready; use std::time::Duration; use zenoh_core::{AsyncResolve, Resolvable, SyncResolve}; +use zenoh_protocol::zenoh::query::Consolidation; use zenoh_result::ZResult; /// The [`Queryable`](crate::queryable::Queryable)s that should be target of a [`get`](Session::get). -pub use zenoh_protocol::core::QueryTarget; +pub type QueryTarget = zenoh_protocol::network::request::ext::TargetType; /// The kind of consolidation. -pub use zenoh_protocol::core::ConsolidationMode; +#[derive(Debug, Clone, PartialEq, Eq, Copy)] +pub enum ConsolidationMode { + /// No consolidation applied: multiple samples may be received for the same key-timestamp. + None, + /// Monotonic consolidation immediately forwards samples, except if one with an equal or more recent timestamp + /// has already been sent with the same key. + /// + /// This optimizes latency while potentially reducing bandwidth. + /// + /// Note that this doesn't cause re-ordering, but drops the samples for which a more recent timestamp has already + /// been observed with the same key. + Monotonic, + /// Holds back samples to only send the set of samples that had the highest timestamp for their key. + Latest, +} + +impl From for Consolidation { + fn from(val: ConsolidationMode) -> Self { + match val { + ConsolidationMode::None => Consolidation::None, + ConsolidationMode::Monotonic => Consolidation::Monotonic, + ConsolidationMode::Latest => Consolidation::Latest, + } + } +} /// The operation: either manual or automatic. #[derive(Clone, Copy, Debug, PartialEq, Eq)] @@ -45,6 +69,7 @@ pub struct QueryConsolidation { } impl QueryConsolidation { + pub const DEFAULT: Self = Self::AUTO; /// Automatic query consolidation strategy selection. pub const AUTO: Self = Self { mode: Mode::Auto }; @@ -72,7 +97,7 @@ impl From for QueryConsolidation { impl Default for QueryConsolidation { fn default() -> Self { - QueryConsolidation::AUTO + Self::DEFAULT } } diff --git a/zenoh/src/queryable.rs b/zenoh/src/queryable.rs index 4e9f4914dd..d0ce99b512 100644 --- a/zenoh/src/queryable.rs +++ b/zenoh/src/queryable.rs @@ -56,7 +56,7 @@ impl Drop for QueryInner { fn drop(&mut self) { self.primitives.send_response_final(ResponseFinal { rid: self.qid, - ext_qos: response::ext::QoSType::response_final_default(), + ext_qos: response::ext::QoSType::RESPONSE_FINAL, ext_tstamp: None, }); } @@ -241,7 +241,7 @@ impl SyncResolve for ReplyBuilder<'_> { mapping: Mapping::Sender, }, payload: ResponseBody::Reply(zenoh::Reply { - consolidation: zenoh::Consolidation::default(), + consolidation: zenoh::Consolidation::DEFAULT, ext_unknown: vec![], payload: match kind { SampleKind::Put => ReplyBody::Put(Put { @@ -262,7 +262,7 @@ impl SyncResolve for ReplyBuilder<'_> { }), }, }), - ext_qos: response::ext::QoSType::response_default(), + ext_qos: response::ext::QoSType::RESPONSE, ext_tstamp: None, ext_respid: Some(response::ext::ResponderIdType { zid: self.query.inner.zid, @@ -292,7 +292,7 @@ impl SyncResolve for ReplyBuilder<'_> { }), code: 0, // TODO }), - ext_qos: response::ext::QoSType::response_default(), + ext_qos: response::ext::QoSType::RESPONSE, ext_tstamp: None, ext_respid: Some(response::ext::ResponderIdType { zid: self.query.inner.zid, diff --git a/zenoh/src/sample.rs b/zenoh/src/sample.rs index 5d707e5936..d41e8c83a1 100644 --- a/zenoh/src/sample.rs +++ b/zenoh/src/sample.rs @@ -14,13 +14,15 @@ //! Sample primitives use crate::buffers::ZBuf; -use crate::prelude::ZenohId; -use crate::prelude::{KeyExpr, SampleKind, Value}; +use crate::prelude::{KeyExpr, Value, ZenohId}; use crate::query::Reply; use crate::time::{new_reception_timestamp, Timestamp}; #[zenoh_macros::unstable] use serde::Serialize; -use std::convert::{TryFrom, TryInto}; +use std::{ + convert::{TryFrom, TryInto}, + fmt, +}; use zenoh_protocol::core::Encoding; pub type SourceSn = u64; @@ -311,6 +313,38 @@ mod attachment { } } } + +/// The kind of a `Sample`. +#[repr(u8)] +#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)] +pub enum SampleKind { + /// if the `Sample` was issued by a `put` operation. + #[default] + Put = 0, + /// if the `Sample` was issued by a `delete` operation. + Delete = 1, +} + +impl fmt::Display for SampleKind { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + SampleKind::Put => write!(f, "PUT"), + SampleKind::Delete => write!(f, "DELETE"), + } + } +} + +impl TryFrom for SampleKind { + type Error = u64; + fn try_from(kind: u64) -> Result { + match kind { + 0 => Ok(SampleKind::Put), + 1 => Ok(SampleKind::Delete), + _ => Err(kind), + } + } +} + #[zenoh_macros::unstable] pub use attachment::{Attachment, AttachmentBuilder, AttachmentIterator}; diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 46cfd5e499..329e44e43f 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -296,7 +296,7 @@ impl<'s, 'a> SessionDeclarations<'s, 'a> for SessionRef<'a> { SubscriberBuilder { session: self.clone(), key_expr: TryIntoKeyExpr::try_into(key_expr).map_err(Into::into), - reliability: Reliability::default(), + reliability: Reliability::DEFAULT, mode: PushMode, origin: Locality::default(), handler: DefaultHandler, @@ -329,8 +329,8 @@ impl<'s, 'a> SessionDeclarations<'s, 'a> for SessionRef<'a> { PublisherBuilder { session: self.clone(), key_expr: key_expr.try_into().map_err(Into::into), - congestion_control: CongestionControl::default(), - priority: Priority::default(), + congestion_control: CongestionControl::DEFAULT, + priority: Priority::DEFAULT, destination: Locality::default(), } } @@ -775,8 +775,8 @@ impl Session { session: self, selector, scope: Ok(None), - target: QueryTarget::default(), - consolidation: QueryConsolidation::default(), + target: QueryTarget::DEFAULT, + consolidation: QueryConsolidation::DEFAULT, destination: Locality::default(), timeout, value: None, @@ -858,9 +858,9 @@ impl Session { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(Declare { - ext_qos: declare::ext::QoSType::declare_default(), + ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: declare::ext::NodeIdType::default(), + ext_nodeid: declare::ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareKeyExpr(DeclareKeyExpr { id: expr_id, wire_expr: WireExpr { @@ -1059,9 +1059,9 @@ impl Session { // }; primitives.send_declare(Declare { - ext_qos: declare::ext::QoSType::declare_default(), + ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: declare::ext::NodeIdType::default(), + ext_nodeid: declare::ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id: id as u32, wire_expr: key_expr.to_wire(self).to_owned(), @@ -1124,9 +1124,9 @@ impl Session { let wire_expr = WireExpr::from(join_sub).to_owned(); drop(state); primitives.send_declare(Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { id: 0, // @TODO use proper SubscriberId (#703) ext_wire_expr: WireExprType { wire_expr }, @@ -1149,9 +1149,9 @@ impl Session { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { id: 0, // @TODO use proper SubscriberId (#703) ext_wire_expr: WireExprType { @@ -1205,9 +1205,9 @@ impl Session { distance: 0, }; primitives.send_declare(Declare { - ext_qos: declare::ext::QoSType::declare_default(), + ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: declare::ext::NodeIdType::default(), + ext_nodeid: declare::ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareQueryable(DeclareQueryable { id: id as u32, wire_expr: key_expr.to_owned(), @@ -1233,9 +1233,9 @@ impl Session { distance: 0, }; primitives.send_declare(Declare { - ext_qos: declare::ext::QoSType::declare_default(), + ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: declare::ext::NodeIdType::default(), + ext_nodeid: declare::ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareQueryable(DeclareQueryable { id: id as u32, wire_expr: key_expr.to_owned(), @@ -1298,9 +1298,9 @@ impl Session { distance: 0, }; primitives.send_declare(Declare { - ext_qos: declare::ext::QoSType::declare_default(), + ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: declare::ext::NodeIdType::default(), + ext_nodeid: declare::ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareQueryable(DeclareQueryable { id: 0, // @TODO use proper QueryableId (#703) wire_expr: qable_state.key_expr.clone(), @@ -1317,9 +1317,9 @@ impl Session { distance: 0, }; primitives.send_declare(Declare { - ext_qos: declare::ext::QoSType::declare_default(), + ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: declare::ext::NodeIdType::default(), + ext_nodeid: declare::ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareQueryable(DeclareQueryable { id: 0, // @TODO use proper QueryableId (#703) wire_expr: qable_state.key_expr.clone(), @@ -1333,9 +1333,9 @@ impl Session { // There are no more Queryables on the same KeyExpr. drop(state); primitives.send_declare(Declare { - ext_qos: declare::ext::QoSType::declare_default(), + ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: declare::ext::NodeIdType::default(), + ext_nodeid: declare::ext::NodeIdType::DEFAULT, body: DeclareBody::UndeclareQueryable(UndeclareQueryable { id: 0, // @TODO use proper QueryableId (#703) ext_wire_expr: WireExprType { @@ -1369,13 +1369,13 @@ impl Session { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(Declare { - ext_qos: declare::ext::QoSType::declare_default(), + ext_qos: declare::ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: declare::ext::NodeIdType::default(), + ext_nodeid: declare::ext::NodeIdType::DEFAULT, body: DeclareBody::DeclareSubscriber(DeclareSubscriber { id: id as u32, wire_expr: key_expr.to_wire(self).to_owned(), - ext_info: SubscriberInfo::default(), + ext_info: SubscriberInfo::DEFAULT, }), }); Ok(tok_state) @@ -1393,9 +1393,9 @@ impl Session { let primitives = state.primitives.as_ref().unwrap().clone(); drop(state); primitives.send_declare(Declare { - ext_qos: ext::QoSType::declare_default(), + ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, body: DeclareBody::UndeclareSubscriber(UndeclareSubscriber { id: 0, // @TODO use proper SubscriberId (#703) ext_wire_expr: WireExprType { @@ -1698,10 +1698,10 @@ impl Session { primitives.send_request(Request { id: 0, // @TODO compute a proper request ID wire_expr: key_expr.to_wire(self).to_owned(), - ext_qos: ext::QoSType::request_default(), + ext_qos: ext::QoSType::REQUEST, ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - ext_target: request::ext::TargetType::default(), + ext_nodeid: ext::NodeIdType::DEFAULT, + ext_target: request::ext::TargetType::DEFAULT, ext_budget: None, ext_timeout: None, payload: RequestBody::Pull(Pull { @@ -1801,9 +1801,9 @@ impl Session { primitives.send_request(Request { id: qid, wire_expr: wexpr.clone(), - ext_qos: request::ext::QoSType::request_default(), + ext_qos: request::ext::QoSType::REQUEST, ext_tstamp: None, - ext_nodeid: request::ext::NodeIdType::default(), + ext_nodeid: request::ext::NodeIdType::DEFAULT, ext_target: target, ext_budget: None, ext_timeout: Some(timeout), @@ -1959,7 +1959,7 @@ impl<'s> SessionDeclarations<'s, 'static> for Arc { SubscriberBuilder { session: SessionRef::Shared(self.clone()), key_expr: key_expr.try_into().map_err(Into::into), - reliability: Reliability::default(), + reliability: Reliability::DEFAULT, mode: PushMode, origin: Locality::default(), handler: DefaultHandler, @@ -2040,8 +2040,8 @@ impl<'s> SessionDeclarations<'s, 'static> for Arc { PublisherBuilder { session: SessionRef::Shared(self.clone()), key_expr: key_expr.try_into().map_err(Into::into), - congestion_control: CongestionControl::default(), - priority: Priority::default(), + congestion_control: CongestionControl::DEFAULT, + priority: Priority::DEFAULT, destination: Locality::default(), } } @@ -2247,11 +2247,6 @@ impl Primitives for Session { fn send_response(&self, msg: Response) { trace!("recv Response {:?}", msg); match msg.payload { - ResponseBody::Ack(_) => { - log::warn!( - "Received a ResponseBody::Ack, but this isn't supported yet. Dropping message." - ) - } ResponseBody::Put(_) => { log::warn!( "Received a ResponseBody::Put, but this isn't supported yet. Dropping message." diff --git a/zenoh/src/subscriber.rs b/zenoh/src/subscriber.rs index 7258833d28..fe2236076f 100644 --- a/zenoh/src/subscriber.rs +++ b/zenoh/src/subscriber.rs @@ -25,9 +25,6 @@ use std::sync::Arc; use zenoh_core::{AsyncResolve, Resolvable, Resolve, SyncResolve}; use zenoh_protocol::network::declare::{subscriber::ext::SubscriberInfo, Mode}; -/// The subscription mode. -pub use zenoh_protocol::core::SubMode; - /// The kind of reliability. pub use zenoh_protocol::core::Reliability; @@ -117,7 +114,6 @@ impl<'a> PullSubscriberInner<'a> { /// ``` /// # async_std::task::block_on(async { /// use zenoh::prelude::r#async::*; - /// use zenoh::subscriber::SubMode; /// /// let session = zenoh::open(config::peer()).res().await.unwrap(); /// let subscriber = session @@ -252,12 +248,6 @@ impl Drop for SubscriberInner<'_> { #[derive(Debug, Clone, Copy)] pub struct PullMode; -impl From for SubMode { - fn from(_: PullMode) -> Self { - SubMode::Pull - } -} - impl From for Mode { fn from(_: PullMode) -> Self { Mode::Pull @@ -269,12 +259,6 @@ impl From for Mode { #[derive(Debug, Clone, Copy)] pub struct PushMode; -impl From for SubMode { - fn from(_: PushMode) -> Self { - SubMode::Push - } -} - impl From for Mode { fn from(_: PushMode) -> Self { Mode::Push @@ -712,7 +696,6 @@ impl<'a, Receiver> PullSubscriber<'a, Receiver> { /// ``` /// # async_std::task::block_on(async { /// use zenoh::prelude::r#async::*; - /// use zenoh::subscriber::SubMode; /// /// let session = zenoh::open(config::peer()).res().await.unwrap(); /// let subscriber = session