From 7387e28cd6f53066361eadd33af011f1b289a5df Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Mon, 9 Sep 2024 13:43:35 +0200 Subject: [PATCH] Fix bugs querying liveliness tokens (#1374) * Fix bug in liveliness get in client * Fix bug treating token interests replies from routers in peers * Peers propagate current token interests to remote peers with unfinalize initial declarations push * Don't register current interests declaration replies * Add comments * Add comments * Add comments --- zenoh/src/net/routing/hat/client/token.rs | 9 +++--- .../src/net/routing/hat/p2p_peer/interests.rs | 15 ++++----- zenoh/src/net/routing/hat/p2p_peer/mod.rs | 17 ++++++++-- zenoh/src/net/routing/hat/p2p_peer/pubsub.rs | 10 +++--- zenoh/src/net/routing/hat/p2p_peer/queries.rs | 8 ++--- zenoh/src/net/routing/hat/p2p_peer/token.rs | 31 +++++++++++++++---- 6 files changed, 58 insertions(+), 32 deletions(-) diff --git a/zenoh/src/net/routing/hat/client/token.rs b/zenoh/src/net/routing/hat/client/token.rs index 9e5923425c..5014ee5931 100644 --- a/zenoh/src/net/routing/hat/client/token.rs +++ b/zenoh/src/net/routing/hat/client/token.rs @@ -116,13 +116,9 @@ fn declare_simple_token( interest_id: Option, send_declare: &mut SendDeclare, ) { - register_simple_token(tables, face, id, res); - - propagate_simple_token(tables, res, face, send_declare); - - let wire_expr = Resource::decl_key(res, face, true); if let Some(interest_id) = interest_id { if let Some((interest, _)) = face.pending_current_interests.get(&interest_id) { + let wire_expr = Resource::get_best_key(res, "", interest.src_face.id); send_declare( &interest.src_face.primitives, RoutingContext::with_expr( @@ -137,6 +133,9 @@ fn declare_simple_token( ), ) } + } else { + register_simple_token(tables, face, id, res); + propagate_simple_token(tables, res, face, send_declare); } } diff --git a/zenoh/src/net/routing/hat/p2p_peer/interests.rs b/zenoh/src/net/routing/hat/p2p_peer/interests.rs index 2ed9e22840..3fa2bbe193 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/interests.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/interests.rs @@ -24,8 +24,8 @@ use zenoh_protocol::{ use zenoh_sync::get_mut_unchecked; use super::{ - face_hat, face_hat_mut, pubsub::declare_sub_interest, queries::declare_qabl_interest, - token::declare_token_interest, HatCode, HatFace, + face_hat, face_hat_mut, initial_interest, pubsub::declare_sub_interest, + queries::declare_qabl_interest, token::declare_token_interest, HatCode, HatFace, }; use crate::net::routing::{ dispatcher::{ @@ -132,11 +132,12 @@ impl HatInterestTrait for HatCode { src_interest_id: id, }); - for dst_face in tables - .faces - .values_mut() - .filter(|f| f.whatami == WhatAmI::Router) - { + for dst_face in tables.faces.values_mut().filter(|f| { + f.whatami == WhatAmI::Router + || (options.tokens() + && f.whatami == WhatAmI::Peer + && !initial_interest(f).map(|i| i.finalized).unwrap_or(true)) + }) { let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); get_mut_unchecked(dst_face).local_interests.insert( id, diff --git a/zenoh/src/net/routing/hat/p2p_peer/mod.rs b/zenoh/src/net/routing/hat/p2p_peer/mod.rs index 21737326e4..e68c2232fc 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/mod.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/mod.rs @@ -172,7 +172,7 @@ impl HatBaseTrait for HatCode { } if face.state.whatami == WhatAmI::Peer { get_mut_unchecked(&mut face.state).local_interests.insert( - 0, + INITIAL_INTEREST_ID, InterestState { options: InterestOptions::ALL, res: None, @@ -418,7 +418,7 @@ struct HatFace { impl HatFace { fn new() -> Self { Self { - next_id: AtomicU32::new(0), + next_id: AtomicU32::new(1), // In p2p, id 0 is erserved for initial interest remote_interests: HashMap::new(), local_subs: HashMap::new(), remote_subs: HashMap::new(), @@ -440,3 +440,16 @@ fn get_routes_entries() -> RoutesIndexes { clients: vec![0], } } + +// In p2p, at connection, while no interest is sent on the network, +// peers act as if they received an interest CurrentFuture with id 0 +// and send back a DeclareFinal with interest_id 0. +// This 'ghost' interest is registered locally to allow tracking if +// the DeclareFinal has been received or not (finalized). + +const INITIAL_INTEREST_ID: u32 = 0; + +#[inline] +fn initial_interest(face: &FaceState) -> Option<&InterestState> { + face.local_interests.get(&INITIAL_INTEREST_ID) +} diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs index 0dccf9ba3c..31336bc516 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs @@ -38,7 +38,9 @@ use crate::{ resource::{NodeId, Resource, SessionContext}, tables::{Route, RoutingExpr, Tables}, }, - hat::{CurrentFutureTrait, HatPubSubTrait, SendDeclare, Sources}, + hat::{ + p2p_peer::initial_interest, CurrentFutureTrait, HatPubSubTrait, SendDeclare, Sources, + }, router::{update_data_routes_from, RoutesIndexes}, RoutingContext, }, @@ -654,11 +656,7 @@ impl HatPubSubTrait for HatCode { for face in tables.faces.values().filter(|f| { f.whatami == WhatAmI::Peer - && !f - .local_interests - .get(&0) - .map(|i| i.finalized) - .unwrap_or(true) + && !initial_interest(f).map(|i| i.finalized).unwrap_or(true) }) { route.entry(face.id).or_insert_with(|| { let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index debbc717b3..4a46ec6e85 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -42,7 +42,7 @@ use crate::net::routing::{ resource::{NodeId, Resource, SessionContext}, tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables}, }, - hat::{CurrentFutureTrait, HatQueriesTrait, SendDeclare, Sources}, + hat::{p2p_peer::initial_interest, CurrentFutureTrait, HatQueriesTrait, SendDeclare, Sources}, router::{update_query_routes_from, RoutesIndexes}, RoutingContext, }; @@ -602,11 +602,7 @@ impl HatQueriesTrait for HatCode { for face in tables.faces.values().filter(|f| { f.whatami == WhatAmI::Peer - && !f - .local_interests - .get(&0) - .map(|i| i.finalized) - .unwrap_or(true) + && !initial_interest(f).map(|i| i.finalized).unwrap_or(true) }) { let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id); route.push(QueryTargetQabl { diff --git a/zenoh/src/net/routing/hat/p2p_peer/token.rs b/zenoh/src/net/routing/hat/p2p_peer/token.rs index 866737f0df..a06b06f7e2 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/token.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/token.rs @@ -152,11 +152,30 @@ fn declare_simple_token( face: &mut Arc, id: TokenId, res: &mut Arc, + interest_id: Option, send_declare: &mut SendDeclare, ) { - register_simple_token(tables, face, id, res); - - propagate_simple_token(tables, res, face, send_declare); + if let Some(interest_id) = interest_id { + if let Some((interest, _)) = face.pending_current_interests.get(&interest_id) { + let wire_expr = Resource::get_best_key(res, "", interest.src_face.id); + send_declare( + &interest.src_face.primitives, + RoutingContext::with_expr( + Declare { + interest_id: Some(interest.src_interest_id), + ext_qos: ext::QoSType::default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::DeclareToken(DeclareToken { id, wire_expr }), + }, + res.expr(), + ), + ) + } + } else { + register_simple_token(tables, face, id, res); + propagate_simple_token(tables, res, face, send_declare); + } } #[inline] @@ -411,7 +430,7 @@ pub(crate) fn declare_token_interest( aggregate: bool, send_declare: &mut SendDeclare, ) { - if mode.current() && face.whatami == WhatAmI::Client { + if mode.current() { let interest_id = (!mode.future()).then_some(id); if let Some(res) = res.as_ref() { if aggregate { @@ -525,10 +544,10 @@ impl HatTokenTrait for HatCode { id: TokenId, res: &mut Arc, _node_id: NodeId, - _interest_id: Option, + interest_id: Option, send_declare: &mut SendDeclare, ) { - declare_simple_token(tables, face, id, res, send_declare) + declare_simple_token(tables, face, id, res, interest_id, send_declare) } fn undeclare_token(