Skip to content

Commit

Permalink
Matching status (#565)
Browse files Browse the repository at this point in the history
Co-authored-by: Pierre Avital <[email protected]>
Co-authored-by: Julien Enoch <[email protected]>
  • Loading branch information
3 people authored Nov 22, 2023
1 parent 1daaaf5 commit 7295201
Show file tree
Hide file tree
Showing 7 changed files with 1,005 additions and 19 deletions.
11 changes: 10 additions & 1 deletion zenoh/src/net/routing/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct FaceState {
pub(super) id: usize,
pub(super) zid: ZenohId,
pub(super) whatami: WhatAmI,
pub(super) local: bool,
#[cfg(feature = "stats")]
pub(super) stats: Option<Arc<TransportStats>>,
pub(super) primitives: Arc<dyn Primitives + Send + Sync>,
Expand All @@ -47,10 +48,12 @@ pub struct FaceState {
}

impl FaceState {
pub(super) fn new(
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
id: usize,
zid: ZenohId,
whatami: WhatAmI,
local: bool,
#[cfg(feature = "stats")] stats: Option<Arc<TransportStats>>,
primitives: Arc<dyn Primitives + Send + Sync>,
link_id: usize,
Expand All @@ -60,6 +63,7 @@ impl FaceState {
id,
zid,
whatami,
local,
#[cfg(feature = "stats")]
stats,
primitives,
Expand All @@ -76,6 +80,11 @@ impl FaceState {
})
}

#[inline]
pub fn is_local(&self) -> bool {
self.local
}

#[inline]
#[allow(clippy::trivially_copy_pass_by_ref)]
pub(super) fn get_mapping(
Expand Down
43 changes: 25 additions & 18 deletions zenoh/src/net/routing/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1598,75 +1598,75 @@ macro_rules! treat_timestamp {
}

#[inline]
fn get_data_route(
pub(crate) fn get_data_route(
tables: &Tables,
face: &FaceState,
whatami: WhatAmI,
link_id: usize,
res: &Option<Arc<Resource>>,
expr: &mut RoutingExpr,
routing_context: u64,
) -> Arc<Route> {
match tables.whatami {
WhatAmI::Router => match face.whatami {
WhatAmI::Router => match whatami {
WhatAmI::Router => {
let routers_net = tables.routers_net.as_ref().unwrap();
let local_context = routers_net.get_local_context(routing_context, face.link_id);
let local_context = routers_net.get_local_context(routing_context, link_id);
res.as_ref()
.and_then(|res| res.routers_data_route(local_context))
.unwrap_or_else(|| {
compute_data_route(tables, expr, Some(local_context), face.whatami)
compute_data_route(tables, expr, Some(local_context), whatami)
})
}
WhatAmI::Peer => {
if tables.full_net(WhatAmI::Peer) {
let peers_net = tables.peers_net.as_ref().unwrap();
let local_context = peers_net.get_local_context(routing_context, face.link_id);
let local_context = peers_net.get_local_context(routing_context, link_id);
res.as_ref()
.and_then(|res| res.peers_data_route(local_context))
.unwrap_or_else(|| {
compute_data_route(tables, expr, Some(local_context), face.whatami)
compute_data_route(tables, expr, Some(local_context), whatami)
})
} else {
res.as_ref()
.and_then(|res| res.peer_data_route())
.unwrap_or_else(|| compute_data_route(tables, expr, None, face.whatami))
.unwrap_or_else(|| compute_data_route(tables, expr, None, whatami))
}
}
_ => res
.as_ref()
.and_then(|res| res.routers_data_route(0))
.unwrap_or_else(|| compute_data_route(tables, expr, None, face.whatami)),
.unwrap_or_else(|| compute_data_route(tables, expr, None, whatami)),
},
WhatAmI::Peer => {
if tables.full_net(WhatAmI::Peer) {
match face.whatami {
match whatami {
WhatAmI::Router | WhatAmI::Peer => {
let peers_net = tables.peers_net.as_ref().unwrap();
let local_context =
peers_net.get_local_context(routing_context, face.link_id);
let local_context = peers_net.get_local_context(routing_context, link_id);
res.as_ref()
.and_then(|res| res.peers_data_route(local_context))
.unwrap_or_else(|| {
compute_data_route(tables, expr, Some(local_context), face.whatami)
compute_data_route(tables, expr, Some(local_context), whatami)
})
}
_ => res
.as_ref()
.and_then(|res| res.peers_data_route(0))
.unwrap_or_else(|| compute_data_route(tables, expr, None, face.whatami)),
.unwrap_or_else(|| compute_data_route(tables, expr, None, whatami)),
}
} else {
res.as_ref()
.and_then(|res| match face.whatami {
.and_then(|res| match whatami {
WhatAmI::Client => res.client_data_route(),
_ => res.peer_data_route(),
})
.unwrap_or_else(|| compute_data_route(tables, expr, None, face.whatami))
.unwrap_or_else(|| compute_data_route(tables, expr, None, whatami))
}
}
_ => res
.as_ref()
.and_then(|res| res.client_data_route())
.unwrap_or_else(|| compute_data_route(tables, expr, None, face.whatami)),
.unwrap_or_else(|| compute_data_route(tables, expr, None, whatami)),
}
}

Expand Down Expand Up @@ -1784,7 +1784,14 @@ pub fn full_reentrant_route_data(
== *tables.elect_router(expr.full_expr(), tables.get_router_links(face.zid))
{
let res = Resource::get_resource(&prefix, expr.suffix);
let route = get_data_route(&tables, face, &res, &mut expr, routing_context);
let route = get_data_route(
&tables,
face.whatami,
face.link_id,
&res,
&mut expr,
routing_context,
);
let matching_pulls = get_matching_pulls(&tables, &res, &mut expr);

if !(route.is_empty() && matching_pulls.is_empty()) {
Expand Down
4 changes: 4 additions & 0 deletions zenoh/src/net/routing/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ impl Tables {
fid,
zid,
whatami,
false,
#[cfg(feature = "stats")]
Some(stats),
primitives.clone(),
Expand Down Expand Up @@ -304,6 +305,7 @@ impl Tables {
fid,
zid,
whatami,
true,
#[cfg(feature = "stats")]
None,
primitives.clone(),
Expand Down Expand Up @@ -649,6 +651,7 @@ impl Router {
fid,
ZenohId::from_str("1").unwrap(),
WhatAmI::Peer,
false,
#[cfg(feature = "stats")]
None,
Arc::new(McastMux::new(transport.clone())),
Expand All @@ -674,6 +677,7 @@ impl Router {
fid,
peer.zid,
WhatAmI::Client, // Quick hack
false,
#[cfg(feature = "stats")]
Some(transport.get_stats().unwrap()),
Arc::new(DummyPrimitives),
Expand Down
2 changes: 2 additions & 0 deletions zenoh/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ pub(crate) mod common {
pub use zenoh_protocol::core::SampleKind;

pub use crate::publication::Priority;
#[zenoh_macros::unstable]
pub use crate::publication::PublisherDeclarations;
pub use zenoh_protocol::core::{CongestionControl, Reliability, WhatAmI};

/// A [`Locator`] contains a choice of protocol, an address and port, as well as optional additional properties to work with.
Expand Down
Loading

0 comments on commit 7295201

Please sign in to comment.