From ad7d647f9a2940ac1ed3624d56ed2f22774e83b9 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Tue, 17 Sep 2024 16:45:39 +0200 Subject: [PATCH 01/12] Peers only porpagate new tokens --- zenoh/src/net/routing/hat/p2p_peer/token.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/zenoh/src/net/routing/hat/p2p_peer/token.rs b/zenoh/src/net/routing/hat/p2p_peer/token.rs index 3b81a3ce57..06f3a15c99 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/token.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/token.rs @@ -31,6 +31,22 @@ use crate::net::routing::{ RoutingContext, }; +fn new_token( + tables: &Tables, + res: &Arc, + src_face: &Arc, + dst_face: &mut Arc, +) -> bool { + // Is there any face that + !res.session_ctxs.values().any(|ctx| { + ctx.token // declared the token + && (ctx.face.id != src_face.id) // is not the face that just registered it + && (ctx.face.id != dst_face.id || dst_face.zid == tables.zid) // is not the face we are propagating to (except for local) + && (ctx.face.whatami == WhatAmI::Client || dst_face.whatami == WhatAmI::Client) + // don't forward from/to router/peers + }) +} + #[inline] fn propagate_simple_token_to( tables: &mut Tables, @@ -42,6 +58,7 @@ fn propagate_simple_token_to( if (src_face.id != dst_face.id || dst_face.zid == tables.zid) && !face_hat!(dst_face).local_tokens.contains_key(res) && (src_face.whatami == WhatAmI::Client || dst_face.whatami == WhatAmI::Client) + && new_token(tables, res, src_face, dst_face) { if dst_face.whatami != WhatAmI::Client { let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); From 8494af82e8e084ba785d6a429c5f9d771253e1da Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Tue, 17 Sep 2024 17:39:32 +0200 Subject: [PATCH 02/12] Routers check failover brokering before propagating one shot tooken undeclares to peers --- zenoh/src/net/routing/hat/router/token.rs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/zenoh/src/net/routing/hat/router/token.rs b/zenoh/src/net/routing/hat/router/token.rs index a95c6fc48a..e84814ed7e 100644 --- a/zenoh/src/net/routing/hat/router/token.rs +++ b/zenoh/src/net/routing/hat/router/token.rs @@ -415,11 +415,14 @@ fn propagate_forget_simple_token( // undeclaration, otherwise the undeclaration would be duplicated at the source Face. In // cases where we don't have access to a Face as we didnt't receive an undeclaration and we // default to true. - } else if src_face.map_or(true, |src_face| src_face.id != face.id) - && face_hat!(face).remote_interests.values().any(|(r, o)| { - o.tokens() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true) && !o.aggregate() - }) - { + } else if src_face.map_or(true, |src_face| { + src_face.id != face.id + && (src_face.whatami != WhatAmI::Peer + || face.whatami != WhatAmI::Peer + || hat!(tables).failover_brokering(src_face.zid, face.zid)) + }) && face_hat!(face).remote_interests.values().any(|(r, o)| { + o.tokens() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true) && !o.aggregate() + }) { // Token has never been declared on this face. // Send an Undeclare with a one shot generated id and a WireExpr ext. send_declare( @@ -476,6 +479,10 @@ fn propagate_forget_simple_token( o.tokens() && r.as_ref().map(|r| r.matches(&res)).unwrap_or(true) && !o.aggregate() + }) && src_face.map_or(true, |src_face| { + src_face.whatami != WhatAmI::Peer + || face.whatami != WhatAmI::Peer + || hat!(tables).failover_brokering(src_face.zid, face.zid) }) { // Token has never been declared on this face. // Send an Undeclare with a one shot generated id and a WireExpr ext. From 00e6cc361191da9684ab071097e114a81b6ef3eb Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 18 Sep 2024 09:17:08 +0200 Subject: [PATCH 03/12] Only propagate Current interests to non finalized peers --- zenoh/src/net/routing/hat/p2p_peer/interests.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/zenoh/src/net/routing/hat/p2p_peer/interests.rs b/zenoh/src/net/routing/hat/p2p_peer/interests.rs index 3fa2bbe193..00ce44cf6d 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/interests.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/interests.rs @@ -135,6 +135,7 @@ impl HatInterestTrait for HatCode { for dst_face in tables.faces.values_mut().filter(|f| { f.whatami == WhatAmI::Router || (options.tokens() + && mode == InterestMode::Current && f.whatami == WhatAmI::Peer && !initial_interest(f).map(|i| i.finalized).unwrap_or(true)) }) { From dd8f9afe063e2d8b3b66d1c6d31a0c39647a7449 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 18 Sep 2024 09:20:44 +0200 Subject: [PATCH 04/12] Peers propagate Current interests as CurrentFuture to routers --- zenoh/src/net/routing/hat/p2p_peer/interests.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/zenoh/src/net/routing/hat/p2p_peer/interests.rs b/zenoh/src/net/routing/hat/p2p_peer/interests.rs index 00ce44cf6d..37337d2594 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/interests.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/interests.rs @@ -132,6 +132,11 @@ impl HatInterestTrait for HatCode { src_interest_id: id, }); + let propagated_mode = if mode.future() { + InterestMode::CurrentFuture + } else { + mode + }; for dst_face in tables.faces.values_mut().filter(|f| { f.whatami == WhatAmI::Router || (options.tokens() @@ -145,7 +150,7 @@ impl HatInterestTrait for HatCode { InterestState { options, res: res.as_ref().map(|res| (*res).clone()), - finalized: mode == InterestMode::Future, + finalized: propagated_mode == InterestMode::Future, }, ); if mode.current() { @@ -162,7 +167,7 @@ impl HatInterestTrait for HatCode { dst_face.primitives.send_interest(RoutingContext::with_expr( Interest { id, - mode, + mode: propagated_mode, options, wire_expr, ext_qos: ext::QoSType::DECLARE, From 04c0f175230826de3d56369fdcae86e124357e17 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 18 Sep 2024 16:07:24 +0200 Subject: [PATCH 05/12] Historical replies to CurrentFuture token interests are sent with an interest_id --- .../zenoh-protocol/src/network/interest.rs | 6 +- zenoh/src/api/session.rs | 93 ++++++++++--------- zenoh/src/net/routing/dispatcher/interests.rs | 1 + zenoh/src/net/routing/hat/client/interests.rs | 1 + zenoh/src/net/routing/hat/client/token.rs | 36 +++---- .../src/net/routing/hat/p2p_peer/interests.rs | 7 +- zenoh/src/net/routing/hat/p2p_peer/mod.rs | 4 +- zenoh/src/net/routing/hat/p2p_peer/pubsub.rs | 6 +- zenoh/src/net/routing/hat/p2p_peer/queries.rs | 4 +- zenoh/src/net/routing/hat/p2p_peer/token.rs | 65 ++++++++----- zenoh/src/net/routing/hat/router/token.rs | 2 +- 11 files changed, 124 insertions(+), 101 deletions(-) diff --git a/commons/zenoh-protocol/src/network/interest.rs b/commons/zenoh-protocol/src/network/interest.rs index 9f329b6ff5..86ff886233 100644 --- a/commons/zenoh-protocol/src/network/interest.rs +++ b/commons/zenoh-protocol/src/network/interest.rs @@ -59,11 +59,11 @@ pub mod flag { /// |------------------>| -- This is a DeclareInterest e.g. for subscriber declarations/undeclarations. /// | | /// | DECL SUBSCRIBER | -/// |<------------------| -- With interest_id field not set +/// |<------------------| -- With interest_id field not set (except for tokens) /// | DECL SUBSCRIBER | -/// |<------------------| -- With interest_id field not set +/// |<------------------| -- With interest_id field not set (except for tokens) /// | DECL SUBSCRIBER | -/// |<------------------| -- With interest_id field not set +/// |<------------------| -- With interest_id field not set (except for tokens) /// | | /// | DECL FINAL | /// |<------------------| -- With interest_id field set diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 2eb7295661..cef275a690 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -2347,60 +2347,61 @@ impl Primitives for WeakSession { zenoh_protocol::network::DeclareBody::UndeclareQueryable(m) => { trace!("recv UndeclareQueryable {:?}", m.id); } + #[cfg(not(feature = "unstable"))] + zenoh_protocol::network::DeclareBody::DeclareToken(_) => {} + #[cfg(feature = "unstable")] zenoh_protocol::network::DeclareBody::DeclareToken(m) => { - trace!("recv DeclareToken {:?}", m.id); - #[cfg(feature = "unstable")] + let mut state = zwrite!(self.state); + match state + .wireexpr_to_keyexpr(&m.wire_expr, false) + .map(|e| e.into_owned()) { - let mut state = zwrite!(self.state); - match state - .wireexpr_to_keyexpr(&m.wire_expr, false) - .map(|e| e.into_owned()) - { - Ok(key_expr) => { - if let Some(interest_id) = msg.interest_id { - if let Some(query) = state.liveliness_queries.get(&interest_id) { - let reply = Reply { - result: Ok(Sample { - key_expr, - payload: ZBytes::empty(), - kind: SampleKind::Put, - encoding: Encoding::default(), - timestamp: None, - qos: QoS::default(), - #[cfg(feature = "unstable")] - reliability: Reliability::Reliable, - #[cfg(feature = "unstable")] - source_info: SourceInfo::empty(), - #[cfg(feature = "unstable")] - attachment: None, - }), + Ok(key_expr) => { + if let Some(interest_id) = msg.interest_id { + if let Some(query) = state.liveliness_queries.get(&interest_id) { + let reply = Reply { + result: Ok(Sample { + key_expr, + payload: ZBytes::empty(), + kind: SampleKind::Put, + encoding: Encoding::default(), + timestamp: None, + qos: QoS::default(), #[cfg(feature = "unstable")] - replier_id: None, - }; - - query.callback.call(reply); - } - } else if let Entry::Vacant(e) = state.remote_tokens.entry(m.id) { - e.insert(key_expr.clone()); - drop(state); - - self.execute_subscriber_callbacks( - false, - &m.wire_expr, - None, - ZBuf::default(), - SubscriberKind::LivelinessSubscriber, - #[cfg(feature = "unstable")] - Reliability::Reliable, + reliability: Reliability::Reliable, + #[cfg(feature = "unstable")] + source_info: SourceInfo::empty(), + #[cfg(feature = "unstable")] + attachment: None, + }), #[cfg(feature = "unstable")] - None, - ); + replier_id: None, + }; + + query.callback.call(reply); + return; } } - Err(err) => { - tracing::error!("Received DeclareToken for unknown wire_expr: {}", err) + if let Entry::Vacant(e) = state.remote_tokens.entry(m.id) { + e.insert(key_expr.clone()); + drop(state); + + self.execute_subscriber_callbacks( + false, + &m.wire_expr, + None, + ZBuf::default(), + SubscriberKind::LivelinessSubscriber, + #[cfg(feature = "unstable")] + Reliability::Reliable, + #[cfg(feature = "unstable")] + None, + ); } } + Err(err) => { + tracing::error!("Received DeclareToken for unknown wire_expr: {}", err) + } } } zenoh_protocol::network::DeclareBody::UndeclareToken(m) => { diff --git a/zenoh/src/net/routing/dispatcher/interests.rs b/zenoh/src/net/routing/dispatcher/interests.rs index d088b1f3f6..6180828f1c 100644 --- a/zenoh/src/net/routing/dispatcher/interests.rs +++ b/zenoh/src/net/routing/dispatcher/interests.rs @@ -46,6 +46,7 @@ static INTEREST_TIMEOUT_MS: u64 = 10000; pub(crate) struct CurrentInterest { pub(crate) src_face: Arc, pub(crate) src_interest_id: InterestId, + pub(crate) mode: InterestMode, } pub(crate) fn declare_final( diff --git a/zenoh/src/net/routing/hat/client/interests.rs b/zenoh/src/net/routing/hat/client/interests.rs index 9347b3f0e5..eab03fc99f 100644 --- a/zenoh/src/net/routing/hat/client/interests.rs +++ b/zenoh/src/net/routing/hat/client/interests.rs @@ -101,6 +101,7 @@ impl HatInterestTrait for HatCode { let interest = Arc::new(CurrentInterest { src_face: face.clone(), src_interest_id: id, + mode, }); for dst_face in tables diff --git a/zenoh/src/net/routing/hat/client/token.rs b/zenoh/src/net/routing/hat/client/token.rs index 18a202cd82..b567e747b3 100644 --- a/zenoh/src/net/routing/hat/client/token.rs +++ b/zenoh/src/net/routing/hat/client/token.rs @@ -118,25 +118,27 @@ fn declare_simple_token( ) { if let Some(interest_id) = interest_id { if let Some((interest, _)) = face.pending_current_interests.get(&interest_id) { - let wire_expr = Resource::get_best_key(res, "", interest.src_face.id); - send_declare( - &interest.src_face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: Some(interest.src_interest_id), - ext_qos: ext::QoSType::default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::DeclareToken(DeclareToken { id, wire_expr }), - }, - res.expr(), - ), - ) + if interest.mode == InterestMode::Current { + let wire_expr = Resource::get_best_key(res, "", interest.src_face.id); + send_declare( + &interest.src_face.primitives, + RoutingContext::with_expr( + Declare { + interest_id: Some(interest.src_interest_id), + ext_qos: ext::QoSType::default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::DeclareToken(DeclareToken { id, wire_expr }), + }, + res.expr(), + ), + ); + return; + } } - } else { - register_simple_token(tables, face, id, res); - propagate_simple_token(tables, res, face, send_declare); } + register_simple_token(tables, face, id, res); + propagate_simple_token(tables, res, face, send_declare); } #[inline] diff --git a/zenoh/src/net/routing/hat/p2p_peer/interests.rs b/zenoh/src/net/routing/hat/p2p_peer/interests.rs index 37337d2594..a8343baac6 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/interests.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/interests.rs @@ -47,7 +47,7 @@ pub(super) fn interests_new_face(tables: &mut Tables, face: &mut Arc) .collect::>>() { if face.whatami == WhatAmI::Router { - for (res, options) in face_hat_mut!(&mut src_face).remote_interests.values() { + for (res, _, options) in face_hat_mut!(&mut src_face).remote_interests.values() { let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); get_mut_unchecked(face).local_interests.insert( id, @@ -125,11 +125,12 @@ impl HatInterestTrait for HatCode { } face_hat_mut!(face) .remote_interests - .insert(id, (res.as_ref().map(|res| (*res).clone()), options)); + .insert(id, (res.as_ref().map(|res| (*res).clone()), mode, options)); let interest = Arc::new(CurrentInterest { src_face: face.clone(), src_interest_id: id, + mode, }); let propagated_mode = if mode.future() { @@ -220,7 +221,7 @@ impl HatInterestTrait for HatCode { .collect::>() { let local_interest = dst_face.local_interests.get(&id).unwrap(); - if local_interest.res == interest.0 && local_interest.options == interest.1 + if local_interest.res == interest.0 && local_interest.options == interest.2 { dst_face.primitives.send_interest(RoutingContext::with_expr( Interest { diff --git a/zenoh/src/net/routing/hat/p2p_peer/mod.rs b/zenoh/src/net/routing/hat/p2p_peer/mod.rs index e68c2232fc..329ff42453 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/mod.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/mod.rs @@ -33,7 +33,7 @@ use zenoh_protocol::{ queryable::ext::QueryableInfoType, QueryableId, SubscriberId, TokenId, }, - interest::{InterestId, InterestOptions}, + interest::{InterestId, InterestMode, InterestOptions}, oam::id::OAM_LINKSTATE, Declare, DeclareBody, DeclareFinal, Oam, }, @@ -406,7 +406,7 @@ impl HatContext { struct HatFace { next_id: AtomicU32, // @TODO: manage rollover and uniqueness - remote_interests: HashMap>, InterestOptions)>, + remote_interests: HashMap>, InterestMode, InterestOptions)>, local_subs: HashMap, SubscriberId>, remote_subs: HashMap>, local_tokens: HashMap, TokenId>, diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs index f78c1a6e1a..a837214469 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs @@ -84,13 +84,13 @@ fn propagate_simple_subscription_to( let matching_interests = face_hat!(dst_face) .remote_interests .values() - .filter(|(r, o)| { + .filter(|(r, _, o)| { o.subscribers() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true) }) .cloned() - .collect::>, InterestOptions)>>(); + .collect::>, InterestMode, InterestOptions)>>(); - for (int_res, options) in matching_interests { + for (int_res, _, options) in matching_interests { let res = if options.aggregate() { int_res.as_ref().unwrap_or(res) } else { diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index 34896a75fe..6872b9ebe7 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -97,7 +97,9 @@ fn propagate_simple_queryable_to( || face_hat!(dst_face) .remote_interests .values() - .any(|(r, o)| o.queryables() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true))) + .any(|(r, _, o)| { + o.queryables() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true) + })) && src_face .as_ref() .map(|src_face| { diff --git a/zenoh/src/net/routing/hat/p2p_peer/token.rs b/zenoh/src/net/routing/hat/p2p_peer/token.rs index 06f3a15c99..2b0323739c 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/token.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/token.rs @@ -53,6 +53,7 @@ fn propagate_simple_token_to( dst_face: &mut Arc, res: &Arc, src_face: &mut Arc, + interest_id: Option, send_declare: &mut SendDeclare, ) { if (src_face.id != dst_face.id || dst_face.zid == tables.zid) @@ -84,11 +85,15 @@ fn propagate_simple_token_to( let matching_interests = face_hat!(dst_face) .remote_interests .values() - .filter(|(r, o)| o.tokens() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true)) + .filter(|(r, m, o)| { + o.tokens() + && r.as_ref().map(|r| r.matches(res)).unwrap_or(true) + && (m.current() || interest_id.is_none()) + }) .cloned() - .collect::>, InterestOptions)>>(); + .collect::>, InterestMode, InterestOptions)>>(); - for (int_res, options) in matching_interests { + for (int_res, _, options) in matching_interests { let res = if options.aggregate() { int_res.as_ref().unwrap_or(res) } else { @@ -125,6 +130,7 @@ fn propagate_simple_token( tables: &mut Tables, res: &Arc, src_face: &mut Arc, + interest_id: Option, send_declare: &mut SendDeclare, ) { for mut dst_face in tables @@ -133,7 +139,14 @@ fn propagate_simple_token( .cloned() .collect::>>() { - propagate_simple_token_to(tables, &mut dst_face, res, src_face, send_declare); + propagate_simple_token_to( + tables, + &mut dst_face, + res, + src_face, + interest_id, + send_declare, + ); } } @@ -174,25 +187,27 @@ fn declare_simple_token( ) { if let Some(interest_id) = interest_id { if let Some((interest, _)) = face.pending_current_interests.get(&interest_id) { - let wire_expr = Resource::get_best_key(res, "", interest.src_face.id); - send_declare( - &interest.src_face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: Some(interest.src_interest_id), - ext_qos: ext::QoSType::default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::DeclareToken(DeclareToken { id, wire_expr }), - }, - res.expr(), - ), - ) + if interest.mode == InterestMode::Current { + let wire_expr = Resource::get_best_key(res, "", interest.src_face.id); + send_declare( + &interest.src_face.primitives, + RoutingContext::with_expr( + Declare { + interest_id: Some(interest.src_interest_id), + ext_qos: ext::QoSType::default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::DeclareToken(DeclareToken { id, wire_expr }), + }, + res.expr(), + ), + ); + return; + } } - } else { - register_simple_token(tables, face, id, res); - propagate_simple_token(tables, res, face, send_declare); } + register_simple_token(tables, face, id, res); + propagate_simple_token(tables, res, face, interest_id, send_declare); } #[inline] @@ -241,7 +256,7 @@ fn propagate_forget_simple_token( ), ); } else if src_face.id != face.id - && face_hat!(face).remote_interests.values().any(|(r, o)| { + && face_hat!(face).remote_interests.values().any(|(r, _, o)| { o.tokens() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true) && !o.aggregate() }) { @@ -293,7 +308,7 @@ fn propagate_forget_simple_token( res.expr(), ), ); - } else if face_hat!(face).remote_interests.values().any(|(r, o)| { + } else if face_hat!(face).remote_interests.values().any(|(r, _, o)| { o.tokens() && r.as_ref().map(|r| r.matches(&res)).unwrap_or(true) && !o.aggregate() @@ -432,7 +447,7 @@ pub(super) fn token_new_face( .collect::>>() { for token in face_hat!(src_face.clone()).remote_tokens.values() { - propagate_simple_token_to(tables, face, token, &mut src_face, send_declare); + propagate_simple_token_to(tables, face, token, &mut src_face, None, send_declare); } } } @@ -463,7 +478,7 @@ pub(crate) fn declare_token_interest( send_declare: &mut SendDeclare, ) { if mode.current() { - let interest_id = (!mode.future()).then_some(id); + let interest_id = Some(id); if let Some(res) = res.as_ref() { if aggregate { if tables.faces.values().any(|src_face| { diff --git a/zenoh/src/net/routing/hat/router/token.rs b/zenoh/src/net/routing/hat/router/token.rs index e84814ed7e..b506764bfe 100644 --- a/zenoh/src/net/routing/hat/router/token.rs +++ b/zenoh/src/net/routing/hat/router/token.rs @@ -989,7 +989,7 @@ pub(crate) fn declare_token_interest( && (face.whatami == WhatAmI::Client || (face.whatami == WhatAmI::Peer && !hat!(tables).full_net(WhatAmI::Peer))) { - let interest_id = (!mode.future()).then_some(id); + let interest_id = Some(id); if let Some(res) = res.as_ref() { if aggregate { if hat!(tables).router_tokens.iter().any(|token| { From 590186c2ddd894f473e440694b9d3825fc49dd28 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 18 Sep 2024 16:40:47 +0200 Subject: [PATCH 06/12] Peers send initial tokens with the INITIAL_INTEREST_ID --- zenoh/src/net/routing/hat/p2p_peer/token.rs | 22 +++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/zenoh/src/net/routing/hat/p2p_peer/token.rs b/zenoh/src/net/routing/hat/p2p_peer/token.rs index 2b0323739c..9d583d6ba7 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/token.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/token.rs @@ -23,7 +23,7 @@ use zenoh_protocol::network::{ }; use zenoh_sync::get_mut_unchecked; -use super::{face_hat, face_hat_mut, HatCode, HatFace}; +use super::{face_hat, face_hat_mut, HatCode, HatFace, INITIAL_INTEREST_ID}; use crate::net::routing::{ dispatcher::{face::FaceState, tables::Tables}, hat::{CurrentFutureTrait, HatTokenTrait, SendDeclare}, @@ -53,7 +53,8 @@ fn propagate_simple_token_to( dst_face: &mut Arc, res: &Arc, src_face: &mut Arc, - interest_id: Option, + src_interest_id: Option, + dst_interest_id: Option, send_declare: &mut SendDeclare, ) { if (src_face.id != dst_face.id || dst_face.zid == tables.zid) @@ -69,7 +70,7 @@ fn propagate_simple_token_to( &dst_face.primitives, RoutingContext::with_expr( Declare { - interest_id: None, + interest_id: dst_interest_id, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -88,7 +89,7 @@ fn propagate_simple_token_to( .filter(|(r, m, o)| { o.tokens() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true) - && (m.current() || interest_id.is_none()) + && (m.current() || src_interest_id.is_none()) }) .cloned() .collect::>, InterestMode, InterestOptions)>>(); @@ -108,7 +109,7 @@ fn propagate_simple_token_to( &dst_face.primitives, RoutingContext::with_expr( Declare { - interest_id: None, + interest_id: dst_interest_id, ext_qos: ext::QoSType::DECLARE, ext_tstamp: None, ext_nodeid: ext::NodeIdType::DEFAULT, @@ -145,6 +146,7 @@ fn propagate_simple_token( res, src_face, interest_id, + None, send_declare, ); } @@ -447,7 +449,15 @@ pub(super) fn token_new_face( .collect::>>() { for token in face_hat!(src_face.clone()).remote_tokens.values() { - propagate_simple_token_to(tables, face, token, &mut src_face, None, send_declare); + propagate_simple_token_to( + tables, + face, + token, + &mut src_face, + None, + Some(INITIAL_INTEREST_ID), + send_declare, + ); } } } From 30adc854697b70b575f3ede37a385c35cdaec0df Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 18 Sep 2024 17:53:23 +0200 Subject: [PATCH 07/12] Add liveliness tests --- zenoh/tests/liveliness.rs | 385 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 385 insertions(+) diff --git a/zenoh/tests/liveliness.rs b/zenoh/tests/liveliness.rs index df19865ce7..d64a229f83 100644 --- a/zenoh/tests/liveliness.rs +++ b/zenoh/tests/liveliness.rs @@ -4105,3 +4105,388 @@ async fn test_liveliness_subget_router_history_middle() { router_subget.close().await.unwrap(); router.close().await.unwrap(); } + +#[cfg(feature = "unstable")] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_liveliness_regression_1() { + use std::time::Duration; + + use zenoh::{config, sample::SampleKind}; + use zenoh_config::WhatAmI; + use zenoh_link::EndPoint; + + const TIMEOUT: Duration = Duration::from_secs(60); + const SLEEP: Duration = Duration::from_secs(1); + const ROUTER_ENDPOINT: &str = "tcp/localhost:47503"; + const PEER_TOK_ENDPOINT: &str = "tcp/localhost:47504"; + const LIVELINESS_KEYEXPR: &str = "test/liveliness/regression/1"; + + zenoh_util::init_log_from_env_or("error"); + + let router = { + let mut c = config::default(); + c.listen + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Router)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Router ZID: {}", s.zid()); + s + }; + + let peer_tok = { + let mut c = config::default(); + c.listen + .endpoints + .set(vec![PEER_TOK_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.connect + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Peer)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Peer (token) ZID: {}", s.zid()); + s + }; + + let token = ztimeout!(peer_tok.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); + tokio::time::sleep(SLEEP).await; + + let peer_sub = { + let mut c = config::default(); + c.connect + .endpoints + .set(vec![ + ROUTER_ENDPOINT.parse::().unwrap(), + PEER_TOK_ENDPOINT.parse::().unwrap(), + ]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Peer)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Peer (sub) ZID: {}", s.zid()); + s + }; + + let sub = ztimeout!(peer_sub.liveliness().declare_subscriber(LIVELINESS_KEYEXPR)).unwrap(); + tokio::time::sleep(SLEEP).await; + + assert!(sub.try_recv().is_err()); + + token.undeclare().await.unwrap(); + tokio::time::sleep(SLEEP).await; + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert!(sample.kind() == SampleKind::Delete); + assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); + assert!(sub.try_recv().is_err()); + + peer_tok.close().await.unwrap(); + peer_sub.close().await.unwrap(); + router.close().await.unwrap(); +} + +#[cfg(feature = "unstable")] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_liveliness_regression_2() { + use std::time::Duration; + + use zenoh::{config, sample::SampleKind}; + use zenoh_config::WhatAmI; + use zenoh_link::EndPoint; + + const TIMEOUT: Duration = Duration::from_secs(60); + const SLEEP: Duration = Duration::from_secs(1); + const PEER_TOK1_ENDPOINT: &str = "tcp/localhost:47505"; + const PEER_SUB_ENDPOINT: &str = "tcp/localhost:47506"; + const LIVELINESS_KEYEXPR: &str = "test/liveliness/regression/2"; + + zenoh_util::init_log_from_env_or("error"); + + let peer_tok1 = { + let mut c = config::default(); + c.listen + .endpoints + .set(vec![PEER_TOK1_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Peer)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Peer (token 1) ZID: {}", s.zid()); + s + }; + + let token1 = ztimeout!(peer_tok1.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); + tokio::time::sleep(SLEEP).await; + + let peer_sub = { + let mut c = config::default(); + c.listen + .endpoints + .set(vec![PEER_SUB_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.connect + .endpoints + .set(vec![PEER_TOK1_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Peer)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Peer (sub) ZID: {}", s.zid()); + s + }; + + let sub = ztimeout!(peer_sub.liveliness().declare_subscriber(LIVELINESS_KEYEXPR)).unwrap(); + tokio::time::sleep(SLEEP).await; + + assert!(sub.try_recv().is_err()); + + let peer_tok2 = { + let mut c = config::default(); + c.connect + .endpoints + .set(vec![ + PEER_TOK1_ENDPOINT.parse::().unwrap(), + PEER_SUB_ENDPOINT.parse::().unwrap(), + ]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Peer)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Peer (token 2) ZID: {}", s.zid()); + s + }; + + let token2 = ztimeout!(peer_tok2.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); + tokio::time::sleep(SLEEP).await; + + assert!(sub.try_recv().is_err()); + + token1.undeclare().await.unwrap(); + tokio::time::sleep(SLEEP).await; + + assert!(sub.try_recv().is_err()); + + token2.undeclare().await.unwrap(); + tokio::time::sleep(SLEEP).await; + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert!(sample.kind() == SampleKind::Delete); + assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); + assert!(sub.try_recv().is_err()); + + peer_tok1.close().await.unwrap(); + peer_tok2.close().await.unwrap(); + peer_sub.close().await.unwrap(); +} + +#[cfg(feature = "unstable")] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_liveliness_regression_2_history() { + use std::time::Duration; + + use zenoh::{config, sample::SampleKind}; + use zenoh_config::WhatAmI; + use zenoh_link::EndPoint; + + const TIMEOUT: Duration = Duration::from_secs(60); + const SLEEP: Duration = Duration::from_secs(1); + const PEER_TOK1_ENDPOINT: &str = "tcp/localhost:47506"; + const PEER_SUB_ENDPOINT: &str = "tcp/localhost:47507"; + const LIVELINESS_KEYEXPR: &str = "test/liveliness/regression/2/history"; + + zenoh_util::init_log_from_env_or("error"); + + let peer_tok1 = { + let mut c = config::default(); + c.listen + .endpoints + .set(vec![PEER_TOK1_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Peer)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Peer (token 1) ZID: {}", s.zid()); + s + }; + + let token1 = ztimeout!(peer_tok1.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); + tokio::time::sleep(SLEEP).await; + + let peer_sub = { + let mut c = config::default(); + c.listen + .endpoints + .set(vec![PEER_SUB_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.connect + .endpoints + .set(vec![PEER_TOK1_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Peer)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Peer (sub) ZID: {}", s.zid()); + s + }; + + let sub = ztimeout!(peer_sub + .liveliness() + .declare_subscriber(LIVELINESS_KEYEXPR) + .history(true)) + .unwrap(); + tokio::time::sleep(SLEEP).await; + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert!(sample.kind() == SampleKind::Put); + assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); + assert!(sub.try_recv().is_err()); + + let peer_tok2 = { + let mut c = config::default(); + c.connect + .endpoints + .set(vec![ + PEER_TOK1_ENDPOINT.parse::().unwrap(), + PEER_SUB_ENDPOINT.parse::().unwrap(), + ]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Peer)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Peer (token 2) ZID: {}", s.zid()); + s + }; + + let token2 = ztimeout!(peer_tok2.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); + tokio::time::sleep(SLEEP).await; + + assert!(sub.try_recv().is_err()); + + token1.undeclare().await.unwrap(); + tokio::time::sleep(SLEEP).await; + + assert!(sub.try_recv().is_err()); + + token2.undeclare().await.unwrap(); + tokio::time::sleep(SLEEP).await; + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert!(sample.kind() == SampleKind::Delete); + assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); + assert!(sub.try_recv().is_err()); + + peer_tok1.close().await.unwrap(); + peer_tok2.close().await.unwrap(); + peer_sub.close().await.unwrap(); +} + +#[cfg(feature = "unstable")] +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_liveliness_regression_3() { + use std::time::Duration; + + use zenoh::{config, sample::SampleKind}; + use zenoh_config::WhatAmI; + use zenoh_link::EndPoint; + + const TIMEOUT: Duration = Duration::from_secs(60); + const SLEEP: Duration = Duration::from_secs(1); + const ROUTER_ENDPOINT: &str = "tcp/localhost:47508"; + const PEER_TOK_ENDPOINT: &str = "tcp/localhost:47509"; + const LIVELINESS_KEYEXPR: &str = "test/liveliness/regression/3"; + + zenoh_util::init_log_from_env_or("error"); + + let router = { + let mut c = config::default(); + c.listen + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Router)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Router ZID: {}", s.zid()); + s + }; + + let peer_tok1 = { + let mut c = config::default(); + c.listen + .endpoints + .set(vec![PEER_TOK_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.connect + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Peer)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Peer (token 1) ZID: {}", s.zid()); + s + }; + + let token1 = ztimeout!(peer_tok1.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); + tokio::time::sleep(SLEEP).await; + + let client_tok2 = { + let mut c = config::default(); + c.connect + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Peer)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Client (token 2) ZID: {}", s.zid()); + s + }; + + let token2 = ztimeout!(client_tok2.liveliness().declare_token(LIVELINESS_KEYEXPR)).unwrap(); + tokio::time::sleep(SLEEP).await; + + let peer_sub = { + let mut c = config::default(); + c.connect + .endpoints + .set(vec![ + ROUTER_ENDPOINT.parse::().unwrap(), + PEER_TOK_ENDPOINT.parse::().unwrap(), + ]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Peer)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Peer (sub) ZID: {}", s.zid()); + s + }; + + let sub = ztimeout!(peer_sub.liveliness().declare_subscriber(LIVELINESS_KEYEXPR)).unwrap(); + tokio::time::sleep(SLEEP).await; + + assert!(sub.try_recv().is_err()); + + token1.undeclare().await.unwrap(); + tokio::time::sleep(SLEEP).await; + + assert!(sub.try_recv().is_err()); + + token2.undeclare().await.unwrap(); + tokio::time::sleep(SLEEP).await; + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert!(sample.kind() == SampleKind::Delete); + assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); + assert!(sub.try_recv().is_err()); + + peer_tok1.close().await.unwrap(); + client_tok2.close().await.unwrap(); + peer_sub.close().await.unwrap(); + router.close().await.unwrap(); +} From 254449bedec56b103e2c2cf7eddb50c7e4106382 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 18 Sep 2024 18:01:27 +0200 Subject: [PATCH 08/12] Also apply changes to linstate peers --- zenoh/src/net/routing/hat/linkstate_peer/token.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zenoh/src/net/routing/hat/linkstate_peer/token.rs b/zenoh/src/net/routing/hat/linkstate_peer/token.rs index 5b0352f830..9741df4741 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/token.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/token.rs @@ -658,7 +658,7 @@ pub(crate) fn declare_token_interest( send_declare: &mut SendDeclare, ) { if mode.current() && face.whatami == WhatAmI::Client { - let interest_id = (!mode.future()).then_some(id); + let interest_id = Some(id); if let Some(res) = res.as_ref() { if aggregate { if hat!(tables).linkstatepeer_tokens.iter().any(|token| { From 58fe7264be32688839f174082932b0c2dcd9c79f Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 18 Sep 2024 18:21:32 +0200 Subject: [PATCH 09/12] Fix merge --- zenoh/tests/liveliness.rs | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/zenoh/tests/liveliness.rs b/zenoh/tests/liveliness.rs index de2179c7bb..9477b69749 100644 --- a/zenoh/tests/liveliness.rs +++ b/zenoh/tests/liveliness.rs @@ -4109,7 +4109,7 @@ async fn test_liveliness_subget_router_history_middle() { async fn test_liveliness_regression_1() { use std::time::Duration; - use zenoh::{config, sample::SampleKind}; + use zenoh::sample::SampleKind; use zenoh_config::WhatAmI; use zenoh_link::EndPoint; @@ -4122,7 +4122,7 @@ async fn test_liveliness_regression_1() { zenoh_util::init_log_from_env_or("error"); let router = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.listen .endpoints .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) @@ -4135,7 +4135,7 @@ async fn test_liveliness_regression_1() { }; let peer_tok = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.listen .endpoints .set(vec![PEER_TOK_ENDPOINT.parse::().unwrap()]) @@ -4155,7 +4155,7 @@ async fn test_liveliness_regression_1() { tokio::time::sleep(SLEEP).await; let peer_sub = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.connect .endpoints .set(vec![ @@ -4193,7 +4193,7 @@ async fn test_liveliness_regression_1() { async fn test_liveliness_regression_2() { use std::time::Duration; - use zenoh::{config, sample::SampleKind}; + use zenoh::sample::SampleKind; use zenoh_config::WhatAmI; use zenoh_link::EndPoint; @@ -4206,7 +4206,7 @@ async fn test_liveliness_regression_2() { zenoh_util::init_log_from_env_or("error"); let peer_tok1 = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.listen .endpoints .set(vec![PEER_TOK1_ENDPOINT.parse::().unwrap()]) @@ -4222,7 +4222,7 @@ async fn test_liveliness_regression_2() { tokio::time::sleep(SLEEP).await; let peer_sub = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.listen .endpoints .set(vec![PEER_SUB_ENDPOINT.parse::().unwrap()]) @@ -4244,7 +4244,7 @@ async fn test_liveliness_regression_2() { assert!(sub.try_recv().is_err()); let peer_tok2 = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.connect .endpoints .set(vec![ @@ -4287,7 +4287,7 @@ async fn test_liveliness_regression_2() { async fn test_liveliness_regression_2_history() { use std::time::Duration; - use zenoh::{config, sample::SampleKind}; + use zenoh::sample::SampleKind; use zenoh_config::WhatAmI; use zenoh_link::EndPoint; @@ -4300,7 +4300,7 @@ async fn test_liveliness_regression_2_history() { zenoh_util::init_log_from_env_or("error"); let peer_tok1 = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.listen .endpoints .set(vec![PEER_TOK1_ENDPOINT.parse::().unwrap()]) @@ -4316,7 +4316,7 @@ async fn test_liveliness_regression_2_history() { tokio::time::sleep(SLEEP).await; let peer_sub = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.listen .endpoints .set(vec![PEER_SUB_ENDPOINT.parse::().unwrap()]) @@ -4345,7 +4345,7 @@ async fn test_liveliness_regression_2_history() { assert!(sub.try_recv().is_err()); let peer_tok2 = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.connect .endpoints .set(vec![ @@ -4388,7 +4388,7 @@ async fn test_liveliness_regression_2_history() { async fn test_liveliness_regression_3() { use std::time::Duration; - use zenoh::{config, sample::SampleKind}; + use zenoh::sample::SampleKind; use zenoh_config::WhatAmI; use zenoh_link::EndPoint; @@ -4401,7 +4401,7 @@ async fn test_liveliness_regression_3() { zenoh_util::init_log_from_env_or("error"); let router = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.listen .endpoints .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) @@ -4414,7 +4414,7 @@ async fn test_liveliness_regression_3() { }; let peer_tok1 = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.listen .endpoints .set(vec![PEER_TOK_ENDPOINT.parse::().unwrap()]) @@ -4434,7 +4434,7 @@ async fn test_liveliness_regression_3() { tokio::time::sleep(SLEEP).await; let client_tok2 = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.connect .endpoints .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) @@ -4450,7 +4450,7 @@ async fn test_liveliness_regression_3() { tokio::time::sleep(SLEEP).await; let peer_sub = { - let mut c = config::default(); + let mut c = zenoh::Config::default(); c.connect .endpoints .set(vec![ From 359d0fccf4823fbe832f2bdacec1a10db8362b47 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 18 Sep 2024 18:59:01 +0200 Subject: [PATCH 10/12] Apply new CurrentFuture interest behavior to pubsub and queries --- commons/zenoh-protocol/src/network/interest.rs | 6 +++--- zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs | 2 +- zenoh/src/net/routing/hat/linkstate_peer/queries.rs | 2 +- zenoh/src/net/routing/hat/p2p_peer/pubsub.rs | 2 +- zenoh/src/net/routing/hat/p2p_peer/queries.rs | 2 +- zenoh/src/net/routing/hat/router/pubsub.rs | 2 +- zenoh/src/net/routing/hat/router/queries.rs | 2 +- 7 files changed, 9 insertions(+), 9 deletions(-) diff --git a/commons/zenoh-protocol/src/network/interest.rs b/commons/zenoh-protocol/src/network/interest.rs index 86ff886233..ad4976bc75 100644 --- a/commons/zenoh-protocol/src/network/interest.rs +++ b/commons/zenoh-protocol/src/network/interest.rs @@ -59,11 +59,11 @@ pub mod flag { /// |------------------>| -- This is a DeclareInterest e.g. for subscriber declarations/undeclarations. /// | | /// | DECL SUBSCRIBER | -/// |<------------------| -- With interest_id field not set (except for tokens) +/// |<------------------| -- With interest_id field set /// | DECL SUBSCRIBER | -/// |<------------------| -- With interest_id field not set (except for tokens) +/// |<------------------| -- With interest_id field set /// | DECL SUBSCRIBER | -/// |<------------------| -- With interest_id field not set (except for tokens) +/// |<------------------| -- With interest_id field set /// | | /// | DECL FINAL | /// |<------------------| -- With interest_id field set diff --git a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs index 872b7ded59..8b7cce47fb 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs @@ -697,7 +697,7 @@ pub(super) fn declare_sub_interest( send_declare: &mut SendDeclare, ) { if mode.current() && face.whatami == WhatAmI::Client { - let interest_id = (!mode.future()).then_some(id); + let interest_id = Some(id); if let Some(res) = res.as_ref() { if aggregate { if hat!(tables).linkstatepeer_subs.iter().any(|sub| { diff --git a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs index 229b61516b..181db04f9f 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs @@ -770,7 +770,7 @@ pub(super) fn declare_qabl_interest( send_declare: &mut SendDeclare, ) { if mode.current() && face.whatami == WhatAmI::Client { - let interest_id = (!mode.future()).then_some(id); + let interest_id = Some(id); if let Some(res) = res.as_ref() { if aggregate { if hat!(tables).linkstatepeer_qabls.iter().any(|qabl| { diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs index a837214469..56a3419b8b 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs @@ -426,7 +426,7 @@ pub(super) fn declare_sub_interest( send_declare: &mut SendDeclare, ) { if mode.current() && face.whatami == WhatAmI::Client { - let interest_id = (!mode.future()).then_some(id); + let interest_id = Some(id); if let Some(res) = res.as_ref() { if aggregate { if tables.faces.values().any(|src_face| { diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index 6872b9ebe7..c75f498ff4 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -414,7 +414,7 @@ pub(super) fn declare_qabl_interest( send_declare: &mut SendDeclare, ) { if mode.current() && face.whatami == WhatAmI::Client { - let interest_id = (!mode.future()).then_some(id); + let interest_id = Some(id); if let Some(res) = res.as_ref() { if aggregate { if tables.faces.values().any(|src_face| { diff --git a/zenoh/src/net/routing/hat/router/pubsub.rs b/zenoh/src/net/routing/hat/router/pubsub.rs index 080b32b1c5..45524f0e2f 100644 --- a/zenoh/src/net/routing/hat/router/pubsub.rs +++ b/zenoh/src/net/routing/hat/router/pubsub.rs @@ -944,7 +944,7 @@ pub(crate) fn declare_sub_interest( send_declare: &mut SendDeclare, ) { if mode.current() { - let interest_id = (!mode.future()).then_some(id); + let interest_id = Some(id); if let Some(res) = res.as_ref() { if aggregate { if hat!(tables).router_subs.iter().any(|sub| { diff --git a/zenoh/src/net/routing/hat/router/queries.rs b/zenoh/src/net/routing/hat/router/queries.rs index b4dcf4798b..2be4c1d5f3 100644 --- a/zenoh/src/net/routing/hat/router/queries.rs +++ b/zenoh/src/net/routing/hat/router/queries.rs @@ -1154,7 +1154,7 @@ pub(crate) fn declare_qabl_interest( send_declare: &mut SendDeclare, ) { if mode.current() { - let interest_id = (!mode.future()).then_some(id); + let interest_id = Some(id); if let Some(res) = res.as_ref() { if aggregate { if hat!(tables).router_qabls.iter().any(|qabl| { From 1e981734cb6c39cafdfb9da109821318f56d1bcb Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Thu, 19 Sep 2024 14:11:27 +0200 Subject: [PATCH 11/12] Ignore received declares for unknown interests --- zenoh/src/net/routing/hat/client/token.rs | 8 ++++++++ zenoh/src/net/routing/hat/p2p_peer/token.rs | 8 ++++++++ 2 files changed, 16 insertions(+) diff --git a/zenoh/src/net/routing/hat/client/token.rs b/zenoh/src/net/routing/hat/client/token.rs index b567e747b3..ec8b228c51 100644 --- a/zenoh/src/net/routing/hat/client/token.rs +++ b/zenoh/src/net/routing/hat/client/token.rs @@ -135,6 +135,14 @@ fn declare_simple_token( ); return; } + } else if !face.local_interests.contains_key(&interest_id) { + tracing::debug!( + "Received DeclareToken for {} from {} with unknown interest_id {}. Ignore.", + res.expr(), + face, + interest_id, + ); + return; } } register_simple_token(tables, face, id, res); diff --git a/zenoh/src/net/routing/hat/p2p_peer/token.rs b/zenoh/src/net/routing/hat/p2p_peer/token.rs index 9d583d6ba7..d9f798c18c 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/token.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/token.rs @@ -206,6 +206,14 @@ fn declare_simple_token( ); return; } + } else if !face.local_interests.contains_key(&interest_id) { + println!( + "Received DeclareToken for {} from {} with unknown interest_id {}. Ignore.", + res.expr(), + face, + interest_id, + ); + return; } } register_simple_token(tables, face, id, res); From 97e3f1e6b782f98f33cfb2b27daa3a0cc1a0b931 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Thu, 19 Sep 2024 14:49:14 +0200 Subject: [PATCH 12/12] Fix key_expr mapping bug --- zenoh/src/net/routing/hat/p2p_peer/interests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zenoh/src/net/routing/hat/p2p_peer/interests.rs b/zenoh/src/net/routing/hat/p2p_peer/interests.rs index a8343baac6..860cd7227f 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/interests.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/interests.rs @@ -164,7 +164,7 @@ impl HatInterestTrait for HatCode { } let wire_expr = res .as_ref() - .map(|res| Resource::decl_key(res, dst_face, dst_face.whatami == WhatAmI::Client)); + .map(|res| Resource::decl_key(res, dst_face, dst_face.whatami != WhatAmI::Client)); dst_face.primitives.send_interest(RoutingContext::with_expr( Interest { id,