From 2cbe20daf82d4f4fd8de92dd1de0b2b938f5da0a Mon Sep 17 00:00:00 2001 From: Mahmoud Mazouz Date: Wed, 25 Sep 2024 17:10:41 +0000 Subject: [PATCH] wip --- .../zenoh-protocol/src/network/interest.rs | 19 + zenoh/src/api/session.rs | 39 +- zenoh/src/net/routing/dispatcher/face.rs | 40 +- zenoh/src/net/routing/dispatcher/interests.rs | 46 +++ zenoh/src/net/routing/dispatcher/tables.rs | 4 + zenoh/src/net/routing/dispatcher/token.rs | 11 + .../src/net/routing/hat/p2p_peer/interests.rs | 19 +- zenoh/src/net/routing/hat/p2p_peer/mod.rs | 9 +- zenoh/src/net/routing/hat/p2p_peer/pubsub.rs | 15 +- zenoh/src/net/routing/hat/p2p_peer/queries.rs | 4 +- zenoh/src/net/routing/hat/p2p_peer/token.rs | 354 ++++++++++-------- zenoh/tests/liveliness.rs | 11 +- 12 files changed, 391 insertions(+), 180 deletions(-) diff --git a/commons/zenoh-protocol/src/network/interest.rs b/commons/zenoh-protocol/src/network/interest.rs index ad4976bc75..461ff5a5a9 100644 --- a/commons/zenoh-protocol/src/network/interest.rs +++ b/commons/zenoh-protocol/src/network/interest.rs @@ -80,6 +80,25 @@ pub mod flag { /// | | This stops the transmission of subscriber declarations/undeclarations. /// | | /// +/// And the message flow is the following for an [`Interest`] with mode [`InterestMode::Future`]: +/// +/// ```text +/// A B +/// | INTEREST | +/// |------------------>| -- This is a DeclareInterest e.g. for subscriber declarations/undeclarations. +/// | | +/// | DECL SUBSCRIBER | +/// |<------------------| -- With interest_id field not set +/// | UNDECL SUBSCRIBER | +/// |<------------------| -- With interest_id field not set +/// | | +/// | ... | +/// | | +/// | INTEREST FINAL | +/// |------------------>| -- Mode: Final +/// | | This stops the transmission of subscriber declarations/undeclarations. +/// | | +/// /// Flags: /// - |: Mode The mode of the interest* /// -/ diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 083d1d474a..a68cbdbf5f 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -1596,10 +1596,15 @@ impl SessionInner { let known_tokens = if history { state .remote_tokens - .values() - .filter(|token| key_expr.intersects(token)) - .cloned() - .collect::>>() + .iter() + .filter_map(|(id, token)| { + if key_expr.intersects(token) { + Some((*id, token.clone())) + } else { + None + } + }) + .collect::)>>() } else { vec![] }; @@ -1607,10 +1612,19 @@ impl SessionInner { let primitives = state.primitives()?; drop(state); + let zid = self.zid(); + if !known_tokens.is_empty() { self.task_controller .spawn_with_rt(zenoh_runtime::ZRuntime::Net, async move { - for token in known_tokens { + for (id, token) in known_tokens { + dbg!(); + eprintln!( + "{zid} -> {zid}: DeclareToken(id={}, expr={}, interest_id=n/a) (session local)", + id, + &token, + ); + eprintln!(); callback.call(Sample { key_expr: token, payload: ZBytes::empty(), @@ -2368,13 +2382,26 @@ impl Primitives for WeakSession { if state.primitives.is_none() { return; // Session closing or closed } + match state .wireexpr_to_keyexpr(&m.wire_expr, false) .map(|e| e.into_owned()) { Ok(key_expr) => { + dbg!(); + eprintln!( + "{} -> {}: DeclareToken(id={}, expr={}, interest_id={:?})", + self.zid(), + self.zid(), + m.id, + key_expr, + msg.interest_id + ); + eprintln!(); + if let Some(interest_id) = msg.interest_id { if let Some(query) = state.liveliness_queries.get(&interest_id) { + // dbg!(&key_expr); let reply = Reply { result: Ok(Sample { key_expr, @@ -2398,10 +2425,10 @@ impl Primitives for WeakSession { return; } } + // NOTE: the token id here will never be 0 because if we're here it means we are in Future or CurrentFuture mode if let Entry::Vacant(e) = state.remote_tokens.entry(m.id) { e.insert(key_expr.clone()); drop(state); - self.execute_subscriber_callbacks( false, &m.wire_expr, diff --git a/zenoh/src/net/routing/dispatcher/face.rs b/zenoh/src/net/routing/dispatcher/face.rs index 654f5cf9c3..3596d63ee8 100644 --- a/zenoh/src/net/routing/dispatcher/face.rs +++ b/zenoh/src/net/routing/dispatcher/face.rs @@ -224,7 +224,25 @@ impl Primitives for Face { msg.wire_expr.as_ref(), msg.mode, msg.options, - &mut |p, m| declares.push((p.clone(), m)), + &mut |p, m| { + // if let DeclareBody::DeclareToken(tok) = &m.msg.body { + // // println!("{}", std::backtrace::Backtrace::force_capture()); + // if let Some(mux) = p.as_any().downcast_ref::() { + // dbg!(( + // tok, + // self.state.zid, + // mux.face.get().unwrap().state.upgrade().unwrap().zid + // )); + // } else { + // if let Some(sess) = p.as_any().downcast_ref::() { + // dbg!((tok, self.state.zid, sess.runtime.zid())); + // } else { + // dbg!((tok, self.state.zid)); + // } + // } + // } + declares.push((p.clone(), m)) + }, ); drop(ctrl_lock); for (p, m) in declares { @@ -325,7 +343,25 @@ impl Primitives for Face { &m.wire_expr, msg.ext_nodeid.node_id, msg.interest_id, - &mut |p, m| declares.push((p.clone(), m)), + &mut |p, m| { + // if let DeclareBody::DeclareToken(tok) = &m.msg.body { + // // println!("{}", std::backtrace::Backtrace::force_capture()); + // if let Some(mux) = p.as_any().downcast_ref::() { + // dbg!(( + // tok, + // self.state.zid, + // mux.face.get().unwrap().state.upgrade().unwrap().zid + // )); + // } else { + // if let Some(sess) = p.as_any().downcast_ref::() { + // dbg!((tok, self.state.zid, sess.runtime.zid())); + // } else { + // dbg!((tok, self.state.zid)); + // } + // } + // } + declares.push((p.clone(), m)) + }, ); drop(ctrl_lock); for (p, m) in declares { diff --git a/zenoh/src/net/routing/dispatcher/interests.rs b/zenoh/src/net/routing/dispatcher/interests.rs index 6180828f1c..f69835ebd2 100644 --- a/zenoh/src/net/routing/dispatcher/interests.rs +++ b/zenoh/src/net/routing/dispatcher/interests.rs @@ -49,6 +49,29 @@ pub(crate) struct CurrentInterest { pub(crate) mode: InterestMode, } +#[derive(PartialEq, Clone)] +pub(crate) struct RemoteInterest { + pub(crate) res: Option>, + pub(crate) options: InterestOptions, +} + +impl RemoteInterest { + pub(crate) fn matches(&self, res: &Arc) -> bool { + self.res.as_ref().map(|r| r.matches(res)).unwrap_or(true) + } + + pub(crate) fn resource<'c, 'a: 'c, 'b: 'c>( + &'a self, + default: &'b Arc, + ) -> &'c Arc { + if self.options.aggregate() { + self.res.as_ref().unwrap_or(default) + } else { + default + } + } +} + pub(crate) fn declare_final( face: &mut Arc, id: InterestId, @@ -204,6 +227,18 @@ pub(crate) fn declare_interest( (res, wtables) }; + dbg!(); + eprintln!( + "{} -> {}: Interest(id={}, expr={}, mode={:?}, tokens={})", + face.zid, + wtables.zid, + id, + res.expr(), + mode, + options.tokens(), + ); + eprintln!(); + hat_code.declare_interest( &mut wtables, tables_ref, @@ -224,6 +259,17 @@ pub(crate) fn declare_interest( } } else { let mut wtables = zwrite!(tables_ref.tables); + + dbg!(); + eprintln!( + "{} -> {}: Interest(id={}, expr=, mode={:?}, tokens={})", + face.zid, + wtables.zid, + id, + mode, + options.tokens(), + ); + hat_code.declare_interest( &mut wtables, tables_ref, diff --git a/zenoh/src/net/routing/dispatcher/tables.rs b/zenoh/src/net/routing/dispatcher/tables.rs index 2c5cfffffb..4d87f37398 100644 --- a/zenoh/src/net/routing/dispatcher/tables.rs +++ b/zenoh/src/net/routing/dispatcher/tables.rs @@ -167,6 +167,10 @@ impl Tables { } } } + + pub(crate) fn faces(&self) -> Vec> { + self.faces.values().cloned().collect::>() + } } pub fn close_face(tables: &TablesLock, face: &Weak) { diff --git a/zenoh/src/net/routing/dispatcher/token.rs b/zenoh/src/net/routing/dispatcher/token.rs index a34e35af68..a87d3ef9fe 100644 --- a/zenoh/src/net/routing/dispatcher/token.rs +++ b/zenoh/src/net/routing/dispatcher/token.rs @@ -77,6 +77,17 @@ pub(crate) fn declare_token( (res, wtables) }; + dbg!(); + eprintln!( + "{} -> {}: DeclareToken(id={}, expr={}, interest_id={:?})", + face.zid, + wtables.zid, + id, + res.expr(), + interest_id + ); + eprintln!(); + hat_code.declare_token( &mut wtables, face, diff --git a/zenoh/src/net/routing/hat/p2p_peer/interests.rs b/zenoh/src/net/routing/hat/p2p_peer/interests.rs index 860cd7227f..7a60cc654e 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/interests.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/interests.rs @@ -30,7 +30,7 @@ use super::{ use crate::net::routing::{ dispatcher::{ face::{FaceState, InterestState}, - interests::{CurrentInterest, CurrentInterestCleanup}, + interests::{CurrentInterest, CurrentInterestCleanup, RemoteInterest}, resource::Resource, tables::{Tables, TablesLock}, }, @@ -47,7 +47,9 @@ pub(super) fn interests_new_face(tables: &mut Tables, face: &mut Arc) .collect::>>() { if face.whatami == WhatAmI::Router { - for (res, _, options) in face_hat_mut!(&mut src_face).remote_interests.values() { + for RemoteInterest { res, options } in + face_hat_mut!(&mut src_face).remote_interests.values() + { let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); get_mut_unchecked(face).local_interests.insert( id, @@ -123,9 +125,13 @@ impl HatInterestTrait for HatCode { send_declare, ) } - face_hat_mut!(face) - .remote_interests - .insert(id, (res.as_ref().map(|res| (*res).clone()), mode, options)); + face_hat_mut!(face).remote_interests.insert( + id, + RemoteInterest { + res: res.as_ref().map(|res| (*res).clone()), + options, + }, + ); let interest = Arc::new(CurrentInterest { src_face: face.clone(), @@ -221,7 +227,8 @@ impl HatInterestTrait for HatCode { .collect::>() { let local_interest = dst_face.local_interests.get(&id).unwrap(); - if local_interest.res == interest.0 && local_interest.options == interest.2 + if local_interest.res == interest.res + && local_interest.options == interest.options { dst_face.primitives.send_interest(RoutingContext::with_expr( Interest { diff --git a/zenoh/src/net/routing/hat/p2p_peer/mod.rs b/zenoh/src/net/routing/hat/p2p_peer/mod.rs index 4d20204c19..a5ccf64dee 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/mod.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/mod.rs @@ -33,7 +33,7 @@ use zenoh_protocol::{ queryable::ext::QueryableInfoType, QueryableId, SubscriberId, TokenId, }, - interest::{InterestId, InterestMode, InterestOptions}, + interest::{InterestId, InterestOptions}, oam::id::OAM_LINKSTATE, Declare, DeclareBody, DeclareFinal, Oam, }, @@ -59,7 +59,10 @@ use crate::net::{ codec::Zenoh080Routing, protocol::linkstate::LinkStateList, routing::{ - dispatcher::face::{Face, InterestState}, + dispatcher::{ + face::{Face, InterestState}, + interests::RemoteInterest, + }, router::{compute_data_routes, compute_query_routes, RoutesIndexes}, RoutingContext, }, @@ -407,7 +410,7 @@ impl HatContext { struct HatFace { next_id: AtomicU32, // @TODO: manage rollover and uniqueness - remote_interests: HashMap>, InterestMode, InterestOptions)>, + remote_interests: HashMap, local_subs: HashMap, SubscriberId>, remote_subs: HashMap>, local_tokens: HashMap, TokenId>, diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs index 56a3419b8b..2e4595da83 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs @@ -24,7 +24,7 @@ use zenoh_protocol::{ common::ext::WireExprType, ext, Declare, DeclareBody, DeclareSubscriber, SubscriberId, UndeclareSubscriber, }, - interest::{InterestId, InterestMode, InterestOptions}, + interest::{InterestId, InterestMode}, }, }; use zenoh_sync::get_mut_unchecked; @@ -35,6 +35,7 @@ use crate::{ net::routing::{ dispatcher::{ face::FaceState, + interests::RemoteInterest, pubsub::SubscriberInfo, resource::{NodeId, Resource, SessionContext}, tables::{Route, RoutingExpr, Tables}, @@ -84,13 +85,15 @@ fn propagate_simple_subscription_to( let matching_interests = face_hat!(dst_face) .remote_interests .values() - .filter(|(r, _, o)| { - o.subscribers() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true) - }) + .filter(|interest| interest.options.subscribers() && interest.matches(res)) .cloned() - .collect::>, InterestMode, InterestOptions)>>(); + .collect::>(); - for (int_res, _, options) in matching_interests { + for RemoteInterest { + res: int_res, + options, + } in matching_interests + { let res = if options.aggregate() { int_res.as_ref().unwrap_or(res) } else { diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index c75f498ff4..739c815b5d 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -97,9 +97,7 @@ fn propagate_simple_queryable_to( || face_hat!(dst_face) .remote_interests .values() - .any(|(r, _, o)| { - o.queryables() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true) - })) + .any(|interest| interest.options.queryables() && interest.matches(res))) && src_face .as_ref() .map(|src_face| { diff --git a/zenoh/src/net/routing/hat/p2p_peer/token.rs b/zenoh/src/net/routing/hat/p2p_peer/token.rs index d9f798c18c..7dad61d5ba 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/token.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/token.rs @@ -18,7 +18,7 @@ use zenoh_config::WhatAmI; use zenoh_protocol::network::{ declare::{common::ext::WireExprType, TokenId}, ext, - interest::{InterestId, InterestMode, InterestOptions}, + interest::{InterestId, InterestMode}, Declare, DeclareBody, DeclareToken, UndeclareToken, }; use zenoh_sync::get_mut_unchecked; @@ -31,124 +31,101 @@ use crate::net::routing::{ RoutingContext, }; -fn new_token( +fn should_route_between(src: &FaceState, dst: &FaceState) -> bool { + src.whatami == WhatAmI::Client || dst.whatami == WhatAmI::Client +} + +fn is_new_token( tables: &Tables, - res: &Arc, - src_face: &Arc, - dst_face: &mut Arc, + res: &Resource, + src_face: &FaceState, + dst_face: &FaceState, ) -> bool { // Is there any face that !res.session_ctxs.values().any(|ctx| { ctx.token // declared the token && (ctx.face.id != src_face.id) // is not the face that just registered it && (ctx.face.id != dst_face.id || dst_face.zid == tables.zid) // is not the face we are propagating to (except for local) - && (ctx.face.whatami == WhatAmI::Client || dst_face.whatami == WhatAmI::Client) + && should_route_between(&ctx.face, dst_face) // don't forward from/to router/peers }) } -#[inline] -fn propagate_simple_token_to( - tables: &mut Tables, +fn send_declare_token( + send_declare: &mut SendDeclare, dst_face: &mut Arc, res: &Arc, - src_face: &mut Arc, - src_interest_id: Option, - dst_interest_id: Option, - send_declare: &mut SendDeclare, + interest_id: Option, + id: TokenId, ) { - if (src_face.id != dst_face.id || dst_face.zid == tables.zid) - && !face_hat!(dst_face).local_tokens.contains_key(res) - && (src_face.whatami == WhatAmI::Client || dst_face.whatami == WhatAmI::Client) - && new_token(tables, res, src_face, dst_face) - { - if dst_face.whatami != WhatAmI::Client { - let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); - face_hat_mut!(dst_face).local_tokens.insert(res.clone(), id); - let key_expr = Resource::decl_key(res, dst_face, dst_face.whatami != WhatAmI::Client); - send_declare( - &dst_face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: dst_interest_id, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::DeclareToken(DeclareToken { - id, - wire_expr: key_expr, - }), - }, - res.expr(), - ), - ); - } else { - let matching_interests = face_hat!(dst_face) - .remote_interests - .values() - .filter(|(r, m, o)| { - o.tokens() - && r.as_ref().map(|r| r.matches(res)).unwrap_or(true) - && (m.current() || src_interest_id.is_none()) - }) - .cloned() - .collect::>, InterestMode, InterestOptions)>>(); - - for (int_res, _, options) in matching_interests { - let res = if options.aggregate() { - int_res.as_ref().unwrap_or(res) - } else { - res - }; - if !face_hat!(dst_face).local_tokens.contains_key(res) { - let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); - face_hat_mut!(dst_face).local_tokens.insert(res.clone(), id); - let key_expr = - Resource::decl_key(res, dst_face, dst_face.whatami != WhatAmI::Client); - send_declare( - &dst_face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: dst_interest_id, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::DeclareToken(DeclareToken { - id, - wire_expr: key_expr, - }), - }, - res.expr(), - ), - ); - } - } - } - } + let wire_expr = Resource::decl_key(res, dst_face, dst_face.whatami != WhatAmI::Client); + send_declare( + &dst_face.primitives, + RoutingContext::with_expr( + Declare { + interest_id, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::DeclareToken(DeclareToken { id, wire_expr }), + }, + res.expr(), + ), + ); } -fn propagate_simple_token( +/// Handle known [`InterestMode::Future`] interests in tokens. +fn handle_future_interest_in_tokens( tables: &mut Tables, - res: &Arc, + src_res: &Arc, src_face: &mut Arc, - interest_id: Option, send_declare: &mut SendDeclare, ) { - for mut dst_face in tables - .faces - .values() - .cloned() - .collect::>>() - { - propagate_simple_token_to( - tables, - &mut dst_face, - res, - src_face, - interest_id, - None, - send_declare, - ); + for dst_face in tables.faces().iter_mut() { + if (src_face.id != dst_face.id || dst_face.zid == tables.zid) + && !face_hat!(dst_face).local_tokens.contains_key(src_res) + && should_route_between(src_face, dst_face) + && is_new_token(tables, src_res, src_face, dst_face) + { + if dst_face.whatami == WhatAmI::Client { + let matching_interests = face_hat!(dst_face) + .remote_interests + .values() + .filter(|interest| interest.options.tokens() && interest.matches(src_res)) + .cloned() + .collect::>(); + + for interest in matching_interests { + let res = interest.resource(src_res); + if !face_hat!(dst_face).local_tokens.contains_key(res) { + let id = make_token_id2(res, dst_face); + dbg!(); + eprintln!( + "{} -> {}: DeclareToken(id={}, expr={}, interest_id={:?})", + tables.zid, + dst_face.zid, + id, + src_res.expr(), + Option::::None + ); + eprintln!(); + send_declare_token(send_declare, dst_face, res, None, id) + } + } + } else { + let id = make_token_id2(src_res, dst_face); + dbg!(); + eprintln!( + "{} -> {}: DeclareToken(id={}, expr={}, interest_id={:?})", + tables.zid, + dst_face.zid, + id, + src_res.expr(), + Option::::None + ); + send_declare_token(send_declare, dst_face, src_res, None, id) + } + } } } @@ -187,37 +164,63 @@ fn declare_simple_token( interest_id: Option, send_declare: &mut SendDeclare, ) { + register_simple_token(tables, face, id, res); + if let Some(interest_id) = interest_id { - if let Some((interest, _)) = face.pending_current_interests.get(&interest_id) { - if interest.mode == InterestMode::Current { - let wire_expr = Resource::get_best_key(res, "", interest.src_face.id); - send_declare( - &interest.src_face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: Some(interest.src_interest_id), - ext_qos: ext::QoSType::default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::DeclareToken(DeclareToken { id, wire_expr }), - }, - res.expr(), - ), - ); - return; - } - } else if !face.local_interests.contains_key(&interest_id) { - println!( - "Received DeclareToken for {} from {} with unknown interest_id {}. Ignore.", - res.expr(), - face, - interest_id, - ); - return; - } + handle_current_interest_in_tokens(tables, face, id, res, interest_id, send_declare); + } else { + handle_future_interest_in_tokens(tables, res, face, send_declare); } - register_simple_token(tables, face, id, res); - propagate_simple_token(tables, res, face, interest_id, send_declare); +} + +/// Handle known [`InterestMode::Current`] and [`InterestMode::CurrentFuture`] interests in tokens. +fn handle_current_interest_in_tokens( + _tables: &mut Tables, + face: &mut Arc, + id: TokenId, + res: &mut Arc, + interest_id: InterestId, + send_declare: &mut SendDeclare, +) { + let Some((interest, _)) = face.pending_current_interests.get(&interest_id) else { + // TODO(fuzzypixelz): restore error trace + return; + }; + + debug_assert_eq!(id, 0); + debug_assert!(matches!( + interest.mode, + InterestMode::Current | InterestMode::CurrentFuture + )); + + let mut dst_face = interest.src_face.clone(); + let id = make_token_id2(res, &mut dst_face); + + dbg!(); + eprintln!( + "{} -> {}: DeclareToken(id={}, expr={}, interest_id={:?})", + _tables.zid, + interest.src_face.zid, + id, + res.expr(), + Some(interest.src_interest_id) + ); + eprintln!(); + + let wire_expr = Resource::get_best_key(res, "", interest.src_face.id); + send_declare( + &dst_face.primitives, + RoutingContext::with_expr( + Declare { + interest_id: Some(interest.src_interest_id), + ext_qos: ext::QoSType::default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::DeclareToken(DeclareToken { id, wire_expr }), + }, + res.expr(), + ), + ); } #[inline] @@ -266,8 +269,8 @@ fn propagate_forget_simple_token( ), ); } else if src_face.id != face.id - && face_hat!(face).remote_interests.values().any(|(r, _, o)| { - o.tokens() && r.as_ref().map(|r| r.matches(res)).unwrap_or(true) && !o.aggregate() + && face_hat!(face).remote_interests.values().any(|interest| { + interest.options.tokens() && interest.matches(res) && !interest.options.aggregate() }) { // Token has never been declared on this face. @@ -318,10 +321,10 @@ fn propagate_forget_simple_token( res.expr(), ), ); - } else if face_hat!(face).remote_interests.values().any(|(r, _, o)| { - o.tokens() - && r.as_ref().map(|r| r.matches(&res)).unwrap_or(true) - && !o.aggregate() + } else if face_hat!(face).remote_interests.values().any(|interest| { + interest.options.tokens() + && interest.matches(&res) + && !interest.options.aggregate() }) { // Token has never been declared on this face. // Send an Undeclare with a one shot generated id and a WireExpr ext. @@ -446,25 +449,39 @@ fn forget_simple_token( pub(super) fn token_new_face( tables: &mut Tables, - face: &mut Arc, + dst_face: &mut Arc, send_declare: &mut SendDeclare, ) { - if face.whatami != WhatAmI::Client { - for mut src_face in tables - .faces - .values() - .cloned() - .collect::>>() - { - for token in face_hat!(src_face.clone()).remote_tokens.values() { - propagate_simple_token_to( - tables, - face, - token, - &mut src_face, - None, - Some(INITIAL_INTEREST_ID), + if dst_face.whatami == WhatAmI::Client { + return; + } + + for src_face in tables.faces().iter_mut() { + for src_res in face_hat!(src_face.clone()).remote_tokens.values() { + if (src_face.id != dst_face.id || dst_face.zid == tables.zid) + && !face_hat!(dst_face).local_tokens.contains_key(src_res) + && should_route_between(src_face, dst_face) + && is_new_token(tables, src_res, src_face, dst_face) + { + let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(dst_face) + .local_tokens + .insert(src_res.clone(), id); + dbg!(); + eprintln!( + "{} -> {}: DeclareToken(id={}, expr={}, interest_id={:?})", + tables.zid, + dst_face.zid, + id, + src_res.expr(), + Some(INITIAL_INTEREST_ID) + ); + send_declare_token( send_declare, + dst_face, + src_res, + Some(INITIAL_INTEREST_ID), + id, ); } } @@ -486,6 +503,17 @@ fn make_token_id(res: &Arc, face: &mut Arc, mode: InterestM } } +#[inline] +fn make_token_id2(res: &Arc, face: &mut Arc) -> TokenId { + if let Some(id) = face_hat!(face).local_tokens.get(res) { + *id + } else { + let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(face).local_tokens.insert(res.clone(), id); + id + } +} + pub(crate) fn declare_token_interest( tables: &mut Tables, face: &mut Arc, @@ -507,6 +535,16 @@ pub(crate) fn declare_token_interest( }) { let id = make_token_id(res, face, mode); let wire_expr = Resource::decl_key(res, face, face.whatami != WhatAmI::Client); + dbg!(); + eprintln!( + "{} -> {}: DeclareToken(id={}, expr={}, interest_id={:?})", + tables.zid, + face.zid, + id, + res.expr(), + interest_id + ); + eprintln!(); send_declare( &face.primitives, RoutingContext::with_expr( @@ -534,6 +572,14 @@ pub(crate) fn declare_token_interest( let id = make_token_id(token, face, mode); let wire_expr = Resource::decl_key(token, face, face.whatami != WhatAmI::Client); + dbg!(); + eprintln!( + "{} -> {}: DeclareToken(id={}, expr={})", + tables.zid, + face.zid, + id, + res.expr() + ); send_declare( &face.primitives, RoutingContext::with_expr( @@ -566,6 +612,14 @@ pub(crate) fn declare_token_interest( let id = make_token_id(token, face, mode); let wire_expr = Resource::decl_key(token, face, face.whatami != WhatAmI::Client); + dbg!(); + eprintln!( + "{} -> {}: DeclareToken(id={}, expr={})", + tables.zid, + face.zid, + id, + token.expr() + ); send_declare( &face.primitives, RoutingContext::with_expr( diff --git a/zenoh/tests/liveliness.rs b/zenoh/tests/liveliness.rs index ac42951769..74b6ae4ea0 100644 --- a/zenoh/tests/liveliness.rs +++ b/zenoh/tests/liveliness.rs @@ -371,7 +371,7 @@ async fn test_liveliness_after_close() { use zenoh_config::EndPoint; const TIMEOUT: Duration = Duration::from_secs(60); const SLEEP: Duration = Duration::from_secs(1); - const PEER1_ENDPOINT: &str = "tcp/localhost:47447"; + const PEER1_ENDPOINT: &str = "tcp/localhost:47510"; const LIVELINESS_KEYEXPR: &str = "test/liveliness/subscriber/clique"; zenoh_util::init_log_from_env_or("error"); @@ -1482,10 +1482,13 @@ async fn test_liveliness_subscriber_double_peer_history_middle() { .unwrap(); tokio::time::sleep(SLEEP).await; - let sample = ztimeout!(sub2.recv_async()).unwrap(); + let sample = dbg!(ztimeout!(sub2.recv_async())).unwrap(); assert!(sample.kind() == SampleKind::Put); assert!(sample.key_expr().as_str() == LIVELINESS_KEYEXPR); - assert!(sub2.try_recv().is_err()); + // assert!(dbg!(sub2.try_recv()).is_err()); + dbg!(ztimeout!(sub2.recv_async())); + dbg!(ztimeout!(sub2.recv_async())); + dbg!(ztimeout!(sub2.recv_async())); token.undeclare().await.unwrap(); tokio::time::sleep(SLEEP).await; @@ -4256,7 +4259,7 @@ async fn test_liveliness_regression_2() { const TIMEOUT: Duration = Duration::from_secs(60); const SLEEP: Duration = Duration::from_secs(1); const PEER_TOK1_ENDPOINT: &str = "tcp/localhost:47505"; - const PEER_SUB_ENDPOINT: &str = "tcp/localhost:47506"; + const PEER_SUB_ENDPOINT: &str = "tcp/localhost:47511"; const LIVELINESS_KEYEXPR: &str = "test/liveliness/regression/2"; zenoh_util::init_log_from_env_or("error");