From f3f540320e211c8f720d3e660ac50af261df7bdf Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Mon, 9 Sep 2024 15:07:20 +0200 Subject: [PATCH] chore: merge main into dev/arcsession (#1378) * Fix bug with QueryTarget ALL_COMPLETE in clients and peers (#1358) * Fix bug with QueryTarget ALL_COMPLETE in clients and peers * Fix BEST_MATCHING queryable selection * Properly fix Query targeting in non writer side filtering situations * Improve fix * Update zenoh/src/net/routing/hat/linkstate_peer/queries.rs Co-authored-by: Joseph Perez * Update zenoh/src/net/routing/hat/p2p_peer/queries.rs Co-authored-by: Joseph Perez * Update zenoh/src/net/routing/hat/router/queries.rs Co-authored-by: Joseph Perez * Update zenoh/src/net/routing/hat/client/queries.rs Co-authored-by: Joseph Perez * Remove non used ordered-float dependency --------- Co-authored-by: Luca Cominardi Co-authored-by: Joseph Perez * fix: publisher should not be clonable (#1370) * made builder traits internal (#1376) * scaffolding macro added * builder traits made internal * doc corrected * cargo fmt * typo fix * typo fix * Fix bugs querying liveliness tokens (#1374) * Fix bug in liveliness get in client * Fix bug treating token interests replies from routers in peers * Peers propagate current token interests to remote peers with unfinalize initial declarations push * Don't register current interests declaration replies * Add comments * Add comments * Add comments --------- Co-authored-by: OlivierHecart Co-authored-by: Luca Cominardi Co-authored-by: Michael Ilyin --- Cargo.lock | 10 --- commons/zenoh-macros/src/lib.rs | 80 ++++++++++++++++++- .../zenoh-plugin-rest/examples/z_serve_sse.rs | 6 +- plugins/zenoh-plugin-rest/src/lib.rs | 2 +- .../src/replica/aligner.rs | 1 - .../src/replica/storage.rs | 3 +- zenoh-ext/src/querying_subscriber.rs | 2 +- zenoh/Cargo.toml | 1 - zenoh/src/api/builders/publisher.rs | 11 ++- zenoh/src/api/builders/sample.rs | 4 + zenoh/src/api/publisher.rs | 2 +- zenoh/src/api/query.rs | 6 +- zenoh/src/api/queryable.rs | 4 + zenoh/src/lib.rs | 9 ++- zenoh/src/net/routing/dispatcher/queries.rs | 9 +-- zenoh/src/net/routing/dispatcher/resource.rs | 3 +- zenoh/src/net/routing/hat/client/queries.rs | 16 ++-- zenoh/src/net/routing/hat/client/token.rs | 9 +-- .../net/routing/hat/linkstate_peer/queries.rs | 23 +++--- .../src/net/routing/hat/p2p_peer/interests.rs | 15 ++-- zenoh/src/net/routing/hat/p2p_peer/mod.rs | 17 +++- zenoh/src/net/routing/hat/p2p_peer/pubsub.rs | 10 +-- zenoh/src/net/routing/hat/p2p_peer/queries.rs | 27 +++---- zenoh/src/net/routing/hat/p2p_peer/token.rs | 31 +++++-- zenoh/src/net/routing/hat/router/queries.rs | 23 +++--- zenoh/src/net/runtime/adminspace.rs | 1 - zenoh/src/prelude.rs | 10 +-- 27 files changed, 206 insertions(+), 129 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aac8c0c10f..7c140c1c31 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2587,15 +2587,6 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" -[[package]] -name = "ordered-float" -version = "4.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a91171844676f8c7990ce64959210cd2eaef32c2612c50f9fae9f8aaa6065a6" -dependencies = [ - "num-traits", -] - [[package]] name = "overload" version = "0.1.1" @@ -5343,7 +5334,6 @@ dependencies = [ "itertools 0.13.0", "lazy_static", "once_cell", - "ordered-float", "paste", "petgraph", "phf", diff --git a/commons/zenoh-macros/src/lib.rs b/commons/zenoh-macros/src/lib.rs index 003525daa9..f3533a6aea 100644 --- a/commons/zenoh-macros/src/lib.rs +++ b/commons/zenoh-macros/src/lib.rs @@ -19,7 +19,7 @@ //! [Click here for Zenoh's documentation](../zenoh/index.html) use proc_macro::TokenStream; use quote::{quote, ToTokens}; -use syn::{parse_macro_input, parse_quote, Attribute, Error, Item, LitStr, TraitItem}; +use syn::{parse_macro_input, parse_quote, Attribute, Error, Item, ItemImpl, LitStr, TraitItem}; use zenoh_keyexpr::{ format::{ macro_support::{self, SegmentBuilder}, @@ -522,3 +522,81 @@ pub fn register_param(input: proc_macro::TokenStream) -> proc_macro::TokenStream .unwrap_or_else(syn::Error::into_compile_error) .into() } + +/// Macro `#[internal_trait]` should precede +/// `impl Trait for Struct { ... }` +/// +/// This macro wraps the implementations of "internal" tratis. +/// +/// These traits are used to group set of functions which should be implemented +/// together and with the same portotyoe. E.g. `QoSBuilderTrait` provides set of +/// setters (`congestion_control`, `priority`, `express`) and we should not +/// forget to implement all these setters for each entity which supports +/// QoS functionality. +/// +/// The traits mechanism is a good way to group functions. But additional traits +/// adds extra burden to end user who have to import it every time. +/// +/// The macro `internal_trait` solves this problem by adding +/// methods with same names as in trait to structure implementation itself, +/// making them available to user without additional trait import. +/// +#[proc_macro_attribute] +pub fn internal_trait(_attr: TokenStream, item: TokenStream) -> TokenStream { + let input = parse_macro_input!(item as ItemImpl); + let trait_path = &input.trait_.as_ref().unwrap().1; + let struct_path = &input.self_ty; + let generics = &input.generics; + // let struct_lifetime = get_type_path_lifetime(struct_path); + + let mut struct_methods = quote! {}; + for item_fn in input.items.iter() { + if let syn::ImplItem::Fn(method) = item_fn { + let method_name = &method.sig.ident; + let method_generic_params = &method.sig.generics.params; + let method_generic_params = if method_generic_params.is_empty() { + quote! {} + } else { + quote! {<#method_generic_params>} + }; + let method_args = &method.sig.inputs; + let method_output = &method.sig.output; + let where_clause = &method.sig.generics.where_clause; + let mut method_call_args = quote! {}; + for arg in method_args.iter() { + match arg { + syn::FnArg::Receiver(_) => { + method_call_args.extend(quote! { self, }); + } + syn::FnArg::Typed(pat_type) => { + let pat = &pat_type.pat; + method_call_args.extend(quote! { #pat, }); + } + } + } + let mut attributes = quote! {}; + for attr in &method.attrs { + attributes.extend(quote! { + #attr + }); + } + // call corresponding trait method from struct method + struct_methods.extend(quote! { + #attributes + pub fn #method_name #method_generic_params (#method_args) #method_output #where_clause { + <#struct_path as #trait_path>::#method_name(#method_call_args) + } + }); + } + } + let struct_methods_output = quote! { + impl #generics #struct_path { + #struct_methods + } + }; + (quote! { + #input + #struct_methods_output + }) + .into() +} diff --git a/plugins/zenoh-plugin-rest/examples/z_serve_sse.rs b/plugins/zenoh-plugin-rest/examples/z_serve_sse.rs index fbd0269498..049e2756fb 100644 --- a/plugins/zenoh-plugin-rest/examples/z_serve_sse.rs +++ b/plugins/zenoh-plugin-rest/examples/z_serve_sse.rs @@ -14,11 +14,7 @@ use std::time::Duration; use clap::{arg, Command}; -use zenoh::{ - config::Config, - key_expr::keyexpr, - qos::{CongestionControl, QoSBuilderTrait}, -}; +use zenoh::{config::Config, key_expr::keyexpr, qos::CongestionControl}; const HTML: &str = r#"
diff --git a/plugins/zenoh-plugin-rest/src/lib.rs b/plugins/zenoh-plugin-rest/src/lib.rs index 289fc9e055..4b236e5d5a 100644 --- a/plugins/zenoh-plugin-rest/src/lib.rs +++ b/plugins/zenoh-plugin-rest/src/lib.rs @@ -53,7 +53,7 @@ use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginCont mod config; pub use config::Config; -use zenoh::{bytes::EncodingBuilderTrait, query::ReplyError}; +use zenoh::query::ReplyError; const GIT_VERSION: &str = git_version::git_version!(prefix = "v", cargo_prefix = "v"); lazy_static::lazy_static! { diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs b/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs index 952a72f499..cf38fe4728 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs @@ -24,7 +24,6 @@ use tokio::sync::RwLock; use zenoh::{ internal::Value, key_expr::{KeyExpr, OwnedKeyExpr}, - prelude::*, query::Selector, sample::{Sample, SampleBuilder}, time::Timestamp, diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs index 94e9d85d82..5ffccbbd54 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs @@ -23,7 +23,6 @@ use flume::{Receiver, Sender}; use futures::select; use tokio::sync::{Mutex, RwLock}; use zenoh::{ - bytes::EncodingBuilderTrait, internal::{ buffers::{SplitBuffer, ZBuf}, zenoh_home, Timed, TimedEvent, Timer, Value, @@ -35,7 +34,7 @@ use zenoh::{ KeyExpr, OwnedKeyExpr, }, query::{ConsolidationMode, QueryTarget}, - sample::{Sample, SampleBuilder, SampleKind, TimestampBuilderTrait}, + sample::{Sample, SampleBuilder, SampleKind}, session::Session, time::{Timestamp, NTP64}, }; diff --git a/zenoh-ext/src/querying_subscriber.rs b/zenoh-ext/src/querying_subscriber.rs index 893cbcbad0..c8a41b4e0a 100644 --- a/zenoh-ext/src/querying_subscriber.rs +++ b/zenoh-ext/src/querying_subscriber.rs @@ -27,7 +27,7 @@ use zenoh::{ prelude::Wait, pubsub::{Reliability, Subscriber}, query::{QueryConsolidation, QueryTarget, ReplyKeyExpr, Selector}, - sample::{Locality, Sample, SampleBuilder, TimestampBuilderTrait}, + sample::{Locality, Sample, SampleBuilder}, time::Timestamp, Error, Resolvable, Resolve, Result as ZResult, Session, }; diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml index 701c22039a..32622509b5 100644 --- a/zenoh/Cargo.toml +++ b/zenoh/Cargo.toml @@ -77,7 +77,6 @@ git-version = { workspace = true } itertools = { workspace = true } lazy_static = { workspace = true } tracing = { workspace = true } -ordered-float = { workspace = true } paste = { workspace = true } petgraph = { workspace = true } phf = { workspace = true } diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index 53c32c8a7d..4b0c3d8459 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -18,13 +18,12 @@ use zenoh_core::{Resolvable, Result as ZResult, Wait}; use zenoh_protocol::core::Reliability; use zenoh_protocol::{core::CongestionControl, network::Mapping}; +use super::sample::TimestampBuilderTrait; #[cfg(feature = "unstable")] use crate::api::sample::SourceInfo; use crate::{ api::{ - builders::sample::{ - EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait, - }, + builders::sample::{EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait}, bytes::{OptionZBytes, ZBytes}, encoding::Encoding, key_expr::KeyExpr, @@ -82,6 +81,7 @@ pub struct PublicationBuilder { pub(crate) attachment: Option, } +#[zenoh_macros::internal_trait] impl QoSBuilderTrait for PublicationBuilder, T> { #[inline] fn congestion_control(self, congestion_control: CongestionControl) -> Self { @@ -128,6 +128,7 @@ impl PublicationBuilder, T> { } } +#[zenoh_macros::internal_trait] impl EncodingBuilderTrait for PublisherBuilder<'_, '_> { fn encoding>(self, encoding: T) -> Self { Self { @@ -137,6 +138,7 @@ impl EncodingBuilderTrait for PublisherBuilder<'_, '_> { } } +#[zenoh_macros::internal_trait] impl

EncodingBuilderTrait for PublicationBuilder { fn encoding>(self, encoding: T) -> Self { Self { @@ -149,6 +151,7 @@ impl

EncodingBuilderTrait for PublicationBuilder { } } +#[zenoh_macros::internal_trait] impl SampleBuilderTrait for PublicationBuilder { #[cfg(feature = "unstable")] fn source_info(self, source_info: SourceInfo) -> Self { @@ -166,6 +169,7 @@ impl SampleBuilderTrait for PublicationBuilder { } } +#[zenoh_macros::internal_trait] impl TimestampBuilderTrait for PublicationBuilder { fn timestamp>>(self, timestamp: TS) -> Self { Self { @@ -278,6 +282,7 @@ impl<'a, 'b> Clone for PublisherBuilder<'a, 'b> { } } +#[zenoh_macros::internal_trait] impl QoSBuilderTrait for PublisherBuilder<'_, '_> { /// Change the `congestion_control` to apply when routing the data. #[inline] diff --git a/zenoh/src/api/builders/sample.rs b/zenoh/src/api/builders/sample.rs index 4c1fa81406..cb7ada9e4f 100644 --- a/zenoh/src/api/builders/sample.rs +++ b/zenoh/src/api/builders/sample.rs @@ -166,6 +166,7 @@ impl SampleBuilder { } } +#[zenoh_macros::internal_trait] impl TimestampBuilderTrait for SampleBuilder { fn timestamp>>(self, timestamp: U) -> Self { Self { @@ -178,6 +179,7 @@ impl TimestampBuilderTrait for SampleBuilder { } } +#[zenoh_macros::internal_trait] impl SampleBuilderTrait for SampleBuilder { #[zenoh_macros::unstable] fn source_info(self, source_info: SourceInfo) -> Self { @@ -202,6 +204,7 @@ impl SampleBuilderTrait for SampleBuilder { } } +#[zenoh_macros::internal_trait] impl QoSBuilderTrait for SampleBuilder { fn congestion_control(self, congestion_control: CongestionControl) -> Self { let qos: QoSBuilder = self.sample.qos.into(); @@ -229,6 +232,7 @@ impl QoSBuilderTrait for SampleBuilder { } } +#[zenoh_macros::internal_trait] impl EncodingBuilderTrait for SampleBuilder { fn encoding>(self, encoding: T) -> Self { Self { diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index a8ce4e5569..faa2e4eca6 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -106,7 +106,7 @@ impl fmt::Debug for PublisherState { /// subscriber.stream().map(Ok).forward(publisher).await.unwrap(); /// # } /// ``` -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct Publisher<'a> { #[cfg(feature = "unstable")] pub(crate) session_id: ZenohId, diff --git a/zenoh/src/api/query.rs b/zenoh/src/api/query.rs index bea028ff97..177ca565a1 100644 --- a/zenoh/src/api/query.rs +++ b/zenoh/src/api/query.rs @@ -28,7 +28,7 @@ use zenoh_protocol::core::{CongestionControl, Parameters}; use zenoh_result::ZResult; use super::{ - builders::sample::{EncodingBuilderTrait, QoSBuilderTrait}, + builders::sample::{EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait}, bytes::ZBytes, encoding::Encoding, handlers::{locked, Callback, DefaultHandler, IntoHandler}, @@ -41,7 +41,7 @@ use super::{ }; #[cfg(feature = "unstable")] use super::{sample::SourceInfo, selector::ZenohParameters}; -use crate::{bytes::OptionZBytes, sample::SampleBuilderTrait}; +use crate::bytes::OptionZBytes; /// The [`Queryable`](crate::query::Queryable)s that should be target of a [`get`](Session::get). pub type QueryTarget = zenoh_protocol::network::request::ext::TargetType; @@ -209,6 +209,7 @@ pub struct SessionGetBuilder<'a, 'b, Handler> { pub(crate) source_info: SourceInfo, } +#[zenoh_macros::internal_trait] impl SampleBuilderTrait for SessionGetBuilder<'_, '_, Handler> { #[zenoh_macros::unstable] fn source_info(self, source_info: SourceInfo) -> Self { @@ -244,6 +245,7 @@ impl QoSBuilderTrait for SessionGetBuilder<'_, '_, DefaultHandler> { } } +#[zenoh_macros::internal_trait] impl EncodingBuilderTrait for SessionGetBuilder<'_, '_, Handler> { fn encoding>(self, encoding: T) -> Self { let mut value = self.value.unwrap_or_default(); diff --git a/zenoh/src/api/queryable.rs b/zenoh/src/api/queryable.rs index a1ac14b5f1..a04662dd8f 100644 --- a/zenoh/src/api/queryable.rs +++ b/zenoh/src/api/queryable.rs @@ -304,6 +304,7 @@ pub struct ReplyBuilder<'a, 'b, T> { attachment: Option, } +#[zenoh_macros::internal_trait] impl TimestampBuilderTrait for ReplyBuilder<'_, '_, T> { fn timestamp>>(self, timestamp: U) -> Self { Self { @@ -313,6 +314,7 @@ impl TimestampBuilderTrait for ReplyBuilder<'_, '_, T> { } } +#[zenoh_macros::internal_trait] impl SampleBuilderTrait for ReplyBuilder<'_, '_, T> { fn attachment>(self, attachment: U) -> Self { let attachment: OptionZBytes = attachment.into(); @@ -348,6 +350,7 @@ impl QoSBuilderTrait for ReplyBuilder<'_, '_, T> { } } +#[zenoh_macros::internal_trait] impl EncodingBuilderTrait for ReplyBuilder<'_, '_, ReplyBuilderPut> { fn encoding>(self, encoding: T) -> Self { Self { @@ -471,6 +474,7 @@ pub struct ReplyErrBuilder<'a> { value: Value, } +#[zenoh_macros::internal_trait] impl EncodingBuilderTrait for ReplyErrBuilder<'_> { fn encoding>(self, encoding: T) -> Self { let mut value = self.value.clone(); diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index 6a15afe150..d650ddb05d 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -210,7 +210,6 @@ pub mod sample { pub use crate::api::{ builders::sample::{ SampleBuilder, SampleBuilderAny, SampleBuilderDelete, SampleBuilderPut, - SampleBuilderTrait, TimestampBuilderTrait, }, sample::{Sample, SampleFields, SampleKind, SourceSn}, }; @@ -219,7 +218,6 @@ pub mod sample { /// Payload primitives pub mod bytes { pub use crate::api::{ - builders::sample::EncodingBuilderTrait, bytes::{ Deserialize, OptionZBytes, Serialize, ZBytes, ZBytesIterator, ZBytesReader, ZBytesSliceIterator, ZBytesWriter, ZDeserializeError, ZSerde, @@ -279,7 +277,7 @@ pub mod handlers { pub mod qos { pub use zenoh_protocol::core::CongestionControl; - pub use crate::api::{builders::sample::QoSBuilderTrait, publisher::Priority}; + pub use crate::api::publisher::Priority; } /// Scouting primitives @@ -374,6 +372,11 @@ compile_error!( #[zenoh_macros::internal] pub mod internal { + pub mod traits { + pub use crate::api::builders::sample::{ + EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait, + }; + } pub use zenoh_core::{ zasync_executor_init, zasynclock, zerror, zlock, zread, ztimeout, zwrite, ResolveFuture, }; diff --git a/zenoh/src/net/routing/dispatcher/queries.rs b/zenoh/src/net/routing/dispatcher/queries.rs index c117bd51df..7d1287beeb 100644 --- a/zenoh/src/net/routing/dispatcher/queries.rs +++ b/zenoh/src/net/routing/dispatcher/queries.rs @@ -327,7 +327,7 @@ fn compute_final_route( TargetType::AllComplete => { let mut route = HashMap::new(); for qabl in qabls.iter() { - if qabl.complete > 0 + if qabl.info.map(|info| info.complete).unwrap_or(true) && tables .hat_code .egress_filter(tables, src_face, &qabl.direction.0, expr) @@ -342,10 +342,9 @@ fn compute_final_route( route } TargetType::BestMatching => { - if let Some(qabl) = qabls - .iter() - .find(|qabl| qabl.direction.0.id != src_face.id && qabl.complete > 0) - { + if let Some(qabl) = qabls.iter().find(|qabl| { + qabl.direction.0.id != src_face.id && qabl.info.is_some_and(|info| info.complete) + }) { let mut route = HashMap::new(); let mut direction = qabl.direction.clone(); diff --git a/zenoh/src/net/routing/dispatcher/resource.rs b/zenoh/src/net/routing/dispatcher/resource.rs index ab84241666..1636d192c4 100644 --- a/zenoh/src/net/routing/dispatcher/resource.rs +++ b/zenoh/src/net/routing/dispatcher/resource.rs @@ -46,8 +46,7 @@ pub(crate) type Route = HashMap; pub(crate) type QueryRoute = HashMap; pub(crate) struct QueryTargetQabl { pub(crate) direction: Direction, - pub(crate) complete: u64, - pub(crate) distance: f64, + pub(crate) info: Option, } pub(crate) type QueryTargetQablSet = Vec; diff --git a/zenoh/src/net/routing/hat/client/queries.rs b/zenoh/src/net/routing/hat/client/queries.rs index e711ccf2e8..cd417ef84b 100644 --- a/zenoh/src/net/routing/hat/client/queries.rs +++ b/zenoh/src/net/routing/hat/client/queries.rs @@ -17,7 +17,6 @@ use std::{ sync::{atomic::Ordering, Arc}, }; -use ordered_float::OrderedFloat; use zenoh_protocol::{ core::{ key_expr::{ @@ -354,8 +353,7 @@ impl HatQueriesTrait for HatCode { let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); route.push(QueryTargetQabl { direction: (face.clone(), key_expr.to_owned(), NodeId::default()), - complete: 0, - distance: f64::MAX, + info: None, }); } } @@ -375,17 +373,15 @@ impl HatQueriesTrait for HatCode { if let Some(qabl_info) = context.qabl.as_ref() { route.push(QueryTargetQabl { direction: (context.face.clone(), key_expr.to_owned(), NodeId::default()), - complete: if complete { - qabl_info.complete as u64 - } else { - 0 - }, - distance: 0.5, + info: Some(QueryableInfoType { + complete: complete && qabl_info.complete, + distance: 1, + }), }); } } } - route.sort_by_key(|qabl| OrderedFloat(qabl.distance)); + route.sort_by_key(|qabl| qabl.info.map_or(u16::MAX, |i| i.distance)); Arc::new(route) } diff --git a/zenoh/src/net/routing/hat/client/token.rs b/zenoh/src/net/routing/hat/client/token.rs index 9e5923425c..5014ee5931 100644 --- a/zenoh/src/net/routing/hat/client/token.rs +++ b/zenoh/src/net/routing/hat/client/token.rs @@ -116,13 +116,9 @@ fn declare_simple_token( interest_id: Option, send_declare: &mut SendDeclare, ) { - register_simple_token(tables, face, id, res); - - propagate_simple_token(tables, res, face, send_declare); - - let wire_expr = Resource::decl_key(res, face, true); if let Some(interest_id) = interest_id { if let Some((interest, _)) = face.pending_current_interests.get(&interest_id) { + let wire_expr = Resource::get_best_key(res, "", interest.src_face.id); send_declare( &interest.src_face.primitives, RoutingContext::with_expr( @@ -137,6 +133,9 @@ fn declare_simple_token( ), ) } + } else { + register_simple_token(tables, face, id, res); + propagate_simple_token(tables, res, face, send_declare); } } diff --git a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs index 6941466571..ce97803532 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs @@ -17,7 +17,6 @@ use std::{ sync::{atomic::Ordering, Arc}, }; -use ordered_float::OrderedFloat; use petgraph::graph::NodeIndex; use zenoh_protocol::{ core::{ @@ -718,12 +717,10 @@ fn insert_target_for_qabls( Resource::get_best_key(expr.prefix, expr.suffix, face.id); route.push(QueryTargetQabl { direction: (face.clone(), key_expr.to_owned(), source), - complete: if complete { - qabl_info.complete as u64 - } else { - 0 - }, - distance: net.distances[qabl_idx.index()], + info: Some(QueryableInfoType { + complete: complete && qabl_info.complete, + distance: net.distances[qabl_idx.index()] as u16, + }), }); } } @@ -1000,18 +997,16 @@ impl HatQueriesTrait for HatCode { key_expr.to_owned(), NodeId::default(), ), - complete: if complete { - qabl_info.complete as u64 - } else { - 0 - }, - distance: 0.5, + info: Some(QueryableInfoType { + complete: complete && qabl_info.complete, + distance: 1, + }), }); } } } } - route.sort_by_key(|qabl| OrderedFloat(qabl.distance)); + route.sort_by_key(|qabl| qabl.info.map_or(u16::MAX, |i| i.distance)); Arc::new(route) } diff --git a/zenoh/src/net/routing/hat/p2p_peer/interests.rs b/zenoh/src/net/routing/hat/p2p_peer/interests.rs index 2ed9e22840..3fa2bbe193 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/interests.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/interests.rs @@ -24,8 +24,8 @@ use zenoh_protocol::{ use zenoh_sync::get_mut_unchecked; use super::{ - face_hat, face_hat_mut, pubsub::declare_sub_interest, queries::declare_qabl_interest, - token::declare_token_interest, HatCode, HatFace, + face_hat, face_hat_mut, initial_interest, pubsub::declare_sub_interest, + queries::declare_qabl_interest, token::declare_token_interest, HatCode, HatFace, }; use crate::net::routing::{ dispatcher::{ @@ -132,11 +132,12 @@ impl HatInterestTrait for HatCode { src_interest_id: id, }); - for dst_face in tables - .faces - .values_mut() - .filter(|f| f.whatami == WhatAmI::Router) - { + for dst_face in tables.faces.values_mut().filter(|f| { + f.whatami == WhatAmI::Router + || (options.tokens() + && f.whatami == WhatAmI::Peer + && !initial_interest(f).map(|i| i.finalized).unwrap_or(true)) + }) { let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); get_mut_unchecked(dst_face).local_interests.insert( id, diff --git a/zenoh/src/net/routing/hat/p2p_peer/mod.rs b/zenoh/src/net/routing/hat/p2p_peer/mod.rs index 21737326e4..e68c2232fc 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/mod.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/mod.rs @@ -172,7 +172,7 @@ impl HatBaseTrait for HatCode { } if face.state.whatami == WhatAmI::Peer { get_mut_unchecked(&mut face.state).local_interests.insert( - 0, + INITIAL_INTEREST_ID, InterestState { options: InterestOptions::ALL, res: None, @@ -418,7 +418,7 @@ struct HatFace { impl HatFace { fn new() -> Self { Self { - next_id: AtomicU32::new(0), + next_id: AtomicU32::new(1), // In p2p, id 0 is erserved for initial interest remote_interests: HashMap::new(), local_subs: HashMap::new(), remote_subs: HashMap::new(), @@ -440,3 +440,16 @@ fn get_routes_entries() -> RoutesIndexes { clients: vec![0], } } + +// In p2p, at connection, while no interest is sent on the network, +// peers act as if they received an interest CurrentFuture with id 0 +// and send back a DeclareFinal with interest_id 0. +// This 'ghost' interest is registered locally to allow tracking if +// the DeclareFinal has been received or not (finalized). + +const INITIAL_INTEREST_ID: u32 = 0; + +#[inline] +fn initial_interest(face: &FaceState) -> Option<&InterestState> { + face.local_interests.get(&INITIAL_INTEREST_ID) +} diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs index 0dccf9ba3c..31336bc516 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs @@ -38,7 +38,9 @@ use crate::{ resource::{NodeId, Resource, SessionContext}, tables::{Route, RoutingExpr, Tables}, }, - hat::{CurrentFutureTrait, HatPubSubTrait, SendDeclare, Sources}, + hat::{ + p2p_peer::initial_interest, CurrentFutureTrait, HatPubSubTrait, SendDeclare, Sources, + }, router::{update_data_routes_from, RoutesIndexes}, RoutingContext, }, @@ -654,11 +656,7 @@ impl HatPubSubTrait for HatCode { for face in tables.faces.values().filter(|f| { f.whatami == WhatAmI::Peer - && !f - .local_interests - .get(&0) - .map(|i| i.finalized) - .unwrap_or(true) + && !initial_interest(f).map(|i| i.finalized).unwrap_or(true) }) { route.entry(face.id).or_insert_with(|| { let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index 2fd6d6fa81..4a46ec6e85 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -17,7 +17,6 @@ use std::{ sync::{atomic::Ordering, Arc}, }; -use ordered_float::OrderedFloat; use zenoh_protocol::{ core::{ key_expr::{ @@ -43,7 +42,7 @@ use crate::net::routing::{ resource::{NodeId, Resource, SessionContext}, tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables}, }, - hat::{CurrentFutureTrait, HatQueriesTrait, SendDeclare, Sources}, + hat::{p2p_peer::initial_interest, CurrentFutureTrait, HatQueriesTrait, SendDeclare, Sources}, router::{update_query_routes_from, RoutesIndexes}, RoutingContext, }; @@ -597,24 +596,18 @@ impl HatQueriesTrait for HatCode { let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); route.push(QueryTargetQabl { direction: (face.clone(), key_expr.to_owned(), NodeId::default()), - complete: 0, - distance: f64::MAX, + info: None, }); } for face in tables.faces.values().filter(|f| { f.whatami == WhatAmI::Peer - && !f - .local_interests - .get(&0) - .map(|i| i.finalized) - .unwrap_or(true) + && !initial_interest(f).map(|i| i.finalized).unwrap_or(true) }) { let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); route.push(QueryTargetQabl { direction: (face.clone(), key_expr.to_owned(), NodeId::default()), - complete: 0, - distance: 0.5, + info: None, }); } } @@ -639,18 +632,16 @@ impl HatQueriesTrait for HatCode { key_expr.to_owned(), NodeId::default(), ), - complete: if complete { - qabl_info.complete as u64 - } else { - 0 - }, - distance: 0.5, + info: Some(QueryableInfoType { + complete: complete && qabl_info.complete, + distance: 1, + }), }); } } } } - route.sort_by_key(|qabl| OrderedFloat(qabl.distance)); + route.sort_by_key(|qabl| qabl.info.map_or(u16::MAX, |i| i.distance)); Arc::new(route) } diff --git a/zenoh/src/net/routing/hat/p2p_peer/token.rs b/zenoh/src/net/routing/hat/p2p_peer/token.rs index 866737f0df..a06b06f7e2 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/token.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/token.rs @@ -152,11 +152,30 @@ fn declare_simple_token( face: &mut Arc, id: TokenId, res: &mut Arc, + interest_id: Option, send_declare: &mut SendDeclare, ) { - register_simple_token(tables, face, id, res); - - propagate_simple_token(tables, res, face, send_declare); + if let Some(interest_id) = interest_id { + if let Some((interest, _)) = face.pending_current_interests.get(&interest_id) { + let wire_expr = Resource::get_best_key(res, "", interest.src_face.id); + send_declare( + &interest.src_face.primitives, + RoutingContext::with_expr( + Declare { + interest_id: Some(interest.src_interest_id), + ext_qos: ext::QoSType::default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::DeclareToken(DeclareToken { id, wire_expr }), + }, + res.expr(), + ), + ) + } + } else { + register_simple_token(tables, face, id, res); + propagate_simple_token(tables, res, face, send_declare); + } } #[inline] @@ -411,7 +430,7 @@ pub(crate) fn declare_token_interest( aggregate: bool, send_declare: &mut SendDeclare, ) { - if mode.current() && face.whatami == WhatAmI::Client { + if mode.current() { let interest_id = (!mode.future()).then_some(id); if let Some(res) = res.as_ref() { if aggregate { @@ -525,10 +544,10 @@ impl HatTokenTrait for HatCode { id: TokenId, res: &mut Arc, _node_id: NodeId, - _interest_id: Option, + interest_id: Option, send_declare: &mut SendDeclare, ) { - declare_simple_token(tables, face, id, res, send_declare) + declare_simple_token(tables, face, id, res, interest_id, send_declare) } fn undeclare_token( diff --git a/zenoh/src/net/routing/hat/router/queries.rs b/zenoh/src/net/routing/hat/router/queries.rs index f45a260288..969d94f3a2 100644 --- a/zenoh/src/net/routing/hat/router/queries.rs +++ b/zenoh/src/net/routing/hat/router/queries.rs @@ -17,7 +17,6 @@ use std::{ sync::{atomic::Ordering, Arc}, }; -use ordered_float::OrderedFloat; use petgraph::graph::NodeIndex; use zenoh_protocol::{ core::{ @@ -1102,12 +1101,10 @@ fn insert_target_for_qabls( Resource::get_best_key(expr.prefix, expr.suffix, face.id); route.push(QueryTargetQabl { direction: (face.clone(), key_expr.to_owned(), source), - complete: if complete { - qabl_info.complete as u64 - } else { - 0 - }, - distance: net.distances[qabl_idx.index()], + info: Some(QueryableInfoType { + complete: complete && qabl_info.complete, + distance: net.distances[qabl_idx.index()] as u16, + }), }); } } @@ -1482,19 +1479,17 @@ impl HatQueriesTrait for HatCode { key_expr.to_owned(), NodeId::default(), ), - complete: if complete { - qabl_info.complete as u64 - } else { - 0 - }, - distance: 0.5, + info: Some(QueryableInfoType { + complete: complete && qabl_info.complete, + distance: 1, + }), }); } } } } } - route.sort_by_key(|qabl| OrderedFloat(qabl.distance)); + route.sort_by_key(|qabl| qabl.info.map_or(u16::MAX, |i| i.distance)); Arc::new(route) } diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index d3e2a3c1ad..dc984c2e92 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -44,7 +44,6 @@ use super::{routing::dispatcher::face::Face, Runtime}; use crate::api::plugins::PluginsManager; use crate::{ api::{ - builders::sample::EncodingBuilderTrait, bytes::ZBytes, key_expr::KeyExpr, queryable::{Query, QueryInner}, diff --git a/zenoh/src/prelude.rs b/zenoh/src/prelude.rs index 022a2d63cb..02534af650 100644 --- a/zenoh/src/prelude.rs +++ b/zenoh/src/prelude.rs @@ -28,14 +28,8 @@ mod _prelude { #[zenoh_macros::unstable] pub use crate::api::selector::ZenohParameters; pub use crate::{ - api::{ - builders::sample::{ - EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait, - }, - session::Undeclarable, - }, - config::ValidatedMap, - Error as ZError, Resolvable, Resolve, Result as ZResult, + api::session::Undeclarable, config::ValidatedMap, Error as ZError, Resolvable, Resolve, + Result as ZResult, }; }