Skip to content

Commit

Permalink
fix: align MatchingListener undeclaration on drop behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
wyfo committed Sep 6, 2024
1 parent 8f159ea commit bb9de5a
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 55 deletions.
90 changes: 42 additions & 48 deletions zenoh/src/api/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::{
convert::TryFrom,
fmt,
future::{IntoFuture, Ready},
mem::size_of,
pin::Pin,
task::{Context, Poll},
};
Expand Down Expand Up @@ -765,18 +766,19 @@ where
{
#[zenoh_macros::unstable]
fn wait(self) -> <Self as Resolvable>::To {
let (callback, receiver) = self.handler.into_handler();
let (callback, handler) = self.handler.into_handler();
let state = self
.publisher
.session
.declare_matches_listener_inner(self.publisher, callback)?;
zlock!(self.publisher.matching_listeners).insert(state.id);
Ok(MatchingListener {
listener: MatchingListenerInner {
inner: MatchingListenerInner {
publisher: self.publisher.clone(),
state,
undeclare_on_drop: size_of::<Handler::Handler>() > 0,
},
receiver,
handler,
})
}
}
Expand Down Expand Up @@ -819,30 +821,15 @@ impl fmt::Debug for MatchingListenerState {
pub(crate) struct MatchingListenerInner<'a> {
pub(crate) publisher: Publisher<'a>,
pub(crate) state: Arc<MatchingListenerState>,
}

#[zenoh_macros::unstable]
impl<'a> MatchingListenerInner<'a> {
#[inline]
pub fn undeclare(self) -> MatchingListenerUndeclaration<'a> {
UndeclarableSealed::undeclare_inner(self, ())
}
}

#[zenoh_macros::unstable]
impl<'a> UndeclarableSealed<()> for MatchingListenerInner<'a> {
type Undeclaration = MatchingListenerUndeclaration<'a>;

fn undeclare_inner(self, _: ()) -> Self::Undeclaration {
MatchingListenerUndeclaration { subscriber: self }
}
pub(crate) undeclare_on_drop: bool,
}

/// A listener that sends notifications when the [`MatchingStatus`] of a
/// publisher changes.
///
/// Matching listeners run in background until the publisher is undeclared.
/// They can be manually undeclared, but will not be undeclared on drop.
/// Callback matching listeners will run in background until the publisher is undeclared,
/// or until it is undeclared.
/// On the other hand, matching listener with a handler are automatically undeclared when dropped.
///
/// # Examples
/// ```no_run
Expand All @@ -863,13 +850,13 @@ impl<'a> UndeclarableSealed<()> for MatchingListenerInner<'a> {
/// # }
/// ```
#[zenoh_macros::unstable]
pub struct MatchingListener<'a, Receiver> {
pub(crate) listener: MatchingListenerInner<'a>,
pub(crate) receiver: Receiver,
pub struct MatchingListener<'a, Handler> {
pub(crate) inner: MatchingListenerInner<'a>,
pub(crate) handler: Handler,
}

#[zenoh_macros::unstable]
impl<'a, Receiver> MatchingListener<'a, Receiver> {
impl<'a, Handler> MatchingListener<'a, Handler> {
/// Undeclare the [`MatchingListener`].
///
/// # Examples
Expand All @@ -885,58 +872,65 @@ impl<'a, Receiver> MatchingListener<'a, Receiver> {
/// # }
/// ```
#[inline]
pub fn undeclare(self) -> MatchingListenerUndeclaration<'a> {
self.listener.undeclare()
pub fn undeclare(self) -> MatchingListenerUndeclaration<'a, Handler>
where
Handler: Send,
{
self.undeclare_inner(())
}

fn undeclare_impl(&mut self) -> ZResult<()> {
// set the flag first to avoid double panic if this function panic
self.inner.undeclare_on_drop = false;
zlock!(self.inner.publisher.matching_listeners).remove(&self.inner.state.id);
self.inner
.publisher
.session
.undeclare_matches_listener_inner(self.inner.state.id)
}
}

#[zenoh_macros::unstable]
impl<'a, T> UndeclarableSealed<()> for MatchingListener<'a, T> {
type Undeclaration = MatchingListenerUndeclaration<'a>;
impl<'a, Handler: Send> UndeclarableSealed<()> for MatchingListener<'a, Handler> {
type Undeclaration = MatchingListenerUndeclaration<'a, Handler>;

fn undeclare_inner(self, _: ()) -> Self::Undeclaration {
UndeclarableSealed::undeclare_inner(self.listener, ())
MatchingListenerUndeclaration(self)
}
}

#[zenoh_macros::unstable]
impl<Receiver> std::ops::Deref for MatchingListener<'_, Receiver> {
type Target = Receiver;
impl<Handler> std::ops::Deref for MatchingListener<'_, Handler> {
type Target = Handler;

fn deref(&self) -> &Self::Target {
&self.receiver
&self.handler
}
}
#[zenoh_macros::unstable]
impl<Receiver> std::ops::DerefMut for MatchingListener<'_, Receiver> {
impl<Handler> std::ops::DerefMut for MatchingListener<'_, Handler> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.receiver
&mut self.handler
}
}

#[zenoh_macros::unstable]
pub struct MatchingListenerUndeclaration<'a> {
subscriber: MatchingListenerInner<'a>,
}
pub struct MatchingListenerUndeclaration<'a, Handler>(MatchingListener<'a, Handler>);

#[zenoh_macros::unstable]
impl Resolvable for MatchingListenerUndeclaration<'_> {
impl<Handler> Resolvable for MatchingListenerUndeclaration<'_, Handler> {
type To = ZResult<()>;
}

#[zenoh_macros::unstable]
impl Wait for MatchingListenerUndeclaration<'_> {
fn wait(self) -> <Self as Resolvable>::To {
zlock!(self.subscriber.publisher.matching_listeners).remove(&self.subscriber.state.id);
self.subscriber
.publisher
.session
.undeclare_matches_listener_inner(self.subscriber.state.id)
impl<Handler> Wait for MatchingListenerUndeclaration<'_, Handler> {
fn wait(mut self) -> <Self as Resolvable>::To {
self.0.undeclare_impl()
}
}

#[zenoh_macros::unstable]
impl IntoFuture for MatchingListenerUndeclaration<'_> {
impl<Handler> IntoFuture for MatchingListenerUndeclaration<'_, Handler> {
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;

Expand Down
4 changes: 1 addition & 3 deletions zenoh/src/api/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,9 +736,7 @@ impl<'a, 'b, Handler> QueryableBuilder<'a, 'b, Handler> {
/// A queryable that provides data through a [`Handler`](crate::handlers::IntoHandler).
///
/// Queryables can be created from a zenoh [`Session`](crate::Session)
/// with the [`declare_queryable`](crate::Session::declare_queryable) function
/// and the [`with`](QueryableBuilder::with) function
/// of the resulting builder.
/// with the [`declare_queryable`](crate::Session::declare_queryable) function.
///
/// Callback queryables will run in background until the session is closed,
/// or until it is undeclared.
Expand Down
6 changes: 2 additions & 4 deletions zenoh/src/api/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,9 +363,7 @@ where
/// A subscriber that provides data through a [`Handler`](crate::handlers::IntoHandler).
///
/// Subscribers can be created from a zenoh [`Session`](crate::Session)
/// with the [`declare_subscriber`](crate::Session::declare_subscriber) function
/// and the [`with`](SubscriberBuilder::with) function
/// of the resulting builder.
/// with the [`declare_subscriber`](crate::Session::declare_subscriber) function.
///
/// Callback subscribers will run in background until the session is closed,
/// or until it is undeclared.
Expand Down Expand Up @@ -500,7 +498,7 @@ impl<Handler> Drop for Subscriber<Handler> {
}
}

impl<'a, Handler: Send + 'a> UndeclarableSealed<()> for Subscriber<Handler> {
impl<Handler: Send> UndeclarableSealed<()> for Subscriber<Handler> {
type Undeclaration = SubscriberUndeclaration<Handler>;

fn undeclare_inner(self, _: ()) -> Self::Undeclaration {
Expand Down

0 comments on commit bb9de5a

Please sign in to comment.