Skip to content

Commit

Permalink
Code move
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Nov 2, 2023
1 parent 66f41d8 commit ec960ea
Show file tree
Hide file tree
Showing 8 changed files with 647 additions and 570 deletions.
19 changes: 6 additions & 13 deletions zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::net::routing::hat::HatFace;

//
// Copyright (c) 2023 ZettaScale Technology
//
Expand All @@ -14,16 +16,13 @@
use super::super::router::*;
use super::tables::{Tables, TablesLock};
use super::{resource::*, tables};
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use zenoh_protocol::zenoh::RequestBody;
use zenoh_protocol::{
core::{ExprId, WhatAmI, ZenohId},
network::{
declare::queryable::ext::QueryableInfo, Mapping, Push, Request, RequestId, Response,
ResponseFinal,
},
network::{Mapping, Push, Request, RequestId, Response, ResponseFinal},
};
#[cfg(feature = "stats")]
use zenoh_transport::stats::TransportStats;
Expand All @@ -39,13 +38,10 @@ pub struct FaceState {
pub(crate) link_id: usize,
pub(crate) local_mappings: HashMap<ExprId, Arc<Resource>>,
pub(crate) remote_mappings: HashMap<ExprId, Arc<Resource>>,
pub(crate) local_subs: HashSet<Arc<Resource>>,
pub(crate) remote_subs: HashSet<Arc<Resource>>,
pub(crate) local_qabls: HashMap<Arc<Resource>, QueryableInfo>,
pub(crate) remote_qabls: HashSet<Arc<Resource>>,
pub(crate) next_qid: RequestId,
pub(crate) pending_queries: HashMap<RequestId, Arc<Query>>,
pub(crate) mcast_group: Option<TransportMulticast>,
pub(crate) hat: HatFace,
}

impl FaceState {
Expand All @@ -68,13 +64,10 @@ impl FaceState {
link_id,
local_mappings: HashMap::new(),
remote_mappings: HashMap::new(),
local_subs: HashSet::new(),
remote_subs: HashSet::new(),
local_qabls: HashMap::new(),
remote_qabls: HashSet::new(),
next_qid: 0,
pending_queries: HashMap::new(),
mcast_group,
hat: HatFace::new(),
})
}

Expand Down
169 changes: 3 additions & 166 deletions zenoh/src/net/routing/dispatcher/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use super::super::hat::network::Network;
use super::super::hat::pubsub::compute_data_route;
use super::face::FaceState;
use super::resource::{DataRoutes, Direction, PullCaches, Resource, Route};
use super::tables::{RoutingExpr, Tables};
use petgraph::graph::NodeIndex;
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::Arc;
use std::sync::RwLock;
use zenoh_core::zread;
use zenoh_protocol::{
core::{key_expr::OwnedKeyExpr, WhatAmI, WireExpr, ZenohId},
core::{key_expr::OwnedKeyExpr, WhatAmI, WireExpr},
network::{
declare::{ext, Mode},
Push,
Expand All @@ -32,169 +32,6 @@ use zenoh_protocol::{
};
use zenoh_sync::get_mut_unchecked;

#[inline]
fn insert_faces_for_subs(
route: &mut Route,
expr: &RoutingExpr,
tables: &Tables,
net: &Network,
source: usize,
subs: &HashSet<ZenohId>,
) {
if net.trees.len() > source {
for sub in subs {
if let Some(sub_idx) = net.get_idx(sub) {
if net.trees[source].directions.len() > sub_idx.index() {
if let Some(direction) = net.trees[source].directions[sub_idx.index()] {
if net.graph.contains_node(direction) {
if let Some(face) = tables.get_face(&net.graph[direction].zid) {
route.entry(face.id).or_insert_with(|| {
let key_expr =
Resource::get_best_key(expr.prefix, expr.suffix, face.id);
(
face.clone(),
key_expr.to_owned(),
if source != 0 {
Some(source as u16)
} else {
None
},
)
});
}
}
}
}
}
}
} else {
log::trace!("Tree for node sid:{} not yet ready", source);
}
}

fn compute_data_route(
tables: &Tables,
expr: &mut RoutingExpr,
source: Option<usize>,
source_type: WhatAmI,
) -> Arc<Route> {
let mut route = HashMap::new();
let key_expr = expr.full_expr();
if key_expr.ends_with('/') {
return Arc::new(route);
}
log::trace!(
"compute_data_route({}, {:?}, {:?})",
key_expr,
source,
source_type
);
let key_expr = match OwnedKeyExpr::try_from(key_expr) {
Ok(ke) => ke,
Err(e) => {
log::warn!("Invalid KE reached the system: {}", e);
return Arc::new(route);
}
};
let res = Resource::get_resource(expr.prefix, expr.suffix);
let matches = res
.as_ref()
.and_then(|res| res.context.as_ref())
.map(|ctx| Cow::from(&ctx.matches))
.unwrap_or_else(|| Cow::from(Resource::get_matches(tables, &key_expr)));

let master = tables.whatami != WhatAmI::Router
|| !tables.hat.full_net(WhatAmI::Peer)
|| *tables
.hat
.elect_router(&tables.zid, &key_expr, tables.hat.shared_nodes.iter())
== tables.zid;

for mres in matches.iter() {
let mres = mres.upgrade().unwrap();
if tables.whatami == WhatAmI::Router {
if master || source_type == WhatAmI::Router {
let net = tables.hat.routers_net.as_ref().unwrap();
let router_source = match source_type {
WhatAmI::Router => source.unwrap(),
_ => net.idx.index(),
};
insert_faces_for_subs(
&mut route,
expr,
tables,
net,
router_source,
&mres.context().router_subs,
);
}

if (master || source_type != WhatAmI::Router) && tables.hat.full_net(WhatAmI::Peer) {
let net = tables.hat.peers_net.as_ref().unwrap();
let peer_source = match source_type {
WhatAmI::Peer => source.unwrap(),
_ => net.idx.index(),
};
insert_faces_for_subs(
&mut route,
expr,
tables,
net,
peer_source,
&mres.context().peer_subs,
);
}
}

if tables.whatami == WhatAmI::Peer && tables.hat.full_net(WhatAmI::Peer) {
let net = tables.hat.peers_net.as_ref().unwrap();
let peer_source = match source_type {
WhatAmI::Router | WhatAmI::Peer => source.unwrap(),
_ => net.idx.index(),
};
insert_faces_for_subs(
&mut route,
expr,
tables,
net,
peer_source,
&mres.context().peer_subs,
);
}

if tables.whatami != WhatAmI::Router || master || source_type == WhatAmI::Router {
for (sid, context) in &mres.session_ctxs {
if let Some(subinfo) = &context.subs {
if match tables.whatami {
WhatAmI::Router => context.face.whatami != WhatAmI::Router,
_ => {
source_type == WhatAmI::Client
|| context.face.whatami == WhatAmI::Client
}
} && subinfo.mode == Mode::Push
{
route.entry(*sid).or_insert_with(|| {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, *sid);
(context.face.clone(), key_expr.to_owned(), None)
});
}
}
}
}
}
for mcast_group in &tables.mcast_groups {
route.insert(
mcast_group.id,
(
mcast_group.clone(),
expr.full_expr().to_string().into(),
None,
),
);
}
Arc::new(route)
}

fn compute_matching_pulls(tables: &Tables, expr: &mut RoutingExpr) -> Arc<PullCaches> {
let mut pull_caches = vec![];
let ke = if let Ok(ke) = OwnedKeyExpr::try_from(expr.full_expr()) {
Expand Down
Loading

0 comments on commit ec960ea

Please sign in to comment.