Skip to content

Commit

Permalink
MatchingStatus depends on Publisher allowed_destination
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Oct 11, 2023
1 parent 8058488 commit 819f53e
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 9 deletions.
9 changes: 9 additions & 0 deletions zenoh/src/net/routing/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct FaceState {
pub(super) id: usize,
pub(super) zid: ZenohId,
pub(super) whatami: WhatAmI,
pub(super) local: bool,
#[cfg(feature = "stats")]
pub(super) stats: Option<Arc<TransportStats>>,
pub(super) primitives: Arc<dyn Primitives + Send + Sync>,
Expand All @@ -47,10 +48,12 @@ pub struct FaceState {
}

impl FaceState {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
id: usize,
zid: ZenohId,
whatami: WhatAmI,
local: bool,
#[cfg(feature = "stats")] stats: Option<Arc<TransportStats>>,
primitives: Arc<dyn Primitives + Send + Sync>,
link_id: usize,
Expand All @@ -60,6 +63,7 @@ impl FaceState {
id,
zid,
whatami,
local,
#[cfg(feature = "stats")]
stats,
primitives,
Expand All @@ -76,6 +80,11 @@ impl FaceState {
})
}

#[inline]
pub fn is_local(&self) -> bool {
self.local
}

#[inline]
#[allow(clippy::trivially_copy_pass_by_ref)]
pub(super) fn get_mapping(
Expand Down
4 changes: 4 additions & 0 deletions zenoh/src/net/routing/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ impl Tables {
fid,
zid,
whatami,
false,
#[cfg(feature = "stats")]
Some(stats),
primitives.clone(),
Expand Down Expand Up @@ -304,6 +305,7 @@ impl Tables {
fid,
zid,
whatami,
true,
#[cfg(feature = "stats")]
None,
primitives.clone(),
Expand Down Expand Up @@ -649,6 +651,7 @@ impl Router {
fid,
ZenohId::from_str("1").unwrap(),
WhatAmI::Peer,
false,
#[cfg(feature = "stats")]
None,
Arc::new(McastMux::new(transport.clone())),
Expand All @@ -674,6 +677,7 @@ impl Router {
fid,
peer.zid,
WhatAmI::Client, // Quick hack
false,
#[cfg(feature = "stats")]
Some(transport.get_stats().unwrap()),
Arc::new(DummyPrimitives),
Expand Down
6 changes: 5 additions & 1 deletion zenoh/src/publication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,10 @@ impl<'a> Publisher<'a> {
/// ```
#[zenoh_macros::unstable]
pub fn matching_status(&self) -> impl Resolve<ZResult<MatchingStatus>> + '_ {
zenoh_core::ResolveFuture::new(async move { self.session.matching_status(self.key_expr()) })
zenoh_core::ResolveFuture::new(async move {
self.session
.matching_status(self.key_expr(), self.destination)
})
}

/// Return a [`MatchingListener`] for this Publisher.
Expand Down Expand Up @@ -1081,6 +1084,7 @@ pub(crate) struct MatchingListenerState {
pub(crate) id: Id,
pub(crate) current: std::sync::Mutex<bool>,
pub(crate) key_expr: KeyExpr<'static>,
pub(crate) destination: Locality,
pub(crate) callback: Callback<'static, MatchingStatus>,
}

Expand Down
29 changes: 21 additions & 8 deletions zenoh/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1458,6 +1458,7 @@ impl Session {
let sub_state = Arc::new(MatchingListenerState {
id,
current: std::sync::Mutex::new(false),
destination: publisher.destination,
key_expr: publisher.key_expr.clone().into_owned(),
callback,
});
Expand All @@ -1466,7 +1467,7 @@ impl Session {
match sub_state.current.lock() {
Ok(mut current) => {
if self
.matching_status(&publisher.key_expr)
.matching_status(&publisher.key_expr, sub_state.destination)
.map(|s| s.is_matching())
.unwrap_or(true)
{
Expand All @@ -1480,24 +1481,32 @@ impl Session {
}

#[zenoh_macros::unstable]
pub(crate) fn matching_status(&self, key_expr: &KeyExpr) -> ZResult<MatchingStatus> {
pub(crate) fn matching_status(
&self,
key_expr: &KeyExpr,
destination: Locality,
) -> ZResult<MatchingStatus> {
use crate::net::routing::router::RoutingExpr;
use zenoh_protocol::core::WhatAmI;
let tables = zread!(self.runtime.router.tables.tables);
let res = crate::net::routing::resource::Resource::get_resource(
&tables.root_res,
key_expr.as_str(),
);
let matching = !crate::net::routing::pubsub::get_data_route(

let route = crate::net::routing::pubsub::get_data_route(
&tables,
WhatAmI::Client,
0,
&res,
&mut RoutingExpr::new(&tables.root_res, key_expr.as_str()),
0,
)
.is_empty();

);
let matching = match destination {
Locality::Any => !route.is_empty(),
Locality::Remote => route.values().any(|dir| !dir.0.is_local()),
Locality::SessionLocal => route.values().any(|dir| dir.0.is_local()),
};
Ok(MatchingStatus { matching })
}

Expand All @@ -1513,7 +1522,9 @@ impl Session {
match msub.current.lock() {
Ok(mut current) => {
if !*current {
if let Ok(status) = session.matching_status(&msub.key_expr) {
if let Ok(status) =
session.matching_status(&msub.key_expr, msub.destination)
{
if status.is_matching() {
*current = true;
let callback = msub.callback.clone();
Expand Down Expand Up @@ -1544,7 +1555,9 @@ impl Session {
match msub.current.lock() {
Ok(mut current) => {
if *current {
if let Ok(status) = session.matching_status(&msub.key_expr) {
if let Ok(status) =
session.matching_status(&msub.key_expr, msub.destination)
{
if !status.is_matching() {
*current = false;
let callback = msub.callback.clone();
Expand Down

0 comments on commit 819f53e

Please sign in to comment.