diff --git a/zenoh/src/net/routing/face.rs b/zenoh/src/net/routing/face.rs index 4562c73e69..cb01f3ea6e 100644 --- a/zenoh/src/net/routing/face.rs +++ b/zenoh/src/net/routing/face.rs @@ -31,6 +31,7 @@ pub struct FaceState { pub(super) id: usize, pub(super) zid: ZenohId, pub(super) whatami: WhatAmI, + pub(super) local: bool, #[cfg(feature = "stats")] pub(super) stats: Option>, pub(super) primitives: Arc, @@ -47,10 +48,12 @@ pub struct FaceState { } impl FaceState { + #[allow(clippy::too_many_arguments)] pub(crate) fn new( id: usize, zid: ZenohId, whatami: WhatAmI, + local: bool, #[cfg(feature = "stats")] stats: Option>, primitives: Arc, link_id: usize, @@ -60,6 +63,7 @@ impl FaceState { id, zid, whatami, + local, #[cfg(feature = "stats")] stats, primitives, @@ -76,6 +80,11 @@ impl FaceState { }) } + #[inline] + pub fn is_local(&self) -> bool { + self.local + } + #[inline] #[allow(clippy::trivially_copy_pass_by_ref)] pub(super) fn get_mapping( diff --git a/zenoh/src/net/routing/router.rs b/zenoh/src/net/routing/router.rs index dbf687ba79..60012a48eb 100644 --- a/zenoh/src/net/routing/router.rs +++ b/zenoh/src/net/routing/router.rs @@ -272,6 +272,7 @@ impl Tables { fid, zid, whatami, + false, #[cfg(feature = "stats")] Some(stats), primitives.clone(), @@ -304,6 +305,7 @@ impl Tables { fid, zid, whatami, + true, #[cfg(feature = "stats")] None, primitives.clone(), @@ -649,6 +651,7 @@ impl Router { fid, ZenohId::from_str("1").unwrap(), WhatAmI::Peer, + false, #[cfg(feature = "stats")] None, Arc::new(McastMux::new(transport.clone())), @@ -674,6 +677,7 @@ impl Router { fid, peer.zid, WhatAmI::Client, // Quick hack + false, #[cfg(feature = "stats")] Some(transport.get_stats().unwrap()), Arc::new(DummyPrimitives), diff --git a/zenoh/src/publication.rs b/zenoh/src/publication.rs index 0beb36ab04..dd16849640 100644 --- a/zenoh/src/publication.rs +++ b/zenoh/src/publication.rs @@ -412,7 +412,10 @@ impl<'a> Publisher<'a> { /// ``` #[zenoh_macros::unstable] pub fn matching_status(&self) -> impl Resolve> + '_ { - zenoh_core::ResolveFuture::new(async move { self.session.matching_status(self.key_expr()) }) + zenoh_core::ResolveFuture::new(async move { + self.session + .matching_status(self.key_expr(), self.destination) + }) } /// Return a [`MatchingListener`] for this Publisher. @@ -1081,6 +1084,7 @@ pub(crate) struct MatchingListenerState { pub(crate) id: Id, pub(crate) current: std::sync::Mutex, pub(crate) key_expr: KeyExpr<'static>, + pub(crate) destination: Locality, pub(crate) callback: Callback<'static, MatchingStatus>, } diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 352bc9e9ff..67094fc5d3 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -1458,6 +1458,7 @@ impl Session { let sub_state = Arc::new(MatchingListenerState { id, current: std::sync::Mutex::new(false), + destination: publisher.destination, key_expr: publisher.key_expr.clone().into_owned(), callback, }); @@ -1466,7 +1467,7 @@ impl Session { match sub_state.current.lock() { Ok(mut current) => { if self - .matching_status(&publisher.key_expr) + .matching_status(&publisher.key_expr, sub_state.destination) .map(|s| s.is_matching()) .unwrap_or(true) { @@ -1480,7 +1481,11 @@ impl Session { } #[zenoh_macros::unstable] - pub(crate) fn matching_status(&self, key_expr: &KeyExpr) -> ZResult { + pub(crate) fn matching_status( + &self, + key_expr: &KeyExpr, + destination: Locality, + ) -> ZResult { use crate::net::routing::router::RoutingExpr; use zenoh_protocol::core::WhatAmI; let tables = zread!(self.runtime.router.tables.tables); @@ -1488,16 +1493,20 @@ impl Session { &tables.root_res, key_expr.as_str(), ); - let matching = !crate::net::routing::pubsub::get_data_route( + + let route = crate::net::routing::pubsub::get_data_route( &tables, WhatAmI::Client, 0, &res, &mut RoutingExpr::new(&tables.root_res, key_expr.as_str()), 0, - ) - .is_empty(); - + ); + let matching = match destination { + Locality::Any => !route.is_empty(), + Locality::Remote => route.values().any(|dir| !dir.0.is_local()), + Locality::SessionLocal => route.values().any(|dir| dir.0.is_local()), + }; Ok(MatchingStatus { matching }) } @@ -1513,7 +1522,9 @@ impl Session { match msub.current.lock() { Ok(mut current) => { if !*current { - if let Ok(status) = session.matching_status(&msub.key_expr) { + if let Ok(status) = + session.matching_status(&msub.key_expr, msub.destination) + { if status.is_matching() { *current = true; let callback = msub.callback.clone(); @@ -1544,7 +1555,9 @@ impl Session { match msub.current.lock() { Ok(mut current) => { if *current { - if let Ok(status) = session.matching_status(&msub.key_expr) { + if let Ok(status) = + session.matching_status(&msub.key_expr, msub.destination) + { if !status.is_matching() { *current = false; let callback = msub.callback.clone();