Skip to content

Commit

Permalink
Listener with static lifetime
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Oct 9, 2023
1 parent acad2d7 commit 66eb8aa
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 5 deletions.
2 changes: 2 additions & 0 deletions zenoh/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ pub(crate) mod common {
pub use zenoh_protocol::core::SampleKind;

pub use crate::publication::Priority;
#[zenoh_macros::unstable]
pub use crate::publication::PublisherDeclarations;
pub use zenoh_protocol::core::{CongestionControl, Reliability, WhatAmI};

/// A [`Locator`] contains a choice of protocol, an address and port, as well as optional additional properties to work with.
Expand Down
101 changes: 96 additions & 5 deletions zenoh/src/publication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,35 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use zenoh_result::Error;

#[zenoh_macros::unstable]
#[derive(Clone)]
pub enum PublisherRef<'a, 'b: 'a> {
Borrow(&'a Publisher<'b>),
Shared(std::sync::Arc<Publisher<'static>>),
}

#[zenoh_macros::unstable]
impl<'a> std::ops::Deref for PublisherRef<'_, 'a> {
type Target = Publisher<'a>;

fn deref(&self) -> &Self::Target {
match self {
PublisherRef::Borrow(b) => b,
PublisherRef::Shared(s) => s,
}
}
}

#[zenoh_macros::unstable]
impl std::fmt::Debug for PublisherRef<'_, '_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PublisherRef::Borrow(b) => Publisher::fmt(b, f),
PublisherRef::Shared(s) => Publisher::fmt(s, f),
}
}
}

/// A publisher that allows to send data through a stream.
///
/// Publishers are automatically undeclared when dropped.
Expand Down Expand Up @@ -268,6 +297,11 @@ impl<'a> Publisher<'a> {
self
}

#[zenoh_macros::unstable]
pub fn into_arc(self) -> std::sync::Arc<Self> {
std::sync::Arc::new(self)
}

fn _write(&self, kind: SampleKind, value: Value) -> Publication {
Publication {
publisher: self,
Expand Down Expand Up @@ -382,9 +416,9 @@ impl<'a> Publisher<'a> {
/// # })
/// ```
#[zenoh_macros::unstable]
pub fn matching_listener(&'a self) -> MatchingListenerBuilder<'a, DefaultHandler> {
pub fn matching_listener(&self) -> MatchingListenerBuilder<'_, DefaultHandler> {
MatchingListenerBuilder {
publisher: self,
publisher: PublisherRef::Borrow(self),
handler: DefaultHandler,
}
}
Expand All @@ -406,6 +440,63 @@ impl<'a> Publisher<'a> {
}
}

#[zenoh_macros::unstable]
pub trait PublisherDeclarations {
/// # Examples
/// ```no_run
/// # async_std::task::block_on(async {
/// use zenoh::prelude::r#async::*;
///
/// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc();
/// let publisher = session.declare_publisher("key/expression").res().await.unwrap().into_arc();
/// let matching_listener = publisher.matching_listener().res().await.unwrap();
///
/// async_std::task::spawn(async move {
/// while let Ok(matching_status) = matching_listener.recv_async().await {
/// if matching_status.is_matching() {
/// println!("Publisher has matching subscribers.");
/// } else {
/// println!("Publisher has NO MORE matching subscribers.");
/// }
/// }
/// }).await;
/// # })
/// ```
#[zenoh_macros::unstable]
fn matching_listener(&self) -> MatchingListenerBuilder<'static, DefaultHandler>;
}

#[zenoh_macros::unstable]
impl PublisherDeclarations for std::sync::Arc<Publisher<'static>> {
/// # Examples
/// ```no_run
/// # async_std::task::block_on(async {
/// use zenoh::prelude::r#async::*;
///
/// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc();
/// let publisher = session.declare_publisher("key/expression").res().await.unwrap().into_arc();
/// let matching_listener = publisher.matching_listener().res().await.unwrap();
///
/// async_std::task::spawn(async move {
/// while let Ok(matching_status) = matching_listener.recv_async().await {
/// if matching_status.is_matching() {
/// println!("Publisher has matching subscribers.");
/// } else {
/// println!("Publisher has NO MORE matching subscribers.");
/// }
/// }
/// }).await;
/// # })
/// ```
#[zenoh_macros::unstable]
fn matching_listener(&self) -> MatchingListenerBuilder<'static, DefaultHandler> {
MatchingListenerBuilder {
publisher: PublisherRef::Shared(self.clone()),
handler: DefaultHandler,
}
}
}

impl<'a> Undeclarable<(), PublisherUndeclaration<'a>> for Publisher<'a> {
fn undeclare_inner(self, _: ()) -> PublisherUndeclaration<'a> {
PublisherUndeclaration { publisher: self }
Expand Down Expand Up @@ -773,7 +864,7 @@ impl MatchingStatus {
#[zenoh_macros::unstable]
#[derive(Debug)]
pub struct MatchingListenerBuilder<'a, Handler> {
pub(crate) publisher: &'a Publisher<'a>,
pub(crate) publisher: PublisherRef<'a, 'a>,
pub handler: Handler,
}

Expand Down Expand Up @@ -841,7 +932,7 @@ where
let (callback, receiver) = self.handler.into_cb_receiver_pair();
self.publisher
.session
.declare_matches_subscriber_inner(self.publisher, callback)
.declare_matches_subscriber_inner(&self.publisher, callback)
.map(|sub_state| MatchingListener {
subscriber: MatchingListenerInner {
publisher: self.publisher,
Expand Down Expand Up @@ -877,7 +968,7 @@ pub(crate) struct MatchingListenerState {

#[zenoh_macros::unstable]
pub(crate) struct MatchingListenerInner<'a> {
pub(crate) publisher: &'a Publisher<'a>,
pub(crate) publisher: PublisherRef<'a, 'a>,
pub(crate) state: std::sync::Arc<MatchingListenerState>,
pub(crate) alive: bool,
}
Expand Down

0 comments on commit 66eb8aa

Please sign in to comment.