From 66eb8aa36e15925ca8f140372433da5e0e3eebdf Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Mon, 9 Oct 2023 11:58:05 +0200 Subject: [PATCH] Listener with static lifetime --- zenoh/src/prelude.rs | 2 + zenoh/src/publication.rs | 101 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 98 insertions(+), 5 deletions(-) diff --git a/zenoh/src/prelude.rs b/zenoh/src/prelude.rs index baf7439244..2f2e7650a0 100644 --- a/zenoh/src/prelude.rs +++ b/zenoh/src/prelude.rs @@ -51,6 +51,8 @@ pub(crate) mod common { pub use zenoh_protocol::core::SampleKind; pub use crate::publication::Priority; + #[zenoh_macros::unstable] + pub use crate::publication::PublisherDeclarations; pub use zenoh_protocol::core::{CongestionControl, Reliability, WhatAmI}; /// A [`Locator`] contains a choice of protocol, an address and port, as well as optional additional properties to work with. diff --git a/zenoh/src/publication.rs b/zenoh/src/publication.rs index 1dece07d8d..3cab950678 100644 --- a/zenoh/src/publication.rs +++ b/zenoh/src/publication.rs @@ -202,6 +202,35 @@ use std::pin::Pin; use std::task::{Context, Poll}; use zenoh_result::Error; +#[zenoh_macros::unstable] +#[derive(Clone)] +pub enum PublisherRef<'a, 'b: 'a> { + Borrow(&'a Publisher<'b>), + Shared(std::sync::Arc>), +} + +#[zenoh_macros::unstable] +impl<'a> std::ops::Deref for PublisherRef<'_, 'a> { + type Target = Publisher<'a>; + + fn deref(&self) -> &Self::Target { + match self { + PublisherRef::Borrow(b) => b, + PublisherRef::Shared(s) => s, + } + } +} + +#[zenoh_macros::unstable] +impl std::fmt::Debug for PublisherRef<'_, '_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + PublisherRef::Borrow(b) => Publisher::fmt(b, f), + PublisherRef::Shared(s) => Publisher::fmt(s, f), + } + } +} + /// A publisher that allows to send data through a stream. /// /// Publishers are automatically undeclared when dropped. @@ -268,6 +297,11 @@ impl<'a> Publisher<'a> { self } + #[zenoh_macros::unstable] + pub fn into_arc(self) -> std::sync::Arc { + std::sync::Arc::new(self) + } + fn _write(&self, kind: SampleKind, value: Value) -> Publication { Publication { publisher: self, @@ -382,9 +416,9 @@ impl<'a> Publisher<'a> { /// # }) /// ``` #[zenoh_macros::unstable] - pub fn matching_listener(&'a self) -> MatchingListenerBuilder<'a, DefaultHandler> { + pub fn matching_listener(&self) -> MatchingListenerBuilder<'_, DefaultHandler> { MatchingListenerBuilder { - publisher: self, + publisher: PublisherRef::Borrow(self), handler: DefaultHandler, } } @@ -406,6 +440,63 @@ impl<'a> Publisher<'a> { } } +#[zenoh_macros::unstable] +pub trait PublisherDeclarations { + /// # Examples + /// ```no_run + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); + /// let publisher = session.declare_publisher("key/expression").res().await.unwrap().into_arc(); + /// let matching_listener = publisher.matching_listener().res().await.unwrap(); + /// + /// async_std::task::spawn(async move { + /// while let Ok(matching_status) = matching_listener.recv_async().await { + /// if matching_status.is_matching() { + /// println!("Publisher has matching subscribers."); + /// } else { + /// println!("Publisher has NO MORE matching subscribers."); + /// } + /// } + /// }).await; + /// # }) + /// ``` + #[zenoh_macros::unstable] + fn matching_listener(&self) -> MatchingListenerBuilder<'static, DefaultHandler>; +} + +#[zenoh_macros::unstable] +impl PublisherDeclarations for std::sync::Arc> { + /// # Examples + /// ```no_run + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); + /// let publisher = session.declare_publisher("key/expression").res().await.unwrap().into_arc(); + /// let matching_listener = publisher.matching_listener().res().await.unwrap(); + /// + /// async_std::task::spawn(async move { + /// while let Ok(matching_status) = matching_listener.recv_async().await { + /// if matching_status.is_matching() { + /// println!("Publisher has matching subscribers."); + /// } else { + /// println!("Publisher has NO MORE matching subscribers."); + /// } + /// } + /// }).await; + /// # }) + /// ``` + #[zenoh_macros::unstable] + fn matching_listener(&self) -> MatchingListenerBuilder<'static, DefaultHandler> { + MatchingListenerBuilder { + publisher: PublisherRef::Shared(self.clone()), + handler: DefaultHandler, + } + } +} + impl<'a> Undeclarable<(), PublisherUndeclaration<'a>> for Publisher<'a> { fn undeclare_inner(self, _: ()) -> PublisherUndeclaration<'a> { PublisherUndeclaration { publisher: self } @@ -773,7 +864,7 @@ impl MatchingStatus { #[zenoh_macros::unstable] #[derive(Debug)] pub struct MatchingListenerBuilder<'a, Handler> { - pub(crate) publisher: &'a Publisher<'a>, + pub(crate) publisher: PublisherRef<'a, 'a>, pub handler: Handler, } @@ -841,7 +932,7 @@ where let (callback, receiver) = self.handler.into_cb_receiver_pair(); self.publisher .session - .declare_matches_subscriber_inner(self.publisher, callback) + .declare_matches_subscriber_inner(&self.publisher, callback) .map(|sub_state| MatchingListener { subscriber: MatchingListenerInner { publisher: self.publisher, @@ -877,7 +968,7 @@ pub(crate) struct MatchingListenerState { #[zenoh_macros::unstable] pub(crate) struct MatchingListenerInner<'a> { - pub(crate) publisher: &'a Publisher<'a>, + pub(crate) publisher: PublisherRef<'a, 'a>, pub(crate) state: std::sync::Arc, pub(crate) alive: bool, }