diff --git a/Cargo.lock b/Cargo.lock
index aac8c0c10f..7c140c1c31 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2587,15 +2587,6 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d"
-[[package]]
-name = "ordered-float"
-version = "4.2.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4a91171844676f8c7990ce64959210cd2eaef32c2612c50f9fae9f8aaa6065a6"
-dependencies = [
- "num-traits",
-]
-
[[package]]
name = "overload"
version = "0.1.1"
@@ -5343,7 +5334,6 @@ dependencies = [
"itertools 0.13.0",
"lazy_static",
"once_cell",
- "ordered-float",
"paste",
"petgraph",
"phf",
diff --git a/commons/zenoh-macros/src/lib.rs b/commons/zenoh-macros/src/lib.rs
index 003525daa9..f3533a6aea 100644
--- a/commons/zenoh-macros/src/lib.rs
+++ b/commons/zenoh-macros/src/lib.rs
@@ -19,7 +19,7 @@
//! [Click here for Zenoh's documentation](../zenoh/index.html)
use proc_macro::TokenStream;
use quote::{quote, ToTokens};
-use syn::{parse_macro_input, parse_quote, Attribute, Error, Item, LitStr, TraitItem};
+use syn::{parse_macro_input, parse_quote, Attribute, Error, Item, ItemImpl, LitStr, TraitItem};
use zenoh_keyexpr::{
format::{
macro_support::{self, SegmentBuilder},
@@ -522,3 +522,81 @@ pub fn register_param(input: proc_macro::TokenStream) -> proc_macro::TokenStream
.unwrap_or_else(syn::Error::into_compile_error)
.into()
}
+
+/// Macro `#[internal_trait]` should precede
+/// `impl Trait for Struct { ... }`
+///
+/// This macro wraps the implementations of "internal" tratis.
+///
+/// These traits are used to group set of functions which should be implemented
+/// together and with the same portotyoe. E.g. `QoSBuilderTrait` provides set of
+/// setters (`congestion_control`, `priority`, `express`) and we should not
+/// forget to implement all these setters for each entity which supports
+/// QoS functionality.
+///
+/// The traits mechanism is a good way to group functions. But additional traits
+/// adds extra burden to end user who have to import it every time.
+///
+/// The macro `internal_trait` solves this problem by adding
+/// methods with same names as in trait to structure implementation itself,
+/// making them available to user without additional trait import.
+///
+#[proc_macro_attribute]
+pub fn internal_trait(_attr: TokenStream, item: TokenStream) -> TokenStream {
+ let input = parse_macro_input!(item as ItemImpl);
+ let trait_path = &input.trait_.as_ref().unwrap().1;
+ let struct_path = &input.self_ty;
+ let generics = &input.generics;
+ // let struct_lifetime = get_type_path_lifetime(struct_path);
+
+ let mut struct_methods = quote! {};
+ for item_fn in input.items.iter() {
+ if let syn::ImplItem::Fn(method) = item_fn {
+ let method_name = &method.sig.ident;
+ let method_generic_params = &method.sig.generics.params;
+ let method_generic_params = if method_generic_params.is_empty() {
+ quote! {}
+ } else {
+ quote! {<#method_generic_params>}
+ };
+ let method_args = &method.sig.inputs;
+ let method_output = &method.sig.output;
+ let where_clause = &method.sig.generics.where_clause;
+ let mut method_call_args = quote! {};
+ for arg in method_args.iter() {
+ match arg {
+ syn::FnArg::Receiver(_) => {
+ method_call_args.extend(quote! { self, });
+ }
+ syn::FnArg::Typed(pat_type) => {
+ let pat = &pat_type.pat;
+ method_call_args.extend(quote! { #pat, });
+ }
+ }
+ }
+ let mut attributes = quote! {};
+ for attr in &method.attrs {
+ attributes.extend(quote! {
+ #attr
+ });
+ }
+ // call corresponding trait method from struct method
+ struct_methods.extend(quote! {
+ #attributes
+ pub fn #method_name #method_generic_params (#method_args) #method_output #where_clause {
+ <#struct_path as #trait_path>::#method_name(#method_call_args)
+ }
+ });
+ }
+ }
+ let struct_methods_output = quote! {
+ impl #generics #struct_path {
+ #struct_methods
+ }
+ };
+ (quote! {
+ #input
+ #struct_methods_output
+ })
+ .into()
+}
diff --git a/plugins/zenoh-plugin-rest/examples/z_serve_sse.rs b/plugins/zenoh-plugin-rest/examples/z_serve_sse.rs
index fbd0269498..049e2756fb 100644
--- a/plugins/zenoh-plugin-rest/examples/z_serve_sse.rs
+++ b/plugins/zenoh-plugin-rest/examples/z_serve_sse.rs
@@ -14,11 +14,7 @@
use std::time::Duration;
use clap::{arg, Command};
-use zenoh::{
- config::Config,
- key_expr::keyexpr,
- qos::{CongestionControl, QoSBuilderTrait},
-};
+use zenoh::{config::Config, key_expr::keyexpr, qos::CongestionControl};
const HTML: &str = r#"
diff --git a/plugins/zenoh-plugin-rest/src/lib.rs b/plugins/zenoh-plugin-rest/src/lib.rs
index 289fc9e055..4b236e5d5a 100644
--- a/plugins/zenoh-plugin-rest/src/lib.rs
+++ b/plugins/zenoh-plugin-rest/src/lib.rs
@@ -53,7 +53,7 @@ use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginCont
mod config;
pub use config::Config;
-use zenoh::{bytes::EncodingBuilderTrait, query::ReplyError};
+use zenoh::query::ReplyError;
const GIT_VERSION: &str = git_version::git_version!(prefix = "v", cargo_prefix = "v");
lazy_static::lazy_static! {
diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs b/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs
index 952a72f499..cf38fe4728 100644
--- a/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs
+++ b/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs
@@ -24,7 +24,6 @@ use tokio::sync::RwLock;
use zenoh::{
internal::Value,
key_expr::{KeyExpr, OwnedKeyExpr},
- prelude::*,
query::Selector,
sample::{Sample, SampleBuilder},
time::Timestamp,
diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs
index 94e9d85d82..5ffccbbd54 100644
--- a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs
+++ b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs
@@ -23,7 +23,6 @@ use flume::{Receiver, Sender};
use futures::select;
use tokio::sync::{Mutex, RwLock};
use zenoh::{
- bytes::EncodingBuilderTrait,
internal::{
buffers::{SplitBuffer, ZBuf},
zenoh_home, Timed, TimedEvent, Timer, Value,
@@ -35,7 +34,7 @@ use zenoh::{
KeyExpr, OwnedKeyExpr,
},
query::{ConsolidationMode, QueryTarget},
- sample::{Sample, SampleBuilder, SampleKind, TimestampBuilderTrait},
+ sample::{Sample, SampleBuilder, SampleKind},
session::Session,
time::{Timestamp, NTP64},
};
diff --git a/zenoh-ext/src/querying_subscriber.rs b/zenoh-ext/src/querying_subscriber.rs
index 893cbcbad0..c8a41b4e0a 100644
--- a/zenoh-ext/src/querying_subscriber.rs
+++ b/zenoh-ext/src/querying_subscriber.rs
@@ -27,7 +27,7 @@ use zenoh::{
prelude::Wait,
pubsub::{Reliability, Subscriber},
query::{QueryConsolidation, QueryTarget, ReplyKeyExpr, Selector},
- sample::{Locality, Sample, SampleBuilder, TimestampBuilderTrait},
+ sample::{Locality, Sample, SampleBuilder},
time::Timestamp,
Error, Resolvable, Resolve, Result as ZResult, Session,
};
diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml
index 701c22039a..32622509b5 100644
--- a/zenoh/Cargo.toml
+++ b/zenoh/Cargo.toml
@@ -77,7 +77,6 @@ git-version = { workspace = true }
itertools = { workspace = true }
lazy_static = { workspace = true }
tracing = { workspace = true }
-ordered-float = { workspace = true }
paste = { workspace = true }
petgraph = { workspace = true }
phf = { workspace = true }
diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs
index 53c32c8a7d..4b0c3d8459 100644
--- a/zenoh/src/api/builders/publisher.rs
+++ b/zenoh/src/api/builders/publisher.rs
@@ -18,13 +18,12 @@ use zenoh_core::{Resolvable, Result as ZResult, Wait};
use zenoh_protocol::core::Reliability;
use zenoh_protocol::{core::CongestionControl, network::Mapping};
+use super::sample::TimestampBuilderTrait;
#[cfg(feature = "unstable")]
use crate::api::sample::SourceInfo;
use crate::{
api::{
- builders::sample::{
- EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait,
- },
+ builders::sample::{EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait},
bytes::{OptionZBytes, ZBytes},
encoding::Encoding,
key_expr::KeyExpr,
@@ -82,6 +81,7 @@ pub struct PublicationBuilder {
pub(crate) attachment: Option,
}
+#[zenoh_macros::internal_trait]
impl QoSBuilderTrait for PublicationBuilder, T> {
#[inline]
fn congestion_control(self, congestion_control: CongestionControl) -> Self {
@@ -128,6 +128,7 @@ impl PublicationBuilder, T> {
}
}
+#[zenoh_macros::internal_trait]
impl EncodingBuilderTrait for PublisherBuilder<'_, '_> {
fn encoding>(self, encoding: T) -> Self {
Self {
@@ -137,6 +138,7 @@ impl EncodingBuilderTrait for PublisherBuilder<'_, '_> {
}
}
+#[zenoh_macros::internal_trait]
impl EncodingBuilderTrait for PublicationBuilder
{
fn encoding>(self, encoding: T) -> Self {
Self {
@@ -149,6 +151,7 @@ impl EncodingBuilderTrait for PublicationBuilder
{
}
}
+#[zenoh_macros::internal_trait]
impl
SampleBuilderTrait for PublicationBuilder
{
#[cfg(feature = "unstable")]
fn source_info(self, source_info: SourceInfo) -> Self {
@@ -166,6 +169,7 @@ impl
SampleBuilderTrait for PublicationBuilder
{
}
}
+#[zenoh_macros::internal_trait]
impl
TimestampBuilderTrait for PublicationBuilder
{
fn timestamp>>(self, timestamp: TS) -> Self {
Self {
@@ -278,6 +282,7 @@ impl<'a, 'b> Clone for PublisherBuilder<'a, 'b> {
}
}
+#[zenoh_macros::internal_trait]
impl QoSBuilderTrait for PublisherBuilder<'_, '_> {
/// Change the `congestion_control` to apply when routing the data.
#[inline]
diff --git a/zenoh/src/api/builders/sample.rs b/zenoh/src/api/builders/sample.rs
index 4c1fa81406..cb7ada9e4f 100644
--- a/zenoh/src/api/builders/sample.rs
+++ b/zenoh/src/api/builders/sample.rs
@@ -166,6 +166,7 @@ impl SampleBuilder {
}
}
+#[zenoh_macros::internal_trait]
impl TimestampBuilderTrait for SampleBuilder {
fn timestamp>>(self, timestamp: U) -> Self {
Self {
@@ -178,6 +179,7 @@ impl TimestampBuilderTrait for SampleBuilder {
}
}
+#[zenoh_macros::internal_trait]
impl SampleBuilderTrait for SampleBuilder {
#[zenoh_macros::unstable]
fn source_info(self, source_info: SourceInfo) -> Self {
@@ -202,6 +204,7 @@ impl SampleBuilderTrait for SampleBuilder {
}
}
+#[zenoh_macros::internal_trait]
impl QoSBuilderTrait for SampleBuilder {
fn congestion_control(self, congestion_control: CongestionControl) -> Self {
let qos: QoSBuilder = self.sample.qos.into();
@@ -229,6 +232,7 @@ impl QoSBuilderTrait for SampleBuilder {
}
}
+#[zenoh_macros::internal_trait]
impl EncodingBuilderTrait for SampleBuilder {
fn encoding>(self, encoding: T) -> Self {
Self {
diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs
index a8ce4e5569..faa2e4eca6 100644
--- a/zenoh/src/api/publisher.rs
+++ b/zenoh/src/api/publisher.rs
@@ -106,7 +106,7 @@ impl fmt::Debug for PublisherState {
/// subscriber.stream().map(Ok).forward(publisher).await.unwrap();
/// # }
/// ```
-#[derive(Debug, Clone)]
+#[derive(Debug)]
pub struct Publisher<'a> {
#[cfg(feature = "unstable")]
pub(crate) session_id: ZenohId,
diff --git a/zenoh/src/api/query.rs b/zenoh/src/api/query.rs
index bea028ff97..177ca565a1 100644
--- a/zenoh/src/api/query.rs
+++ b/zenoh/src/api/query.rs
@@ -28,7 +28,7 @@ use zenoh_protocol::core::{CongestionControl, Parameters};
use zenoh_result::ZResult;
use super::{
- builders::sample::{EncodingBuilderTrait, QoSBuilderTrait},
+ builders::sample::{EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait},
bytes::ZBytes,
encoding::Encoding,
handlers::{locked, Callback, DefaultHandler, IntoHandler},
@@ -41,7 +41,7 @@ use super::{
};
#[cfg(feature = "unstable")]
use super::{sample::SourceInfo, selector::ZenohParameters};
-use crate::{bytes::OptionZBytes, sample::SampleBuilderTrait};
+use crate::bytes::OptionZBytes;
/// The [`Queryable`](crate::query::Queryable)s that should be target of a [`get`](Session::get).
pub type QueryTarget = zenoh_protocol::network::request::ext::TargetType;
@@ -209,6 +209,7 @@ pub struct SessionGetBuilder<'a, 'b, Handler> {
pub(crate) source_info: SourceInfo,
}
+#[zenoh_macros::internal_trait]
impl SampleBuilderTrait for SessionGetBuilder<'_, '_, Handler> {
#[zenoh_macros::unstable]
fn source_info(self, source_info: SourceInfo) -> Self {
@@ -244,6 +245,7 @@ impl QoSBuilderTrait for SessionGetBuilder<'_, '_, DefaultHandler> {
}
}
+#[zenoh_macros::internal_trait]
impl EncodingBuilderTrait for SessionGetBuilder<'_, '_, Handler> {
fn encoding>(self, encoding: T) -> Self {
let mut value = self.value.unwrap_or_default();
diff --git a/zenoh/src/api/queryable.rs b/zenoh/src/api/queryable.rs
index a1ac14b5f1..a04662dd8f 100644
--- a/zenoh/src/api/queryable.rs
+++ b/zenoh/src/api/queryable.rs
@@ -304,6 +304,7 @@ pub struct ReplyBuilder<'a, 'b, T> {
attachment: Option,
}
+#[zenoh_macros::internal_trait]
impl TimestampBuilderTrait for ReplyBuilder<'_, '_, T> {
fn timestamp>>(self, timestamp: U) -> Self {
Self {
@@ -313,6 +314,7 @@ impl TimestampBuilderTrait for ReplyBuilder<'_, '_, T> {
}
}
+#[zenoh_macros::internal_trait]
impl SampleBuilderTrait for ReplyBuilder<'_, '_, T> {
fn attachment>(self, attachment: U) -> Self {
let attachment: OptionZBytes = attachment.into();
@@ -348,6 +350,7 @@ impl QoSBuilderTrait for ReplyBuilder<'_, '_, T> {
}
}
+#[zenoh_macros::internal_trait]
impl EncodingBuilderTrait for ReplyBuilder<'_, '_, ReplyBuilderPut> {
fn encoding>(self, encoding: T) -> Self {
Self {
@@ -471,6 +474,7 @@ pub struct ReplyErrBuilder<'a> {
value: Value,
}
+#[zenoh_macros::internal_trait]
impl EncodingBuilderTrait for ReplyErrBuilder<'_> {
fn encoding>(self, encoding: T) -> Self {
let mut value = self.value.clone();
diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs
index 6a15afe150..d650ddb05d 100644
--- a/zenoh/src/lib.rs
+++ b/zenoh/src/lib.rs
@@ -210,7 +210,6 @@ pub mod sample {
pub use crate::api::{
builders::sample::{
SampleBuilder, SampleBuilderAny, SampleBuilderDelete, SampleBuilderPut,
- SampleBuilderTrait, TimestampBuilderTrait,
},
sample::{Sample, SampleFields, SampleKind, SourceSn},
};
@@ -219,7 +218,6 @@ pub mod sample {
/// Payload primitives
pub mod bytes {
pub use crate::api::{
- builders::sample::EncodingBuilderTrait,
bytes::{
Deserialize, OptionZBytes, Serialize, ZBytes, ZBytesIterator, ZBytesReader,
ZBytesSliceIterator, ZBytesWriter, ZDeserializeError, ZSerde,
@@ -279,7 +277,7 @@ pub mod handlers {
pub mod qos {
pub use zenoh_protocol::core::CongestionControl;
- pub use crate::api::{builders::sample::QoSBuilderTrait, publisher::Priority};
+ pub use crate::api::publisher::Priority;
}
/// Scouting primitives
@@ -374,6 +372,11 @@ compile_error!(
#[zenoh_macros::internal]
pub mod internal {
+ pub mod traits {
+ pub use crate::api::builders::sample::{
+ EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait,
+ };
+ }
pub use zenoh_core::{
zasync_executor_init, zasynclock, zerror, zlock, zread, ztimeout, zwrite, ResolveFuture,
};
diff --git a/zenoh/src/net/routing/dispatcher/queries.rs b/zenoh/src/net/routing/dispatcher/queries.rs
index c117bd51df..7d1287beeb 100644
--- a/zenoh/src/net/routing/dispatcher/queries.rs
+++ b/zenoh/src/net/routing/dispatcher/queries.rs
@@ -327,7 +327,7 @@ fn compute_final_route(
TargetType::AllComplete => {
let mut route = HashMap::new();
for qabl in qabls.iter() {
- if qabl.complete > 0
+ if qabl.info.map(|info| info.complete).unwrap_or(true)
&& tables
.hat_code
.egress_filter(tables, src_face, &qabl.direction.0, expr)
@@ -342,10 +342,9 @@ fn compute_final_route(
route
}
TargetType::BestMatching => {
- if let Some(qabl) = qabls
- .iter()
- .find(|qabl| qabl.direction.0.id != src_face.id && qabl.complete > 0)
- {
+ if let Some(qabl) = qabls.iter().find(|qabl| {
+ qabl.direction.0.id != src_face.id && qabl.info.is_some_and(|info| info.complete)
+ }) {
let mut route = HashMap::new();
let mut direction = qabl.direction.clone();
diff --git a/zenoh/src/net/routing/dispatcher/resource.rs b/zenoh/src/net/routing/dispatcher/resource.rs
index ab84241666..1636d192c4 100644
--- a/zenoh/src/net/routing/dispatcher/resource.rs
+++ b/zenoh/src/net/routing/dispatcher/resource.rs
@@ -46,8 +46,7 @@ pub(crate) type Route = HashMap;
pub(crate) type QueryRoute = HashMap;
pub(crate) struct QueryTargetQabl {
pub(crate) direction: Direction,
- pub(crate) complete: u64,
- pub(crate) distance: f64,
+ pub(crate) info: Option,
}
pub(crate) type QueryTargetQablSet = Vec;
diff --git a/zenoh/src/net/routing/hat/client/queries.rs b/zenoh/src/net/routing/hat/client/queries.rs
index e711ccf2e8..cd417ef84b 100644
--- a/zenoh/src/net/routing/hat/client/queries.rs
+++ b/zenoh/src/net/routing/hat/client/queries.rs
@@ -17,7 +17,6 @@ use std::{
sync::{atomic::Ordering, Arc},
};
-use ordered_float::OrderedFloat;
use zenoh_protocol::{
core::{
key_expr::{
@@ -354,8 +353,7 @@ impl HatQueriesTrait for HatCode {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.push(QueryTargetQabl {
direction: (face.clone(), key_expr.to_owned(), NodeId::default()),
- complete: 0,
- distance: f64::MAX,
+ info: None,
});
}
}
@@ -375,17 +373,15 @@ impl HatQueriesTrait for HatCode {
if let Some(qabl_info) = context.qabl.as_ref() {
route.push(QueryTargetQabl {
direction: (context.face.clone(), key_expr.to_owned(), NodeId::default()),
- complete: if complete {
- qabl_info.complete as u64
- } else {
- 0
- },
- distance: 0.5,
+ info: Some(QueryableInfoType {
+ complete: complete && qabl_info.complete,
+ distance: 1,
+ }),
});
}
}
}
- route.sort_by_key(|qabl| OrderedFloat(qabl.distance));
+ route.sort_by_key(|qabl| qabl.info.map_or(u16::MAX, |i| i.distance));
Arc::new(route)
}
diff --git a/zenoh/src/net/routing/hat/client/token.rs b/zenoh/src/net/routing/hat/client/token.rs
index 9e5923425c..5014ee5931 100644
--- a/zenoh/src/net/routing/hat/client/token.rs
+++ b/zenoh/src/net/routing/hat/client/token.rs
@@ -116,13 +116,9 @@ fn declare_simple_token(
interest_id: Option,
send_declare: &mut SendDeclare,
) {
- register_simple_token(tables, face, id, res);
-
- propagate_simple_token(tables, res, face, send_declare);
-
- let wire_expr = Resource::decl_key(res, face, true);
if let Some(interest_id) = interest_id {
if let Some((interest, _)) = face.pending_current_interests.get(&interest_id) {
+ let wire_expr = Resource::get_best_key(res, "", interest.src_face.id);
send_declare(
&interest.src_face.primitives,
RoutingContext::with_expr(
@@ -137,6 +133,9 @@ fn declare_simple_token(
),
)
}
+ } else {
+ register_simple_token(tables, face, id, res);
+ propagate_simple_token(tables, res, face, send_declare);
}
}
diff --git a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs
index 6941466571..ce97803532 100644
--- a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs
+++ b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs
@@ -17,7 +17,6 @@ use std::{
sync::{atomic::Ordering, Arc},
};
-use ordered_float::OrderedFloat;
use petgraph::graph::NodeIndex;
use zenoh_protocol::{
core::{
@@ -718,12 +717,10 @@ fn insert_target_for_qabls(
Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.push(QueryTargetQabl {
direction: (face.clone(), key_expr.to_owned(), source),
- complete: if complete {
- qabl_info.complete as u64
- } else {
- 0
- },
- distance: net.distances[qabl_idx.index()],
+ info: Some(QueryableInfoType {
+ complete: complete && qabl_info.complete,
+ distance: net.distances[qabl_idx.index()] as u16,
+ }),
});
}
}
@@ -1000,18 +997,16 @@ impl HatQueriesTrait for HatCode {
key_expr.to_owned(),
NodeId::default(),
),
- complete: if complete {
- qabl_info.complete as u64
- } else {
- 0
- },
- distance: 0.5,
+ info: Some(QueryableInfoType {
+ complete: complete && qabl_info.complete,
+ distance: 1,
+ }),
});
}
}
}
}
- route.sort_by_key(|qabl| OrderedFloat(qabl.distance));
+ route.sort_by_key(|qabl| qabl.info.map_or(u16::MAX, |i| i.distance));
Arc::new(route)
}
diff --git a/zenoh/src/net/routing/hat/p2p_peer/interests.rs b/zenoh/src/net/routing/hat/p2p_peer/interests.rs
index 2ed9e22840..3fa2bbe193 100644
--- a/zenoh/src/net/routing/hat/p2p_peer/interests.rs
+++ b/zenoh/src/net/routing/hat/p2p_peer/interests.rs
@@ -24,8 +24,8 @@ use zenoh_protocol::{
use zenoh_sync::get_mut_unchecked;
use super::{
- face_hat, face_hat_mut, pubsub::declare_sub_interest, queries::declare_qabl_interest,
- token::declare_token_interest, HatCode, HatFace,
+ face_hat, face_hat_mut, initial_interest, pubsub::declare_sub_interest,
+ queries::declare_qabl_interest, token::declare_token_interest, HatCode, HatFace,
};
use crate::net::routing::{
dispatcher::{
@@ -132,11 +132,12 @@ impl HatInterestTrait for HatCode {
src_interest_id: id,
});
- for dst_face in tables
- .faces
- .values_mut()
- .filter(|f| f.whatami == WhatAmI::Router)
- {
+ for dst_face in tables.faces.values_mut().filter(|f| {
+ f.whatami == WhatAmI::Router
+ || (options.tokens()
+ && f.whatami == WhatAmI::Peer
+ && !initial_interest(f).map(|i| i.finalized).unwrap_or(true))
+ }) {
let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst);
get_mut_unchecked(dst_face).local_interests.insert(
id,
diff --git a/zenoh/src/net/routing/hat/p2p_peer/mod.rs b/zenoh/src/net/routing/hat/p2p_peer/mod.rs
index 21737326e4..e68c2232fc 100644
--- a/zenoh/src/net/routing/hat/p2p_peer/mod.rs
+++ b/zenoh/src/net/routing/hat/p2p_peer/mod.rs
@@ -172,7 +172,7 @@ impl HatBaseTrait for HatCode {
}
if face.state.whatami == WhatAmI::Peer {
get_mut_unchecked(&mut face.state).local_interests.insert(
- 0,
+ INITIAL_INTEREST_ID,
InterestState {
options: InterestOptions::ALL,
res: None,
@@ -418,7 +418,7 @@ struct HatFace {
impl HatFace {
fn new() -> Self {
Self {
- next_id: AtomicU32::new(0),
+ next_id: AtomicU32::new(1), // In p2p, id 0 is erserved for initial interest
remote_interests: HashMap::new(),
local_subs: HashMap::new(),
remote_subs: HashMap::new(),
@@ -440,3 +440,16 @@ fn get_routes_entries() -> RoutesIndexes {
clients: vec![0],
}
}
+
+// In p2p, at connection, while no interest is sent on the network,
+// peers act as if they received an interest CurrentFuture with id 0
+// and send back a DeclareFinal with interest_id 0.
+// This 'ghost' interest is registered locally to allow tracking if
+// the DeclareFinal has been received or not (finalized).
+
+const INITIAL_INTEREST_ID: u32 = 0;
+
+#[inline]
+fn initial_interest(face: &FaceState) -> Option<&InterestState> {
+ face.local_interests.get(&INITIAL_INTEREST_ID)
+}
diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs
index 0dccf9ba3c..31336bc516 100644
--- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs
+++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs
@@ -38,7 +38,9 @@ use crate::{
resource::{NodeId, Resource, SessionContext},
tables::{Route, RoutingExpr, Tables},
},
- hat::{CurrentFutureTrait, HatPubSubTrait, SendDeclare, Sources},
+ hat::{
+ p2p_peer::initial_interest, CurrentFutureTrait, HatPubSubTrait, SendDeclare, Sources,
+ },
router::{update_data_routes_from, RoutesIndexes},
RoutingContext,
},
@@ -654,11 +656,7 @@ impl HatPubSubTrait for HatCode {
for face in tables.faces.values().filter(|f| {
f.whatami == WhatAmI::Peer
- && !f
- .local_interests
- .get(&0)
- .map(|i| i.finalized)
- .unwrap_or(true)
+ && !initial_interest(f).map(|i| i.finalized).unwrap_or(true)
}) {
route.entry(face.id).or_insert_with(|| {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs
index 2fd6d6fa81..4a46ec6e85 100644
--- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs
+++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs
@@ -17,7 +17,6 @@ use std::{
sync::{atomic::Ordering, Arc},
};
-use ordered_float::OrderedFloat;
use zenoh_protocol::{
core::{
key_expr::{
@@ -43,7 +42,7 @@ use crate::net::routing::{
resource::{NodeId, Resource, SessionContext},
tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables},
},
- hat::{CurrentFutureTrait, HatQueriesTrait, SendDeclare, Sources},
+ hat::{p2p_peer::initial_interest, CurrentFutureTrait, HatQueriesTrait, SendDeclare, Sources},
router::{update_query_routes_from, RoutesIndexes},
RoutingContext,
};
@@ -597,24 +596,18 @@ impl HatQueriesTrait for HatCode {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.push(QueryTargetQabl {
direction: (face.clone(), key_expr.to_owned(), NodeId::default()),
- complete: 0,
- distance: f64::MAX,
+ info: None,
});
}
for face in tables.faces.values().filter(|f| {
f.whatami == WhatAmI::Peer
- && !f
- .local_interests
- .get(&0)
- .map(|i| i.finalized)
- .unwrap_or(true)
+ && !initial_interest(f).map(|i| i.finalized).unwrap_or(true)
}) {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.push(QueryTargetQabl {
direction: (face.clone(), key_expr.to_owned(), NodeId::default()),
- complete: 0,
- distance: 0.5,
+ info: None,
});
}
}
@@ -639,18 +632,16 @@ impl HatQueriesTrait for HatCode {
key_expr.to_owned(),
NodeId::default(),
),
- complete: if complete {
- qabl_info.complete as u64
- } else {
- 0
- },
- distance: 0.5,
+ info: Some(QueryableInfoType {
+ complete: complete && qabl_info.complete,
+ distance: 1,
+ }),
});
}
}
}
}
- route.sort_by_key(|qabl| OrderedFloat(qabl.distance));
+ route.sort_by_key(|qabl| qabl.info.map_or(u16::MAX, |i| i.distance));
Arc::new(route)
}
diff --git a/zenoh/src/net/routing/hat/p2p_peer/token.rs b/zenoh/src/net/routing/hat/p2p_peer/token.rs
index 866737f0df..a06b06f7e2 100644
--- a/zenoh/src/net/routing/hat/p2p_peer/token.rs
+++ b/zenoh/src/net/routing/hat/p2p_peer/token.rs
@@ -152,11 +152,30 @@ fn declare_simple_token(
face: &mut Arc,
id: TokenId,
res: &mut Arc,
+ interest_id: Option,
send_declare: &mut SendDeclare,
) {
- register_simple_token(tables, face, id, res);
-
- propagate_simple_token(tables, res, face, send_declare);
+ if let Some(interest_id) = interest_id {
+ if let Some((interest, _)) = face.pending_current_interests.get(&interest_id) {
+ let wire_expr = Resource::get_best_key(res, "", interest.src_face.id);
+ send_declare(
+ &interest.src_face.primitives,
+ RoutingContext::with_expr(
+ Declare {
+ interest_id: Some(interest.src_interest_id),
+ ext_qos: ext::QoSType::default(),
+ ext_tstamp: None,
+ ext_nodeid: ext::NodeIdType::default(),
+ body: DeclareBody::DeclareToken(DeclareToken { id, wire_expr }),
+ },
+ res.expr(),
+ ),
+ )
+ }
+ } else {
+ register_simple_token(tables, face, id, res);
+ propagate_simple_token(tables, res, face, send_declare);
+ }
}
#[inline]
@@ -411,7 +430,7 @@ pub(crate) fn declare_token_interest(
aggregate: bool,
send_declare: &mut SendDeclare,
) {
- if mode.current() && face.whatami == WhatAmI::Client {
+ if mode.current() {
let interest_id = (!mode.future()).then_some(id);
if let Some(res) = res.as_ref() {
if aggregate {
@@ -525,10 +544,10 @@ impl HatTokenTrait for HatCode {
id: TokenId,
res: &mut Arc,
_node_id: NodeId,
- _interest_id: Option,
+ interest_id: Option,
send_declare: &mut SendDeclare,
) {
- declare_simple_token(tables, face, id, res, send_declare)
+ declare_simple_token(tables, face, id, res, interest_id, send_declare)
}
fn undeclare_token(
diff --git a/zenoh/src/net/routing/hat/router/queries.rs b/zenoh/src/net/routing/hat/router/queries.rs
index f45a260288..969d94f3a2 100644
--- a/zenoh/src/net/routing/hat/router/queries.rs
+++ b/zenoh/src/net/routing/hat/router/queries.rs
@@ -17,7 +17,6 @@ use std::{
sync::{atomic::Ordering, Arc},
};
-use ordered_float::OrderedFloat;
use petgraph::graph::NodeIndex;
use zenoh_protocol::{
core::{
@@ -1102,12 +1101,10 @@ fn insert_target_for_qabls(
Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.push(QueryTargetQabl {
direction: (face.clone(), key_expr.to_owned(), source),
- complete: if complete {
- qabl_info.complete as u64
- } else {
- 0
- },
- distance: net.distances[qabl_idx.index()],
+ info: Some(QueryableInfoType {
+ complete: complete && qabl_info.complete,
+ distance: net.distances[qabl_idx.index()] as u16,
+ }),
});
}
}
@@ -1482,19 +1479,17 @@ impl HatQueriesTrait for HatCode {
key_expr.to_owned(),
NodeId::default(),
),
- complete: if complete {
- qabl_info.complete as u64
- } else {
- 0
- },
- distance: 0.5,
+ info: Some(QueryableInfoType {
+ complete: complete && qabl_info.complete,
+ distance: 1,
+ }),
});
}
}
}
}
}
- route.sort_by_key(|qabl| OrderedFloat(qabl.distance));
+ route.sort_by_key(|qabl| qabl.info.map_or(u16::MAX, |i| i.distance));
Arc::new(route)
}
diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs
index d3e2a3c1ad..dc984c2e92 100644
--- a/zenoh/src/net/runtime/adminspace.rs
+++ b/zenoh/src/net/runtime/adminspace.rs
@@ -44,7 +44,6 @@ use super::{routing::dispatcher::face::Face, Runtime};
use crate::api::plugins::PluginsManager;
use crate::{
api::{
- builders::sample::EncodingBuilderTrait,
bytes::ZBytes,
key_expr::KeyExpr,
queryable::{Query, QueryInner},
diff --git a/zenoh/src/prelude.rs b/zenoh/src/prelude.rs
index 022a2d63cb..02534af650 100644
--- a/zenoh/src/prelude.rs
+++ b/zenoh/src/prelude.rs
@@ -28,14 +28,8 @@ mod _prelude {
#[zenoh_macros::unstable]
pub use crate::api::selector::ZenohParameters;
pub use crate::{
- api::{
- builders::sample::{
- EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait,
- },
- session::Undeclarable,
- },
- config::ValidatedMap,
- Error as ZError, Resolvable, Resolve, Result as ZResult,
+ api::session::Undeclarable, config::ValidatedMap, Error as ZError, Resolvable, Resolve,
+ Result as ZResult,
};
}