diff --git a/zenoh/src/net/routing/dispatcher/face.rs b/zenoh/src/net/routing/dispatcher/face.rs index 54af7a01ee..df2167b65f 100644 --- a/zenoh/src/net/routing/dispatcher/face.rs +++ b/zenoh/src/net/routing/dispatcher/face.rs @@ -150,6 +150,7 @@ impl Primitives for Face { ctrl_lock.as_ref(), &self.tables, &mut self.state.clone(), + m.id, &m.wire_expr, &m.ext_info, msg.ext_nodeid.node_id, @@ -160,6 +161,7 @@ impl Primitives for Face { ctrl_lock.as_ref(), &self.tables, &mut self.state.clone(), + m.id, &m.ext_wire_expr.wire_expr, msg.ext_nodeid.node_id, ); diff --git a/zenoh/src/net/routing/dispatcher/queries.rs b/zenoh/src/net/routing/dispatcher/queries.rs index 65193d41ac..f1a93212cf 100644 --- a/zenoh/src/net/routing/dispatcher/queries.rs +++ b/zenoh/src/net/routing/dispatcher/queries.rs @@ -23,6 +23,7 @@ use std::sync::{Arc, Weak}; use zenoh_config::WhatAmI; use zenoh_protocol::core::key_expr::keyexpr; use zenoh_protocol::network::declare::queryable::ext::QueryableInfo; +use zenoh_protocol::network::declare::QueryableId; use zenoh_protocol::{ core::{Encoding, WireExpr}, network::{ @@ -44,6 +45,7 @@ pub(crate) fn declare_queryable( hat_code: &(dyn HatTrait + Send + Sync), tables: &TablesLock, face: &mut Arc, + id: QueryableId, expr: &WireExpr, qabl_info: &QueryableInfo, node_id: NodeId, @@ -81,7 +83,7 @@ pub(crate) fn declare_queryable( (res, wtables) }; - hat_code.declare_queryable(&mut wtables, face, &mut res, qabl_info, node_id); + hat_code.declare_queryable(&mut wtables, face, id, &mut res, qabl_info, node_id); disable_matches_query_routes(&mut wtables, &mut res); drop(wtables); @@ -110,47 +112,57 @@ pub(crate) fn undeclare_queryable( hat_code: &(dyn HatTrait + Send + Sync), tables: &TablesLock, face: &mut Arc, + id: QueryableId, expr: &WireExpr, node_id: NodeId, ) { - let rtables = zread!(tables.tables); - match rtables.get_mapping(face, &expr.scope, expr.mapping) { - Some(prefix) => match Resource::get_resource(prefix, expr.suffix.as_ref()) { - Some(mut res) => { - log::debug!("{} Undeclare queryable ({})", face, res.expr()); - drop(rtables); - let mut wtables = zwrite!(tables.tables); - - hat_code.undeclare_queryable(&mut wtables, face, &mut res, node_id); - - disable_matches_query_routes(&mut wtables, &mut res); - drop(wtables); - - let rtables = zread!(tables.tables); - let matches_query_routes = compute_matches_query_routes(&rtables, &res); - drop(rtables); - - let wtables = zwrite!(tables.tables); - for (mut res, query_routes) in matches_query_routes { - get_mut_unchecked(&mut res) - .context_mut() - .update_query_routes(query_routes); + let res = if expr.is_empty() { + None + } else { + let rtables = zread!(tables.tables); + match rtables.get_mapping(face, &expr.scope, expr.mapping) { + Some(prefix) => match Resource::get_resource(prefix, expr.suffix.as_ref()) { + Some(res) => Some(res), + None => { + log::error!( + "{} Undeclare unknown queryable {}{}!", + face, + prefix.expr(), + expr.suffix + ); + return; } - Resource::clean(&mut res); - drop(wtables); + }, + None => { + log::error!( + "{} Undeclare queryable with unknown scope {}", + face, + expr.scope + ); + return; } - None => log::error!( - "{} Undeclare unknown queryable ({}{})!", - face, - prefix.expr(), - expr.suffix - ), - }, - None => log::error!( - "{} Undeclare queryable with unknown scope {}!", - face, - expr.scope - ), + } + }; + let mut wtables = zwrite!(tables.tables); + if let Some(mut res) = hat_code.undeclare_queryable(&mut wtables, face, id, res, node_id) { + log::debug!("{} Undeclare queryable {} ({})", face, id, res.expr()); + disable_matches_query_routes(&mut wtables, &mut res); + drop(wtables); + + let rtables = zread!(tables.tables); + let matches_query_routes = compute_matches_query_routes(&rtables, &res); + drop(rtables); + + let wtables = zwrite!(tables.tables); + for (mut res, query_routes) in matches_query_routes { + get_mut_unchecked(&mut res) + .context_mut() + .update_query_routes(query_routes); + } + Resource::clean(&mut res); + drop(wtables); + } else { + log::error!("{} Undeclare unknown queryable {}", face, id); } } diff --git a/zenoh/src/net/routing/hat/client/mod.rs b/zenoh/src/net/routing/hat/client/mod.rs index 8caa87d2b8..05210bcaee 100644 --- a/zenoh/src/net/routing/hat/client/mod.rs +++ b/zenoh/src/net/routing/hat/client/mod.rs @@ -40,11 +40,11 @@ use super::{ }; use std::{ any::Any, - collections::{HashMap, HashSet}, + collections::HashMap, sync::{atomic::AtomicU32, Arc}, }; use zenoh_config::WhatAmI; -use zenoh_protocol::network::declare::{queryable::ext::QueryableInfo, SubscriberId}; +use zenoh_protocol::network::declare::{queryable::ext::QueryableInfo, QueryableId, SubscriberId}; use zenoh_protocol::network::Oam; use zenoh_result::ZResult; use zenoh_sync::get_mut_unchecked; @@ -159,7 +159,7 @@ impl HatBaseTrait for HatCode { } let mut qabls_matches = vec![]; - for mut res in face + for (_id, mut res) in face .hat .downcast_mut::() .unwrap() @@ -293,8 +293,8 @@ struct HatFace { next_id: AtomicU32, // @TODO: manage rollover and uniqueness local_subs: HashMap, SubscriberId>, remote_subs: HashMap>, - local_qabls: HashMap, QueryableInfo>, - remote_qabls: HashSet>, + local_qabls: HashMap, (QueryableId, QueryableInfo)>, + remote_qabls: HashMap>, } impl HatFace { @@ -304,7 +304,7 @@ impl HatFace { local_subs: HashMap::new(), remote_subs: HashMap::new(), local_qabls: HashMap::new(), - remote_qabls: HashSet::new(), + remote_qabls: HashMap::new(), } } } diff --git a/zenoh/src/net/routing/hat/client/queries.rs b/zenoh/src/net/routing/hat/client/queries.rs index 0f5b63fc2e..e8682fc5c7 100644 --- a/zenoh/src/net/routing/hat/client/queries.rs +++ b/zenoh/src/net/routing/hat/client/queries.rs @@ -23,10 +23,12 @@ use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS}; use ordered_float::OrderedFloat; use std::borrow::Cow; use std::collections::{HashMap, HashSet}; +use std::sync::atomic::Ordering; use std::sync::Arc; use zenoh_buffers::ZBuf; use zenoh_protocol::core::key_expr::include::{Includer, DEFAULT_INCLUDER}; use zenoh_protocol::core::key_expr::OwnedKeyExpr; +use zenoh_protocol::network::declare::QueryableId; use zenoh_protocol::{ core::{WhatAmI, WireExpr}, network::declare::{ @@ -83,16 +85,19 @@ fn propagate_simple_queryable( let faces = tables.faces.values().cloned(); for mut dst_face in faces { let info = local_qabl_info(tables, res, &dst_face); - let current_info = face_hat!(dst_face).local_qabls.get(res); + let current = face_hat!(dst_face).local_qabls.get(res); if (src_face.is_none() || src_face.as_ref().unwrap().id != dst_face.id) - && (current_info.is_none() || *current_info.unwrap() != info) + && (current.is_none() || current.unwrap().1 != info) && (src_face.is_none() || src_face.as_ref().unwrap().whatami == WhatAmI::Client || dst_face.whatami == WhatAmI::Client) { + let id = current + .map(|c| c.0) + .unwrap_or(face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst)); face_hat_mut!(&mut dst_face) .local_qabls - .insert(res.clone(), info); + .insert(res.clone(), (id, info)); let key_expr = Resource::decl_key(res, &mut dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { @@ -100,7 +105,7 @@ fn propagate_simple_queryable( ext_tstamp: None, ext_nodeid: ext::NodeIdType::default(), body: DeclareBody::DeclareQueryable(DeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) + id, wire_expr: key_expr, ext_info: info, }), @@ -114,6 +119,7 @@ fn propagate_simple_queryable( fn register_client_queryable( _tables: &mut Tables, face: &mut Arc, + id: QueryableId, res: &mut Arc, qabl_info: &QueryableInfo, ) { @@ -132,16 +138,17 @@ fn register_client_queryable( })) .qabl = Some(*qabl_info); } - face_hat_mut!(face).remote_qabls.insert(res.clone()); + face_hat_mut!(face).remote_qabls.insert(id, res.clone()); } fn declare_client_queryable( tables: &mut Tables, face: &mut Arc, + id: QueryableId, res: &mut Arc, qabl_info: &QueryableInfo, ) { - register_client_queryable(tables, face, res, qabl_info); + register_client_queryable(tables, face, id, res, qabl_info); propagate_simple_queryable(tables, res, Some(face)); } @@ -161,22 +168,19 @@ fn client_qabls(res: &Arc) -> Vec> { fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc) { for face in tables.faces.values_mut() { - if face_hat!(face).local_qabls.contains_key(res) { - let wire_expr = Resource::get_best_key(res, "", face.id); + if let Some((id, _)) = face_hat_mut!(face).local_qabls.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { ext_qos: ext::QoSType::declare_default(), ext_tstamp: None, ext_nodeid: ext::NodeIdType::default(), body: DeclareBody::UndeclareQueryable(UndeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) - ext_wire_expr: WireExprType { wire_expr }, + id, + ext_wire_expr: WireExprType::null(), }), }, res.expr(), )); - - face_hat_mut!(face).local_qabls.remove(res); } } } @@ -186,37 +190,37 @@ pub(super) fn undeclare_client_queryable( face: &mut Arc, res: &mut Arc, ) { - if let Some(ctx) = get_mut_unchecked(res).session_ctxs.get_mut(&face.id) { - get_mut_unchecked(ctx).qabl = None; - if ctx.qabl.is_none() { - face_hat_mut!(face).remote_qabls.remove(res); + if !face_hat_mut!(face) + .remote_qabls + .values() + .any(|s| *s == *res) + { + if let Some(ctx) = get_mut_unchecked(res).session_ctxs.get_mut(&face.id) { + get_mut_unchecked(ctx).qabl = None; } - } - let mut client_qabls = client_qabls(res); - if client_qabls.is_empty() { - propagate_forget_simple_queryable(tables, res); - } else { - propagate_simple_queryable(tables, res, None); - } - if client_qabls.len() == 1 { - let face = &mut client_qabls[0]; - if face_hat!(face).local_qabls.contains_key(res) { - let wire_expr = Resource::get_best_key(res, "", face.id); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::UndeclareQueryable(UndeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) - ext_wire_expr: WireExprType { wire_expr }, - }), - }, - res.expr(), - )); - - face_hat_mut!(face).local_qabls.remove(res); + let mut client_qabls = client_qabls(res); + if client_qabls.is_empty() { + propagate_forget_simple_queryable(tables, res); + } else { + propagate_simple_queryable(tables, res, None); + } + if client_qabls.len() == 1 { + let face = &mut client_qabls[0]; + if let Some((id, _)) = face_hat_mut!(face).local_qabls.remove(res) { + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::UndeclareQueryable(UndeclareQueryable { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr(), + )); + } } } } @@ -224,9 +228,14 @@ pub(super) fn undeclare_client_queryable( fn forget_client_queryable( tables: &mut Tables, face: &mut Arc, - res: &mut Arc, -) { - undeclare_client_queryable(tables, face, res); + id: QueryableId, +) -> Option> { + if let Some(mut res) = face_hat_mut!(face).remote_qabls.remove(&id) { + undeclare_client_queryable(tables, face, &mut res); + Some(res) + } else { + None + } } pub(super) fn queries_new_face(tables: &mut Tables, _face: &mut Arc) { @@ -236,7 +245,7 @@ pub(super) fn queries_new_face(tables: &mut Tables, _face: &mut Arc) .cloned() .collect::>>() { - for qabl in face_hat!(face).remote_qabls.iter() { + for qabl in face_hat!(face).remote_qabls.values() { propagate_simple_queryable(tables, qabl, Some(&mut face.clone())); } } @@ -251,27 +260,29 @@ impl HatQueriesTrait for HatCode { &self, tables: &mut Tables, face: &mut Arc, + id: QueryableId, res: &mut Arc, qabl_info: &QueryableInfo, _node_id: NodeId, ) { - declare_client_queryable(tables, face, res, qabl_info); + declare_client_queryable(tables, face, id, res, qabl_info); } fn undeclare_queryable( &self, tables: &mut Tables, face: &mut Arc, - res: &mut Arc, + id: QueryableId, + _res: Option>, _node_id: NodeId, - ) { - forget_client_queryable(tables, face, res); + ) -> Option> { + forget_client_queryable(tables, face, id) } fn get_queryables(&self, tables: &Tables) -> Vec> { let mut qabls = HashSet::new(); for src_face in tables.faces.values() { - for qabl in &face_hat!(src_face).remote_qabls { + for qabl in face_hat!(src_face).remote_qabls.values() { qabls.insert(qabl.clone()); } } diff --git a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs index 8434549682..5591ea3b3e 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs @@ -53,7 +53,7 @@ use zenoh_config::{unwrap_or_default, ModeDependent, WhatAmI, WhatAmIMatcher, Ze use zenoh_protocol::{ common::ZExtBody, network::{ - declare::{queryable::ext::QueryableInfo, SubscriberId}, + declare::{queryable::ext::QueryableInfo, QueryableId, SubscriberId}, oam::id::OAM_LINKSTATE, Oam, }, @@ -278,7 +278,7 @@ impl HatBaseTrait for HatCode { } let mut qabls_matches = vec![]; - for mut res in face + for (_, mut res) in face .hat .downcast_mut::() .unwrap() @@ -476,8 +476,8 @@ struct HatFace { next_id: AtomicU32, // @TODO: manage rollover and uniqueness local_subs: HashMap, SubscriberId>, remote_subs: HashMap>, - local_qabls: HashMap, QueryableInfo>, - remote_qabls: HashSet>, + local_qabls: HashMap, (QueryableId, QueryableInfo)>, + remote_qabls: HashMap>, } impl HatFace { @@ -488,7 +488,7 @@ impl HatFace { local_subs: HashMap::new(), remote_subs: HashMap::new(), local_qabls: HashMap::new(), - remote_qabls: HashSet::new(), + remote_qabls: HashMap::new(), } } } diff --git a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs index 50eb00023f..de8790f8c9 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs @@ -26,10 +26,12 @@ use ordered_float::OrderedFloat; use petgraph::graph::NodeIndex; use std::borrow::Cow; use std::collections::HashMap; +use std::sync::atomic::Ordering; use std::sync::Arc; use zenoh_buffers::ZBuf; use zenoh_protocol::core::key_expr::include::{Includer, DEFAULT_INCLUDER}; use zenoh_protocol::core::key_expr::OwnedKeyExpr; +use zenoh_protocol::network::declare::QueryableId; use zenoh_protocol::{ core::{WhatAmI, WireExpr, ZenohId}, network::declare::{ @@ -141,7 +143,7 @@ fn send_sourced_queryable_to_net_childs( node_id: routing_context, }, body: DeclareBody::DeclareQueryable(DeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) + id: 0, wire_expr: key_expr, ext_info: *qabl_info, }), @@ -164,14 +166,17 @@ fn propagate_simple_queryable( let faces = tables.faces.values().cloned(); for mut dst_face in faces { let info = local_qabl_info(tables, res, &dst_face); - let current_info = face_hat!(dst_face).local_qabls.get(res); + let current = face_hat!(dst_face).local_qabls.get(res); if (src_face.is_none() || src_face.as_ref().unwrap().id != dst_face.id) - && (current_info.is_none() || *current_info.unwrap() != info) + && (current.is_none() || current.unwrap().1 != info) && dst_face.whatami == WhatAmI::Client { + let id = current + .map(|c| c.0) + .unwrap_or(face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst)); face_hat_mut!(&mut dst_face) .local_qabls - .insert(res.clone(), info); + .insert(res.clone(), (id, info)); let key_expr = Resource::decl_key(res, &mut dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { @@ -179,7 +184,7 @@ fn propagate_simple_queryable( ext_tstamp: None, ext_nodeid: ext::NodeIdType::default(), body: DeclareBody::DeclareQueryable(DeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) + id, wire_expr: key_expr, ext_info: info, }), @@ -266,6 +271,7 @@ fn declare_peer_queryable( fn register_client_queryable( _tables: &mut Tables, face: &mut Arc, + id: QueryableId, res: &mut Arc, qabl_info: &QueryableInfo, ) { @@ -284,17 +290,17 @@ fn register_client_queryable( })) .qabl = Some(*qabl_info); } - face_hat_mut!(face).remote_qabls.insert(res.clone()); + face_hat_mut!(face).remote_qabls.insert(id, res.clone()); } fn declare_client_queryable( tables: &mut Tables, face: &mut Arc, + id: QueryableId, res: &mut Arc, qabl_info: &QueryableInfo, ) { - register_client_queryable(tables, face, res, qabl_info); - + register_client_queryable(tables, face, id, res, qabl_info); let local_details = local_peer_qabl_info(tables, res); let zid = tables.zid; register_peer_queryable(tables, Some(face), res, &local_details, zid); @@ -347,7 +353,7 @@ fn send_forget_sourced_queryable_to_net_childs( node_id: routing_context, }, body: DeclareBody::UndeclareQueryable(UndeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) + id: 0, ext_wire_expr: WireExprType { wire_expr }, }), }, @@ -363,22 +369,19 @@ fn send_forget_sourced_queryable_to_net_childs( fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc) { for face in tables.faces.values_mut() { - if face_hat!(face).local_qabls.contains_key(res) { - let wire_expr = Resource::get_best_key(res, "", face.id); + if let Some((id, _)) = face_hat_mut!(face).local_qabls.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { ext_qos: ext::QoSType::declare_default(), ext_tstamp: None, ext_nodeid: ext::NodeIdType::default(), body: DeclareBody::UndeclareQueryable(UndeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) - ext_wire_expr: WireExprType { wire_expr }, + id, + ext_wire_expr: WireExprType::null(), }), }, res.expr(), )); - - face_hat_mut!(face).local_qabls.remove(res); } } } @@ -458,41 +461,41 @@ pub(super) fn undeclare_client_queryable( face: &mut Arc, res: &mut Arc, ) { - if let Some(ctx) = get_mut_unchecked(res).session_ctxs.get_mut(&face.id) { - get_mut_unchecked(ctx).qabl = None; - if ctx.qabl.is_none() { - face_hat_mut!(face).remote_qabls.remove(res); + if !face_hat_mut!(face) + .remote_qabls + .values() + .any(|s| *s == *res) + { + if let Some(ctx) = get_mut_unchecked(res).session_ctxs.get_mut(&face.id) { + get_mut_unchecked(ctx).qabl = None; } - } - let mut client_qabls = client_qabls(res); - let peer_qabls = remote_peer_qabls(tables, res); + let mut client_qabls = client_qabls(res); + let peer_qabls = remote_peer_qabls(tables, res); - if client_qabls.is_empty() { - undeclare_peer_queryable(tables, None, res, &tables.zid.clone()); - } else { - let local_info = local_peer_qabl_info(tables, res); - register_peer_queryable(tables, None, res, &local_info, tables.zid); - } - - if client_qabls.len() == 1 && !peer_qabls { - let face = &mut client_qabls[0]; - if face_hat!(face).local_qabls.contains_key(res) { - let wire_expr = Resource::get_best_key(res, "", face.id); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::UndeclareQueryable(UndeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) - ext_wire_expr: WireExprType { wire_expr }, - }), - }, - res.expr(), - )); + if client_qabls.is_empty() { + undeclare_peer_queryable(tables, None, res, &tables.zid.clone()); + } else { + let local_info = local_peer_qabl_info(tables, res); + register_peer_queryable(tables, None, res, &local_info, tables.zid); + } - face_hat_mut!(face).local_qabls.remove(res); + if client_qabls.len() == 1 && !peer_qabls { + let face = &mut client_qabls[0]; + if let Some((id, _)) = face_hat_mut!(face).local_qabls.remove(res) { + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::UndeclareQueryable(UndeclareQueryable { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr(), + )); + } } } } @@ -500,9 +503,14 @@ pub(super) fn undeclare_client_queryable( fn forget_client_queryable( tables: &mut Tables, face: &mut Arc, - res: &mut Arc, -) { - undeclare_client_queryable(tables, face, res); + id: QueryableId, +) -> Option> { + if let Some(mut res) = face_hat_mut!(face).remote_qabls.remove(&id) { + undeclare_client_queryable(tables, face, &mut res); + Some(res) + } else { + None + } } pub(super) fn queries_new_face(tables: &mut Tables, face: &mut Arc) { @@ -510,7 +518,10 @@ pub(super) fn queries_new_face(tables: &mut Tables, face: &mut Arc) { for qabl in &hat!(tables).peer_qabls { if qabl.context.is_some() { let info = local_qabl_info(tables, qabl, face); - face_hat_mut!(face).local_qabls.insert(qabl.clone(), info); + let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(face) + .local_qabls + .insert(qabl.clone(), (id, info)); let key_expr = Resource::decl_key(qabl, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { @@ -518,7 +529,7 @@ pub(super) fn queries_new_face(tables: &mut Tables, face: &mut Arc) { ext_tstamp: None, ext_nodeid: ext::NodeIdType::default(), body: DeclareBody::DeclareQueryable(DeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) + id, wire_expr: key_expr, ext_info: info, }), @@ -631,6 +642,7 @@ impl HatQueriesTrait for HatCode { &self, tables: &mut Tables, face: &mut Arc, + id: QueryableId, res: &mut Arc, qabl_info: &QueryableInfo, node_id: NodeId, @@ -640,7 +652,7 @@ impl HatQueriesTrait for HatCode { declare_peer_queryable(tables, face, res, qabl_info, peer); } } else { - declare_client_queryable(tables, face, res, qabl_info); + declare_client_queryable(tables, face, id, res, qabl_info); } } @@ -648,15 +660,23 @@ impl HatQueriesTrait for HatCode { &self, tables: &mut Tables, face: &mut Arc, - res: &mut Arc, + id: QueryableId, + res: Option>, node_id: NodeId, - ) { + ) -> Option> { if face.whatami != WhatAmI::Client { - if let Some(peer) = get_peer(tables, face, node_id) { - forget_peer_queryable(tables, face, res, &peer); + if let Some(mut res) = res { + if let Some(peer) = get_peer(tables, face, node_id) { + forget_peer_queryable(tables, face, &mut res, &peer); + Some(res) + } else { + None + } + } else { + None } } else { - forget_client_queryable(tables, face, res); + forget_client_queryable(tables, face, id) } } diff --git a/zenoh/src/net/routing/hat/mod.rs b/zenoh/src/net/routing/hat/mod.rs index 568a036abd..d9feb687f2 100644 --- a/zenoh/src/net/routing/hat/mod.rs +++ b/zenoh/src/net/routing/hat/mod.rs @@ -31,7 +31,10 @@ use zenoh_config::{unwrap_or_default, Config, WhatAmI}; use zenoh_protocol::{ core::WireExpr, network::{ - declare::{queryable::ext::QueryableInfo, subscriber::ext::SubscriberInfo, SubscriberId}, + declare::{ + queryable::ext::QueryableInfo, subscriber::ext::SubscriberInfo, QueryableId, + SubscriberId, + }, Oam, }, }; @@ -149,6 +152,7 @@ pub(crate) trait HatQueriesTrait { &self, tables: &mut Tables, face: &mut Arc, + id: QueryableId, res: &mut Arc, qabl_info: &QueryableInfo, node_id: NodeId, @@ -157,9 +161,10 @@ pub(crate) trait HatQueriesTrait { &self, tables: &mut Tables, face: &mut Arc, - res: &mut Arc, + id: QueryableId, + res: Option>, node_id: NodeId, - ); + ) -> Option>; fn get_queryables(&self, tables: &Tables) -> Vec>; diff --git a/zenoh/src/net/routing/hat/p2p_peer/mod.rs b/zenoh/src/net/routing/hat/p2p_peer/mod.rs index 5b4503c51d..1a6c1ba407 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/mod.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/mod.rs @@ -45,11 +45,14 @@ use super::{ }; use std::{ any::Any, - collections::{HashMap, HashSet}, + collections::HashMap, sync::{atomic::AtomicU32, Arc}, }; use zenoh_config::{unwrap_or_default, ModeDependent, WhatAmI, WhatAmIMatcher}; -use zenoh_protocol::network::{declare::SubscriberId, Oam}; +use zenoh_protocol::network::{ + declare::{QueryableId, SubscriberId}, + Oam, +}; use zenoh_protocol::{ common::ZExtBody, network::{declare::queryable::ext::QueryableInfo, oam::id::OAM_LINKSTATE}, @@ -205,7 +208,7 @@ impl HatBaseTrait for HatCode { } let mut qabls_matches = vec![]; - for mut res in face + for (_id, mut res) in face .hat .downcast_mut::() .unwrap() @@ -366,8 +369,8 @@ struct HatFace { next_id: AtomicU32, // @TODO: manage rollover and uniqueness local_subs: HashMap, SubscriberId>, remote_subs: HashMap>, - local_qabls: HashMap, QueryableInfo>, - remote_qabls: HashSet>, + local_qabls: HashMap, (QueryableId, QueryableInfo)>, + remote_qabls: HashMap>, } impl HatFace { @@ -377,7 +380,7 @@ impl HatFace { local_subs: HashMap::new(), remote_subs: HashMap::new(), local_qabls: HashMap::new(), - remote_qabls: HashSet::new(), + remote_qabls: HashMap::new(), } } } diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index 0f5b63fc2e..e8682fc5c7 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -23,10 +23,12 @@ use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS}; use ordered_float::OrderedFloat; use std::borrow::Cow; use std::collections::{HashMap, HashSet}; +use std::sync::atomic::Ordering; use std::sync::Arc; use zenoh_buffers::ZBuf; use zenoh_protocol::core::key_expr::include::{Includer, DEFAULT_INCLUDER}; use zenoh_protocol::core::key_expr::OwnedKeyExpr; +use zenoh_protocol::network::declare::QueryableId; use zenoh_protocol::{ core::{WhatAmI, WireExpr}, network::declare::{ @@ -83,16 +85,19 @@ fn propagate_simple_queryable( let faces = tables.faces.values().cloned(); for mut dst_face in faces { let info = local_qabl_info(tables, res, &dst_face); - let current_info = face_hat!(dst_face).local_qabls.get(res); + let current = face_hat!(dst_face).local_qabls.get(res); if (src_face.is_none() || src_face.as_ref().unwrap().id != dst_face.id) - && (current_info.is_none() || *current_info.unwrap() != info) + && (current.is_none() || current.unwrap().1 != info) && (src_face.is_none() || src_face.as_ref().unwrap().whatami == WhatAmI::Client || dst_face.whatami == WhatAmI::Client) { + let id = current + .map(|c| c.0) + .unwrap_or(face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst)); face_hat_mut!(&mut dst_face) .local_qabls - .insert(res.clone(), info); + .insert(res.clone(), (id, info)); let key_expr = Resource::decl_key(res, &mut dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { @@ -100,7 +105,7 @@ fn propagate_simple_queryable( ext_tstamp: None, ext_nodeid: ext::NodeIdType::default(), body: DeclareBody::DeclareQueryable(DeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) + id, wire_expr: key_expr, ext_info: info, }), @@ -114,6 +119,7 @@ fn propagate_simple_queryable( fn register_client_queryable( _tables: &mut Tables, face: &mut Arc, + id: QueryableId, res: &mut Arc, qabl_info: &QueryableInfo, ) { @@ -132,16 +138,17 @@ fn register_client_queryable( })) .qabl = Some(*qabl_info); } - face_hat_mut!(face).remote_qabls.insert(res.clone()); + face_hat_mut!(face).remote_qabls.insert(id, res.clone()); } fn declare_client_queryable( tables: &mut Tables, face: &mut Arc, + id: QueryableId, res: &mut Arc, qabl_info: &QueryableInfo, ) { - register_client_queryable(tables, face, res, qabl_info); + register_client_queryable(tables, face, id, res, qabl_info); propagate_simple_queryable(tables, res, Some(face)); } @@ -161,22 +168,19 @@ fn client_qabls(res: &Arc) -> Vec> { fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc) { for face in tables.faces.values_mut() { - if face_hat!(face).local_qabls.contains_key(res) { - let wire_expr = Resource::get_best_key(res, "", face.id); + if let Some((id, _)) = face_hat_mut!(face).local_qabls.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { ext_qos: ext::QoSType::declare_default(), ext_tstamp: None, ext_nodeid: ext::NodeIdType::default(), body: DeclareBody::UndeclareQueryable(UndeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) - ext_wire_expr: WireExprType { wire_expr }, + id, + ext_wire_expr: WireExprType::null(), }), }, res.expr(), )); - - face_hat_mut!(face).local_qabls.remove(res); } } } @@ -186,37 +190,37 @@ pub(super) fn undeclare_client_queryable( face: &mut Arc, res: &mut Arc, ) { - if let Some(ctx) = get_mut_unchecked(res).session_ctxs.get_mut(&face.id) { - get_mut_unchecked(ctx).qabl = None; - if ctx.qabl.is_none() { - face_hat_mut!(face).remote_qabls.remove(res); + if !face_hat_mut!(face) + .remote_qabls + .values() + .any(|s| *s == *res) + { + if let Some(ctx) = get_mut_unchecked(res).session_ctxs.get_mut(&face.id) { + get_mut_unchecked(ctx).qabl = None; } - } - let mut client_qabls = client_qabls(res); - if client_qabls.is_empty() { - propagate_forget_simple_queryable(tables, res); - } else { - propagate_simple_queryable(tables, res, None); - } - if client_qabls.len() == 1 { - let face = &mut client_qabls[0]; - if face_hat!(face).local_qabls.contains_key(res) { - let wire_expr = Resource::get_best_key(res, "", face.id); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::UndeclareQueryable(UndeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) - ext_wire_expr: WireExprType { wire_expr }, - }), - }, - res.expr(), - )); - - face_hat_mut!(face).local_qabls.remove(res); + let mut client_qabls = client_qabls(res); + if client_qabls.is_empty() { + propagate_forget_simple_queryable(tables, res); + } else { + propagate_simple_queryable(tables, res, None); + } + if client_qabls.len() == 1 { + let face = &mut client_qabls[0]; + if let Some((id, _)) = face_hat_mut!(face).local_qabls.remove(res) { + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::UndeclareQueryable(UndeclareQueryable { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr(), + )); + } } } } @@ -224,9 +228,14 @@ pub(super) fn undeclare_client_queryable( fn forget_client_queryable( tables: &mut Tables, face: &mut Arc, - res: &mut Arc, -) { - undeclare_client_queryable(tables, face, res); + id: QueryableId, +) -> Option> { + if let Some(mut res) = face_hat_mut!(face).remote_qabls.remove(&id) { + undeclare_client_queryable(tables, face, &mut res); + Some(res) + } else { + None + } } pub(super) fn queries_new_face(tables: &mut Tables, _face: &mut Arc) { @@ -236,7 +245,7 @@ pub(super) fn queries_new_face(tables: &mut Tables, _face: &mut Arc) .cloned() .collect::>>() { - for qabl in face_hat!(face).remote_qabls.iter() { + for qabl in face_hat!(face).remote_qabls.values() { propagate_simple_queryable(tables, qabl, Some(&mut face.clone())); } } @@ -251,27 +260,29 @@ impl HatQueriesTrait for HatCode { &self, tables: &mut Tables, face: &mut Arc, + id: QueryableId, res: &mut Arc, qabl_info: &QueryableInfo, _node_id: NodeId, ) { - declare_client_queryable(tables, face, res, qabl_info); + declare_client_queryable(tables, face, id, res, qabl_info); } fn undeclare_queryable( &self, tables: &mut Tables, face: &mut Arc, - res: &mut Arc, + id: QueryableId, + _res: Option>, _node_id: NodeId, - ) { - forget_client_queryable(tables, face, res); + ) -> Option> { + forget_client_queryable(tables, face, id) } fn get_queryables(&self, tables: &Tables) -> Vec> { let mut qabls = HashSet::new(); for src_face in tables.faces.values() { - for qabl in &face_hat!(src_face).remote_qabls { + for qabl in face_hat!(src_face).remote_qabls.values() { qabls.insert(qabl.clone()); } } diff --git a/zenoh/src/net/routing/hat/router/mod.rs b/zenoh/src/net/routing/hat/router/mod.rs index 60df3d8a37..ff576ae271 100644 --- a/zenoh/src/net/routing/hat/router/mod.rs +++ b/zenoh/src/net/routing/hat/router/mod.rs @@ -58,7 +58,7 @@ use zenoh_config::{unwrap_or_default, ModeDependent, WhatAmI, WhatAmIMatcher, Ze use zenoh_protocol::{ common::ZExtBody, network::{ - declare::{queryable::ext::QueryableInfo, SubscriberId}, + declare::{queryable::ext::QueryableInfo, QueryableId, SubscriberId}, oam::id::OAM_LINKSTATE, Oam, }, @@ -447,7 +447,7 @@ impl HatBaseTrait for HatCode { } let mut qabls_matches = vec![]; - for mut res in face + for (_, mut res) in face .hat .downcast_mut::() .unwrap() @@ -777,8 +777,8 @@ struct HatFace { next_id: AtomicU32, // @TODO: manage rollover and uniqueness local_subs: HashMap, SubscriberId>, remote_subs: HashMap>, - local_qabls: HashMap, QueryableInfo>, - remote_qabls: HashSet>, + local_qabls: HashMap, (QueryableId, QueryableInfo)>, + remote_qabls: HashMap>, } impl HatFace { @@ -789,7 +789,7 @@ impl HatFace { local_subs: HashMap::new(), remote_subs: HashMap::new(), local_qabls: HashMap::new(), - remote_qabls: HashSet::new(), + remote_qabls: HashMap::new(), } } } diff --git a/zenoh/src/net/routing/hat/router/queries.rs b/zenoh/src/net/routing/hat/router/queries.rs index 877c82f71f..4180585d5b 100644 --- a/zenoh/src/net/routing/hat/router/queries.rs +++ b/zenoh/src/net/routing/hat/router/queries.rs @@ -26,10 +26,12 @@ use ordered_float::OrderedFloat; use petgraph::graph::NodeIndex; use std::borrow::Cow; use std::collections::HashMap; +use std::sync::atomic::Ordering; use std::sync::Arc; use zenoh_buffers::ZBuf; use zenoh_protocol::core::key_expr::include::{Includer, DEFAULT_INCLUDER}; use zenoh_protocol::core::key_expr::OwnedKeyExpr; +use zenoh_protocol::network::declare::QueryableId; use zenoh_protocol::{ core::{WhatAmI, WireExpr, ZenohId}, network::declare::{ @@ -212,7 +214,7 @@ fn send_sourced_queryable_to_net_childs( node_id: routing_context, }, body: DeclareBody::DeclareQueryable(DeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) + id: 0, wire_expr: key_expr, ext_info: *qabl_info, }), @@ -236,9 +238,9 @@ fn propagate_simple_queryable( let faces = tables.faces.values().cloned(); for mut dst_face in faces { let info = local_qabl_info(tables, res, &dst_face); - let current_info = face_hat!(dst_face).local_qabls.get(res); + let current = face_hat!(dst_face).local_qabls.get(res); if (src_face.is_none() || src_face.as_ref().unwrap().id != dst_face.id) - && (current_info.is_none() || *current_info.unwrap() != info) + && (current.is_none() || current.unwrap().1 != info) && if full_peers_net { dst_face.whatami == WhatAmI::Client } else { @@ -250,9 +252,12 @@ fn propagate_simple_queryable( .failover_brokering(src_face.as_ref().unwrap().zid, dst_face.zid)) } { + let id = current + .map(|c| c.0) + .unwrap_or(face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst)); face_hat_mut!(&mut dst_face) .local_qabls - .insert(res.clone(), info); + .insert(res.clone(), (id, info)); let key_expr = Resource::decl_key(res, &mut dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { @@ -260,7 +265,7 @@ fn propagate_simple_queryable( ext_tstamp: None, ext_nodeid: ext::NodeIdType::default(), body: DeclareBody::DeclareQueryable(DeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) + id, wire_expr: key_expr, ext_info: info, }), @@ -394,6 +399,7 @@ fn declare_peer_queryable( fn register_client_queryable( _tables: &mut Tables, face: &mut Arc, + id: QueryableId, res: &mut Arc, qabl_info: &QueryableInfo, ) { @@ -412,16 +418,17 @@ fn register_client_queryable( })) .qabl = Some(*qabl_info); } - face_hat_mut!(face).remote_qabls.insert(res.clone()); + face_hat_mut!(face).remote_qabls.insert(id, res.clone()); } fn declare_client_queryable( tables: &mut Tables, face: &mut Arc, + id: QueryableId, res: &mut Arc, qabl_info: &QueryableInfo, ) { - register_client_queryable(tables, face, res, qabl_info); + register_client_queryable(tables, face, id, res, qabl_info); let local_details = local_router_qabl_info(tables, res); let zid = tables.zid; register_router_queryable(tables, Some(face), res, &local_details, zid); @@ -483,7 +490,7 @@ fn send_forget_sourced_queryable_to_net_childs( node_id: routing_context, }, body: DeclareBody::UndeclareQueryable(UndeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) + id: 0, ext_wire_expr: WireExprType { wire_expr }, }), }, @@ -499,22 +506,19 @@ fn send_forget_sourced_queryable_to_net_childs( fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc) { for face in tables.faces.values_mut() { - if face_hat!(face).local_qabls.contains_key(res) { - let wire_expr = Resource::get_best_key(res, "", face.id); + if let Some((id, _)) = face_hat_mut!(face).local_qabls.remove(res) { face.primitives.send_declare(RoutingContext::with_expr( Declare { ext_qos: ext::QoSType::declare_default(), ext_tstamp: None, ext_nodeid: ext::NodeIdType::default(), body: DeclareBody::UndeclareQueryable(UndeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) - ext_wire_expr: WireExprType { wire_expr }, + id, + ext_wire_expr: WireExprType::null(), }), }, res.expr(), )); - - face_hat_mut!(face).local_qabls.remove(res); } } } @@ -540,21 +544,20 @@ fn propagate_forget_simple_queryable_to_peers(tables: &mut Tables, res: &mut Arc && hat!(tables).failover_brokering(s.face.zid, face.zid))) }) { - let wire_expr = Resource::get_best_key(res, "", face.id); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::UndeclareQueryable(UndeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) - ext_wire_expr: WireExprType { wire_expr }, - }), - }, - res.expr(), - )); - - face_hat_mut!(&mut face).local_qabls.remove(res); + if let Some((id, _)) = face_hat_mut!(&mut face).local_qabls.remove(res) { + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::UndeclareQueryable(UndeclareQueryable { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr(), + )); + } } } } @@ -680,43 +683,43 @@ pub(super) fn undeclare_client_queryable( face: &mut Arc, res: &mut Arc, ) { - if let Some(ctx) = get_mut_unchecked(res).session_ctxs.get_mut(&face.id) { - get_mut_unchecked(ctx).qabl = None; - if ctx.qabl.is_none() { - face_hat_mut!(face).remote_qabls.remove(res); + if !face_hat_mut!(face) + .remote_qabls + .values() + .any(|s| *s == *res) + { + if let Some(ctx) = get_mut_unchecked(res).session_ctxs.get_mut(&face.id) { + get_mut_unchecked(ctx).qabl = None; } - } - let mut client_qabls = client_qabls(res); - let router_qabls = remote_router_qabls(tables, res); - let peer_qabls = remote_peer_qabls(tables, res); + let mut client_qabls = client_qabls(res); + let router_qabls = remote_router_qabls(tables, res); + let peer_qabls = remote_peer_qabls(tables, res); - if client_qabls.is_empty() && !peer_qabls { - undeclare_router_queryable(tables, None, res, &tables.zid.clone()); - } else { - let local_info = local_router_qabl_info(tables, res); - register_router_queryable(tables, None, res, &local_info, tables.zid); - propagate_forget_simple_queryable_to_peers(tables, res); - } - - if client_qabls.len() == 1 && !router_qabls && !peer_qabls { - let face = &mut client_qabls[0]; - if face_hat!(face).local_qabls.contains_key(res) { - let wire_expr = Resource::get_best_key(res, "", face.id); - face.primitives.send_declare(RoutingContext::with_expr( - Declare { - ext_qos: ext::QoSType::declare_default(), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::default(), - body: DeclareBody::UndeclareQueryable(UndeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) - ext_wire_expr: WireExprType { wire_expr }, - }), - }, - res.expr(), - )); + if client_qabls.is_empty() && !peer_qabls { + undeclare_router_queryable(tables, None, res, &tables.zid.clone()); + } else { + let local_info = local_router_qabl_info(tables, res); + register_router_queryable(tables, None, res, &local_info, tables.zid); + propagate_forget_simple_queryable_to_peers(tables, res); + } - face_hat_mut!(face).local_qabls.remove(res); + if client_qabls.len() == 1 && !router_qabls && !peer_qabls { + let face = &mut client_qabls[0]; + if let Some((id, _)) = face_hat_mut!(face).local_qabls.remove(res) { + face.primitives.send_declare(RoutingContext::with_expr( + Declare { + ext_qos: ext::QoSType::declare_default(), + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::default(), + body: DeclareBody::UndeclareQueryable(UndeclareQueryable { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr(), + )); + } } } } @@ -724,9 +727,14 @@ pub(super) fn undeclare_client_queryable( fn forget_client_queryable( tables: &mut Tables, face: &mut Arc, - res: &mut Arc, -) { - undeclare_client_queryable(tables, face, res); + id: QueryableId, +) -> Option> { + if let Some(mut res) = face_hat_mut!(face).remote_qabls.remove(&id) { + undeclare_client_queryable(tables, face, &mut res); + Some(res) + } else { + None + } } pub(super) fn queries_new_face(tables: &mut Tables, face: &mut Arc) { @@ -734,7 +742,10 @@ pub(super) fn queries_new_face(tables: &mut Tables, face: &mut Arc) { for qabl in hat!(tables).router_qabls.iter() { if qabl.context.is_some() { let info = local_qabl_info(tables, qabl, face); - face_hat_mut!(face).local_qabls.insert(qabl.clone(), info); + let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(face) + .local_qabls + .insert(qabl.clone(), (id, info)); let key_expr = Resource::decl_key(qabl, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { @@ -742,7 +753,7 @@ pub(super) fn queries_new_face(tables: &mut Tables, face: &mut Arc) { ext_tstamp: None, ext_nodeid: ext::NodeIdType::default(), body: DeclareBody::DeclareQueryable(DeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) + id, wire_expr: key_expr, ext_info: info, }), @@ -763,7 +774,10 @@ pub(super) fn queries_new_face(tables: &mut Tables, face: &mut Arc) { })) { let info = local_qabl_info(tables, qabl, face); - face_hat_mut!(face).local_qabls.insert(qabl.clone(), info); + let id = face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst); + face_hat_mut!(face) + .local_qabls + .insert(qabl.clone(), (id, info)); let key_expr = Resource::decl_key(qabl, face); face.primitives.send_declare(RoutingContext::with_expr( Declare { @@ -771,7 +785,7 @@ pub(super) fn queries_new_face(tables: &mut Tables, face: &mut Arc) { ext_tstamp: None, ext_nodeid: ext::NodeIdType::default(), body: DeclareBody::DeclareQueryable(DeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) + id, wire_expr: key_expr, ext_info: info, }), @@ -833,7 +847,7 @@ pub(super) fn queries_remove_node(tables: &mut Tables, node: &ZenohId, net_type: pub(super) fn queries_linkstate_change(tables: &mut Tables, zid: &ZenohId, links: &[ZenohId]) { if let Some(src_face) = tables.get_face(zid) { if hat!(tables).router_peers_failover_brokering && src_face.whatami == WhatAmI::Peer { - for res in &face_hat!(src_face).remote_qabls { + for res in face_hat!(src_face).remote_qabls.values() { let client_qabls = res .session_ctxs .values() @@ -845,7 +859,7 @@ pub(super) fn queries_linkstate_change(tables: &mut Tables, zid: &ZenohId, links { let dst_face = &mut get_mut_unchecked(ctx).face; if dst_face.whatami == WhatAmI::Peer && src_face.zid != dst_face.zid { - if face_hat!(dst_face).local_qabls.contains_key(res) { + if let Some(id) = face_hat!(dst_face).local_subs.get(res).cloned() { let forget = !HatTables::failover_brokering_to(links, dst_face.zid) && { let ctx_links = hat!(tables) @@ -863,7 +877,6 @@ pub(super) fn queries_linkstate_change(tables: &mut Tables, zid: &ZenohId, links }) }; if forget { - let wire_expr = Resource::get_best_key(res, "", dst_face.id); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { ext_qos: ext::QoSType::declare_default(), @@ -871,8 +884,8 @@ pub(super) fn queries_linkstate_change(tables: &mut Tables, zid: &ZenohId, links ext_nodeid: ext::NodeIdType::default(), body: DeclareBody::UndeclareQueryable( UndeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) - ext_wire_expr: WireExprType { wire_expr }, + id, + ext_wire_expr: WireExprType::null(), }, ), }, @@ -884,9 +897,10 @@ pub(super) fn queries_linkstate_change(tables: &mut Tables, zid: &ZenohId, links } else if HatTables::failover_brokering_to(links, ctx.face.zid) { let dst_face = &mut get_mut_unchecked(ctx).face; let info = local_qabl_info(tables, res, dst_face); + let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst); face_hat_mut!(dst_face) .local_qabls - .insert(res.clone(), info); + .insert(res.clone(), (id, info)); let key_expr = Resource::decl_key(res, dst_face); dst_face.primitives.send_declare(RoutingContext::with_expr( Declare { @@ -894,7 +908,7 @@ pub(super) fn queries_linkstate_change(tables: &mut Tables, zid: &ZenohId, links ext_tstamp: None, ext_nodeid: ext::NodeIdType::default(), body: DeclareBody::DeclareQueryable(DeclareQueryable { - id: 0, // @TODO use proper QueryableId (#703) + id, wire_expr: key_expr, ext_info: info, }), @@ -1004,6 +1018,7 @@ impl HatQueriesTrait for HatCode { &self, tables: &mut Tables, face: &mut Arc, + id: QueryableId, res: &mut Arc, qabl_info: &QueryableInfo, node_id: NodeId, @@ -1020,10 +1035,10 @@ impl HatQueriesTrait for HatCode { declare_peer_queryable(tables, face, res, qabl_info, peer) } } else { - declare_client_queryable(tables, face, res, qabl_info) + declare_client_queryable(tables, face, id, res, qabl_info) } } - _ => declare_client_queryable(tables, face, res, qabl_info), + _ => declare_client_queryable(tables, face, id, res, qabl_info), } } @@ -1031,25 +1046,40 @@ impl HatQueriesTrait for HatCode { &self, tables: &mut Tables, face: &mut Arc, - res: &mut Arc, + id: QueryableId, + res: Option>, node_id: NodeId, - ) { + ) -> Option> { match face.whatami { WhatAmI::Router => { - if let Some(router) = get_router(tables, face, node_id) { - forget_router_queryable(tables, face, res, &router) + if let Some(mut res) = res { + if let Some(router) = get_router(tables, face, node_id) { + forget_router_queryable(tables, face, &mut res, &router); + Some(res) + } else { + None + } + } else { + None } } WhatAmI::Peer => { if hat!(tables).full_net(WhatAmI::Peer) { - if let Some(peer) = get_peer(tables, face, node_id) { - forget_peer_queryable(tables, face, res, &peer) + if let Some(mut res) = res { + if let Some(peer) = get_peer(tables, face, node_id) { + forget_peer_queryable(tables, face, &mut res, &peer); + Some(res) + } else { + None + } + } else { + None } } else { - forget_client_queryable(tables, face, res) + forget_client_queryable(tables, face, id) } } - _ => forget_client_queryable(tables, face, res), + _ => forget_client_queryable(tables, face, id), } }