Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: bind callback subscriber/queryable to session lifetime #1334

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions examples/examples/z_sub_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,7 @@ fn main() {
}
})
.wait()
.unwrap()
// Make the subscriber run in background, until the session is closed.
.background();
.unwrap();

println!("Press CTRL-C to quit...");
std::thread::park();
Expand Down
6 changes: 4 additions & 2 deletions zenoh/src/api/key_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,10 @@ impl<'a> KeyExpr<'a> {
}
}

impl<'a> UndeclarableSealed<&'a Session, KeyExprUndeclaration<'a>> for KeyExpr<'a> {
fn undeclare_inner(self, session: &'a Session) -> KeyExprUndeclaration<'a> {
impl<'a> UndeclarableSealed<&'a Session> for KeyExpr<'a> {
type Undeclaration = KeyExprUndeclaration<'a>;
wyfo marked this conversation as resolved.
Show resolved Hide resolved

fn undeclare_inner(self, session: &'a Session) -> Self::Undeclaration {
KeyExprUndeclaration {
session,
expr: self,
Expand Down
77 changes: 41 additions & 36 deletions zenoh/src/api/liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
use std::{
convert::TryInto,
future::{IntoFuture, Ready},
mem::size_of,
sync::Arc,
time::Duration,
};

use tracing::error;
use zenoh_config::unwrap_or_default;
use zenoh_core::{Resolvable, Resolve, Result as ZResult, Wait};

Expand All @@ -31,6 +33,7 @@ use super::{
subscriber::{Subscriber, SubscriberInner},
Id,
};
use crate::api::session::MaybeWeakSessionRef;

/// A structure with functions to declare a
/// [`LivelinessToken`](LivelinessToken), query
Expand Down Expand Up @@ -251,12 +254,13 @@ impl Wait for LivelinessTokenBuilder<'_, '_> {
fn wait(self) -> <Self as Resolvable>::To {
let session = self.session;
let key_expr = self.key_expr?.into_owned();
let undeclare_on_drop = true;
session
.declare_liveliness_inner(&key_expr)
.map(|tok_state| LivelinessToken {
session,
session: MaybeWeakSessionRef::new(session, !undeclare_on_drop),
state: tok_state,
undeclare_on_drop: true,
undeclare_on_drop,
})
}
}
Expand Down Expand Up @@ -288,7 +292,7 @@ pub(crate) struct LivelinessTokenState {
/// that declared the token has Zenoh connectivity with the Zenoh application
/// that monitors it.
///
/// `LivelinessTokens` are automatically undeclared when dropped.
/// Liveliness tokens are automatically undeclared when dropped.
///
/// # Examples
/// ```no_run
Expand All @@ -307,12 +311,12 @@ pub(crate) struct LivelinessTokenState {
#[zenoh_macros::unstable]
#[derive(Debug)]
pub struct LivelinessToken<'a> {
pub(crate) session: SessionRef<'a>,
pub(crate) state: Arc<LivelinessTokenState>,
session: MaybeWeakSessionRef<'a>,
state: Arc<LivelinessTokenState>,
undeclare_on_drop: bool,
}

/// A [`Resolvable`] returned when undeclaring a [`LivelinessToken`](LivelinessToken).
/// A [`Resolvable`] returned when undeclaring a [`LivelinessToken`].
///
/// # Examples
/// ```
Expand All @@ -332,9 +336,7 @@ pub struct LivelinessToken<'a> {
/// ```
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
#[zenoh_macros::unstable]
pub struct LivelinessTokenUndeclaration<'a> {
token: LivelinessToken<'a>,
}
pub struct LivelinessTokenUndeclaration<'a>(LivelinessToken<'a>);

#[zenoh_macros::unstable]
impl Resolvable for LivelinessTokenUndeclaration<'_> {
Expand All @@ -344,9 +346,7 @@ impl Resolvable for LivelinessTokenUndeclaration<'_> {
#[zenoh_macros::unstable]
impl Wait for LivelinessTokenUndeclaration<'_> {
fn wait(mut self) -> <Self as Resolvable>::To {
// set the flag first to avoid double panic if this function panic
self.token.undeclare_on_drop = false;
self.token.session.undeclare_liveliness(self.token.state.id)
self.0.undeclare_impl()
}
}

Expand All @@ -362,11 +362,7 @@ impl<'a> IntoFuture for LivelinessTokenUndeclaration<'a> {

#[zenoh_macros::unstable]
impl<'a> LivelinessToken<'a> {
/// Undeclare a [`LivelinessToken`].
///
/// LivelinessTokens are automatically closed when dropped,
/// but you may want to use this function to handle errors or
/// undeclare the LivelinessToken asynchronously.
/// Undeclare the [`LivelinessToken`].
///
/// # Examples
/// ```
Expand All @@ -389,29 +385,32 @@ impl<'a> LivelinessToken<'a> {
UndeclarableSealed::undeclare_inner(self, ())
}

/// Keep this liveliness token in background, until the session is closed.
#[inline]
#[zenoh_macros::unstable]
pub fn background(mut self) {
// It's not necessary to undeclare this resource when session close, as other sessions
// will clean all resources related to the closed one.
// So we can just never undeclare it.
fn undeclare_impl(&mut self) -> ZResult<()> {
// set the flag first to avoid double panic if this function panic
self.undeclare_on_drop = false;
match self.session.upgrade() {
Some(session) => session.undeclare_liveliness(self.state.id),
None => Ok(()),
}
}
}

#[zenoh_macros::unstable]
impl<'a> UndeclarableSealed<(), LivelinessTokenUndeclaration<'a>> for LivelinessToken<'a> {
fn undeclare_inner(self, _: ()) -> LivelinessTokenUndeclaration<'a> {
LivelinessTokenUndeclaration { token: self }
impl<'a> UndeclarableSealed<()> for LivelinessToken<'a> {
type Undeclaration = LivelinessTokenUndeclaration<'a>;

fn undeclare_inner(self, _: ()) -> Self::Undeclaration {
LivelinessTokenUndeclaration(self)
}
}

#[zenoh_macros::unstable]
impl Drop for LivelinessToken<'_> {
fn drop(&mut self) {
if self.undeclare_on_drop {
let _ = self.session.undeclare_liveliness(self.state.id);
if let Err(error) = self.undeclare_impl() {
error!(error);
}
}
}
}
Expand Down Expand Up @@ -575,16 +574,22 @@ where
let key_expr = self.key_expr?;
let session = self.session;
let (callback, handler) = self.handler.into_handler();
let undeclare_on_drop = size_of::<Handler::Handler>() > 0;
session
.declare_liveliness_subscriber_inner(&key_expr, Locality::default(), callback)
.map(|sub_state| Subscriber {
subscriber: SubscriberInner {
session,
state: sub_state,
kind: SubscriberKind::LivelinessSubscriber,
undeclare_on_drop: true,
},
handler,
.map(|sub_state| {
Subscriber {
inner: SubscriberInner {
#[cfg(feature = "unstable")]
session_id: session.zid(),
session: MaybeWeakSessionRef::new(session, !undeclare_on_drop),
state: sub_state,
kind: SubscriberKind::LivelinessSubscriber,
// `size_of::<Handler::Handler>() == 0` means callback-only subscriber
undeclare_on_drop,
},
handler,
}
})
}
}
Expand Down
Loading