From 6dea3bf95a909d73411bdcb6da4211c9c32cb201 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 28 Aug 2024 11:40:39 +0200 Subject: [PATCH] Fix wire_expr mapping (#1330) * Fix wire_expr mapping * Fix find and replace error --- zenoh/src/net/routing/dispatcher/resource.rs | 18 +++++++---- zenoh/src/net/routing/hat/client/interests.rs | 6 ++-- zenoh/src/net/routing/hat/client/pubsub.rs | 2 +- zenoh/src/net/routing/hat/client/queries.rs | 2 +- zenoh/src/net/routing/hat/client/token.rs | 10 +++--- .../net/routing/hat/linkstate_peer/pubsub.rs | 18 +++++++---- .../net/routing/hat/linkstate_peer/queries.rs | 16 ++++++---- .../net/routing/hat/linkstate_peer/token.rs | 19 +++++++---- .../src/net/routing/hat/p2p_peer/interests.rs | 8 +++-- zenoh/src/net/routing/hat/p2p_peer/pubsub.rs | 13 +++++--- zenoh/src/net/routing/hat/p2p_peer/queries.rs | 10 +++--- zenoh/src/net/routing/hat/p2p_peer/token.rs | 13 +++++--- zenoh/src/net/routing/hat/router/interests.rs | 10 ++++-- zenoh/src/net/routing/hat/router/pubsub.rs | 30 +++++++++++------ zenoh/src/net/routing/hat/router/queries.rs | 24 +++++++++----- zenoh/src/net/routing/hat/router/token.rs | 32 ++++++++++++------- 16 files changed, 149 insertions(+), 82 deletions(-) diff --git a/zenoh/src/net/routing/dispatcher/resource.rs b/zenoh/src/net/routing/dispatcher/resource.rs index f864c39049..01ff9b2817 100644 --- a/zenoh/src/net/routing/dispatcher/resource.rs +++ b/zenoh/src/net/routing/dispatcher/resource.rs @@ -461,7 +461,11 @@ impl Resource { } #[inline] - pub fn decl_key(res: &Arc, face: &mut Arc) -> WireExpr<'static> { + pub fn decl_key( + res: &Arc, + face: &mut Arc, + push: bool, + ) -> WireExpr<'static> { let (nonwild_prefix, wildsuffix) = Resource::nonwild_prefix(res); match nonwild_prefix { Some(mut nonwild_prefix) => { @@ -484,11 +488,13 @@ impl Resource { }; } } - if face.remote_key_interests.values().any(|res| { - res.as_ref() - .map(|res| res.matches(&nonwild_prefix)) - .unwrap_or(true) - }) { + if push + || face.remote_key_interests.values().any(|res| { + res.as_ref() + .map(|res| res.matches(&nonwild_prefix)) + .unwrap_or(true) + }) + { let ctx = get_mut_unchecked(&mut nonwild_prefix) .session_ctxs .entry(face.id) diff --git a/zenoh/src/net/routing/hat/client/interests.rs b/zenoh/src/net/routing/hat/client/interests.rs index b890e800f2..9347b3f0e5 100644 --- a/zenoh/src/net/routing/hat/client/interests.rs +++ b/zenoh/src/net/routing/hat/client/interests.rs @@ -53,7 +53,7 @@ pub(super) fn interests_new_face(tables: &mut Tables, face: &mut Arc) finalized: false, }, ); - let wire_expr = res.as_ref().map(|res| Resource::decl_key(res, face)); + let wire_expr = res.as_ref().map(|res| Resource::decl_key(res, face, true)); face.primitives.send_interest(RoutingContext::with_expr( Interest { id, @@ -125,7 +125,9 @@ impl HatInterestTrait for HatCode { .insert(id, (interest.clone(), cancellation_token)); CurrentInterestCleanup::spawn_interest_clean_up_task(dst_face, tables_ref, id); } - let wire_expr = res.as_ref().map(|res| Resource::decl_key(res, dst_face)); + let wire_expr = res + .as_ref() + .map(|res| Resource::decl_key(res, dst_face, true)); dst_face.primitives.send_interest(RoutingContext::with_expr( Interest { id, diff --git a/zenoh/src/net/routing/hat/client/pubsub.rs b/zenoh/src/net/routing/hat/client/pubsub.rs index 4edc9c98e6..5a19f3549c 100644 --- a/zenoh/src/net/routing/hat/client/pubsub.rs +++ b/zenoh/src/net/routing/hat/client/pubsub.rs @@ -56,7 +56,7 @@ fn propagate_simple_subscription_to( { let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); face_hat_mut!(dst_face).local_subs.insert(res.clone(), id); - let key_expr = Resource::decl_key(res, dst_face); + let key_expr = Resource::decl_key(res, dst_face, true); send_declare( &dst_face.primitives, RoutingContext::with_expr( diff --git a/zenoh/src/net/routing/hat/client/queries.rs b/zenoh/src/net/routing/hat/client/queries.rs index 7658a509da..e711ccf2e8 100644 --- a/zenoh/src/net/routing/hat/client/queries.rs +++ b/zenoh/src/net/routing/hat/client/queries.rs @@ -104,7 +104,7 @@ fn propagate_simple_queryable( face_hat_mut!(&mut dst_face) .local_qabls .insert(res.clone(), (id, info)); - let key_expr = Resource::decl_key(res, &mut dst_face); + let key_expr = Resource::decl_key(res, &mut dst_face, true); send_declare( &dst_face.primitives, RoutingContext::with_expr( diff --git a/zenoh/src/net/routing/hat/client/token.rs b/zenoh/src/net/routing/hat/client/token.rs index 11fab10466..9e5923425c 100644 --- a/zenoh/src/net/routing/hat/client/token.rs +++ b/zenoh/src/net/routing/hat/client/token.rs @@ -45,7 +45,7 @@ fn propagate_simple_token_to( { let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); face_hat_mut!(dst_face).local_tokens.insert(res.clone(), id); - let key_expr = Resource::decl_key(res, dst_face); + let key_expr = Resource::decl_key(res, dst_face, true); send_declare( &dst_face.primitives, RoutingContext::with_expr( @@ -120,7 +120,7 @@ fn declare_simple_token( propagate_simple_token(tables, res, face, send_declare); - let wire_expr = Resource::decl_key(res, face); + let wire_expr = Resource::decl_key(res, face, true); if let Some(interest_id) = interest_id { if let Some((interest, _)) = face.pending_current_interests.get(&interest_id) { send_declare( @@ -312,7 +312,7 @@ pub(crate) fn declare_token_interest( } else { 0 }; - let wire_expr = Resource::decl_key(res, face); + let wire_expr = Resource::decl_key(res, face, true); send_declare( &face.primitives, RoutingContext::with_expr( @@ -343,7 +343,7 @@ pub(crate) fn declare_token_interest( } else { 0 }; - let wire_expr = Resource::decl_key(token, face); + let wire_expr = Resource::decl_key(token, face, true); send_declare( &face.primitives, RoutingContext::with_expr( @@ -379,7 +379,7 @@ pub(crate) fn declare_token_interest( } else { 0 }; - let wire_expr = Resource::decl_key(token, face); + let wire_expr = Resource::decl_key(token, face, true); send_declare( &face.primitives, RoutingContext::with_expr( diff --git a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs index 8b9d97872b..f1412ec807 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs @@ -66,7 +66,8 @@ fn send_sourced_subscription_to_net_children( .map(|src_face| someface.id != src_face.id) .unwrap_or(true) { - let key_expr = Resource::decl_key(res, &mut someface); + let push_declaration = someface.whatami != WhatAmI::Client; + let key_expr = Resource::decl_key(res, &mut someface, push_declaration); someface.primitives.send_declare(RoutingContext::with_expr( Declare { @@ -108,7 +109,7 @@ fn propagate_simple_subscription_to( if dst_face.whatami != WhatAmI::Client { let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); face_hat_mut!(dst_face).local_subs.insert(res.clone(), id); - let key_expr = Resource::decl_key(res, dst_face); + let key_expr = Resource::decl_key(res, dst_face, dst_face.whatami != WhatAmI::Client); send_declare( &dst_face.primitives, RoutingContext::with_expr( @@ -145,7 +146,8 @@ fn propagate_simple_subscription_to( if !face_hat!(dst_face).local_subs.contains_key(res) { let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); face_hat_mut!(dst_face).local_subs.insert(res.clone(), id); - let key_expr = Resource::decl_key(res, dst_face); + let key_expr = + Resource::decl_key(res, dst_face, dst_face.whatami != WhatAmI::Client); send_declare( &dst_face.primitives, RoutingContext::with_expr( @@ -352,7 +354,8 @@ fn send_forget_sourced_subscription_to_net_children( .map(|src_face| someface.id != src_face.id) .unwrap_or(true) { - let wire_expr = Resource::decl_key(res, &mut someface); + let push_declaration = someface.whatami != WhatAmI::Client; + let wire_expr = Resource::decl_key(res, &mut someface, push_declaration); someface.primitives.send_declare(RoutingContext::with_expr( Declare { @@ -702,7 +705,7 @@ pub(super) fn declare_sub_interest( } else { 0 }; - let wire_expr = Resource::decl_key(res, face); + let wire_expr = Resource::decl_key(res, face, face.whatami != WhatAmI::Client); send_declare( &face.primitives, RoutingContext::with_expr( @@ -734,7 +737,8 @@ pub(super) fn declare_sub_interest( } else { 0 }; - let wire_expr = Resource::decl_key(sub, face); + let wire_expr = + Resource::decl_key(sub, face, face.whatami != WhatAmI::Client); send_declare( &face.primitives, RoutingContext::with_expr( @@ -767,7 +771,7 @@ pub(super) fn declare_sub_interest( } else { 0 }; - let wire_expr = Resource::decl_key(sub, face); + let wire_expr = Resource::decl_key(sub, face, face.whatami != WhatAmI::Client); send_declare( &face.primitives, RoutingContext::with_expr( diff --git a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs index de70cddf9b..6941466571 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs @@ -138,7 +138,8 @@ fn send_sourced_queryable_to_net_children( .map(|src_face| someface.id != src_face.id) .unwrap_or(true) { - let key_expr = Resource::decl_key(res, &mut someface); + let push_declaration = someface.whatami != WhatAmI::Client; + let key_expr = Resource::decl_key(res, &mut someface, push_declaration); someface.primitives.send_declare(RoutingContext::with_expr( Declare { @@ -191,7 +192,8 @@ fn propagate_simple_queryable( face_hat_mut!(&mut dst_face) .local_qabls .insert(res.clone(), (id, info)); - let key_expr = Resource::decl_key(res, &mut dst_face); + let push_declaration = dst_face.whatami != WhatAmI::Client; + let key_expr = Resource::decl_key(res, &mut dst_face, push_declaration); send_declare( &dst_face.primitives, RoutingContext::with_expr( @@ -369,7 +371,8 @@ fn send_forget_sourced_queryable_to_net_children( .map(|src_face| someface.id != src_face.id) .unwrap_or(true) { - let wire_expr = Resource::decl_key(res, &mut someface); + let push_declaration = someface.whatami != WhatAmI::Client; + let wire_expr = Resource::decl_key(res, &mut someface, push_declaration); someface.primitives.send_declare(RoutingContext::with_expr( Declare { @@ -767,7 +770,7 @@ pub(super) fn declare_qabl_interest( } else { 0 }; - let wire_expr = Resource::decl_key(res, face); + let wire_expr = Resource::decl_key(res, face, face.whatami != WhatAmI::Client); send_declare( &face.primitives, RoutingContext::with_expr( @@ -803,7 +806,8 @@ pub(super) fn declare_qabl_interest( } else { 0 }; - let key_expr = Resource::decl_key(qabl, face); + let key_expr = + Resource::decl_key(qabl, face, face.whatami != WhatAmI::Client); send_declare( &face.primitives, RoutingContext::with_expr( @@ -839,7 +843,7 @@ pub(super) fn declare_qabl_interest( } else { 0 }; - let key_expr = Resource::decl_key(qabl, face); + let key_expr = Resource::decl_key(qabl, face, face.whatami != WhatAmI::Client); send_declare( &face.primitives, RoutingContext::with_expr( diff --git a/zenoh/src/net/routing/hat/linkstate_peer/token.rs b/zenoh/src/net/routing/hat/linkstate_peer/token.rs index c87866577b..6e3ea08492 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/token.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/token.rs @@ -54,7 +54,8 @@ fn send_sourced_token_to_net_clildren( .map(|src_face| someface.id != src_face.id) .unwrap_or(true) { - let key_expr = Resource::decl_key(res, &mut someface); + let push_declaration = someface.whatami != WhatAmI::Client; + let key_expr = Resource::decl_key(res, &mut someface, push_declaration); someface.primitives.send_declare(RoutingContext::with_expr( Declare { @@ -94,7 +95,7 @@ fn propagate_simple_token_to( if dst_face.whatami != WhatAmI::Client { let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); face_hat_mut!(dst_face).local_tokens.insert(res.clone(), id); - let key_expr = Resource::decl_key(res, dst_face); + let key_expr = Resource::decl_key(res, dst_face, dst_face.whatami != WhatAmI::Client); send_declare( &dst_face.primitives, RoutingContext::with_expr( @@ -128,7 +129,8 @@ fn propagate_simple_token_to( if !face_hat!(dst_face).local_tokens.contains_key(res) { let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); face_hat_mut!(dst_face).local_tokens.insert(res.clone(), id); - let key_expr = Resource::decl_key(res, dst_face); + let key_expr = + Resource::decl_key(res, dst_face, dst_face.whatami != WhatAmI::Client); send_declare( &dst_face.primitives, RoutingContext::with_expr( @@ -320,7 +322,8 @@ fn send_forget_sourced_token_to_net_clildren( .map(|src_face| someface.id != src_face.id) .unwrap_or(true) { - let wire_expr = Resource::decl_key(res, &mut someface); + let push_declaration = someface.whatami != WhatAmI::Client; + let wire_expr = Resource::decl_key(res, &mut someface, push_declaration); someface.primitives.send_declare(RoutingContext::with_expr( Declare { @@ -656,7 +659,7 @@ pub(crate) fn declare_token_interest( } else { 0 }; - let wire_expr = Resource::decl_key(res, face); + let wire_expr = Resource::decl_key(res, face, face.whatami != WhatAmI::Client); send_declare( &face.primitives, RoutingContext::with_expr( @@ -685,7 +688,8 @@ pub(crate) fn declare_token_interest( } else { 0 }; - let wire_expr = Resource::decl_key(token, face); + let wire_expr = + Resource::decl_key(token, face, face.whatami != WhatAmI::Client); send_declare( &face.primitives, RoutingContext::with_expr( @@ -715,7 +719,8 @@ pub(crate) fn declare_token_interest( } else { 0 }; - let wire_expr = Resource::decl_key(token, face); + let wire_expr = + Resource::decl_key(token, face, face.whatami != WhatAmI::Client); send_declare( &face.primitives, RoutingContext::with_expr( diff --git a/zenoh/src/net/routing/hat/p2p_peer/interests.rs b/zenoh/src/net/routing/hat/p2p_peer/interests.rs index 068cdd0eeb..2ed9e22840 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/interests.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/interests.rs @@ -57,7 +57,9 @@ pub(super) fn interests_new_face(tables: &mut Tables, face: &mut Arc) finalized: false, }, ); - let wire_expr = res.as_ref().map(|res| Resource::decl_key(res, face)); + let wire_expr = res + .as_ref() + .map(|res| Resource::decl_key(res, face, face.whatami != WhatAmI::Client)); face.primitives.send_interest(RoutingContext::with_expr( Interest { id, @@ -152,7 +154,9 @@ impl HatInterestTrait for HatCode { .insert(id, (interest.clone(), cancellation_token)); CurrentInterestCleanup::spawn_interest_clean_up_task(dst_face, tables_ref, id); } - let wire_expr = res.as_ref().map(|res| Resource::decl_key(res, dst_face)); + let wire_expr = res + .as_ref() + .map(|res| Resource::decl_key(res, dst_face, dst_face.whatami == WhatAmI::Client)); dst_face.primitives.send_interest(RoutingContext::with_expr( Interest { id, diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs index 12a1e67186..0dccf9ba3c 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs @@ -60,7 +60,7 @@ fn propagate_simple_subscription_to( if dst_face.whatami != WhatAmI::Client { let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); face_hat_mut!(dst_face).local_subs.insert(res.clone(), id); - let key_expr = Resource::decl_key(res, dst_face); + let key_expr = Resource::decl_key(res, dst_face, dst_face.whatami != WhatAmI::Client); send_declare( &dst_face.primitives, RoutingContext::with_expr( @@ -97,7 +97,8 @@ fn propagate_simple_subscription_to( if !face_hat!(dst_face).local_subs.contains_key(res) { let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); face_hat_mut!(dst_face).local_subs.insert(res.clone(), id); - let key_expr = Resource::decl_key(res, dst_face); + let key_expr = + Resource::decl_key(res, dst_face, dst_face.whatami != WhatAmI::Client); send_declare( &dst_face.primitives, RoutingContext::with_expr( @@ -432,7 +433,7 @@ pub(super) fn declare_sub_interest( } else { 0 }; - let wire_expr = Resource::decl_key(res, face); + let wire_expr = Resource::decl_key(res, face, face.whatami != WhatAmI::Client); send_declare( &face.primitives, RoutingContext::with_expr( @@ -468,7 +469,8 @@ pub(super) fn declare_sub_interest( } else { 0 }; - let wire_expr = Resource::decl_key(sub, face); + let wire_expr = + Resource::decl_key(sub, face, face.whatami != WhatAmI::Client); send_declare( &face.primitives, RoutingContext::with_expr( @@ -509,7 +511,8 @@ pub(super) fn declare_sub_interest( } else { 0 }; - let wire_expr = Resource::decl_key(sub, face); + let wire_expr = + Resource::decl_key(sub, face, face.whatami != WhatAmI::Client); send_declare( &face.primitives, RoutingContext::with_expr( diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index 87b6372dae..2fd6d6fa81 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -112,7 +112,7 @@ fn propagate_simple_queryable_to( face_hat_mut!(dst_face) .local_qabls .insert(res.clone(), (id, info)); - let key_expr = Resource::decl_key(res, dst_face); + let key_expr = Resource::decl_key(res, dst_face, dst_face.whatami != WhatAmI::Client); send_declare( &dst_face.primitives, RoutingContext::with_expr( @@ -411,7 +411,7 @@ pub(super) fn declare_qabl_interest( } else { 0 }; - let wire_expr = Resource::decl_key(res, face); + let wire_expr = Resource::decl_key(res, face, face.whatami != WhatAmI::Client); send_declare( &face.primitives, RoutingContext::with_expr( @@ -450,7 +450,8 @@ pub(super) fn declare_qabl_interest( } else { 0 }; - let key_expr = Resource::decl_key(qabl, face); + let key_expr = + Resource::decl_key(qabl, face, face.whatami != WhatAmI::Client); send_declare( &face.primitives, RoutingContext::with_expr( @@ -493,7 +494,8 @@ pub(super) fn declare_qabl_interest( } else { 0 }; - let key_expr = Resource::decl_key(qabl, face); + let key_expr = + Resource::decl_key(qabl, face, face.whatami != WhatAmI::Client); send_declare( &face.primitives, RoutingContext::with_expr( diff --git a/zenoh/src/net/routing/hat/p2p_peer/token.rs b/zenoh/src/net/routing/hat/p2p_peer/token.rs index 9ef7034fd0..866737f0df 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/token.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/token.rs @@ -46,7 +46,7 @@ fn propagate_simple_token_to( if dst_face.whatami != WhatAmI::Client { let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); face_hat_mut!(dst_face).local_tokens.insert(res.clone(), id); - let key_expr = Resource::decl_key(res, dst_face); + let key_expr = Resource::decl_key(res, dst_face, dst_face.whatami != WhatAmI::Client); send_declare( &dst_face.primitives, RoutingContext::with_expr( @@ -80,7 +80,8 @@ fn propagate_simple_token_to( if !face_hat!(dst_face).local_tokens.contains_key(res) { let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); face_hat_mut!(dst_face).local_tokens.insert(res.clone(), id); - let key_expr = Resource::decl_key(res, dst_face); + let key_expr = + Resource::decl_key(res, dst_face, dst_face.whatami != WhatAmI::Client); send_declare( &dst_face.primitives, RoutingContext::with_expr( @@ -427,7 +428,7 @@ pub(crate) fn declare_token_interest( } else { 0 }; - let wire_expr = Resource::decl_key(res, face); + let wire_expr = Resource::decl_key(res, face, face.whatami != WhatAmI::Client); send_declare( &face.primitives, RoutingContext::with_expr( @@ -458,7 +459,8 @@ pub(crate) fn declare_token_interest( } else { 0 }; - let wire_expr = Resource::decl_key(token, face); + let wire_expr = + Resource::decl_key(token, face, face.whatami != WhatAmI::Client); send_declare( &face.primitives, RoutingContext::with_expr( @@ -494,7 +496,8 @@ pub(crate) fn declare_token_interest( } else { 0 }; - let wire_expr = Resource::decl_key(token, face); + let wire_expr = + Resource::decl_key(token, face, face.whatami != WhatAmI::Client); send_declare( &face.primitives, RoutingContext::with_expr( diff --git a/zenoh/src/net/routing/hat/router/interests.rs b/zenoh/src/net/routing/hat/router/interests.rs index fcd0269fcc..33bb3ddf6b 100644 --- a/zenoh/src/net/routing/hat/router/interests.rs +++ b/zenoh/src/net/routing/hat/router/interests.rs @@ -24,8 +24,8 @@ use zenoh_protocol::{ use zenoh_sync::get_mut_unchecked; use super::{ - face_hat_mut, pubsub::declare_sub_interest, queries::declare_qabl_interest, - token::declare_token_interest, HatCode, HatFace, + face_hat_mut, hat, pubsub::declare_sub_interest, queries::declare_qabl_interest, + token::declare_token_interest, HatCode, HatFace, HatTables, }; use crate::net::routing::{ dispatcher::{ @@ -112,3 +112,9 @@ impl HatInterestTrait for HatCode { face_hat_mut!(face).remote_interests.remove(&id); } } + +#[inline] +pub(super) fn push_declaration_profile(tables: &Tables, face: &FaceState) -> bool { + face.whatami == WhatAmI::Client + || (face.whatami == WhatAmI::Peer && !hat!(tables).full_net(WhatAmI::Peer)) +} diff --git a/zenoh/src/net/routing/hat/router/pubsub.rs b/zenoh/src/net/routing/hat/router/pubsub.rs index 506c85888c..cc0251a07a 100644 --- a/zenoh/src/net/routing/hat/router/pubsub.rs +++ b/zenoh/src/net/routing/hat/router/pubsub.rs @@ -32,7 +32,8 @@ use zenoh_sync::get_mut_unchecked; use super::{ face_hat, face_hat_mut, get_peer, get_router, get_routes_entries, hat, hat_mut, - network::Network, res_hat, res_hat_mut, HatCode, HatContext, HatFace, HatTables, + interests::push_declaration_profile, network::Network, res_hat, res_hat_mut, HatCode, + HatContext, HatFace, HatTables, }; #[cfg(feature = "unstable")] use crate::key_expr::KeyExpr; @@ -66,7 +67,8 @@ fn send_sourced_subscription_to_net_children( .map(|src_face| someface.id != src_face.id) .unwrap_or(true) { - let key_expr = Resource::decl_key(res, &mut someface); + let push_declaration = push_declaration_profile(tables, &someface); + let key_expr = Resource::decl_key(res, &mut someface, push_declaration); someface.primitives.send_declare(RoutingContext::with_expr( Declare { @@ -129,7 +131,8 @@ fn propagate_simple_subscription_to( if !face_hat!(dst_face).local_subs.contains_key(res) { let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); face_hat_mut!(dst_face).local_subs.insert(res.clone(), id); - let key_expr = Resource::decl_key(res, dst_face); + let key_expr = + Resource::decl_key(res, dst_face, push_declaration_profile(tables, dst_face)); send_declare( &dst_face.primitives, RoutingContext::with_expr( @@ -384,7 +387,8 @@ fn send_forget_sourced_subscription_to_net_children( .map(|src_face| someface.id != src_face.id) .unwrap_or(true) { - let wire_expr = Resource::decl_key(res, &mut someface); + let push_declaration = push_declaration_profile(tables, &someface); + let wire_expr = Resource::decl_key(res, &mut someface, push_declaration); someface.primitives.send_declare(RoutingContext::with_expr( Declare { @@ -883,15 +887,18 @@ pub(super) fn pubsub_linkstate_change( } } - for dst_face in tables.faces.values_mut() { + for mut dst_face in tables.faces.values().cloned() { if src_face.id != dst_face.id && HatTables::failover_brokering_to(links, dst_face.zid) { for res in face_hat!(src_face).remote_subs.values() { if !face_hat!(dst_face).local_subs.contains_key(res) { let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); - face_hat_mut!(dst_face).local_subs.insert(res.clone(), id); - let key_expr = Resource::decl_key(res, dst_face); + face_hat_mut!(&mut dst_face) + .local_subs + .insert(res.clone(), id); + let push_declaration = push_declaration_profile(tables, &dst_face); + let key_expr = Resource::decl_key(res, &mut dst_face, push_declaration); let sub_info = SubscriberInfo { reliability: Reliability::Reliable, // @TODO compute proper reliability to propagate from reliability of known subscribers }; @@ -950,7 +957,8 @@ pub(crate) fn declare_sub_interest( } else { 0 }; - let wire_expr = Resource::decl_key(res, face); + let wire_expr = + Resource::decl_key(res, face, push_declaration_profile(tables, face)); send_declare( &face.primitives, RoutingContext::with_expr( @@ -995,7 +1003,8 @@ pub(crate) fn declare_sub_interest( } else { 0 }; - let wire_expr = Resource::decl_key(sub, face); + let wire_expr = + Resource::decl_key(sub, face, push_declaration_profile(tables, face)); send_declare( &face.primitives, RoutingContext::with_expr( @@ -1038,7 +1047,8 @@ pub(crate) fn declare_sub_interest( } else { 0 }; - let wire_expr = Resource::decl_key(sub, face); + let wire_expr = + Resource::decl_key(sub, face, push_declaration_profile(tables, face)); send_declare( &face.primitives, RoutingContext::with_expr( diff --git a/zenoh/src/net/routing/hat/router/queries.rs b/zenoh/src/net/routing/hat/router/queries.rs index d706435179..f45a260288 100644 --- a/zenoh/src/net/routing/hat/router/queries.rs +++ b/zenoh/src/net/routing/hat/router/queries.rs @@ -39,7 +39,8 @@ use zenoh_sync::get_mut_unchecked; use super::{ face_hat, face_hat_mut, get_peer, get_router, get_routes_entries, hat, hat_mut, - network::Network, res_hat, res_hat_mut, HatCode, HatContext, HatFace, HatTables, + interests::push_declaration_profile, network::Network, res_hat, res_hat_mut, HatCode, + HatContext, HatFace, HatTables, }; use crate::net::routing::{ dispatcher::{ @@ -206,7 +207,8 @@ fn send_sourced_queryable_to_net_children( .map(|src_face| someface.id != src_face.id) .unwrap_or(true) { - let key_expr = Resource::decl_key(res, &mut someface); + let push_declaration = push_declaration_profile(tables, &someface); + let key_expr = Resource::decl_key(res, &mut someface, push_declaration); someface.primitives.send_declare(RoutingContext::with_expr( Declare { @@ -272,7 +274,8 @@ fn propagate_simple_queryable( face_hat_mut!(&mut dst_face) .local_qabls .insert(res.clone(), (id, info)); - let key_expr = Resource::decl_key(res, &mut dst_face); + let push_declaration = push_declaration_profile(tables, &dst_face); + let key_expr = Resource::decl_key(res, &mut dst_face, push_declaration); send_declare( &dst_face.primitives, RoutingContext::with_expr( @@ -515,7 +518,8 @@ fn send_forget_sourced_queryable_to_net_children( .map(|src_face| someface.id != src_face.id) .unwrap_or(true) { - let wire_expr = Resource::decl_key(res, &mut someface); + let push_declaration = push_declaration_profile(tables, &someface); + let wire_expr = Resource::decl_key(res, &mut someface, push_declaration); someface.primitives.send_declare(RoutingContext::with_expr( Declare { @@ -999,7 +1003,8 @@ pub(super) fn queries_linkstate_change( face_hat_mut!(&mut dst_face) .local_qabls .insert(res.clone(), (id, info)); - let key_expr = Resource::decl_key(res, &mut dst_face); + let push_declaration = push_declaration_profile(tables, &dst_face); + let key_expr = Resource::decl_key(res, &mut dst_face, push_declaration); send_declare( &dst_face.primitives, RoutingContext::with_expr( @@ -1161,7 +1166,8 @@ pub(crate) fn declare_qabl_interest( } else { 0 }; - let wire_expr = Resource::decl_key(res, face); + let wire_expr = + Resource::decl_key(res, face, push_declaration_profile(tables, face)); send_declare( &face.primitives, RoutingContext::with_expr( @@ -1206,7 +1212,8 @@ pub(crate) fn declare_qabl_interest( } else { 0 }; - let key_expr = Resource::decl_key(qabl, face); + let key_expr = + Resource::decl_key(qabl, face, push_declaration_profile(tables, face)); send_declare( &face.primitives, RoutingContext::with_expr( @@ -1244,7 +1251,8 @@ pub(crate) fn declare_qabl_interest( } else { 0 }; - let key_expr = Resource::decl_key(qabl, face); + let key_expr = + Resource::decl_key(qabl, face, push_declaration_profile(tables, face)); send_declare( &face.primitives, RoutingContext::with_expr( diff --git a/zenoh/src/net/routing/hat/router/token.rs b/zenoh/src/net/routing/hat/router/token.rs index 1be66cb70b..f94a4d12d4 100644 --- a/zenoh/src/net/routing/hat/router/token.rs +++ b/zenoh/src/net/routing/hat/router/token.rs @@ -27,8 +27,9 @@ use zenoh_protocol::{ use zenoh_sync::get_mut_unchecked; use super::{ - face_hat, face_hat_mut, get_peer, get_router, hat, hat_mut, network::Network, res_hat, - res_hat_mut, HatCode, HatContext, HatFace, HatTables, + face_hat, face_hat_mut, get_peer, get_router, hat, hat_mut, + interests::push_declaration_profile, network::Network, res_hat, res_hat_mut, HatCode, + HatContext, HatFace, HatTables, }; use crate::net::routing::{ dispatcher::{face::FaceState, tables::Tables}, @@ -54,7 +55,8 @@ fn send_sourced_token_to_net_clildren( .map(|src_face| someface.id != src_face.id) .unwrap_or(true) { - let key_expr = Resource::decl_key(res, &mut someface); + let push_declaration = push_declaration_profile(tables, &someface); + let key_expr = Resource::decl_key(res, &mut someface, push_declaration); someface.primitives.send_declare(RoutingContext::with_expr( Declare { @@ -115,7 +117,8 @@ fn propagate_simple_token_to( if !face_hat!(dst_face).local_tokens.contains_key(res) { let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); face_hat_mut!(dst_face).local_tokens.insert(res.clone(), id); - let key_expr = Resource::decl_key(res, dst_face); + let key_expr = + Resource::decl_key(res, dst_face, push_declaration_profile(tables, dst_face)); send_declare( &dst_face.primitives, RoutingContext::with_expr( @@ -358,7 +361,8 @@ fn send_forget_sourced_token_to_net_clildren( .map(|src_face| someface.id != src_face.id) .unwrap_or(true) { - let wire_expr = Resource::decl_key(res, &mut someface); + let push_declaration = push_declaration_profile(tables, &someface); + let wire_expr = Resource::decl_key(res, &mut someface, push_declaration); someface.primitives.send_declare(RoutingContext::with_expr( Declare { @@ -914,15 +918,18 @@ pub(super) fn token_linkstate_change( } } - for dst_face in tables.faces.values_mut() { + for mut dst_face in tables.faces.values().cloned() { if src_face.id != dst_face.id && HatTables::failover_brokering_to(links, dst_face.zid) { for res in face_hat!(src_face).remote_tokens.values() { if !face_hat!(dst_face).local_tokens.contains_key(res) { let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); - face_hat_mut!(dst_face).local_tokens.insert(res.clone(), id); - let key_expr = Resource::decl_key(res, dst_face); + face_hat_mut!(&mut dst_face) + .local_tokens + .insert(res.clone(), id); + let push_declaration = push_declaration_profile(tables, &dst_face); + let key_expr = Resource::decl_key(res, &mut dst_face, push_declaration); send_declare( &dst_face.primitives, RoutingContext::with_expr( @@ -977,7 +984,8 @@ pub(crate) fn declare_token_interest( } else { 0 }; - let wire_expr = Resource::decl_key(res, face); + let wire_expr = + Resource::decl_key(res, face, push_declaration_profile(tables, face)); send_declare( &face.primitives, RoutingContext::with_expr( @@ -1021,7 +1029,8 @@ pub(crate) fn declare_token_interest( } else { 0 }; - let wire_expr = Resource::decl_key(token, face); + let wire_expr = + Resource::decl_key(token, face, push_declaration_profile(tables, face)); send_declare( &face.primitives, RoutingContext::with_expr( @@ -1063,7 +1072,8 @@ pub(crate) fn declare_token_interest( } else { 0 }; - let wire_expr = Resource::decl_key(token, face); + let wire_expr = + Resource::decl_key(token, face, push_declaration_profile(tables, face)); send_declare( &face.primitives, RoutingContext::with_expr(