From 43e75cc94e10b91bfe08f9f45fa80a4473d2af4c Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 6 Sep 2024 12:10:06 +0200 Subject: [PATCH 1/7] Fix bug in liveliness get in client --- zenoh/src/net/routing/hat/client/token.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zenoh/src/net/routing/hat/client/token.rs b/zenoh/src/net/routing/hat/client/token.rs index 9e5923425c..b6cde878b2 100644 --- a/zenoh/src/net/routing/hat/client/token.rs +++ b/zenoh/src/net/routing/hat/client/token.rs @@ -120,9 +120,9 @@ fn declare_simple_token( propagate_simple_token(tables, res, face, send_declare); - let wire_expr = Resource::decl_key(res, face, true); if let Some(interest_id) = interest_id { if let Some((interest, _)) = face.pending_current_interests.get(&interest_id) { + let wire_expr = Resource::get_best_key(res, "", interest.src_face.id); send_declare( &interest.src_face.primitives, RoutingContext::with_expr( From 3086eab636781ab800523f5e30bf760406251a8b Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 6 Sep 2024 15:10:38 +0200 Subject: [PATCH 2/7] Fix bug treating token interests replies from routers in peers --- zenoh/src/net/routing/hat/p2p_peer/token.rs | 24 +++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/zenoh/src/net/routing/hat/p2p_peer/token.rs b/zenoh/src/net/routing/hat/p2p_peer/token.rs index 866737f0df..f97265806b 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/token.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/token.rs @@ -152,11 +152,31 @@ fn declare_simple_token( face: &mut Arc, id: TokenId, res: &mut Arc, + interest_id: Option, send_declare: &mut SendDeclare, ) { register_simple_token(tables, face, id, res); propagate_simple_token(tables, res, face, send_declare); + + if let Some(interest_id) = interest_id { + if let Some((interest, _)) = face.pending_current_interests.get(&interest_id) { + let wire_expr = Resource::get_best_key(res, "", interest.src_face.id); + send_declare( + &interest.src_face.primitives, + RoutingContext::with_expr( + Declare { + interest_id: Some(interest.src_interest_id), + ext_qos: ext::QoSType::default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::DeclareToken(DeclareToken { id, wire_expr }), + }, + res.expr(), + ), + ) + } + } } #[inline] @@ -525,10 +545,10 @@ impl HatTokenTrait for HatCode { id: TokenId, res: &mut Arc, _node_id: NodeId, - _interest_id: Option, + interest_id: Option, send_declare: &mut SendDeclare, ) { - declare_simple_token(tables, face, id, res, send_declare) + declare_simple_token(tables, face, id, res, interest_id, send_declare) } fn undeclare_token( From 35667cf7f1124efa649038653dc9ecb18bd7d2d1 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 6 Sep 2024 16:15:26 +0200 Subject: [PATCH 3/7] Peers propagate current token interests to remote peers with unfinalize initial declarations push --- zenoh/src/net/routing/hat/p2p_peer/interests.rs | 15 ++++++++++----- zenoh/src/net/routing/hat/p2p_peer/mod.rs | 2 +- zenoh/src/net/routing/hat/p2p_peer/token.rs | 2 +- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/zenoh/src/net/routing/hat/p2p_peer/interests.rs b/zenoh/src/net/routing/hat/p2p_peer/interests.rs index 2ed9e22840..9dc97ac6dd 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/interests.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/interests.rs @@ -132,11 +132,16 @@ impl HatInterestTrait for HatCode { src_interest_id: id, }); - for dst_face in tables - .faces - .values_mut() - .filter(|f| f.whatami == WhatAmI::Router) - { + for dst_face in tables.faces.values_mut().filter(|f| { + f.whatami == WhatAmI::Router + || (options.tokens() + && f.whatami == WhatAmI::Peer + && !f + .local_interests + .get(&0) + .map(|i| i.finalized) + .unwrap_or(true)) + }) { let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); get_mut_unchecked(dst_face).local_interests.insert( id, diff --git a/zenoh/src/net/routing/hat/p2p_peer/mod.rs b/zenoh/src/net/routing/hat/p2p_peer/mod.rs index 21737326e4..66a531c6b0 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/mod.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/mod.rs @@ -418,7 +418,7 @@ struct HatFace { impl HatFace { fn new() -> Self { Self { - next_id: AtomicU32::new(0), + next_id: AtomicU32::new(1), remote_interests: HashMap::new(), local_subs: HashMap::new(), remote_subs: HashMap::new(), diff --git a/zenoh/src/net/routing/hat/p2p_peer/token.rs b/zenoh/src/net/routing/hat/p2p_peer/token.rs index f97265806b..1b84492848 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/token.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/token.rs @@ -431,7 +431,7 @@ pub(crate) fn declare_token_interest( aggregate: bool, send_declare: &mut SendDeclare, ) { - if mode.current() && face.whatami == WhatAmI::Client { + if mode.current() { let interest_id = (!mode.future()).then_some(id); if let Some(res) = res.as_ref() { if aggregate { From 98d4b8a7b623922cdae78d64c01dedd4d6ac1537 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Fri, 6 Sep 2024 17:51:23 +0200 Subject: [PATCH 4/7] Don't register current interests declaration replies --- zenoh/src/net/routing/hat/client/token.rs | 7 +++---- zenoh/src/net/routing/hat/p2p_peer/token.rs | 7 +++---- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/zenoh/src/net/routing/hat/client/token.rs b/zenoh/src/net/routing/hat/client/token.rs index b6cde878b2..5014ee5931 100644 --- a/zenoh/src/net/routing/hat/client/token.rs +++ b/zenoh/src/net/routing/hat/client/token.rs @@ -116,10 +116,6 @@ fn declare_simple_token( interest_id: Option, send_declare: &mut SendDeclare, ) { - register_simple_token(tables, face, id, res); - - propagate_simple_token(tables, res, face, send_declare); - if let Some(interest_id) = interest_id { if let Some((interest, _)) = face.pending_current_interests.get(&interest_id) { let wire_expr = Resource::get_best_key(res, "", interest.src_face.id); @@ -137,6 +133,9 @@ fn declare_simple_token( ), ) } + } else { + register_simple_token(tables, face, id, res); + propagate_simple_token(tables, res, face, send_declare); } } diff --git a/zenoh/src/net/routing/hat/p2p_peer/token.rs b/zenoh/src/net/routing/hat/p2p_peer/token.rs index 1b84492848..a06b06f7e2 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/token.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/token.rs @@ -155,10 +155,6 @@ fn declare_simple_token( interest_id: Option, send_declare: &mut SendDeclare, ) { - register_simple_token(tables, face, id, res); - - propagate_simple_token(tables, res, face, send_declare); - if let Some(interest_id) = interest_id { if let Some((interest, _)) = face.pending_current_interests.get(&interest_id) { let wire_expr = Resource::get_best_key(res, "", interest.src_face.id); @@ -176,6 +172,9 @@ fn declare_simple_token( ), ) } + } else { + register_simple_token(tables, face, id, res); + propagate_simple_token(tables, res, face, send_declare); } } From 70a7ae7c4188e301e793698a031a066b24d9beda Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Mon, 9 Sep 2024 10:50:49 +0200 Subject: [PATCH 5/7] Add comments --- zenoh/src/net/routing/hat/p2p_peer/mod.rs | 17 +++++++++++++++-- zenoh/src/net/routing/hat/p2p_peer/pubsub.rs | 10 ++++------ zenoh/src/net/routing/hat/p2p_peer/queries.rs | 8 ++------ 3 files changed, 21 insertions(+), 14 deletions(-) diff --git a/zenoh/src/net/routing/hat/p2p_peer/mod.rs b/zenoh/src/net/routing/hat/p2p_peer/mod.rs index 66a531c6b0..c2d4db4673 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/mod.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/mod.rs @@ -172,7 +172,7 @@ impl HatBaseTrait for HatCode { } if face.state.whatami == WhatAmI::Peer { get_mut_unchecked(&mut face.state).local_interests.insert( - 0, + INITIAL_INTEREST_ID, InterestState { options: InterestOptions::ALL, res: None, @@ -418,7 +418,7 @@ struct HatFace { impl HatFace { fn new() -> Self { Self { - next_id: AtomicU32::new(1), + next_id: AtomicU32::new(1), // In p2p, id 0 is erserved for initial interest remote_interests: HashMap::new(), local_subs: HashMap::new(), remote_subs: HashMap::new(), @@ -440,3 +440,16 @@ fn get_routes_entries() -> RoutesIndexes { clients: vec![0], } } + +// In p2p, at connection, while no interest is sent on the network, +// peers act as if they received an interest CurrentFuture with id 0 +// and send back a ReplyFinal with interest_id 0. +// This 'ghost' interest is registered locally to allow tracking if +// the reply final has been received or not (finalized). + +const INITIAL_INTEREST_ID: u32 = 0; + +#[inline] +fn initial_interest(face: &FaceState) -> Option<&InterestState> { + face.local_interests.get(&INITIAL_INTEREST_ID) +} diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs index 0dccf9ba3c..31336bc516 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs @@ -38,7 +38,9 @@ use crate::{ resource::{NodeId, Resource, SessionContext}, tables::{Route, RoutingExpr, Tables}, }, - hat::{CurrentFutureTrait, HatPubSubTrait, SendDeclare, Sources}, + hat::{ + p2p_peer::initial_interest, CurrentFutureTrait, HatPubSubTrait, SendDeclare, Sources, + }, router::{update_data_routes_from, RoutesIndexes}, RoutingContext, }, @@ -654,11 +656,7 @@ impl HatPubSubTrait for HatCode { for face in tables.faces.values().filter(|f| { f.whatami == WhatAmI::Peer - && !f - .local_interests - .get(&0) - .map(|i| i.finalized) - .unwrap_or(true) + && !initial_interest(f).map(|i| i.finalized).unwrap_or(true) }) { route.entry(face.id).or_insert_with(|| { let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index 2fd6d6fa81..44658e85c1 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -43,7 +43,7 @@ use crate::net::routing::{ resource::{NodeId, Resource, SessionContext}, tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables}, }, - hat::{CurrentFutureTrait, HatQueriesTrait, SendDeclare, Sources}, + hat::{p2p_peer::initial_interest, CurrentFutureTrait, HatQueriesTrait, SendDeclare, Sources}, router::{update_query_routes_from, RoutesIndexes}, RoutingContext, }; @@ -604,11 +604,7 @@ impl HatQueriesTrait for HatCode { for face in tables.faces.values().filter(|f| { f.whatami == WhatAmI::Peer - && !f - .local_interests - .get(&0) - .map(|i| i.finalized) - .unwrap_or(true) + && !initial_interest(f).map(|i| i.finalized).unwrap_or(true) }) { let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); route.push(QueryTargetQabl { From a6c4664067a0a0b081e8e849d3a53e4bbd75436a Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Mon, 9 Sep 2024 11:10:36 +0200 Subject: [PATCH 6/7] Add comments --- zenoh/src/net/routing/hat/p2p_peer/interests.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/zenoh/src/net/routing/hat/p2p_peer/interests.rs b/zenoh/src/net/routing/hat/p2p_peer/interests.rs index 9dc97ac6dd..3fa2bbe193 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/interests.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/interests.rs @@ -24,8 +24,8 @@ use zenoh_protocol::{ use zenoh_sync::get_mut_unchecked; use super::{ - face_hat, face_hat_mut, pubsub::declare_sub_interest, queries::declare_qabl_interest, - token::declare_token_interest, HatCode, HatFace, + face_hat, face_hat_mut, initial_interest, pubsub::declare_sub_interest, + queries::declare_qabl_interest, token::declare_token_interest, HatCode, HatFace, }; use crate::net::routing::{ dispatcher::{ @@ -136,11 +136,7 @@ impl HatInterestTrait for HatCode { f.whatami == WhatAmI::Router || (options.tokens() && f.whatami == WhatAmI::Peer - && !f - .local_interests - .get(&0) - .map(|i| i.finalized) - .unwrap_or(true)) + && !initial_interest(f).map(|i| i.finalized).unwrap_or(true)) }) { let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); get_mut_unchecked(dst_face).local_interests.insert( From 436278dfc6e1278c47c03aead7623de6417c68c2 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Mon, 9 Sep 2024 11:13:41 +0200 Subject: [PATCH 7/7] Add comments --- zenoh/src/net/routing/hat/p2p_peer/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zenoh/src/net/routing/hat/p2p_peer/mod.rs b/zenoh/src/net/routing/hat/p2p_peer/mod.rs index c2d4db4673..e68c2232fc 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/mod.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/mod.rs @@ -443,9 +443,9 @@ fn get_routes_entries() -> RoutesIndexes { // In p2p, at connection, while no interest is sent on the network, // peers act as if they received an interest CurrentFuture with id 0 -// and send back a ReplyFinal with interest_id 0. +// and send back a DeclareFinal with interest_id 0. // This 'ghost' interest is registered locally to allow tracking if -// the reply final has been received or not (finalized). +// the DeclareFinal has been received or not (finalized). const INITIAL_INTEREST_ID: u32 = 0;