From 7295201a547c894e86bc0d412c936a90bc67bd5e Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Wed, 22 Nov 2023 16:54:22 +0100 Subject: [PATCH] Matching status (#565) Co-authored-by: Pierre Avital Co-authored-by: Julien Enoch --- zenoh/src/net/routing/face.rs | 11 +- zenoh/src/net/routing/pubsub.rs | 43 ++- zenoh/src/net/routing/router.rs | 4 + zenoh/src/prelude.rs | 2 + zenoh/src/publication.rs | 571 ++++++++++++++++++++++++++++++++ zenoh/src/session.rs | 168 ++++++++++ zenoh/tests/matching.rs | 225 +++++++++++++ 7 files changed, 1005 insertions(+), 19 deletions(-) create mode 100644 zenoh/tests/matching.rs diff --git a/zenoh/src/net/routing/face.rs b/zenoh/src/net/routing/face.rs index d84f173d2..cb01f3ea6 100644 --- a/zenoh/src/net/routing/face.rs +++ b/zenoh/src/net/routing/face.rs @@ -31,6 +31,7 @@ pub struct FaceState { pub(super) id: usize, pub(super) zid: ZenohId, pub(super) whatami: WhatAmI, + pub(super) local: bool, #[cfg(feature = "stats")] pub(super) stats: Option>, pub(super) primitives: Arc, @@ -47,10 +48,12 @@ pub struct FaceState { } impl FaceState { - pub(super) fn new( + #[allow(clippy::too_many_arguments)] + pub(crate) fn new( id: usize, zid: ZenohId, whatami: WhatAmI, + local: bool, #[cfg(feature = "stats")] stats: Option>, primitives: Arc, link_id: usize, @@ -60,6 +63,7 @@ impl FaceState { id, zid, whatami, + local, #[cfg(feature = "stats")] stats, primitives, @@ -76,6 +80,11 @@ impl FaceState { }) } + #[inline] + pub fn is_local(&self) -> bool { + self.local + } + #[inline] #[allow(clippy::trivially_copy_pass_by_ref)] pub(super) fn get_mapping( diff --git a/zenoh/src/net/routing/pubsub.rs b/zenoh/src/net/routing/pubsub.rs index 0cad9cac8..75a49f113 100644 --- a/zenoh/src/net/routing/pubsub.rs +++ b/zenoh/src/net/routing/pubsub.rs @@ -1598,75 +1598,75 @@ macro_rules! treat_timestamp { } #[inline] -fn get_data_route( +pub(crate) fn get_data_route( tables: &Tables, - face: &FaceState, + whatami: WhatAmI, + link_id: usize, res: &Option>, expr: &mut RoutingExpr, routing_context: u64, ) -> Arc { match tables.whatami { - WhatAmI::Router => match face.whatami { + WhatAmI::Router => match whatami { WhatAmI::Router => { let routers_net = tables.routers_net.as_ref().unwrap(); - let local_context = routers_net.get_local_context(routing_context, face.link_id); + let local_context = routers_net.get_local_context(routing_context, link_id); res.as_ref() .and_then(|res| res.routers_data_route(local_context)) .unwrap_or_else(|| { - compute_data_route(tables, expr, Some(local_context), face.whatami) + compute_data_route(tables, expr, Some(local_context), whatami) }) } WhatAmI::Peer => { if tables.full_net(WhatAmI::Peer) { let peers_net = tables.peers_net.as_ref().unwrap(); - let local_context = peers_net.get_local_context(routing_context, face.link_id); + let local_context = peers_net.get_local_context(routing_context, link_id); res.as_ref() .and_then(|res| res.peers_data_route(local_context)) .unwrap_or_else(|| { - compute_data_route(tables, expr, Some(local_context), face.whatami) + compute_data_route(tables, expr, Some(local_context), whatami) }) } else { res.as_ref() .and_then(|res| res.peer_data_route()) - .unwrap_or_else(|| compute_data_route(tables, expr, None, face.whatami)) + .unwrap_or_else(|| compute_data_route(tables, expr, None, whatami)) } } _ => res .as_ref() .and_then(|res| res.routers_data_route(0)) - .unwrap_or_else(|| compute_data_route(tables, expr, None, face.whatami)), + .unwrap_or_else(|| compute_data_route(tables, expr, None, whatami)), }, WhatAmI::Peer => { if tables.full_net(WhatAmI::Peer) { - match face.whatami { + match whatami { WhatAmI::Router | WhatAmI::Peer => { let peers_net = tables.peers_net.as_ref().unwrap(); - let local_context = - peers_net.get_local_context(routing_context, face.link_id); + let local_context = peers_net.get_local_context(routing_context, link_id); res.as_ref() .and_then(|res| res.peers_data_route(local_context)) .unwrap_or_else(|| { - compute_data_route(tables, expr, Some(local_context), face.whatami) + compute_data_route(tables, expr, Some(local_context), whatami) }) } _ => res .as_ref() .and_then(|res| res.peers_data_route(0)) - .unwrap_or_else(|| compute_data_route(tables, expr, None, face.whatami)), + .unwrap_or_else(|| compute_data_route(tables, expr, None, whatami)), } } else { res.as_ref() - .and_then(|res| match face.whatami { + .and_then(|res| match whatami { WhatAmI::Client => res.client_data_route(), _ => res.peer_data_route(), }) - .unwrap_or_else(|| compute_data_route(tables, expr, None, face.whatami)) + .unwrap_or_else(|| compute_data_route(tables, expr, None, whatami)) } } _ => res .as_ref() .and_then(|res| res.client_data_route()) - .unwrap_or_else(|| compute_data_route(tables, expr, None, face.whatami)), + .unwrap_or_else(|| compute_data_route(tables, expr, None, whatami)), } } @@ -1784,7 +1784,14 @@ pub fn full_reentrant_route_data( == *tables.elect_router(expr.full_expr(), tables.get_router_links(face.zid)) { let res = Resource::get_resource(&prefix, expr.suffix); - let route = get_data_route(&tables, face, &res, &mut expr, routing_context); + let route = get_data_route( + &tables, + face.whatami, + face.link_id, + &res, + &mut expr, + routing_context, + ); let matching_pulls = get_matching_pulls(&tables, &res, &mut expr); if !(route.is_empty() && matching_pulls.is_empty()) { diff --git a/zenoh/src/net/routing/router.rs b/zenoh/src/net/routing/router.rs index dbf687ba7..60012a48e 100644 --- a/zenoh/src/net/routing/router.rs +++ b/zenoh/src/net/routing/router.rs @@ -272,6 +272,7 @@ impl Tables { fid, zid, whatami, + false, #[cfg(feature = "stats")] Some(stats), primitives.clone(), @@ -304,6 +305,7 @@ impl Tables { fid, zid, whatami, + true, #[cfg(feature = "stats")] None, primitives.clone(), @@ -649,6 +651,7 @@ impl Router { fid, ZenohId::from_str("1").unwrap(), WhatAmI::Peer, + false, #[cfg(feature = "stats")] None, Arc::new(McastMux::new(transport.clone())), @@ -674,6 +677,7 @@ impl Router { fid, peer.zid, WhatAmI::Client, // Quick hack + false, #[cfg(feature = "stats")] Some(transport.get_stats().unwrap()), Arc::new(DummyPrimitives), diff --git a/zenoh/src/prelude.rs b/zenoh/src/prelude.rs index baf743924..2f2e7650a 100644 --- a/zenoh/src/prelude.rs +++ b/zenoh/src/prelude.rs @@ -51,6 +51,8 @@ pub(crate) mod common { pub use zenoh_protocol::core::SampleKind; pub use crate::publication::Priority; + #[zenoh_macros::unstable] + pub use crate::publication::PublisherDeclarations; pub use zenoh_protocol::core::{CongestionControl, Reliability, WhatAmI}; /// A [`Locator`] contains a choice of protocol, an address and port, as well as optional additional properties to work with. diff --git a/zenoh/src/publication.rs b/zenoh/src/publication.rs index 3a69c19f8..ac1d6bf55 100644 --- a/zenoh/src/publication.rs +++ b/zenoh/src/publication.rs @@ -14,6 +14,10 @@ //! Publishing primitives. +#[zenoh_macros::unstable] +use crate::handlers::Callback; +#[zenoh_macros::unstable] +use crate::handlers::DefaultHandler; use crate::net::transport::Primitives; use crate::prelude::*; use crate::sample::DataInfo; @@ -199,6 +203,35 @@ use std::pin::Pin; use std::task::{Context, Poll}; use zenoh_result::Error; +#[zenoh_macros::unstable] +#[derive(Clone)] +pub enum PublisherRef<'a> { + Borrow(&'a Publisher<'a>), + Shared(std::sync::Arc>), +} + +#[zenoh_macros::unstable] +impl<'a> std::ops::Deref for PublisherRef<'a> { + type Target = Publisher<'a>; + + fn deref(&self) -> &Self::Target { + match self { + PublisherRef::Borrow(b) => b, + PublisherRef::Shared(s) => s, + } + } +} + +#[zenoh_macros::unstable] +impl std::fmt::Debug for PublisherRef<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + PublisherRef::Borrow(b) => Publisher::fmt(b, f), + PublisherRef::Shared(s) => Publisher::fmt(s, f), + } + } +} + /// A publisher that allows to send data through a stream. /// /// Publishers are automatically undeclared when dropped. @@ -265,6 +298,41 @@ impl<'a> Publisher<'a> { self } + /// Consumes the given `Publisher`, returning a thread-safe reference-counting + /// pointer to it (`Arc`). This is equivalent to `Arc::new(Publisher)`. + /// + /// This is useful to share ownership of the `Publisher` between several threads + /// and tasks. It also alows to create [`MatchingListener`] with static + /// lifetime that can be moved to several threads and tasks. + /// + /// Note: the given zenoh `Publisher` will be undeclared when the last reference to + /// it is dropped. + /// + /// # Examples + /// ```no_run + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); + /// let publisher = session.declare_publisher("key/expression").res().await.unwrap().into_arc(); + /// let matching_listener = publisher.matching_listener().res().await.unwrap(); + /// + /// async_std::task::spawn(async move { + /// while let Ok(matching_status) = matching_listener.recv_async().await { + /// if matching_status.matching_subscribers() { + /// println!("Publisher has matching subscribers."); + /// } else { + /// println!("Publisher has NO MORE matching subscribers."); + /// } + /// } + /// }).await; + /// # }) + /// ``` + #[zenoh_macros::unstable] + pub fn into_arc(self) -> std::sync::Arc { + std::sync::Arc::new(self) + } + fn _write(&self, kind: SampleKind, value: Value) -> Publication { Publication { publisher: self, @@ -328,6 +396,64 @@ impl<'a> Publisher<'a> { self._write(SampleKind::Delete, Value::empty()) } + /// Return the [`MatchingStatus`] of the publisher. + /// + /// [`MatchingStatus::matching_subscribers`] will return true if there exist Subscribers + /// matching the Publisher's key expression and false otherwise. + /// + /// # Examples + /// ``` + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); + /// let publisher = session.declare_publisher("key/expression").res().await.unwrap(); + /// let matching_subscribers: bool = publisher + /// .matching_status() + /// .res() + /// .await + /// .unwrap() + /// .matching_subscribers(); + /// # }) + /// ``` + #[zenoh_macros::unstable] + pub fn matching_status(&self) -> impl Resolve> + '_ { + zenoh_core::ResolveFuture::new(async move { + self.session + .matching_status(self.key_expr(), self.destination) + }) + } + + /// Return a [`MatchingListener`] for this Publisher. + /// + /// The [`MatchingListener`] that will send a notification each time the [`MatchingStatus`] of + /// the Publisher changes. + /// + /// # Examples + /// ```no_run + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let session = zenoh::open(config::peer()).res().await.unwrap(); + /// let publisher = session.declare_publisher("key/expression").res().await.unwrap(); + /// let matching_listener = publisher.matching_listener().res().await.unwrap(); + /// while let Ok(matching_status) = matching_listener.recv_async().await { + /// if matching_status.matching_subscribers() { + /// println!("Publisher has matching subscribers."); + /// } else { + /// println!("Publisher has NO MORE matching subscribers."); + /// } + /// } + /// # }) + /// ``` + #[zenoh_macros::unstable] + pub fn matching_listener(&self) -> MatchingListenerBuilder<'_, DefaultHandler> { + MatchingListenerBuilder { + publisher: PublisherRef::Borrow(self), + handler: DefaultHandler, + } + } + /// Undeclares the [`Publisher`], informing the network that it needn't optimize publications for its key expression anymore. /// /// # Examples @@ -345,6 +471,91 @@ impl<'a> Publisher<'a> { } } +/// Functions to create zenoh entities with `'static` lifetime. +/// +/// This trait contains functions to create zenoh entities like +/// [`MatchingListener`] with a `'static` lifetime. +/// This is useful to move zenoh entities to several threads and tasks. +/// +/// This trait is implemented for `Arc`. +/// +/// # Examples +/// ```no_run +/// # async_std::task::block_on(async { +/// use zenoh::prelude::r#async::*; +/// +/// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); +/// let publisher = session.declare_publisher("key/expression").res().await.unwrap().into_arc(); +/// let matching_listener = publisher.matching_listener().res().await.unwrap(); +/// +/// async_std::task::spawn(async move { +/// while let Ok(matching_status) = matching_listener.recv_async().await { +/// if matching_status.matching_subscribers() { +/// println!("Publisher has matching subscribers."); +/// } else { +/// println!("Publisher has NO MORE matching subscribers."); +/// } +/// } +/// }).await; +/// # }) +/// ``` +#[zenoh_macros::unstable] +pub trait PublisherDeclarations { + /// # Examples + /// ```no_run + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); + /// let publisher = session.declare_publisher("key/expression").res().await.unwrap().into_arc(); + /// let matching_listener = publisher.matching_listener().res().await.unwrap(); + /// + /// async_std::task::spawn(async move { + /// while let Ok(matching_status) = matching_listener.recv_async().await { + /// if matching_status.matching_subscribers() { + /// println!("Publisher has matching subscribers."); + /// } else { + /// println!("Publisher has NO MORE matching subscribers."); + /// } + /// } + /// }).await; + /// # }) + /// ``` + #[zenoh_macros::unstable] + fn matching_listener(&self) -> MatchingListenerBuilder<'static, DefaultHandler>; +} + +#[zenoh_macros::unstable] +impl PublisherDeclarations for std::sync::Arc> { + /// # Examples + /// ```no_run + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); + /// let publisher = session.declare_publisher("key/expression").res().await.unwrap().into_arc(); + /// let matching_listener = publisher.matching_listener().res().await.unwrap(); + /// + /// async_std::task::spawn(async move { + /// while let Ok(matching_status) = matching_listener.recv_async().await { + /// if matching_status.matching_subscribers() { + /// println!("Publisher has matching subscribers."); + /// } else { + /// println!("Publisher has NO MORE matching subscribers."); + /// } + /// } + /// }).await; + /// # }) + /// ``` + #[zenoh_macros::unstable] + fn matching_listener(&self) -> MatchingListenerBuilder<'static, DefaultHandler> { + MatchingListenerBuilder { + publisher: PublisherRef::Shared(self.clone()), + handler: DefaultHandler, + } + } +} + impl<'a> Undeclarable<(), PublisherUndeclaration<'a>> for Publisher<'a> { fn undeclare_inner(self, _: ()) -> PublisherUndeclaration<'a> { PublisherUndeclaration { publisher: self } @@ -700,6 +911,366 @@ impl From for zenoh_protocol::core::Priority { } } +/// A struct that indicates if there exist Subscribers matching the Publisher's key expression. +/// +/// # Examples +/// ``` +/// # async_std::task::block_on(async { +/// use zenoh::prelude::r#async::*; +/// +/// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); +/// let publisher = session.declare_publisher("key/expression").res().await.unwrap(); +/// let matching_status = publisher.matching_status().res().await.unwrap(); +/// # }) +/// ``` +#[zenoh_macros::unstable] +#[derive(Copy, Clone, Debug)] +pub struct MatchingStatus { + pub(crate) matching: bool, +} + +#[zenoh_macros::unstable] +impl MatchingStatus { + /// Return true if there exist Subscribers matching the Publisher's key expression. + /// + /// # Examples + /// ``` + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); + /// let publisher = session.declare_publisher("key/expression").res().await.unwrap(); + /// let matching_subscribers: bool = publisher + /// .matching_status() + /// .res() + /// .await + /// .unwrap() + /// .matching_subscribers(); + /// # }) + /// ``` + pub fn matching_subscribers(&self) -> bool { + self.matching + } +} + +/// A builder for initializing a [`MatchingListener`]. +#[zenoh_macros::unstable] +#[derive(Debug)] +pub struct MatchingListenerBuilder<'a, Handler> { + pub(crate) publisher: PublisherRef<'a>, + pub handler: Handler, +} + +#[zenoh_macros::unstable] +impl<'a> MatchingListenerBuilder<'a, DefaultHandler> { + /// Receive the MatchingStatuses for this listener with a callback. + /// + /// # Examples + /// ``` + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let session = zenoh::open(config::peer()).res().await.unwrap(); + /// let publisher = session.declare_publisher("key/expression").res().await.unwrap(); + /// let matching_listener = publisher + /// .matching_listener() + /// .callback(|matching_status| { + /// if matching_status.matching_subscribers() { + /// println!("Publisher has matching subscribers."); + /// } else { + /// println!("Publisher has NO MORE matching subscribers."); + /// } + /// }) + /// .res() + /// .await + /// .unwrap(); + /// # }) + /// ``` + #[inline] + #[zenoh_macros::unstable] + pub fn callback(self, callback: Callback) -> MatchingListenerBuilder<'a, Callback> + where + Callback: Fn(MatchingStatus) + Send + Sync + 'static, + { + let MatchingListenerBuilder { + publisher, + handler: _, + } = self; + MatchingListenerBuilder { + publisher, + handler: callback, + } + } + + /// Receive the MatchingStatuses for this listener with a mutable callback. + /// + /// # Examples + /// ``` + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let mut n = 0; + /// let session = zenoh::open(config::peer()).res().await.unwrap(); + /// let publisher = session.declare_publisher("key/expression").res().await.unwrap(); + /// let matching_listener = publisher + /// .matching_listener() + /// .callback_mut(move |_matching_status| { n += 1; }) + /// .res() + /// .await + /// .unwrap(); + /// # }) + /// ``` + #[inline] + #[zenoh_macros::unstable] + pub fn callback_mut( + self, + callback: CallbackMut, + ) -> MatchingListenerBuilder<'a, impl Fn(MatchingStatus) + Send + Sync + 'static> + where + CallbackMut: FnMut(MatchingStatus) + Send + Sync + 'static, + { + self.callback(crate::handlers::locked(callback)) + } + + /// Receive the MatchingStatuses for this listener with a [`Handler`](crate::prelude::IntoCallbackReceiverPair). + /// + /// # Examples + /// ```no_run + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let session = zenoh::open(config::peer()).res().await.unwrap(); + /// let publisher = session.declare_publisher("key/expression").res().await.unwrap(); + /// let matching_listener = publisher + /// .matching_listener() + /// .with(flume::bounded(32)) + /// .res() + /// .await + /// .unwrap(); + /// while let Ok(matching_status) = matching_listener.recv_async().await { + /// if matching_status.matching_subscribers() { + /// println!("Publisher has matching subscribers."); + /// } else { + /// println!("Publisher has NO MORE matching subscribers."); + /// } + /// } + /// # }) + /// ``` + #[inline] + #[zenoh_macros::unstable] + pub fn with(self, handler: Handler) -> MatchingListenerBuilder<'a, Handler> + where + Handler: crate::prelude::IntoCallbackReceiverPair<'static, MatchingStatus>, + { + let MatchingListenerBuilder { + publisher, + handler: _, + } = self; + MatchingListenerBuilder { publisher, handler } + } +} + +#[zenoh_macros::unstable] +impl<'a, Handler> Resolvable for MatchingListenerBuilder<'a, Handler> +where + Handler: IntoCallbackReceiverPair<'static, MatchingStatus> + Send, + Handler::Receiver: Send, +{ + type To = ZResult>; +} + +#[zenoh_macros::unstable] +impl<'a, Handler> SyncResolve for MatchingListenerBuilder<'a, Handler> +where + Handler: IntoCallbackReceiverPair<'static, MatchingStatus> + Send, + Handler::Receiver: Send, +{ + #[zenoh_macros::unstable] + fn res_sync(self) -> ::To { + let (callback, receiver) = self.handler.into_cb_receiver_pair(); + self.publisher + .session + .declare_matches_listener_inner(&self.publisher, callback) + .map(|listener_state| MatchingListener { + listener: MatchingListenerInner { + publisher: self.publisher, + state: listener_state, + alive: true, + }, + receiver, + }) + } +} + +#[zenoh_macros::unstable] +impl<'a, Handler> AsyncResolve for MatchingListenerBuilder<'a, Handler> +where + Handler: IntoCallbackReceiverPair<'static, MatchingStatus> + Send, + Handler::Receiver: Send, +{ + type Future = Ready; + + #[zenoh_macros::unstable] + fn res_async(self) -> Self::Future { + std::future::ready(self.res_sync()) + } +} + +#[zenoh_macros::unstable] +pub(crate) struct MatchingListenerState { + pub(crate) id: Id, + pub(crate) current: std::sync::Mutex, + pub(crate) key_expr: KeyExpr<'static>, + pub(crate) destination: Locality, + pub(crate) callback: Callback<'static, MatchingStatus>, +} + +#[zenoh_macros::unstable] +impl std::fmt::Debug for MatchingListenerState { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("MatchingListener") + .field("id", &self.id) + .field("key_expr", &self.key_expr) + .finish() + } +} + +#[zenoh_macros::unstable] +pub(crate) struct MatchingListenerInner<'a> { + pub(crate) publisher: PublisherRef<'a>, + pub(crate) state: std::sync::Arc, + pub(crate) alive: bool, +} + +#[zenoh_macros::unstable] +impl<'a> MatchingListenerInner<'a> { + #[inline] + pub fn undeclare(self) -> MatchingListenerUndeclaration<'a> { + Undeclarable::undeclare_inner(self, ()) + } +} + +#[zenoh_macros::unstable] +impl<'a> Undeclarable<(), MatchingListenerUndeclaration<'a>> for MatchingListenerInner<'a> { + fn undeclare_inner(self, _: ()) -> MatchingListenerUndeclaration<'a> { + MatchingListenerUndeclaration { subscriber: self } + } +} + +/// A listener that sends notifications when the [`MatchingStatus`] of a +/// publisher changes. +/// +/// # Examples +/// ```no_run +/// # async_std::task::block_on(async { +/// use zenoh::prelude::r#async::*; +/// +/// let session = zenoh::open(config::peer()).res().await.unwrap(); +/// let publisher = session.declare_publisher("key/expression").res().await.unwrap(); +/// let matching_listener = publisher.matching_listener().res().await.unwrap(); +/// while let Ok(matching_status) = matching_listener.recv_async().await { +/// if matching_status.matching_subscribers() { +/// println!("Publisher has matching subscribers."); +/// } else { +/// println!("Publisher has NO MORE matching subscribers."); +/// } +/// } +/// # }) +/// ``` +#[zenoh_macros::unstable] +pub struct MatchingListener<'a, Receiver> { + pub(crate) listener: MatchingListenerInner<'a>, + pub receiver: Receiver, +} + +#[zenoh_macros::unstable] +impl<'a, Receiver> MatchingListener<'a, Receiver> { + /// Close a [`MatchingListener`]. + /// + /// MatchingListeners are automatically closed when dropped, but you may want to use this function to handle errors or + /// close the MatchingListener asynchronously. + /// + /// # Examples + /// ``` + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let session = zenoh::open(config::peer()).res().await.unwrap(); + /// let publisher = session.declare_publisher("key/expression").res().await.unwrap(); + /// let matching_listener = publisher.matching_listener().res().await.unwrap(); + /// matching_listener.undeclare().res().await.unwrap(); + /// # }) + /// ``` + #[inline] + pub fn undeclare(self) -> MatchingListenerUndeclaration<'a> { + self.listener.undeclare() + } +} + +#[zenoh_macros::unstable] +impl<'a, T> Undeclarable<(), MatchingListenerUndeclaration<'a>> for MatchingListener<'a, T> { + fn undeclare_inner(self, _: ()) -> MatchingListenerUndeclaration<'a> { + Undeclarable::undeclare_inner(self.listener, ()) + } +} + +#[zenoh_macros::unstable] +impl std::ops::Deref for MatchingListener<'_, Receiver> { + type Target = Receiver; + + fn deref(&self) -> &Self::Target { + &self.receiver + } +} +#[zenoh_macros::unstable] +impl std::ops::DerefMut for MatchingListener<'_, Receiver> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.receiver + } +} + +#[zenoh_macros::unstable] +pub struct MatchingListenerUndeclaration<'a> { + subscriber: MatchingListenerInner<'a>, +} + +#[zenoh_macros::unstable] +impl Resolvable for MatchingListenerUndeclaration<'_> { + type To = ZResult<()>; +} + +#[zenoh_macros::unstable] +impl SyncResolve for MatchingListenerUndeclaration<'_> { + fn res_sync(mut self) -> ::To { + self.subscriber.alive = false; + self.subscriber + .publisher + .session + .undeclare_matches_listener_inner(self.subscriber.state.id) + } +} + +#[zenoh_macros::unstable] +impl AsyncResolve for MatchingListenerUndeclaration<'_> { + type Future = Ready; + + fn res_async(self) -> Self::Future { + std::future::ready(self.res_sync()) + } +} + +#[zenoh_macros::unstable] +impl Drop for MatchingListenerInner<'_> { + fn drop(&mut self) { + if self.alive { + let _ = self + .publisher + .session + .undeclare_matches_listener_inner(self.state.id); + } + } +} + mod tests { #[test] fn priority_from() { diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 806041b6d..e8314c9cc 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -101,6 +101,8 @@ pub(crate) struct SessionState { pub(crate) queryables: HashMap>, #[cfg(feature = "unstable")] pub(crate) tokens: HashMap>, + #[cfg(feature = "unstable")] + pub(crate) matching_listeners: HashMap>, pub(crate) queries: HashMap, pub(crate) aggregated_subscribers: Vec, //pub(crate) aggregated_publishers: Vec, @@ -123,6 +125,8 @@ impl SessionState { queryables: HashMap::new(), #[cfg(feature = "unstable")] tokens: HashMap::new(), + #[cfg(feature = "unstable")] + matching_listeners: HashMap::new(), queries: HashMap::new(), aggregated_subscribers, //aggregated_publishers, @@ -1099,6 +1103,12 @@ impl Session { ext_info: *info, }), }); + + #[cfg(feature = "unstable")] + { + let state = zread!(self.state); + self.update_status_up(&state, &key_expr) + } } Ok(sub_state) @@ -1157,6 +1167,12 @@ impl Session { ext_wire_expr: WireExprType { wire_expr }, }), }); + + #[cfg(feature = "unstable")] + { + let state = zread!(self.state); + self.update_status_down(&state, &sub_state.key_expr) + } } } None => { @@ -1178,6 +1194,12 @@ impl Session { }, }), }); + + #[cfg(feature = "unstable")] + { + let state = zread!(self.state); + self.update_status_down(&state, &sub_state.key_expr) + } } } }; @@ -1423,6 +1445,148 @@ impl Session { } } + #[zenoh_macros::unstable] + pub(crate) fn declare_matches_listener_inner( + &self, + publisher: &Publisher, + callback: Callback<'static, MatchingStatus>, + ) -> ZResult> { + let mut state = zwrite!(self.state); + + let id = state.decl_id_counter.fetch_add(1, Ordering::SeqCst); + log::trace!("matches_listener({:?}) => {id}", publisher.key_expr); + let listener_state = Arc::new(MatchingListenerState { + id, + current: std::sync::Mutex::new(false), + destination: publisher.destination, + key_expr: publisher.key_expr.clone().into_owned(), + callback, + }); + state.matching_listeners.insert(id, listener_state.clone()); + drop(state); + match listener_state.current.lock() { + Ok(mut current) => { + if self + .matching_status(&publisher.key_expr, listener_state.destination) + .map(|s| s.matching_subscribers()) + .unwrap_or(true) + { + *current = true; + (listener_state.callback)(MatchingStatus { matching: true }); + } + } + Err(e) => log::error!("Error trying to acquire MathginListener lock: {}", e), + } + Ok(listener_state) + } + + #[zenoh_macros::unstable] + pub(crate) fn matching_status( + &self, + key_expr: &KeyExpr, + destination: Locality, + ) -> ZResult { + use crate::net::routing::router::RoutingExpr; + use zenoh_protocol::core::WhatAmI; + let tables = zread!(self.runtime.router.tables.tables); + let res = crate::net::routing::resource::Resource::get_resource( + &tables.root_res, + key_expr.as_str(), + ); + + let route = crate::net::routing::pubsub::get_data_route( + &tables, + WhatAmI::Client, + 0, + &res, + &mut RoutingExpr::new(&tables.root_res, key_expr.as_str()), + 0, + ); + let matching = match destination { + Locality::Any => !route.is_empty(), + Locality::Remote => route.values().any(|dir| !dir.0.is_local()), + Locality::SessionLocal => route.values().any(|dir| dir.0.is_local()), + }; + Ok(MatchingStatus { matching }) + } + + #[zenoh_macros::unstable] + pub(crate) fn update_status_up(&self, state: &SessionState, key_expr: &KeyExpr) { + for msub in state.matching_listeners.values() { + if key_expr.intersects(&msub.key_expr) { + // Cannot hold session lock when calling tables (matching_status()) + async_std::task::spawn({ + let session = self.clone(); + let msub = msub.clone(); + async move { + match msub.current.lock() { + Ok(mut current) => { + if !*current { + if let Ok(status) = + session.matching_status(&msub.key_expr, msub.destination) + { + if status.matching_subscribers() { + *current = true; + let callback = msub.callback.clone(); + (callback)(status) + } + } + } + } + Err(e) => { + log::error!("Error trying to acquire MathginListener lock: {}", e); + } + } + } + }); + } + } + } + + #[zenoh_macros::unstable] + pub(crate) fn update_status_down(&self, state: &SessionState, key_expr: &KeyExpr) { + for msub in state.matching_listeners.values() { + if key_expr.intersects(&msub.key_expr) { + // Cannot hold session lock when calling tables (matching_status()) + async_std::task::spawn({ + let session = self.clone(); + let msub = msub.clone(); + async move { + match msub.current.lock() { + Ok(mut current) => { + if *current { + if let Ok(status) = + session.matching_status(&msub.key_expr, msub.destination) + { + if !status.matching_subscribers() { + *current = false; + let callback = msub.callback.clone(); + (callback)(status) + } + } + } + } + Err(e) => { + log::error!("Error trying to acquire MathginListener lock: {}", e); + } + } + } + }); + } + } + } + + #[zenoh_macros::unstable] + pub(crate) fn undeclare_matches_listener_inner(&self, sid: usize) -> ZResult<()> { + let mut state = zwrite!(self.state); + if let Some(state) = state.matching_listeners.remove(&sid) { + trace!("undeclare_matches_listener_inner({:?})", state); + Ok(()) + } else { + Err(zerror!("Unable to find MatchingListener").into()) + } + } + pub(crate) fn handle_data( &self, local: bool, @@ -1939,6 +2103,8 @@ impl Primitives for Session { let state = zread!(self.state); match state.wireexpr_to_keyexpr(&m.wire_expr, false) { Ok(expr) => { + self.update_status_up(&state, &expr); + if expr .as_str() .starts_with(crate::liveliness::PREFIX_LIVELINESS) @@ -1960,6 +2126,8 @@ impl Primitives for Session { let state = zread!(self.state); match state.wireexpr_to_keyexpr(&m.ext_wire_expr.wire_expr, false) { Ok(expr) => { + self.update_status_down(&state, &expr); + if expr .as_str() .starts_with(crate::liveliness::PREFIX_LIVELINESS) diff --git a/zenoh/tests/matching.rs b/zenoh/tests/matching.rs new file mode 100644 index 000000000..cf637ee62 --- /dev/null +++ b/zenoh/tests/matching.rs @@ -0,0 +1,225 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use async_std::prelude::FutureExt; +use async_std::task; +use std::time::Duration; +use zenoh::prelude::r#async::*; +use zenoh_core::zasync_executor_init; + +const TIMEOUT: Duration = Duration::from_secs(60); +const RECV_TIMEOUT: Duration = Duration::from_secs(1); + +macro_rules! ztimeout { + ($f:expr) => { + $f.timeout(TIMEOUT).await.unwrap() + }; +} + +#[cfg(feature = "unstable")] +#[test] +fn zenoh_matching_status_any() { + use flume::RecvTimeoutError; + + task::block_on(async { + zasync_executor_init!(); + + let session1 = ztimeout!(zenoh::open(config::peer()).res_async()).unwrap(); + + let session2 = ztimeout!(zenoh::open(config::peer()).res_async()).unwrap(); + + let publisher1 = ztimeout!(session1 + .declare_publisher("zenoh_matching_status_any_test") + .allowed_destination(Locality::Any) + .res_async()) + .unwrap(); + + let matching_listener = ztimeout!(publisher1.matching_listener().res_async()).unwrap(); + + let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); + assert!(received_status.err() == Some(RecvTimeoutError::Timeout)); + + let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + assert!(!matching_status.matching_subscribers()); + + let sub = ztimeout!(session1 + .declare_subscriber("zenoh_matching_status_any_test") + .res_async()) + .unwrap(); + + let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); + assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(true)); + + let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + assert!(matching_status.matching_subscribers()); + + ztimeout!(sub.undeclare().res_async()).unwrap(); + + let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); + assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(false)); + + let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + assert!(!matching_status.matching_subscribers()); + + let sub = ztimeout!(session2 + .declare_subscriber("zenoh_matching_status_any_test") + .res_async()) + .unwrap(); + + let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); + assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(true)); + + let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + assert!(matching_status.matching_subscribers()); + + ztimeout!(sub.undeclare().res_async()).unwrap(); + + let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); + assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(false)); + + let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + assert!(!matching_status.matching_subscribers()); + }); +} + +#[cfg(feature = "unstable")] +#[test] +fn zenoh_matching_status_remote() { + use flume::RecvTimeoutError; + + task::block_on(async { + zasync_executor_init!(); + + let session1 = ztimeout!(zenoh::open(config::peer()).res_async()).unwrap(); + + let session2 = ztimeout!(zenoh::open(config::peer()).res_async()).unwrap(); + + let publisher1 = ztimeout!(session1 + .declare_publisher("zenoh_matching_status_remote_test") + .allowed_destination(Locality::Remote) + .res_async()) + .unwrap(); + + let matching_listener = ztimeout!(publisher1.matching_listener().res_async()).unwrap(); + + let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); + assert!(received_status.err() == Some(RecvTimeoutError::Timeout)); + + let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + assert!(!matching_status.matching_subscribers()); + + let sub = ztimeout!(session1 + .declare_subscriber("zenoh_matching_status_remote_test") + .res_async()) + .unwrap(); + + let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); + assert!(received_status.err() == Some(RecvTimeoutError::Timeout)); + + let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + assert!(!matching_status.matching_subscribers()); + + ztimeout!(sub.undeclare().res_async()).unwrap(); + + let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); + assert!(received_status.err() == Some(RecvTimeoutError::Timeout)); + + let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + assert!(!matching_status.matching_subscribers()); + + let sub = ztimeout!(session2 + .declare_subscriber("zenoh_matching_status_remote_test") + .res_async()) + .unwrap(); + + let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); + assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(true)); + + let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + assert!(matching_status.matching_subscribers()); + + ztimeout!(sub.undeclare().res_async()).unwrap(); + + let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); + assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(false)); + + let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + assert!(!matching_status.matching_subscribers()); + }); +} + +#[cfg(feature = "unstable")] +#[test] +fn zenoh_matching_status_local() { + use flume::RecvTimeoutError; + + task::block_on(async { + zasync_executor_init!(); + + let session1 = ztimeout!(zenoh::open(config::peer()).res_async()).unwrap(); + + let session2 = ztimeout!(zenoh::open(config::peer()).res_async()).unwrap(); + + let publisher1 = ztimeout!(session1 + .declare_publisher("zenoh_matching_status_local_test") + .allowed_destination(Locality::SessionLocal) + .res_async()) + .unwrap(); + + let matching_listener = ztimeout!(publisher1.matching_listener().res_async()).unwrap(); + + let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); + assert!(received_status.err() == Some(RecvTimeoutError::Timeout)); + + let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + assert!(!matching_status.matching_subscribers()); + + let sub = ztimeout!(session1 + .declare_subscriber("zenoh_matching_status_local_test") + .res_async()) + .unwrap(); + + let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); + assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(true)); + + let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + assert!(matching_status.matching_subscribers()); + + ztimeout!(sub.undeclare().res_async()).unwrap(); + + let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); + assert!(received_status.ok().map(|s| s.matching_subscribers()) == Some(false)); + + let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + assert!(!matching_status.matching_subscribers()); + + let sub = ztimeout!(session2 + .declare_subscriber("zenoh_matching_status_local_test") + .res_async()) + .unwrap(); + + let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); + assert!(received_status.err() == Some(RecvTimeoutError::Timeout)); + + let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + assert!(!matching_status.matching_subscribers()); + + ztimeout!(sub.undeclare().res_async()).unwrap(); + + let received_status = matching_listener.recv_timeout(RECV_TIMEOUT); + assert!(received_status.err() == Some(RecvTimeoutError::Timeout)); + + let matching_status = ztimeout!(publisher1.matching_status().res_async()).unwrap(); + assert!(!matching_status.matching_subscribers()); + }); +}