Skip to content

Commit

Permalink
Code move
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Oct 31, 2023
1 parent 66f41d8 commit d4b8fbb
Show file tree
Hide file tree
Showing 8 changed files with 698 additions and 624 deletions.
19 changes: 6 additions & 13 deletions zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::net::routing::hat::HatFace;

//
// Copyright (c) 2023 ZettaScale Technology
//
Expand All @@ -14,16 +16,13 @@
use super::super::router::*;
use super::tables::{Tables, TablesLock};
use super::{resource::*, tables};
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use zenoh_protocol::zenoh::RequestBody;
use zenoh_protocol::{
core::{ExprId, WhatAmI, ZenohId},
network::{
declare::queryable::ext::QueryableInfo, Mapping, Push, Request, RequestId, Response,
ResponseFinal,
},
network::{Mapping, Push, Request, RequestId, Response, ResponseFinal},
};
#[cfg(feature = "stats")]
use zenoh_transport::stats::TransportStats;
Expand All @@ -39,13 +38,10 @@ pub struct FaceState {
pub(crate) link_id: usize,
pub(crate) local_mappings: HashMap<ExprId, Arc<Resource>>,
pub(crate) remote_mappings: HashMap<ExprId, Arc<Resource>>,
pub(crate) local_subs: HashSet<Arc<Resource>>,
pub(crate) remote_subs: HashSet<Arc<Resource>>,
pub(crate) local_qabls: HashMap<Arc<Resource>, QueryableInfo>,
pub(crate) remote_qabls: HashSet<Arc<Resource>>,
pub(crate) next_qid: RequestId,
pub(crate) pending_queries: HashMap<RequestId, Arc<Query>>,
pub(crate) mcast_group: Option<TransportMulticast>,
pub(crate) hat: HatFace,
}

impl FaceState {
Expand All @@ -68,13 +64,10 @@ impl FaceState {
link_id,
local_mappings: HashMap::new(),
remote_mappings: HashMap::new(),
local_subs: HashSet::new(),
remote_subs: HashSet::new(),
local_qabls: HashMap::new(),
remote_qabls: HashSet::new(),
next_qid: 0,
pending_queries: HashMap::new(),
mcast_group,
hat: HatFace::new(),
})
}

Expand Down
274 changes: 54 additions & 220 deletions zenoh/src/net/routing/dispatcher/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use super::super::hat::network::Network;
use super::super::hat::pubsub::compute_data_route;
use super::face::FaceState;
use super::resource::{DataRoutes, Direction, PullCaches, Resource, Route};
use super::tables::{RoutingExpr, Tables};
use petgraph::graph::NodeIndex;
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::Arc;
use std::sync::RwLock;
use zenoh_core::zread;
use zenoh_protocol::{
core::{key_expr::OwnedKeyExpr, WhatAmI, WireExpr, ZenohId},
core::{key_expr::OwnedKeyExpr, WhatAmI, WireExpr},
network::{
declare::{ext, Mode},
Push,
Expand All @@ -32,169 +32,6 @@ use zenoh_protocol::{
};
use zenoh_sync::get_mut_unchecked;

#[inline]
fn insert_faces_for_subs(
route: &mut Route,
expr: &RoutingExpr,
tables: &Tables,
net: &Network,
source: usize,
subs: &HashSet<ZenohId>,
) {
if net.trees.len() > source {
for sub in subs {
if let Some(sub_idx) = net.get_idx(sub) {
if net.trees[source].directions.len() > sub_idx.index() {
if let Some(direction) = net.trees[source].directions[sub_idx.index()] {
if net.graph.contains_node(direction) {
if let Some(face) = tables.get_face(&net.graph[direction].zid) {
route.entry(face.id).or_insert_with(|| {
let key_expr =
Resource::get_best_key(expr.prefix, expr.suffix, face.id);
(
face.clone(),
key_expr.to_owned(),
if source != 0 {
Some(source as u16)
} else {
None
},
)
});
}
}
}
}
}
}
} else {
log::trace!("Tree for node sid:{} not yet ready", source);
}
}

fn compute_data_route(
tables: &Tables,
expr: &mut RoutingExpr,
source: Option<usize>,
source_type: WhatAmI,
) -> Arc<Route> {
let mut route = HashMap::new();
let key_expr = expr.full_expr();
if key_expr.ends_with('/') {
return Arc::new(route);
}
log::trace!(
"compute_data_route({}, {:?}, {:?})",
key_expr,
source,
source_type
);
let key_expr = match OwnedKeyExpr::try_from(key_expr) {
Ok(ke) => ke,
Err(e) => {
log::warn!("Invalid KE reached the system: {}", e);
return Arc::new(route);
}
};
let res = Resource::get_resource(expr.prefix, expr.suffix);
let matches = res
.as_ref()
.and_then(|res| res.context.as_ref())
.map(|ctx| Cow::from(&ctx.matches))
.unwrap_or_else(|| Cow::from(Resource::get_matches(tables, &key_expr)));

let master = tables.whatami != WhatAmI::Router
|| !tables.hat.full_net(WhatAmI::Peer)
|| *tables
.hat
.elect_router(&tables.zid, &key_expr, tables.hat.shared_nodes.iter())
== tables.zid;

for mres in matches.iter() {
let mres = mres.upgrade().unwrap();
if tables.whatami == WhatAmI::Router {
if master || source_type == WhatAmI::Router {
let net = tables.hat.routers_net.as_ref().unwrap();
let router_source = match source_type {
WhatAmI::Router => source.unwrap(),
_ => net.idx.index(),
};
insert_faces_for_subs(
&mut route,
expr,
tables,
net,
router_source,
&mres.context().router_subs,
);
}

if (master || source_type != WhatAmI::Router) && tables.hat.full_net(WhatAmI::Peer) {
let net = tables.hat.peers_net.as_ref().unwrap();
let peer_source = match source_type {
WhatAmI::Peer => source.unwrap(),
_ => net.idx.index(),
};
insert_faces_for_subs(
&mut route,
expr,
tables,
net,
peer_source,
&mres.context().peer_subs,
);
}
}

if tables.whatami == WhatAmI::Peer && tables.hat.full_net(WhatAmI::Peer) {
let net = tables.hat.peers_net.as_ref().unwrap();
let peer_source = match source_type {
WhatAmI::Router | WhatAmI::Peer => source.unwrap(),
_ => net.idx.index(),
};
insert_faces_for_subs(
&mut route,
expr,
tables,
net,
peer_source,
&mres.context().peer_subs,
);
}

if tables.whatami != WhatAmI::Router || master || source_type == WhatAmI::Router {
for (sid, context) in &mres.session_ctxs {
if let Some(subinfo) = &context.subs {
if match tables.whatami {
WhatAmI::Router => context.face.whatami != WhatAmI::Router,
_ => {
source_type == WhatAmI::Client
|| context.face.whatami == WhatAmI::Client
}
} && subinfo.mode == Mode::Push
{
route.entry(*sid).or_insert_with(|| {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, *sid);
(context.face.clone(), key_expr.to_owned(), None)
});
}
}
}
}
}
for mcast_group in &tables.mcast_groups {
route.insert(
mcast_group.id,
(
mcast_group.clone(),
expr.full_expr().to_string().into(),
None,
),
);
}
Arc::new(route)
}

fn compute_matching_pulls(tables: &Tables, expr: &mut RoutingExpr) -> Arc<PullCaches> {
let mut pull_caches = vec![];
let ke = if let Ok(ke) = OwnedKeyExpr::try_from(expr.full_expr()) {
Expand Down Expand Up @@ -625,10 +462,45 @@ pub fn full_reentrant_route_data(
if !(route.is_empty() && matching_pulls.is_empty()) {
treat_timestamp!(&tables.hlc, payload, tables.drop_future_timestamp);

if route.len() == 1 && matching_pulls.len() == 0 {
let (outface, key_expr, context) = route.values().next().unwrap();
if should_route(&tables, face, outface, &mut expr) {
drop(tables);
// if route.len() == 1 && matching_pulls.len() == 0 {
// let (outface, key_expr, context) = route.values().next().unwrap();
// if should_route(&tables, face, outface, &mut expr) {
// drop(tables);
// #[cfg(feature = "stats")]
// if !admin {
// inc_stats!(face, tx, user, payload)
// } else {
// inc_stats!(face, tx, admin, payload)
// }

// outface.primitives.send_push(Push {
// wire_expr: key_expr.into(),
// ext_qos,
// ext_tstamp: None,
// ext_nodeid: ext::NodeIdType {
// node_id: context.unwrap_or(0),
// },
// payload,
// })
// }
// } else {
if !matching_pulls.is_empty() {
let lock = zlock!(tables.pull_caches_lock);
cache_data!(matching_pulls, expr, payload);
drop(lock);
}

if tables.whatami == WhatAmI::Router {
let route = route
.values()
.filter(|(outface, _key_expr, _context)| {
should_route(&tables, face, outface, &mut expr)
})
.cloned()
.collect::<Vec<Direction>>();

drop(tables);
for (outface, key_expr, context) in route {
#[cfg(feature = "stats")]
if !admin {
inc_stats!(face, tx, user, payload)
Expand All @@ -637,33 +509,24 @@ pub fn full_reentrant_route_data(
}

outface.primitives.send_push(Push {
wire_expr: key_expr.into(),
wire_expr: key_expr,
ext_qos,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType {
node_id: context.unwrap_or(0),
},
payload,
payload: payload.clone(),
})
}
} else {
if !matching_pulls.is_empty() {
let lock = zlock!(tables.pull_caches_lock);
cache_data!(matching_pulls, expr, payload);
drop(lock);
}

if tables.whatami == WhatAmI::Router {
let route = route
.values()
.filter(|(outface, _key_expr, _context)| {
should_route(&tables, face, outface, &mut expr)
})
.cloned()
.collect::<Vec<Direction>>();

drop(tables);
for (outface, key_expr, context) in route {
drop(tables);
for (outface, key_expr, context) in route.values() {
if face.id != outface.id
&& match (face.mcast_group.as_ref(), outface.mcast_group.as_ref()) {
(Some(l), Some(r)) => l != r,
_ => true,
}
{
#[cfg(feature = "stats")]
if !admin {
inc_stats!(face, tx, user, payload)
Expand All @@ -672,7 +535,7 @@ pub fn full_reentrant_route_data(
}

outface.primitives.send_push(Push {
wire_expr: key_expr,
wire_expr: key_expr.into(),
ext_qos,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType {
Expand All @@ -681,38 +544,9 @@ pub fn full_reentrant_route_data(
payload: payload.clone(),
})
}
} else {
drop(tables);
for (outface, key_expr, context) in route.values() {
if face.id != outface.id
&& match (
face.mcast_group.as_ref(),
outface.mcast_group.as_ref(),
) {
(Some(l), Some(r)) => l != r,
_ => true,
}
{
#[cfg(feature = "stats")]
if !admin {
inc_stats!(face, tx, user, payload)
} else {
inc_stats!(face, tx, admin, payload)
}

outface.primitives.send_push(Push {
wire_expr: key_expr.into(),
ext_qos,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType {
node_id: context.unwrap_or(0),
},
payload: payload.clone(),
})
}
}
}
}
// }
}
}
}
Expand Down
Loading

0 comments on commit d4b8fbb

Please sign in to comment.