Skip to content

Commit

Permalink
Use RoutingContext type
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Nov 2, 2023
1 parent ec960ea commit 939daa4
Show file tree
Hide file tree
Showing 8 changed files with 273 additions and 154 deletions.
4 changes: 2 additions & 2 deletions zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ impl Primitives for Face {
&msg.wire_expr,
msg.ext_qos,
msg.payload,
msg.ext_nodeid.node_id as u64,
msg.ext_nodeid.node_id,
);
}

Expand All @@ -411,7 +411,7 @@ impl Primitives for Face {
msg.ext_target,
// consolidation,
msg.payload,
msg.ext_nodeid.node_id as u64,
msg.ext_nodeid.node_id,
);
}
RequestBody::Pull(_) => {
Expand Down
152 changes: 108 additions & 44 deletions zenoh/src/net/routing/dispatcher/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use super::super::hat::pubsub::compute_data_route;
use super::face::FaceState;
use super::resource::{DataRoutes, Direction, PullCaches, Resource, Route};
use super::tables::{RoutingExpr, Tables};
use super::tables::{RoutingContext, RoutingExpr, Tables};
use petgraph::graph::NodeIndex;
use std::borrow::Cow;
use std::collections::HashMap;
Expand Down Expand Up @@ -83,11 +83,20 @@ pub(crate) fn compute_data_routes_(tables: &Tables, res: &Arc<Resource>) -> Data
.resize_with(max_idx.index() + 1, || Arc::new(HashMap::new()));

for idx in &indexes {
routes.routers_data_routes[idx.index()] =
compute_data_route(tables, &mut expr, Some(idx.index()), WhatAmI::Router);
routes.routers_data_routes[idx.index()] = compute_data_route(
tables,
&mut expr,
idx.index() as RoutingContext,
WhatAmI::Router,
);
}

routes.peer_data_route = Some(compute_data_route(tables, &mut expr, None, WhatAmI::Peer));
routes.peer_data_route = Some(compute_data_route(
tables,
&mut expr,
RoutingContext::default(),
WhatAmI::Peer,
));
}
if (tables.whatami == WhatAmI::Router || tables.whatami == WhatAmI::Peer)
&& tables.hat.full_net(WhatAmI::Peer)
Expand All @@ -106,18 +115,35 @@ pub(crate) fn compute_data_routes_(tables: &Tables, res: &Arc<Resource>) -> Data
.resize_with(max_idx.index() + 1, || Arc::new(HashMap::new()));

for idx in &indexes {
routes.peers_data_routes[idx.index()] =
compute_data_route(tables, &mut expr, Some(idx.index()), WhatAmI::Peer);
routes.peers_data_routes[idx.index()] = compute_data_route(
tables,
&mut expr,
idx.index() as RoutingContext,
WhatAmI::Peer,
);
}
}
if tables.whatami == WhatAmI::Peer && !tables.hat.full_net(WhatAmI::Peer) {
routes.client_data_route =
Some(compute_data_route(tables, &mut expr, None, WhatAmI::Client));
routes.peer_data_route = Some(compute_data_route(tables, &mut expr, None, WhatAmI::Peer));
routes.client_data_route = Some(compute_data_route(
tables,
&mut expr,
RoutingContext::default(),
WhatAmI::Client,
));
routes.peer_data_route = Some(compute_data_route(
tables,
&mut expr,
RoutingContext::default(),
WhatAmI::Peer,
));
}
if tables.whatami == WhatAmI::Client {
routes.client_data_route =
Some(compute_data_route(tables, &mut expr, None, WhatAmI::Client));
routes.client_data_route = Some(compute_data_route(
tables,
&mut expr,
RoutingContext::default(),
WhatAmI::Client,
));
}
routes.matching_pulls = Some(compute_matching_pulls(tables, &mut expr));
routes
Expand All @@ -143,12 +169,20 @@ pub(crate) fn compute_data_routes(tables: &mut Tables, res: &mut Arc<Resource>)
routers_data_routes.resize_with(max_idx.index() + 1, || Arc::new(HashMap::new()));

for idx in &indexes {
routers_data_routes[idx.index()] =
compute_data_route(tables, &mut expr, Some(idx.index()), WhatAmI::Router);
routers_data_routes[idx.index()] = compute_data_route(
tables,
&mut expr,
idx.index() as RoutingContext,
WhatAmI::Router,
);
}

res_mut.context_mut().peer_data_route =
Some(compute_data_route(tables, &mut expr, None, WhatAmI::Peer));
res_mut.context_mut().peer_data_route = Some(compute_data_route(
tables,
&mut expr,
RoutingContext::default(),
WhatAmI::Peer,
));
}
if (tables.whatami == WhatAmI::Router || tables.whatami == WhatAmI::Peer)
&& tables.hat.full_net(WhatAmI::Peer)
Expand All @@ -167,19 +201,35 @@ pub(crate) fn compute_data_routes(tables: &mut Tables, res: &mut Arc<Resource>)
peers_data_routes.resize_with(max_idx.index() + 1, || Arc::new(HashMap::new()));

for idx in &indexes {
peers_data_routes[idx.index()] =
compute_data_route(tables, &mut expr, Some(idx.index()), WhatAmI::Peer);
peers_data_routes[idx.index()] = compute_data_route(
tables,
&mut expr,
idx.index() as RoutingContext,
WhatAmI::Peer,
);
}
}
if tables.whatami == WhatAmI::Peer && !tables.hat.full_net(WhatAmI::Peer) {
res_mut.context_mut().client_data_route =
Some(compute_data_route(tables, &mut expr, None, WhatAmI::Client));
res_mut.context_mut().peer_data_route =
Some(compute_data_route(tables, &mut expr, None, WhatAmI::Peer));
res_mut.context_mut().client_data_route = Some(compute_data_route(
tables,
&mut expr,
RoutingContext::default(),
WhatAmI::Client,
));
res_mut.context_mut().peer_data_route = Some(compute_data_route(
tables,
&mut expr,
RoutingContext::default(),
WhatAmI::Peer,
));
}
if tables.whatami == WhatAmI::Client {
res_mut.context_mut().client_data_route =
Some(compute_data_route(tables, &mut expr, None, WhatAmI::Client));
res_mut.context_mut().client_data_route = Some(compute_data_route(
tables,
&mut expr,
RoutingContext::default(),
WhatAmI::Client,
));
}
res_mut.context_mut().matching_pulls = compute_matching_pulls(tables, &mut expr);
}
Expand Down Expand Up @@ -267,7 +317,7 @@ fn get_data_route(
face: &FaceState,
res: &Option<Arc<Resource>>,
expr: &mut RoutingExpr,
routing_context: u64,
routing_context: RoutingContext,
) -> Arc<Route> {
match tables.whatami {
WhatAmI::Router => match face.whatami {
Expand All @@ -277,7 +327,7 @@ fn get_data_route(
res.as_ref()
.and_then(|res| res.routers_data_route(local_context))
.unwrap_or_else(|| {
compute_data_route(tables, expr, Some(local_context), face.whatami)
compute_data_route(tables, expr, local_context, face.whatami)
})
}
WhatAmI::Peer => {
Expand All @@ -287,18 +337,27 @@ fn get_data_route(
res.as_ref()
.and_then(|res| res.peers_data_route(local_context))
.unwrap_or_else(|| {
compute_data_route(tables, expr, Some(local_context), face.whatami)
compute_data_route(tables, expr, local_context, face.whatami)
})
} else {
res.as_ref()
.and_then(|res| res.peer_data_route())
.unwrap_or_else(|| compute_data_route(tables, expr, None, face.whatami))
.unwrap_or_else(|| {
compute_data_route(
tables,
expr,
RoutingContext::default(),
face.whatami,
)
})
}
}
_ => res
.as_ref()
.and_then(|res| res.routers_data_route(0))
.unwrap_or_else(|| compute_data_route(tables, expr, None, face.whatami)),
.and_then(|res| res.routers_data_route(RoutingContext::default()))
.unwrap_or_else(|| {
compute_data_route(tables, expr, RoutingContext::default(), face.whatami)
}),
},
WhatAmI::Peer => {
if tables.hat.full_net(WhatAmI::Peer) {
Expand All @@ -310,27 +369,38 @@ fn get_data_route(
res.as_ref()
.and_then(|res| res.peers_data_route(local_context))
.unwrap_or_else(|| {
compute_data_route(tables, expr, Some(local_context), face.whatami)
compute_data_route(tables, expr, local_context, face.whatami)
})
}
_ => res
.as_ref()
.and_then(|res| res.peers_data_route(0))
.unwrap_or_else(|| compute_data_route(tables, expr, None, face.whatami)),
.and_then(|res| res.peers_data_route(RoutingContext::default()))
.unwrap_or_else(|| {
compute_data_route(
tables,
expr,
RoutingContext::default(),
face.whatami,
)
}),
}
} else {
res.as_ref()
.and_then(|res| match face.whatami {
WhatAmI::Client => res.client_data_route(),
_ => res.peer_data_route(),
})
.unwrap_or_else(|| compute_data_route(tables, expr, None, face.whatami))
.unwrap_or_else(|| {
compute_data_route(tables, expr, RoutingContext::default(), face.whatami)
})
}
}
_ => res
.as_ref()
.and_then(|res| res.client_data_route())
.unwrap_or_else(|| compute_data_route(tables, expr, None, face.whatami)),
.unwrap_or_else(|| {
compute_data_route(tables, expr, RoutingContext::default(), face.whatami)
}),
}
}

Expand Down Expand Up @@ -424,7 +494,7 @@ pub fn full_reentrant_route_data(
expr: &WireExpr,
ext_qos: ext::QoSType,
mut payload: PushBody,
routing_context: u64,
routing_context: RoutingContext,
) {
let tables = zread!(tables_ref);
match tables.get_mapping(face, &expr.scope, expr.mapping).cloned() {
Expand Down Expand Up @@ -477,9 +547,7 @@ pub fn full_reentrant_route_data(
wire_expr: key_expr.into(),
ext_qos,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType {
node_id: context.unwrap_or(0),
},
ext_nodeid: ext::NodeIdType { node_id: *context },
payload,
})
}
Expand Down Expand Up @@ -512,9 +580,7 @@ pub fn full_reentrant_route_data(
wire_expr: key_expr,
ext_qos,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType {
node_id: context.unwrap_or(0),
},
ext_nodeid: ext::NodeIdType { node_id: context },
payload: payload.clone(),
})
}
Expand All @@ -541,9 +607,7 @@ pub fn full_reentrant_route_data(
wire_expr: key_expr.into(),
ext_qos,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType {
node_id: context.unwrap_or(0),
},
ext_nodeid: ext::NodeIdType { node_id: *context },
payload: payload.clone(),
})
}
Expand Down
Loading

0 comments on commit 939daa4

Please sign in to comment.