From d4b8fbb111763fec071033e12fb335b26b5f3ace Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Tue, 31 Oct 2023 17:03:53 +0100 Subject: [PATCH] Code move --- zenoh/src/net/routing/dispatcher/face.rs | 19 +- zenoh/src/net/routing/dispatcher/pubsub.rs | 274 ++++------------- zenoh/src/net/routing/dispatcher/queries.rs | 235 +-------------- zenoh/src/net/routing/dispatcher/resource.rs | 15 +- zenoh/src/net/routing/dispatcher/tables.rs | 86 +----- zenoh/src/net/routing/hat/mod.rs | 136 ++++++++- zenoh/src/net/routing/hat/pubsub.rs | 255 +++++++++++++--- zenoh/src/net/routing/hat/queries.rs | 302 +++++++++++++++++-- 8 files changed, 698 insertions(+), 624 deletions(-) diff --git a/zenoh/src/net/routing/dispatcher/face.rs b/zenoh/src/net/routing/dispatcher/face.rs index 17bf398bc1..c2cc6dea69 100644 --- a/zenoh/src/net/routing/dispatcher/face.rs +++ b/zenoh/src/net/routing/dispatcher/face.rs @@ -1,3 +1,5 @@ +use crate::net::routing::hat::HatFace; + // // Copyright (c) 2023 ZettaScale Technology // @@ -14,16 +16,13 @@ use super::super::router::*; use super::tables::{Tables, TablesLock}; use super::{resource::*, tables}; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::fmt; use std::sync::Arc; use zenoh_protocol::zenoh::RequestBody; use zenoh_protocol::{ core::{ExprId, WhatAmI, ZenohId}, - network::{ - declare::queryable::ext::QueryableInfo, Mapping, Push, Request, RequestId, Response, - ResponseFinal, - }, + network::{Mapping, Push, Request, RequestId, Response, ResponseFinal}, }; #[cfg(feature = "stats")] use zenoh_transport::stats::TransportStats; @@ -39,13 +38,10 @@ pub struct FaceState { pub(crate) link_id: usize, pub(crate) local_mappings: HashMap>, pub(crate) remote_mappings: HashMap>, - pub(crate) local_subs: HashSet>, - pub(crate) remote_subs: HashSet>, - pub(crate) local_qabls: HashMap, QueryableInfo>, - pub(crate) remote_qabls: HashSet>, pub(crate) next_qid: RequestId, pub(crate) pending_queries: HashMap>, pub(crate) mcast_group: Option, + pub(crate) hat: HatFace, } impl FaceState { @@ -68,13 +64,10 @@ impl FaceState { link_id, local_mappings: HashMap::new(), remote_mappings: HashMap::new(), - local_subs: HashSet::new(), - remote_subs: HashSet::new(), - local_qabls: HashMap::new(), - remote_qabls: HashSet::new(), next_qid: 0, pending_queries: HashMap::new(), mcast_group, + hat: HatFace::new(), }) } diff --git a/zenoh/src/net/routing/dispatcher/pubsub.rs b/zenoh/src/net/routing/dispatcher/pubsub.rs index 277ed45843..ed00b5fc80 100644 --- a/zenoh/src/net/routing/dispatcher/pubsub.rs +++ b/zenoh/src/net/routing/dispatcher/pubsub.rs @@ -11,19 +11,19 @@ // Contributors: // ZettaScale Zenoh Team, // -use super::super::hat::network::Network; +use super::super::hat::pubsub::compute_data_route; use super::face::FaceState; use super::resource::{DataRoutes, Direction, PullCaches, Resource, Route}; use super::tables::{RoutingExpr, Tables}; use petgraph::graph::NodeIndex; use std::borrow::Cow; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::convert::TryFrom; use std::sync::Arc; use std::sync::RwLock; use zenoh_core::zread; use zenoh_protocol::{ - core::{key_expr::OwnedKeyExpr, WhatAmI, WireExpr, ZenohId}, + core::{key_expr::OwnedKeyExpr, WhatAmI, WireExpr}, network::{ declare::{ext, Mode}, Push, @@ -32,169 +32,6 @@ use zenoh_protocol::{ }; use zenoh_sync::get_mut_unchecked; -#[inline] -fn insert_faces_for_subs( - route: &mut Route, - expr: &RoutingExpr, - tables: &Tables, - net: &Network, - source: usize, - subs: &HashSet, -) { - if net.trees.len() > source { - for sub in subs { - if let Some(sub_idx) = net.get_idx(sub) { - if net.trees[source].directions.len() > sub_idx.index() { - if let Some(direction) = net.trees[source].directions[sub_idx.index()] { - if net.graph.contains_node(direction) { - if let Some(face) = tables.get_face(&net.graph[direction].zid) { - route.entry(face.id).or_insert_with(|| { - let key_expr = - Resource::get_best_key(expr.prefix, expr.suffix, face.id); - ( - face.clone(), - key_expr.to_owned(), - if source != 0 { - Some(source as u16) - } else { - None - }, - ) - }); - } - } - } - } - } - } - } else { - log::trace!("Tree for node sid:{} not yet ready", source); - } -} - -fn compute_data_route( - tables: &Tables, - expr: &mut RoutingExpr, - source: Option, - source_type: WhatAmI, -) -> Arc { - let mut route = HashMap::new(); - let key_expr = expr.full_expr(); - if key_expr.ends_with('/') { - return Arc::new(route); - } - log::trace!( - "compute_data_route({}, {:?}, {:?})", - key_expr, - source, - source_type - ); - let key_expr = match OwnedKeyExpr::try_from(key_expr) { - Ok(ke) => ke, - Err(e) => { - log::warn!("Invalid KE reached the system: {}", e); - return Arc::new(route); - } - }; - let res = Resource::get_resource(expr.prefix, expr.suffix); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, &key_expr))); - - let master = tables.whatami != WhatAmI::Router - || !tables.hat.full_net(WhatAmI::Peer) - || *tables - .hat - .elect_router(&tables.zid, &key_expr, tables.hat.shared_nodes.iter()) - == tables.zid; - - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); - if tables.whatami == WhatAmI::Router { - if master || source_type == WhatAmI::Router { - let net = tables.hat.routers_net.as_ref().unwrap(); - let router_source = match source_type { - WhatAmI::Router => source.unwrap(), - _ => net.idx.index(), - }; - insert_faces_for_subs( - &mut route, - expr, - tables, - net, - router_source, - &mres.context().router_subs, - ); - } - - if (master || source_type != WhatAmI::Router) && tables.hat.full_net(WhatAmI::Peer) { - let net = tables.hat.peers_net.as_ref().unwrap(); - let peer_source = match source_type { - WhatAmI::Peer => source.unwrap(), - _ => net.idx.index(), - }; - insert_faces_for_subs( - &mut route, - expr, - tables, - net, - peer_source, - &mres.context().peer_subs, - ); - } - } - - if tables.whatami == WhatAmI::Peer && tables.hat.full_net(WhatAmI::Peer) { - let net = tables.hat.peers_net.as_ref().unwrap(); - let peer_source = match source_type { - WhatAmI::Router | WhatAmI::Peer => source.unwrap(), - _ => net.idx.index(), - }; - insert_faces_for_subs( - &mut route, - expr, - tables, - net, - peer_source, - &mres.context().peer_subs, - ); - } - - if tables.whatami != WhatAmI::Router || master || source_type == WhatAmI::Router { - for (sid, context) in &mres.session_ctxs { - if let Some(subinfo) = &context.subs { - if match tables.whatami { - WhatAmI::Router => context.face.whatami != WhatAmI::Router, - _ => { - source_type == WhatAmI::Client - || context.face.whatami == WhatAmI::Client - } - } && subinfo.mode == Mode::Push - { - route.entry(*sid).or_insert_with(|| { - let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, *sid); - (context.face.clone(), key_expr.to_owned(), None) - }); - } - } - } - } - } - for mcast_group in &tables.mcast_groups { - route.insert( - mcast_group.id, - ( - mcast_group.clone(), - expr.full_expr().to_string().into(), - None, - ), - ); - } - Arc::new(route) -} - fn compute_matching_pulls(tables: &Tables, expr: &mut RoutingExpr) -> Arc { let mut pull_caches = vec![]; let ke = if let Ok(ke) = OwnedKeyExpr::try_from(expr.full_expr()) { @@ -625,10 +462,45 @@ pub fn full_reentrant_route_data( if !(route.is_empty() && matching_pulls.is_empty()) { treat_timestamp!(&tables.hlc, payload, tables.drop_future_timestamp); - if route.len() == 1 && matching_pulls.len() == 0 { - let (outface, key_expr, context) = route.values().next().unwrap(); - if should_route(&tables, face, outface, &mut expr) { - drop(tables); + // if route.len() == 1 && matching_pulls.len() == 0 { + // let (outface, key_expr, context) = route.values().next().unwrap(); + // if should_route(&tables, face, outface, &mut expr) { + // drop(tables); + // #[cfg(feature = "stats")] + // if !admin { + // inc_stats!(face, tx, user, payload) + // } else { + // inc_stats!(face, tx, admin, payload) + // } + + // outface.primitives.send_push(Push { + // wire_expr: key_expr.into(), + // ext_qos, + // ext_tstamp: None, + // ext_nodeid: ext::NodeIdType { + // node_id: context.unwrap_or(0), + // }, + // payload, + // }) + // } + // } else { + if !matching_pulls.is_empty() { + let lock = zlock!(tables.pull_caches_lock); + cache_data!(matching_pulls, expr, payload); + drop(lock); + } + + if tables.whatami == WhatAmI::Router { + let route = route + .values() + .filter(|(outface, _key_expr, _context)| { + should_route(&tables, face, outface, &mut expr) + }) + .cloned() + .collect::>(); + + drop(tables); + for (outface, key_expr, context) in route { #[cfg(feature = "stats")] if !admin { inc_stats!(face, tx, user, payload) @@ -637,33 +509,24 @@ pub fn full_reentrant_route_data( } outface.primitives.send_push(Push { - wire_expr: key_expr.into(), + wire_expr: key_expr, ext_qos, ext_tstamp: None, ext_nodeid: ext::NodeIdType { node_id: context.unwrap_or(0), }, - payload, + payload: payload.clone(), }) } } else { - if !matching_pulls.is_empty() { - let lock = zlock!(tables.pull_caches_lock); - cache_data!(matching_pulls, expr, payload); - drop(lock); - } - - if tables.whatami == WhatAmI::Router { - let route = route - .values() - .filter(|(outface, _key_expr, _context)| { - should_route(&tables, face, outface, &mut expr) - }) - .cloned() - .collect::>(); - - drop(tables); - for (outface, key_expr, context) in route { + drop(tables); + for (outface, key_expr, context) in route.values() { + if face.id != outface.id + && match (face.mcast_group.as_ref(), outface.mcast_group.as_ref()) { + (Some(l), Some(r)) => l != r, + _ => true, + } + { #[cfg(feature = "stats")] if !admin { inc_stats!(face, tx, user, payload) @@ -672,7 +535,7 @@ pub fn full_reentrant_route_data( } outface.primitives.send_push(Push { - wire_expr: key_expr, + wire_expr: key_expr.into(), ext_qos, ext_tstamp: None, ext_nodeid: ext::NodeIdType { @@ -681,38 +544,9 @@ pub fn full_reentrant_route_data( payload: payload.clone(), }) } - } else { - drop(tables); - for (outface, key_expr, context) in route.values() { - if face.id != outface.id - && match ( - face.mcast_group.as_ref(), - outface.mcast_group.as_ref(), - ) { - (Some(l), Some(r)) => l != r, - _ => true, - } - { - #[cfg(feature = "stats")] - if !admin { - inc_stats!(face, tx, user, payload) - } else { - inc_stats!(face, tx, admin, payload) - } - - outface.primitives.send_push(Push { - wire_expr: key_expr.into(), - ext_qos, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType { - node_id: context.unwrap_or(0), - }, - payload: payload.clone(), - }) - } - } } } + // } } } } diff --git a/zenoh/src/net/routing/dispatcher/queries.rs b/zenoh/src/net/routing/dispatcher/queries.rs index 3d7264dc12..509ec80300 100644 --- a/zenoh/src/net/routing/dispatcher/queries.rs +++ b/zenoh/src/net/routing/dispatcher/queries.rs @@ -1,5 +1,3 @@ -use crate::net::routing::PREFIX_LIVELINESS; - // // Copyright (c) 2023 ZettaScale Technology // @@ -13,28 +11,19 @@ use crate::net::routing::PREFIX_LIVELINESS; // Contributors: // ZettaScale Zenoh Team, // -use super::super::hat::network::Network; +use super::super::hat::queries::compute_local_replies; +use super::super::hat::queries::compute_query_route; use super::face::FaceState; -use super::resource::{QueryRoute, QueryRoutes, QueryTargetQabl, QueryTargetQablSet, Resource}; +use super::resource::{QueryRoute, QueryRoutes, QueryTargetQablSet, Resource}; use super::tables::{RoutingExpr, Tables, TablesLock}; use async_trait::async_trait; -use ordered_float::OrderedFloat; use petgraph::graph::NodeIndex; -use std::borrow::Cow; use std::collections::HashMap; -use std::convert::TryFrom; use std::sync::{Arc, Weak}; -use zenoh_buffers::ZBuf; use zenoh_protocol::{ - core::{ - key_expr::{ - include::{Includer, DEFAULT_INCLUDER}, - OwnedKeyExpr, - }, - Encoding, WhatAmI, WireExpr, ZenohId, - }, + core::{Encoding, WhatAmI, WireExpr}, network::{ - declare::{ext, queryable::ext::QueryableInfo}, + declare::ext, request::{ext::TargetType, Request, RequestId}, response::{self, ext::ResponderIdType, Response, ResponseFinal}, }, @@ -48,179 +37,6 @@ pub(crate) struct Query { src_qid: RequestId, } -#[inline] -#[allow(clippy::too_many_arguments)] -fn insert_target_for_qabls( - route: &mut QueryTargetQablSet, - expr: &mut RoutingExpr, - tables: &Tables, - net: &Network, - source: usize, - qabls: &HashMap, - complete: bool, -) { - if net.trees.len() > source { - for (qabl, qabl_info) in qabls { - if let Some(qabl_idx) = net.get_idx(qabl) { - if net.trees[source].directions.len() > qabl_idx.index() { - if let Some(direction) = net.trees[source].directions[qabl_idx.index()] { - if net.graph.contains_node(direction) { - if let Some(face) = tables.get_face(&net.graph[direction].zid) { - if net.distances.len() > qabl_idx.index() { - let key_expr = - Resource::get_best_key(expr.prefix, expr.suffix, face.id); - route.push(QueryTargetQabl { - direction: ( - face.clone(), - key_expr.to_owned(), - if source != 0 { - Some(source as u16) - } else { - None - }, - ), - complete: if complete { - qabl_info.complete as u64 - } else { - 0 - }, - distance: net.distances[qabl_idx.index()], - }); - } - } - } - } - } - } - } - } else { - log::trace!("Tree for node sid:{} not yet ready", source); - } -} - -lazy_static::lazy_static! { - static ref EMPTY_ROUTE: Arc = Arc::new(Vec::new()); -} -fn compute_query_route( - tables: &Tables, - expr: &mut RoutingExpr, - source: Option, - source_type: WhatAmI, -) -> Arc { - let mut route = QueryTargetQablSet::new(); - let key_expr = expr.full_expr(); - if key_expr.ends_with('/') { - return EMPTY_ROUTE.clone(); - } - log::trace!( - "compute_query_route({}, {:?}, {:?})", - key_expr, - source, - source_type - ); - let key_expr = match OwnedKeyExpr::try_from(key_expr) { - Ok(ke) => ke, - Err(e) => { - log::warn!("Invalid KE reached the system: {}", e); - return EMPTY_ROUTE.clone(); - } - }; - let res = Resource::get_resource(expr.prefix, expr.suffix); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, &key_expr))); - - let master = tables.whatami != WhatAmI::Router - || !tables.hat.full_net(WhatAmI::Peer) - || *tables - .hat - .elect_router(&tables.zid, &key_expr, tables.hat.shared_nodes.iter()) - == tables.zid; - - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); - let complete = DEFAULT_INCLUDER.includes(mres.expr().as_bytes(), key_expr.as_bytes()); - if tables.whatami == WhatAmI::Router { - if master || source_type == WhatAmI::Router { - let net = tables.hat.routers_net.as_ref().unwrap(); - let router_source = match source_type { - WhatAmI::Router => source.unwrap(), - _ => net.idx.index(), - }; - insert_target_for_qabls( - &mut route, - expr, - tables, - net, - router_source, - &mres.context().router_qabls, - complete, - ); - } - - if (master || source_type != WhatAmI::Router) && tables.hat.full_net(WhatAmI::Peer) { - let net = tables.hat.peers_net.as_ref().unwrap(); - let peer_source = match source_type { - WhatAmI::Peer => source.unwrap(), - _ => net.idx.index(), - }; - insert_target_for_qabls( - &mut route, - expr, - tables, - net, - peer_source, - &mres.context().peer_qabls, - complete, - ); - } - } - - if tables.whatami == WhatAmI::Peer && tables.hat.full_net(WhatAmI::Peer) { - let net = tables.hat.peers_net.as_ref().unwrap(); - let peer_source = match source_type { - WhatAmI::Router | WhatAmI::Peer => source.unwrap(), - _ => net.idx.index(), - }; - insert_target_for_qabls( - &mut route, - expr, - tables, - net, - peer_source, - &mres.context().peer_qabls, - complete, - ); - } - - if tables.whatami != WhatAmI::Router || master || source_type == WhatAmI::Router { - for (sid, context) in &mres.session_ctxs { - if match tables.whatami { - WhatAmI::Router => context.face.whatami != WhatAmI::Router, - _ => source_type == WhatAmI::Client || context.face.whatami == WhatAmI::Client, - } { - let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, *sid); - if let Some(qabl_info) = context.qabl.as_ref() { - route.push(QueryTargetQabl { - direction: (context.face.clone(), key_expr.to_owned(), None), - complete: if complete { - qabl_info.complete as u64 - } else { - 0 - }, - distance: 0.5, - }); - } - } - } - } - } - route.sort_by_key(|qabl| OrderedFloat(qabl.distance)); - Arc::new(route) -} - pub(crate) fn compute_query_routes_(tables: &Tables, res: &Arc) -> QueryRoutes { let mut routes = QueryRoutes { routers_query_routes: vec![], @@ -558,47 +374,6 @@ fn compute_final_route( } } -#[inline] -fn compute_local_replies( - tables: &Tables, - prefix: &Arc, - suffix: &str, - face: &Arc, -) -> Vec<(WireExpr<'static>, ZBuf)> { - let mut result = vec![]; - // Only the first routing point in the query route - // should return the liveliness tokens - if face.whatami == WhatAmI::Client { - let key_expr = prefix.expr() + suffix; - let key_expr = match OwnedKeyExpr::try_from(key_expr) { - Ok(ke) => ke, - Err(e) => { - log::warn!("Invalid KE reached the system: {}", e); - return result; - } - }; - if key_expr.starts_with(PREFIX_LIVELINESS) { - let res = Resource::get_resource(prefix, suffix); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, &key_expr))); - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); - if (mres.context.is_some() - && (!mres.context().router_subs.is_empty() - || !mres.context().peer_subs.is_empty())) - || mres.session_ctxs.values().any(|ctx| ctx.subs.is_some()) - { - result.push((Resource::get_best_key(&mres, "", face.id), ZBuf::default())); - } - } - } - } - result -} - #[derive(Clone)] struct QueryCleanup { tables: Arc, diff --git a/zenoh/src/net/routing/dispatcher/resource.rs b/zenoh/src/net/routing/dispatcher/resource.rs index 85872e61b6..da36373df0 100644 --- a/zenoh/src/net/routing/dispatcher/resource.rs +++ b/zenoh/src/net/routing/dispatcher/resource.rs @@ -13,7 +13,8 @@ // use super::face::FaceState; use super::tables::{Tables, TablesLock}; -use std::collections::{HashMap, HashSet}; +use crate::net::routing::hat::HatContext; +use std::collections::HashMap; use std::convert::TryInto; use std::hash::{Hash, Hasher}; use std::sync::{Arc, Weak}; @@ -22,7 +23,7 @@ use zenoh_protocol::network::request::ext::TargetType; use zenoh_protocol::network::RequestId; use zenoh_protocol::zenoh::PushBody; use zenoh_protocol::{ - core::{key_expr::keyexpr, ExprId, WireExpr, ZenohId}, + core::{key_expr::keyexpr, ExprId, WireExpr}, network::{ declare::{ ext, queryable::ext::QueryableInfo, subscriber::ext::SubscriberInfo, Declare, @@ -74,12 +75,9 @@ pub(crate) struct QueryRoutes { } pub(crate) struct ResourceContext { - pub(crate) router_subs: HashSet, - pub(crate) peer_subs: HashSet, - pub(crate) router_qabls: HashMap, - pub(crate) peer_qabls: HashMap, pub(crate) matches: Vec>, pub(crate) matching_pulls: Arc, + pub(crate) hat: HatContext, pub(crate) valid_data_routes: bool, pub(crate) routers_data_routes: Vec>, pub(crate) peers_data_routes: Vec>, @@ -95,12 +93,9 @@ pub(crate) struct ResourceContext { impl ResourceContext { fn new() -> ResourceContext { ResourceContext { - router_subs: HashSet::new(), - peer_subs: HashSet::new(), - router_qabls: HashMap::new(), - peer_qabls: HashMap::new(), matches: Vec::new(), matching_pulls: Arc::new(Vec::new()), + hat: HatContext::new(), valid_data_routes: false, routers_data_routes: Vec::new(), peers_data_routes: Vec::new(), diff --git a/zenoh/src/net/routing/dispatcher/tables.rs b/zenoh/src/net/routing/dispatcher/tables.rs index bc2eb520a4..20ec1deec6 100644 --- a/zenoh/src/net/routing/dispatcher/tables.rs +++ b/zenoh/src/net/routing/dispatcher/tables.rs @@ -219,91 +219,7 @@ pub fn close_face(tables: &TablesLock, face: &Weak) { Some(mut face) => { log::debug!("Close {}", face); finalize_pending_queries(tables, &mut face); - - let ctrl_lock = zlock!(tables.ctrl_lock); - let mut wtables = zwrite!(tables.tables); - let mut face_clone = face.clone(); - let face = get_mut_unchecked(&mut face); - for res in face.remote_mappings.values_mut() { - get_mut_unchecked(res).session_ctxs.remove(&face.id); - Resource::clean(res); - } - face.remote_mappings.clear(); - for res in face.local_mappings.values_mut() { - get_mut_unchecked(res).session_ctxs.remove(&face.id); - Resource::clean(res); - } - face.local_mappings.clear(); - - let mut subs_matches = vec![]; - for mut res in face.remote_subs.drain() { - get_mut_unchecked(&mut res).session_ctxs.remove(&face.id); - undeclare_client_subscription(&mut wtables, &mut face_clone, &mut res); - - if res.context.is_some() { - for match_ in &res.context().matches { - let mut match_ = match_.upgrade().unwrap(); - if !Arc::ptr_eq(&match_, &res) { - get_mut_unchecked(&mut match_) - .context_mut() - .valid_data_routes = false; - subs_matches.push(match_); - } - } - get_mut_unchecked(&mut res).context_mut().valid_data_routes = false; - subs_matches.push(res); - } - } - - let mut qabls_matches = vec![]; - for mut res in face.remote_qabls.drain() { - get_mut_unchecked(&mut res).session_ctxs.remove(&face.id); - undeclare_client_queryable(&mut wtables, &mut face_clone, &mut res); - - if res.context.is_some() { - for match_ in &res.context().matches { - let mut match_ = match_.upgrade().unwrap(); - if !Arc::ptr_eq(&match_, &res) { - get_mut_unchecked(&mut match_) - .context_mut() - .valid_query_routes = false; - qabls_matches.push(match_); - } - } - get_mut_unchecked(&mut res).context_mut().valid_query_routes = false; - qabls_matches.push(res); - } - } - drop(wtables); - - let mut matches_data_routes = vec![]; - let mut matches_query_routes = vec![]; - let rtables = zread!(tables.tables); - for _match in subs_matches.drain(..) { - matches_data_routes.push((_match.clone(), compute_data_routes_(&rtables, &_match))); - } - for _match in qabls_matches.drain(..) { - matches_query_routes - .push((_match.clone(), compute_query_routes_(&rtables, &_match))); - } - drop(rtables); - - let mut wtables = zwrite!(tables.tables); - for (mut res, data_routes) in matches_data_routes { - get_mut_unchecked(&mut res) - .context_mut() - .update_data_routes(data_routes); - Resource::clean(&mut res); - } - for (mut res, query_routes) in matches_query_routes { - get_mut_unchecked(&mut res) - .context_mut() - .update_query_routes(query_routes); - Resource::clean(&mut res); - } - wtables.faces.remove(&face.id); - drop(wtables); - drop(ctrl_lock); + super::super::hat::close_face(tables, &mut face); } None => log::error!("Face already closed!"), } diff --git a/zenoh/src/net/routing/hat/mod.rs b/zenoh/src/net/routing/hat/mod.rs index 905c4ff4fd..a36aaa8331 100644 --- a/zenoh/src/net/routing/hat/mod.rs +++ b/zenoh/src/net/routing/hat/mod.rs @@ -17,15 +17,23 @@ //! This module is intended for Zenoh's internal use. //! //! [Click here for Zenoh's documentation](../zenoh/index.html) -use self::network::Network; -use super::dispatcher::tables::{Resource, TablesLock}; +use self::{ + network::Network, pubsub::undeclare_client_subscription, queries::undeclare_client_queryable, +}; +use super::dispatcher::{ + face::FaceState, + queries::compute_query_routes_, + tables::{compute_data_routes_, Resource, TablesLock}, +}; use async_std::task::JoinHandle; use std::{ - collections::{hash_map::DefaultHasher, HashSet}, + collections::{hash_map::DefaultHasher, HashMap, HashSet}, hash::Hasher, sync::Arc, }; use zenoh_config::{WhatAmI, ZenohId}; +use zenoh_protocol::network::declare::queryable::ext::QueryableInfo; +use zenoh_sync::get_mut_unchecked; pub mod network; pub mod pubsub; @@ -193,3 +201,125 @@ impl HatTables { } } } + +pub(crate) struct HatContext { + router_subs: HashSet, + peer_subs: HashSet, + router_qabls: HashMap, + peer_qabls: HashMap, +} + +impl HatContext { + pub fn new() -> Self { + Self { + router_subs: HashSet::new(), + peer_subs: HashSet::new(), + router_qabls: HashMap::new(), + peer_qabls: HashMap::new(), + } + } +} + +pub(crate) struct HatFace { + pub(crate) local_subs: HashSet>, + pub(crate) remote_subs: HashSet>, + pub(crate) local_qabls: HashMap, QueryableInfo>, + pub(crate) remote_qabls: HashSet>, +} + +impl HatFace { + pub fn new() -> Self { + Self { + local_subs: HashSet::new(), + remote_subs: HashSet::new(), + local_qabls: HashMap::new(), + remote_qabls: HashSet::new(), + } + } +} + +pub(crate) fn close_face(tables: &TablesLock, face: &mut Arc) { + let ctrl_lock = zlock!(tables.ctrl_lock); + let mut wtables = zwrite!(tables.tables); + let mut face_clone = face.clone(); + let face = get_mut_unchecked(face); + for res in face.remote_mappings.values_mut() { + get_mut_unchecked(res).session_ctxs.remove(&face.id); + Resource::clean(res); + } + face.remote_mappings.clear(); + for res in face.local_mappings.values_mut() { + get_mut_unchecked(res).session_ctxs.remove(&face.id); + Resource::clean(res); + } + face.local_mappings.clear(); + + let mut subs_matches = vec![]; + for mut res in face.hat.remote_subs.drain() { + get_mut_unchecked(&mut res).session_ctxs.remove(&face.id); + undeclare_client_subscription(&mut wtables, &mut face_clone, &mut res); + + if res.context.is_some() { + for match_ in &res.context().matches { + let mut match_ = match_.upgrade().unwrap(); + if !Arc::ptr_eq(&match_, &res) { + get_mut_unchecked(&mut match_) + .context_mut() + .valid_data_routes = false; + subs_matches.push(match_); + } + } + get_mut_unchecked(&mut res).context_mut().valid_data_routes = false; + subs_matches.push(res); + } + } + + let mut qabls_matches = vec![]; + for mut res in face.hat.remote_qabls.drain() { + get_mut_unchecked(&mut res).session_ctxs.remove(&face.id); + undeclare_client_queryable(&mut wtables, &mut face_clone, &mut res); + + if res.context.is_some() { + for match_ in &res.context().matches { + let mut match_ = match_.upgrade().unwrap(); + if !Arc::ptr_eq(&match_, &res) { + get_mut_unchecked(&mut match_) + .context_mut() + .valid_query_routes = false; + qabls_matches.push(match_); + } + } + get_mut_unchecked(&mut res).context_mut().valid_query_routes = false; + qabls_matches.push(res); + } + } + drop(wtables); + + let mut matches_data_routes = vec![]; + let mut matches_query_routes = vec![]; + let rtables = zread!(tables.tables); + for _match in subs_matches.drain(..) { + matches_data_routes.push((_match.clone(), compute_data_routes_(&rtables, &_match))); + } + for _match in qabls_matches.drain(..) { + matches_query_routes.push((_match.clone(), compute_query_routes_(&rtables, &_match))); + } + drop(rtables); + + let mut wtables = zwrite!(tables.tables); + for (mut res, data_routes) in matches_data_routes { + get_mut_unchecked(&mut res) + .context_mut() + .update_data_routes(data_routes); + Resource::clean(&mut res); + } + for (mut res, query_routes) in matches_query_routes { + get_mut_unchecked(&mut res) + .context_mut() + .update_query_routes(query_routes); + Resource::clean(&mut res); + } + wtables.faces.remove(&face.id); + drop(wtables); + drop(ctrl_lock); +} diff --git a/zenoh/src/net/routing/hat/pubsub.rs b/zenoh/src/net/routing/hat/pubsub.rs index a92d8a4640..650c9fe849 100644 --- a/zenoh/src/net/routing/hat/pubsub.rs +++ b/zenoh/src/net/routing/hat/pubsub.rs @@ -1,3 +1,5 @@ +use crate::net::routing::dispatcher::tables::{Route, RoutingExpr}; + // // Copyright (c) 2023 ZettaScale Technology // @@ -19,9 +21,11 @@ use super::super::PREFIX_LIVELINESS; use super::network::Network; use super::HatTables; use petgraph::graph::NodeIndex; -use std::collections::HashMap; +use std::borrow::Cow; +use std::collections::{HashMap, HashSet}; use std::sync::{Arc, RwLockReadGuard}; use zenoh_core::zread; +use zenoh_protocol::core::key_expr::OwnedKeyExpr; use zenoh_protocol::{ core::{key_expr::keyexpr, Reliability, WhatAmI, WireExpr, ZenohId}, network::declare::{ @@ -80,7 +84,7 @@ fn propagate_simple_subscription_to( full_peer_net: bool, ) { if (src_face.id != dst_face.id || res.expr().starts_with(PREFIX_LIVELINESS)) - && !dst_face.local_subs.contains(res) + && !dst_face.hat.local_subs.contains(res) && match tables.whatami { WhatAmI::Router => { if full_peer_net { @@ -102,7 +106,10 @@ fn propagate_simple_subscription_to( _ => src_face.whatami == WhatAmI::Client || dst_face.whatami == WhatAmI::Client, } { - get_mut_unchecked(dst_face).local_subs.insert(res.clone()); + get_mut_unchecked(dst_face) + .hat + .local_subs + .insert(res.clone()); let key_expr = Resource::decl_key(res, dst_face); dst_face.primitives.send_declare(Declare { ext_qos: ext::QoSType::declare_default(), @@ -186,7 +193,7 @@ fn register_router_subscription( sub_info: &SubscriberInfo, router: ZenohId, ) { - if !res.context().router_subs.contains(&router) { + if !res.context().hat.router_subs.contains(&router) { // Register router subscription { log::debug!( @@ -196,6 +203,7 @@ fn register_router_subscription( ); get_mut_unchecked(res) .context_mut() + .hat .router_subs .insert(router); tables.hat.router_subs.insert(res.clone()); @@ -276,11 +284,15 @@ fn register_peer_subscription( sub_info: &SubscriberInfo, peer: ZenohId, ) { - if !res.context().peer_subs.contains(&peer) { + if !res.context().hat.peer_subs.contains(&peer) { // Register peer subscription { log::debug!("Register peer subscription {} (peer: {})", res.expr(), peer); - get_mut_unchecked(res).context_mut().peer_subs.insert(peer); + get_mut_unchecked(res) + .context_mut() + .hat + .peer_subs + .insert(peer); tables.hat.peer_subs.insert(res.clone()); } @@ -392,7 +404,7 @@ fn register_client_subscription( } } } - get_mut_unchecked(face).remote_subs.insert(res.clone()); + get_mut_unchecked(face).hat.remote_subs.insert(res.clone()); } pub fn declare_client_subscription( @@ -515,6 +527,7 @@ fn remote_router_subs(tables: &Tables, res: &Arc) -> bool { res.context.is_some() && res .context() + .hat .router_subs .iter() .any(|peer| peer != &tables.zid) @@ -525,6 +538,7 @@ fn remote_peer_subs(tables: &Tables, res: &Arc) -> bool { res.context.is_some() && res .context() + .hat .peer_subs .iter() .any(|peer| peer != &tables.zid) @@ -583,7 +597,7 @@ fn send_forget_sourced_subscription_to_net_childs( fn propagate_forget_simple_subscription(tables: &mut Tables, res: &Arc) { for face in tables.faces.values_mut() { - if face.local_subs.contains(res) { + if face.hat.local_subs.contains(res) { let wire_expr = Resource::get_best_key(res, "", face.id); face.primitives.send_declare(Declare { ext_qos: ext::QoSType::declare_default(), @@ -594,15 +608,15 @@ fn propagate_forget_simple_subscription(tables: &mut Tables, res: &Arc ext_wire_expr: WireExprType { wire_expr }, }), }); - get_mut_unchecked(face).local_subs.remove(res); + get_mut_unchecked(face).hat.local_subs.remove(res); } } } fn propagate_forget_simple_subscription_to_peers(tables: &mut Tables, res: &Arc) { if !tables.hat.full_net(WhatAmI::Peer) - && res.context().router_subs.len() == 1 - && res.context().router_subs.contains(&tables.zid) + && res.context().hat.router_subs.len() == 1 + && res.context().hat.router_subs.contains(&tables.zid) { for mut face in tables .faces @@ -611,7 +625,7 @@ fn propagate_forget_simple_subscription_to_peers(tables: &mut Tables, res: &Arc< .collect::>>() { if face.whatami == WhatAmI::Peer - && face.local_subs.contains(res) + && face.hat.local_subs.contains(res) && !res.session_ctxs.values().any(|s| { face.zid != s.face.zid && s.subs.is_some() @@ -631,7 +645,7 @@ fn propagate_forget_simple_subscription_to_peers(tables: &mut Tables, res: &Arc< }), }); - get_mut_unchecked(&mut face).local_subs.remove(res); + get_mut_unchecked(&mut face).hat.local_subs.remove(res); } } } @@ -681,10 +695,11 @@ fn unregister_router_subscription(tables: &mut Tables, res: &mut Arc, ); get_mut_unchecked(res) .context_mut() + .hat .router_subs .retain(|sub| sub != router); - if res.context().router_subs.is_empty() { + if res.context().hat.router_subs.is_empty() { tables.hat.router_subs.retain(|sub| !Arc::ptr_eq(sub, res)); if tables.hat.full_net(WhatAmI::Peer) { @@ -702,7 +717,7 @@ fn undeclare_router_subscription( res: &mut Arc, router: &ZenohId, ) { - if res.context().router_subs.contains(router) { + if res.context().hat.router_subs.contains(router) { unregister_router_subscription(tables, res, router); propagate_forget_sourced_subscription(tables, res, face, router, WhatAmI::Router); } @@ -750,10 +765,11 @@ fn unregister_peer_subscription(tables: &mut Tables, res: &mut Arc, pe ); get_mut_unchecked(res) .context_mut() + .hat .peer_subs .retain(|sub| sub != peer); - if res.context().peer_subs.is_empty() { + if res.context().hat.peer_subs.is_empty() { tables.hat.peer_subs.retain(|sub| !Arc::ptr_eq(sub, res)); if tables.whatami == WhatAmI::Peer { @@ -768,7 +784,7 @@ fn undeclare_peer_subscription( res: &mut Arc, peer: &ZenohId, ) { - if res.context().peer_subs.contains(peer) { + if res.context().hat.peer_subs.contains(peer) { unregister_peer_subscription(tables, res, peer); propagate_forget_sourced_subscription(tables, res, face, peer, WhatAmI::Peer); } @@ -825,7 +841,7 @@ pub(crate) fn undeclare_client_subscription( if let Some(ctx) = get_mut_unchecked(res).session_ctxs.get_mut(&face.id) { get_mut_unchecked(ctx).subs = None; } - get_mut_unchecked(face).remote_subs.remove(res); + get_mut_unchecked(face).hat.remote_subs.remove(res); let mut client_subs = client_subs(res); let router_subs = remote_router_subs(tables, res); @@ -855,7 +871,7 @@ pub(crate) fn undeclare_client_subscription( } if client_subs.len() == 1 && !router_subs && !peer_subs { let face = &mut client_subs[0]; - if face.local_subs.contains(res) + if face.hat.local_subs.contains(res) && !(face.whatami == WhatAmI::Client && res.expr().starts_with(PREFIX_LIVELINESS)) { let wire_expr = Resource::get_best_key(res, "", face.id); @@ -869,7 +885,7 @@ pub(crate) fn undeclare_client_subscription( }), }); - get_mut_unchecked(face).local_subs.remove(res); + get_mut_unchecked(face).hat.local_subs.remove(res); } } } @@ -917,7 +933,7 @@ pub(crate) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { WhatAmI::Router => { if face.whatami == WhatAmI::Client { for sub in &tables.hat.router_subs { - get_mut_unchecked(face).local_subs.insert(sub.clone()); + get_mut_unchecked(face).hat.local_subs.insert(sub.clone()); let key_expr = Resource::decl_key(sub, face); face.primitives.send_declare(Declare { ext_qos: ext::QoSType::declare_default(), @@ -933,7 +949,12 @@ pub(crate) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { } else if face.whatami == WhatAmI::Peer && !tables.hat.full_net(WhatAmI::Peer) { for sub in &tables.hat.router_subs { if sub.context.is_some() - && (sub.context().router_subs.iter().any(|r| *r != tables.zid) + && (sub + .context() + .hat + .router_subs + .iter() + .any(|r| *r != tables.zid) || sub.session_ctxs.values().any(|s| { s.subs.is_some() && (s.face.whatami == WhatAmI::Client @@ -941,7 +962,7 @@ pub(crate) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { && tables.hat.failover_brokering(s.face.zid, face.zid))) })) { - get_mut_unchecked(face).local_subs.insert(sub.clone()); + get_mut_unchecked(face).hat.local_subs.insert(sub.clone()); let key_expr = Resource::decl_key(sub, face); face.primitives.send_declare(Declare { ext_qos: ext::QoSType::declare_default(), @@ -961,7 +982,7 @@ pub(crate) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { if tables.hat.full_net(WhatAmI::Peer) { if face.whatami == WhatAmI::Client { for sub in &tables.hat.peer_subs { - get_mut_unchecked(face).local_subs.insert(sub.clone()); + get_mut_unchecked(face).hat.local_subs.insert(sub.clone()); let key_expr = Resource::decl_key(sub, face); face.primitives.send_declare(Declare { ext_qos: ext::QoSType::declare_default(), @@ -982,7 +1003,7 @@ pub(crate) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { .cloned() .collect::>>() { - for sub in &src_face.remote_subs { + for sub in &src_face.hat.remote_subs { propagate_simple_subscription_to( tables, face, @@ -1002,7 +1023,7 @@ pub(crate) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc) { .cloned() .collect::>>() { - for sub in &src_face.remote_subs { + for sub in &src_face.hat.remote_subs { propagate_simple_subscription_to( tables, face, @@ -1024,7 +1045,7 @@ pub(crate) fn pubsub_remove_node(tables: &mut Tables, node: &ZenohId, net_type: .hat .router_subs .iter() - .filter(|res| res.context().router_subs.contains(node)) + .filter(|res| res.context().hat.router_subs.contains(node)) .cloned() .collect::>>() { @@ -1044,7 +1065,7 @@ pub(crate) fn pubsub_remove_node(tables: &mut Tables, node: &ZenohId, net_type: .hat .peer_subs .iter() - .filter(|res| res.context().peer_subs.contains(node)) + .filter(|res| res.context().hat.peer_subs.contains(node)) .cloned() .collect::>>() { @@ -1092,8 +1113,8 @@ pub(crate) fn pubsub_tree_change( for res in subs_res { let subs = match net_type { - WhatAmI::Router => &res.context().router_subs, - _ => &res.context().peer_subs, + WhatAmI::Router => &res.context().hat.router_subs, + _ => &res.context().hat.peer_subs, }; for sub in subs { if *sub == tree_id { @@ -1127,7 +1148,7 @@ pub(crate) fn pubsub_linkstate_change(tables: &mut Tables, zid: &ZenohId, links: && tables.whatami == WhatAmI::Router && src_face.whatami == WhatAmI::Peer { - for res in &src_face.remote_subs { + for res in &src_face.hat.remote_subs { let client_subs = res .session_ctxs .values() @@ -1139,7 +1160,7 @@ pub(crate) fn pubsub_linkstate_change(tables: &mut Tables, zid: &ZenohId, links: { let dst_face = &mut get_mut_unchecked(ctx).face; if dst_face.whatami == WhatAmI::Peer && src_face.zid != dst_face.zid { - if dst_face.local_subs.contains(res) { + if dst_face.hat.local_subs.contains(res) { let forget = !HatTables::failover_brokering_to(links, dst_face.zid) && { let ctx_links = tables @@ -1171,11 +1192,14 @@ pub(crate) fn pubsub_linkstate_change(tables: &mut Tables, zid: &ZenohId, links: ), }); - get_mut_unchecked(dst_face).local_subs.remove(res); + get_mut_unchecked(dst_face).hat.local_subs.remove(res); } } else if HatTables::failover_brokering_to(links, ctx.face.zid) { let dst_face = &mut get_mut_unchecked(ctx).face; - get_mut_unchecked(dst_face).local_subs.insert(res.clone()); + get_mut_unchecked(dst_face) + .hat + .local_subs + .insert(res.clone()); let key_expr = Resource::decl_key(res, dst_face); let sub_info = SubscriberInfo { reliability: Reliability::Reliable, // TODO @@ -1199,3 +1223,166 @@ pub(crate) fn pubsub_linkstate_change(tables: &mut Tables, zid: &ZenohId, links: } } } + +#[inline] +fn insert_faces_for_subs( + route: &mut Route, + expr: &RoutingExpr, + tables: &Tables, + net: &Network, + source: usize, + subs: &HashSet, +) { + if net.trees.len() > source { + for sub in subs { + if let Some(sub_idx) = net.get_idx(sub) { + if net.trees[source].directions.len() > sub_idx.index() { + if let Some(direction) = net.trees[source].directions[sub_idx.index()] { + if net.graph.contains_node(direction) { + if let Some(face) = tables.get_face(&net.graph[direction].zid) { + route.entry(face.id).or_insert_with(|| { + let key_expr = + Resource::get_best_key(expr.prefix, expr.suffix, face.id); + ( + face.clone(), + key_expr.to_owned(), + if source != 0 { + Some(source as u16) + } else { + None + }, + ) + }); + } + } + } + } + } + } + } else { + log::trace!("Tree for node sid:{} not yet ready", source); + } +} + +pub(crate) fn compute_data_route( + tables: &Tables, + expr: &mut RoutingExpr, + source: Option, + source_type: WhatAmI, +) -> Arc { + let mut route = HashMap::new(); + let key_expr = expr.full_expr(); + if key_expr.ends_with('/') { + return Arc::new(route); + } + log::trace!( + "compute_data_route({}, {:?}, {:?})", + key_expr, + source, + source_type + ); + let key_expr = match OwnedKeyExpr::try_from(key_expr) { + Ok(ke) => ke, + Err(e) => { + log::warn!("Invalid KE reached the system: {}", e); + return Arc::new(route); + } + }; + let res = Resource::get_resource(expr.prefix, expr.suffix); + let matches = res + .as_ref() + .and_then(|res| res.context.as_ref()) + .map(|ctx| Cow::from(&ctx.matches)) + .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, &key_expr))); + + let master = tables.whatami != WhatAmI::Router + || !tables.hat.full_net(WhatAmI::Peer) + || *tables + .hat + .elect_router(&tables.zid, &key_expr, tables.hat.shared_nodes.iter()) + == tables.zid; + + for mres in matches.iter() { + let mres = mres.upgrade().unwrap(); + if tables.whatami == WhatAmI::Router { + if master || source_type == WhatAmI::Router { + let net = tables.hat.routers_net.as_ref().unwrap(); + let router_source = match source_type { + WhatAmI::Router => source.unwrap(), + _ => net.idx.index(), + }; + insert_faces_for_subs( + &mut route, + expr, + tables, + net, + router_source, + &mres.context().hat.router_subs, + ); + } + + if (master || source_type != WhatAmI::Router) && tables.hat.full_net(WhatAmI::Peer) { + let net = tables.hat.peers_net.as_ref().unwrap(); + let peer_source = match source_type { + WhatAmI::Peer => source.unwrap(), + _ => net.idx.index(), + }; + insert_faces_for_subs( + &mut route, + expr, + tables, + net, + peer_source, + &mres.context().hat.peer_subs, + ); + } + } + + if tables.whatami == WhatAmI::Peer && tables.hat.full_net(WhatAmI::Peer) { + let net = tables.hat.peers_net.as_ref().unwrap(); + let peer_source = match source_type { + WhatAmI::Router | WhatAmI::Peer => source.unwrap(), + _ => net.idx.index(), + }; + insert_faces_for_subs( + &mut route, + expr, + tables, + net, + peer_source, + &mres.context().hat.peer_subs, + ); + } + + if tables.whatami != WhatAmI::Router || master || source_type == WhatAmI::Router { + for (sid, context) in &mres.session_ctxs { + if let Some(subinfo) = &context.subs { + if match tables.whatami { + WhatAmI::Router => context.face.whatami != WhatAmI::Router, + _ => { + source_type == WhatAmI::Client + || context.face.whatami == WhatAmI::Client + } + } && subinfo.mode == Mode::Push + { + route.entry(*sid).or_insert_with(|| { + let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, *sid); + (context.face.clone(), key_expr.to_owned(), None) + }); + } + } + } + } + } + for mcast_group in &tables.mcast_groups { + route.insert( + mcast_group.id, + ( + mcast_group.clone(), + expr.full_expr().to_string().into(), + None, + ), + ); + } + Arc::new(route) +} diff --git a/zenoh/src/net/routing/hat/queries.rs b/zenoh/src/net/routing/hat/queries.rs index 3e4da6cab8..514bff6d11 100644 --- a/zenoh/src/net/routing/hat/queries.rs +++ b/zenoh/src/net/routing/hat/queries.rs @@ -1,3 +1,6 @@ +use crate::net::routing::dispatcher::tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr}; +use crate::net::routing::PREFIX_LIVELINESS; + // // Copyright (c) 2023 ZettaScale Technology // @@ -17,9 +20,14 @@ use super::super::dispatcher::resource::{Resource, RoutingContext, SessionContex use super::super::dispatcher::tables::{Tables, TablesLock}; use super::network::Network; use super::HatTables; +use ordered_float::OrderedFloat; use petgraph::graph::NodeIndex; +use std::borrow::Cow; use std::collections::HashMap; use std::sync::{Arc, RwLockReadGuard}; +use zenoh_buffers::ZBuf; +use zenoh_protocol::core::key_expr::include::{Includer, DEFAULT_INCLUDER}; +use zenoh_protocol::core::key_expr::OwnedKeyExpr; use zenoh_protocol::{ core::{key_expr::keyexpr, WhatAmI, WireExpr, ZenohId}, network::declare::{ @@ -48,7 +56,7 @@ fn merge_qabl_infos(mut this: QueryableInfo, info: &QueryableInfo) -> QueryableI fn local_router_qabl_info(tables: &Tables, res: &Arc) -> QueryableInfo { let info = if tables.hat.full_net(WhatAmI::Peer) { res.context.as_ref().and_then(|ctx| { - ctx.peer_qabls.iter().fold(None, |accu, (zid, info)| { + ctx.hat.peer_qabls.iter().fold(None, |accu, (zid, info)| { if *zid != tables.zid { Some(match accu { Some(accu) => merge_qabl_infos(accu, info), @@ -83,6 +91,7 @@ fn local_router_qabl_info(tables: &Tables, res: &Arc) -> QueryableInfo fn local_peer_qabl_info(tables: &Tables, res: &Arc) -> QueryableInfo { let info = if tables.whatami == WhatAmI::Router && res.context.is_some() { res.context() + .hat .router_qabls .iter() .fold(None, |accu, (zid, info)| { @@ -119,6 +128,7 @@ fn local_peer_qabl_info(tables: &Tables, res: &Arc) -> QueryableInfo { fn local_qabl_info(tables: &Tables, res: &Arc, face: &Arc) -> QueryableInfo { let mut info = if tables.whatami == WhatAmI::Router && res.context.is_some() { res.context() + .hat .router_qabls .iter() .fold(None, |accu, (zid, info)| { @@ -137,6 +147,7 @@ fn local_qabl_info(tables: &Tables, res: &Arc, face: &Arc) if res.context.is_some() && tables.hat.full_net(WhatAmI::Peer) { info = res .context() + .hat .peer_qabls .iter() .fold(info, |accu, (zid, info)| { @@ -224,7 +235,7 @@ fn propagate_simple_queryable( let faces = tables.faces.values().cloned(); for mut dst_face in faces { let info = local_qabl_info(tables, res, &dst_face); - let current_info = dst_face.local_qabls.get(res); + let current_info = dst_face.hat.local_qabls.get(res); if (src_face.is_none() || src_face.as_ref().unwrap().id != dst_face.id) && (current_info.is_none() || *current_info.unwrap() != info) && match tables.whatami { @@ -259,6 +270,7 @@ fn propagate_simple_queryable( } { get_mut_unchecked(&mut dst_face) + .hat .local_qabls .insert(res.clone(), info); let key_expr = Resource::decl_key(res, &mut dst_face); @@ -321,7 +333,7 @@ fn register_router_queryable( qabl_info: &QueryableInfo, router: ZenohId, ) { - let current_info = res.context().router_qabls.get(&router); + let current_info = res.context().hat.router_qabls.get(&router); if current_info.is_none() || current_info.unwrap() != qabl_info { // Register router queryable { @@ -332,6 +344,7 @@ fn register_router_queryable( ); get_mut_unchecked(res) .context_mut() + .hat .router_qabls .insert(router, *qabl_info); tables.hat.router_qabls.insert(res.clone()); @@ -421,13 +434,14 @@ fn register_peer_queryable( qabl_info: &QueryableInfo, peer: ZenohId, ) { - let current_info = res.context().peer_qabls.get(&peer); + let current_info = res.context().hat.peer_qabls.get(&peer); if current_info.is_none() || current_info.unwrap() != qabl_info { // Register peer queryable { log::debug!("Register peer queryable {} (peer: {})", res.expr(), peer,); get_mut_unchecked(res) .context_mut() + .hat .peer_qabls .insert(peer, *qabl_info); tables.hat.peer_qabls.insert(res.clone()); @@ -532,7 +546,7 @@ fn register_client_queryable( })) .qabl = Some(*qabl_info); } - get_mut_unchecked(face).remote_qabls.insert(res.clone()); + get_mut_unchecked(face).hat.remote_qabls.insert(res.clone()); } pub fn declare_client_queryable( @@ -626,6 +640,7 @@ fn remote_router_qabls(tables: &Tables, res: &Arc) -> bool { res.context.is_some() && res .context() + .hat .router_qabls .keys() .any(|router| router != &tables.zid) @@ -636,6 +651,7 @@ fn remote_peer_qabls(tables: &Tables, res: &Arc) -> bool { res.context.is_some() && res .context() + .hat .peer_qabls .keys() .any(|peer| peer != &tables.zid) @@ -694,7 +710,7 @@ fn send_forget_sourced_queryable_to_net_childs( fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc) { for face in tables.faces.values_mut() { - if face.local_qabls.contains_key(res) { + if face.hat.local_qabls.contains_key(res) { let wire_expr = Resource::get_best_key(res, "", face.id); face.primitives.send_declare(Declare { ext_qos: ext::QoSType::declare_default(), @@ -706,15 +722,15 @@ fn propagate_forget_simple_queryable(tables: &mut Tables, res: &mut Arc) { if !tables.hat.full_net(WhatAmI::Peer) - && res.context().router_qabls.len() == 1 - && res.context().router_qabls.contains_key(&tables.zid) + && res.context().hat.router_qabls.len() == 1 + && res.context().hat.router_qabls.contains_key(&tables.zid) { for mut face in tables .faces @@ -723,7 +739,7 @@ fn propagate_forget_simple_queryable_to_peers(tables: &mut Tables, res: &mut Arc .collect::>>() { if face.whatami == WhatAmI::Peer - && face.local_qabls.contains_key(res) + && face.hat.local_qabls.contains_key(res) && !res.session_ctxs.values().any(|s| { face.zid != s.face.zid && s.qabl.is_some() @@ -743,7 +759,7 @@ fn propagate_forget_simple_queryable_to_peers(tables: &mut Tables, res: &mut Arc }), }); - get_mut_unchecked(&mut face).local_qabls.remove(res); + get_mut_unchecked(&mut face).hat.local_qabls.remove(res); } } } @@ -793,10 +809,11 @@ fn unregister_router_queryable(tables: &mut Tables, res: &mut Arc, rou ); get_mut_unchecked(res) .context_mut() + .hat .router_qabls .remove(router); - if res.context().router_qabls.is_empty() { + if res.context().hat.router_qabls.is_empty() { tables .hat .router_qabls @@ -817,7 +834,7 @@ fn undeclare_router_queryable( res: &mut Arc, router: &ZenohId, ) { - if res.context().router_qabls.contains_key(router) { + if res.context().hat.router_qabls.contains_key(router) { unregister_router_queryable(tables, res, router); propagate_forget_sourced_queryable(tables, res, face, router, WhatAmI::Router); } @@ -860,9 +877,13 @@ pub fn forget_router_queryable( fn unregister_peer_queryable(tables: &mut Tables, res: &mut Arc, peer: &ZenohId) { log::debug!("Unregister peer queryable {} (peer: {})", res.expr(), peer,); - get_mut_unchecked(res).context_mut().peer_qabls.remove(peer); + get_mut_unchecked(res) + .context_mut() + .hat + .peer_qabls + .remove(peer); - if res.context().peer_qabls.is_empty() { + if res.context().hat.peer_qabls.is_empty() { tables.hat.peer_qabls.retain(|qabl| !Arc::ptr_eq(qabl, res)); if tables.whatami == WhatAmI::Peer { @@ -877,7 +898,7 @@ fn undeclare_peer_queryable( res: &mut Arc, peer: &ZenohId, ) { - if res.context().peer_qabls.contains_key(peer) { + if res.context().hat.peer_qabls.contains_key(peer) { unregister_peer_queryable(tables, res, peer); propagate_forget_sourced_queryable(tables, res, face, peer, WhatAmI::Peer); } @@ -938,7 +959,7 @@ pub(crate) fn undeclare_client_queryable( if let Some(ctx) = get_mut_unchecked(res).session_ctxs.get_mut(&face.id) { get_mut_unchecked(ctx).qabl = None; if ctx.qabl.is_none() { - get_mut_unchecked(face).remote_qabls.remove(res); + get_mut_unchecked(face).hat.remote_qabls.remove(res); } } @@ -981,7 +1002,7 @@ pub(crate) fn undeclare_client_queryable( if client_qabls.len() == 1 && !router_qabls && !peer_qabls { let face = &mut client_qabls[0]; - if face.local_qabls.contains_key(res) { + if face.hat.local_qabls.contains_key(res) { let wire_expr = Resource::get_best_key(res, "", face.id); face.primitives.send_declare(Declare { ext_qos: ext::QoSType::declare_default(), @@ -993,7 +1014,7 @@ pub(crate) fn undeclare_client_queryable( }), }); - get_mut_unchecked(face).local_qabls.remove(res); + get_mut_unchecked(face).hat.local_qabls.remove(res); } } } @@ -1040,6 +1061,7 @@ pub(crate) fn queries_new_face(tables: &mut Tables, face: &mut Arc) { if qabl.context.is_some() { let info = local_qabl_info(tables, qabl, face); get_mut_unchecked(face) + .hat .local_qabls .insert(qabl.clone(), info); let key_expr = Resource::decl_key(qabl, face); @@ -1058,7 +1080,12 @@ pub(crate) fn queries_new_face(tables: &mut Tables, face: &mut Arc) { } else if face.whatami == WhatAmI::Peer && !tables.hat.full_net(WhatAmI::Peer) { for qabl in tables.hat.router_qabls.iter() { if qabl.context.is_some() - && (qabl.context().router_qabls.keys().any(|r| *r != tables.zid) + && (qabl + .context() + .hat + .router_qabls + .keys() + .any(|r| *r != tables.zid) || qabl.session_ctxs.values().any(|s| { s.qabl.is_some() && (s.face.whatami == WhatAmI::Client @@ -1068,6 +1095,7 @@ pub(crate) fn queries_new_face(tables: &mut Tables, face: &mut Arc) { { let info = local_qabl_info(tables, qabl, face); get_mut_unchecked(face) + .hat .local_qabls .insert(qabl.clone(), info); let key_expr = Resource::decl_key(qabl, face); @@ -1092,6 +1120,7 @@ pub(crate) fn queries_new_face(tables: &mut Tables, face: &mut Arc) { if qabl.context.is_some() { let info = local_qabl_info(tables, qabl, face); get_mut_unchecked(face) + .hat .local_qabls .insert(qabl.clone(), info); let key_expr = Resource::decl_key(qabl, face); @@ -1115,7 +1144,7 @@ pub(crate) fn queries_new_face(tables: &mut Tables, face: &mut Arc) { .cloned() .collect::>>() { - for qabl in face.remote_qabls.iter() { + for qabl in face.hat.remote_qabls.iter() { propagate_simple_queryable(tables, qabl, Some(&mut face.clone())); } } @@ -1128,7 +1157,7 @@ pub(crate) fn queries_new_face(tables: &mut Tables, face: &mut Arc) { .cloned() .collect::>>() { - for qabl in face.remote_qabls.iter() { + for qabl in face.hat.remote_qabls.iter() { propagate_simple_queryable(tables, qabl, Some(&mut face.clone())); } } @@ -1141,7 +1170,7 @@ pub(crate) fn queries_remove_node(tables: &mut Tables, node: &ZenohId, net_type: WhatAmI::Router => { let mut qabls = vec![]; for res in tables.hat.router_qabls.iter() { - for qabl in res.context().router_qabls.keys() { + for qabl in res.context().hat.router_qabls.keys() { if qabl == node { qabls.push(res.clone()); } @@ -1162,7 +1191,7 @@ pub(crate) fn queries_remove_node(tables: &mut Tables, node: &ZenohId, net_type: WhatAmI::Peer => { let mut qabls = vec![]; for res in tables.hat.router_qabls.iter() { - for qabl in res.context().router_qabls.keys() { + for qabl in res.context().hat.router_qabls.keys() { if qabl == node { qabls.push(res.clone()); } @@ -1201,7 +1230,7 @@ pub(crate) fn queries_linkstate_change(tables: &mut Tables, zid: &ZenohId, links && tables.whatami == WhatAmI::Router && src_face.whatami == WhatAmI::Peer { - for res in &src_face.remote_qabls { + for res in &src_face.hat.remote_qabls { let client_qabls = res .session_ctxs .values() @@ -1213,7 +1242,7 @@ pub(crate) fn queries_linkstate_change(tables: &mut Tables, zid: &ZenohId, links { let dst_face = &mut get_mut_unchecked(ctx).face; if dst_face.whatami == WhatAmI::Peer && src_face.zid != dst_face.zid { - if dst_face.local_qabls.contains_key(res) { + if dst_face.hat.local_qabls.contains_key(res) { let forget = !HatTables::failover_brokering_to(links, dst_face.zid) && { let ctx_links = tables @@ -1243,12 +1272,13 @@ pub(crate) fn queries_linkstate_change(tables: &mut Tables, zid: &ZenohId, links }), }); - get_mut_unchecked(dst_face).local_qabls.remove(res); + get_mut_unchecked(dst_face).hat.local_qabls.remove(res); } } else if HatTables::failover_brokering_to(links, ctx.face.zid) { let dst_face = &mut get_mut_unchecked(ctx).face; let info = local_qabl_info(tables, res, dst_face); get_mut_unchecked(dst_face) + .hat .local_qabls .insert(res.clone(), info); let key_expr = Resource::decl_key(res, dst_face); @@ -1291,8 +1321,8 @@ pub(crate) fn queries_tree_change( for res in qabls_res { let qabls = match net_type { - WhatAmI::Router => &res.context().router_qabls, - _ => &res.context().peer_qabls, + WhatAmI::Router => &res.context().hat.router_qabls, + _ => &res.context().hat.peer_qabls, }; if let Some(qabl_info) = qabls.get(&tree_id) { send_sourced_queryable_to_net_childs( @@ -1313,3 +1343,217 @@ pub(crate) fn queries_tree_change( // recompute routes compute_query_routes_from(tables, &mut tables.root_res.clone()); } + +#[inline] +#[allow(clippy::too_many_arguments)] +fn insert_target_for_qabls( + route: &mut QueryTargetQablSet, + expr: &mut RoutingExpr, + tables: &Tables, + net: &Network, + source: usize, + qabls: &HashMap, + complete: bool, +) { + if net.trees.len() > source { + for (qabl, qabl_info) in qabls { + if let Some(qabl_idx) = net.get_idx(qabl) { + if net.trees[source].directions.len() > qabl_idx.index() { + if let Some(direction) = net.trees[source].directions[qabl_idx.index()] { + if net.graph.contains_node(direction) { + if let Some(face) = tables.get_face(&net.graph[direction].zid) { + if net.distances.len() > qabl_idx.index() { + let key_expr = + Resource::get_best_key(expr.prefix, expr.suffix, face.id); + route.push(QueryTargetQabl { + direction: ( + face.clone(), + key_expr.to_owned(), + if source != 0 { + Some(source as u16) + } else { + None + }, + ), + complete: if complete { + qabl_info.complete as u64 + } else { + 0 + }, + distance: net.distances[qabl_idx.index()], + }); + } + } + } + } + } + } + } + } else { + log::trace!("Tree for node sid:{} not yet ready", source); + } +} + +lazy_static::lazy_static! { + static ref EMPTY_ROUTE: Arc = Arc::new(Vec::new()); +} +pub(crate) fn compute_query_route( + tables: &Tables, + expr: &mut RoutingExpr, + source: Option, + source_type: WhatAmI, +) -> Arc { + let mut route = QueryTargetQablSet::new(); + let key_expr = expr.full_expr(); + if key_expr.ends_with('/') { + return EMPTY_ROUTE.clone(); + } + log::trace!( + "compute_query_route({}, {:?}, {:?})", + key_expr, + source, + source_type + ); + let key_expr = match OwnedKeyExpr::try_from(key_expr) { + Ok(ke) => ke, + Err(e) => { + log::warn!("Invalid KE reached the system: {}", e); + return EMPTY_ROUTE.clone(); + } + }; + let res = Resource::get_resource(expr.prefix, expr.suffix); + let matches = res + .as_ref() + .and_then(|res| res.context.as_ref()) + .map(|ctx| Cow::from(&ctx.matches)) + .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, &key_expr))); + + let master = tables.whatami != WhatAmI::Router + || !tables.hat.full_net(WhatAmI::Peer) + || *tables + .hat + .elect_router(&tables.zid, &key_expr, tables.hat.shared_nodes.iter()) + == tables.zid; + + for mres in matches.iter() { + let mres = mres.upgrade().unwrap(); + let complete = DEFAULT_INCLUDER.includes(mres.expr().as_bytes(), key_expr.as_bytes()); + if tables.whatami == WhatAmI::Router { + if master || source_type == WhatAmI::Router { + let net = tables.hat.routers_net.as_ref().unwrap(); + let router_source = match source_type { + WhatAmI::Router => source.unwrap(), + _ => net.idx.index(), + }; + insert_target_for_qabls( + &mut route, + expr, + tables, + net, + router_source, + &mres.context().hat.router_qabls, + complete, + ); + } + + if (master || source_type != WhatAmI::Router) && tables.hat.full_net(WhatAmI::Peer) { + let net = tables.hat.peers_net.as_ref().unwrap(); + let peer_source = match source_type { + WhatAmI::Peer => source.unwrap(), + _ => net.idx.index(), + }; + insert_target_for_qabls( + &mut route, + expr, + tables, + net, + peer_source, + &mres.context().hat.peer_qabls, + complete, + ); + } + } + + if tables.whatami == WhatAmI::Peer && tables.hat.full_net(WhatAmI::Peer) { + let net = tables.hat.peers_net.as_ref().unwrap(); + let peer_source = match source_type { + WhatAmI::Router | WhatAmI::Peer => source.unwrap(), + _ => net.idx.index(), + }; + insert_target_for_qabls( + &mut route, + expr, + tables, + net, + peer_source, + &mres.context().hat.peer_qabls, + complete, + ); + } + + if tables.whatami != WhatAmI::Router || master || source_type == WhatAmI::Router { + for (sid, context) in &mres.session_ctxs { + if match tables.whatami { + WhatAmI::Router => context.face.whatami != WhatAmI::Router, + _ => source_type == WhatAmI::Client || context.face.whatami == WhatAmI::Client, + } { + let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, *sid); + if let Some(qabl_info) = context.qabl.as_ref() { + route.push(QueryTargetQabl { + direction: (context.face.clone(), key_expr.to_owned(), None), + complete: if complete { + qabl_info.complete as u64 + } else { + 0 + }, + distance: 0.5, + }); + } + } + } + } + } + route.sort_by_key(|qabl| OrderedFloat(qabl.distance)); + Arc::new(route) +} + +#[inline] +pub(crate) fn compute_local_replies( + tables: &Tables, + prefix: &Arc, + suffix: &str, + face: &Arc, +) -> Vec<(WireExpr<'static>, ZBuf)> { + let mut result = vec![]; + // Only the first routing point in the query route + // should return the liveliness tokens + if face.whatami == WhatAmI::Client { + let key_expr = prefix.expr() + suffix; + let key_expr = match OwnedKeyExpr::try_from(key_expr) { + Ok(ke) => ke, + Err(e) => { + log::warn!("Invalid KE reached the system: {}", e); + return result; + } + }; + if key_expr.starts_with(PREFIX_LIVELINESS) { + let res = Resource::get_resource(prefix, suffix); + let matches = res + .as_ref() + .and_then(|res| res.context.as_ref()) + .map(|ctx| Cow::from(&ctx.matches)) + .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, &key_expr))); + for mres in matches.iter() { + let mres = mres.upgrade().unwrap(); + if (mres.context.is_some() + && (!mres.context().hat.router_subs.is_empty() + || !mres.context().hat.peer_subs.is_empty())) + || mres.session_ctxs.values().any(|ctx| ctx.subs.is_some()) + { + result.push((Resource::get_best_key(&mres, "", face.id), ZBuf::default())); + } + } + } + } + result +}