Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: don't undeclare objects on drop #1295

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions examples/examples/z_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ async fn main() {
println!("Press CTRL-C to undeclare LivelinessToken and quit...");
std::thread::park();

// LivelinessTokens are automatically closed when dropped
// Use the code below to manually undeclare it if needed
token.undeclare().await.unwrap();
}

Expand Down
4 changes: 1 addition & 3 deletions examples/examples/z_sub_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,7 @@ fn main() {
}
})
.wait()
.unwrap()
// Make the subscriber run in background, until the session is closed.
.background();
.unwrap();

println!("Press CTRL-C to quit...");
std::thread::park();
Expand Down
2 changes: 0 additions & 2 deletions zenoh/src/api/builders/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,6 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> {
destination: self.destination,
#[cfg(feature = "unstable")]
matching_listeners: Default::default(),
undeclare_on_drop: true,
})
}
}
Expand Down Expand Up @@ -362,7 +361,6 @@ impl<'a, 'b> Wait for PublisherBuilder<'a, 'b> {
destination: self.destination,
#[cfg(feature = "unstable")]
matching_listeners: Default::default(),
undeclare_on_drop: true,
})
}
}
Expand Down
5 changes: 3 additions & 2 deletions zenoh/src/api/key_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,9 @@ impl<'a> KeyExpr<'a> {
}
}

impl<'a> UndeclarableSealed<&'a Session, KeyExprUndeclaration<'a>> for KeyExpr<'a> {
fn undeclare_inner(self, session: &'a Session) -> KeyExprUndeclaration<'a> {
impl<'a> UndeclarableSealed<&'a Session> for KeyExpr<'a> {
type Res = KeyExprUndeclaration<'a>;
fn undeclare_inner(self, session: &'a Session) -> Self::Res {
KeyExprUndeclaration {
session,
expr: self,
Expand Down
56 changes: 17 additions & 39 deletions zenoh/src/api/liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use super::{
subscriber::{Subscriber, SubscriberInner},
Id,
};
use crate::api::session::WeakSessionRef;

/// A structure with functions to declare a
/// [`LivelinessToken`](LivelinessToken), query
Expand Down Expand Up @@ -254,9 +255,8 @@ impl Wait for LivelinessTokenBuilder<'_, '_> {
session
.declare_liveliness_inner(&key_expr)
.map(|tok_state| LivelinessToken {
session,
session: session.into(),
state: tok_state,
undeclare_on_drop: true,
})
}
}
Expand All @@ -283,13 +283,11 @@ pub(crate) struct LivelinessTokenState {
///
/// A declared liveliness token will be seen as alive by any other Zenoh
/// application in the system that monitors it while the liveliness token
/// is not undeclared or dropped, while the Zenoh application that declared
/// is not undeclared, while the Zenoh application that declared
/// it is alive (didn't stop or crashed) and while the Zenoh application
/// that declared the token has Zenoh connectivity with the Zenoh application
/// that monitors it.
///
/// `LivelinessTokens` are automatically undeclared when dropped.
///
/// # Examples
/// ```no_run
/// # #[tokio::main]
Expand All @@ -307,9 +305,8 @@ pub(crate) struct LivelinessTokenState {
#[zenoh_macros::unstable]
#[derive(Debug)]
pub struct LivelinessToken<'a> {
pub(crate) session: SessionRef<'a>,
pub(crate) session: WeakSessionRef<'a>,
pub(crate) state: Arc<LivelinessTokenState>,
undeclare_on_drop: bool,
}

/// A [`Resolvable`] returned when undeclaring a [`LivelinessToken`](LivelinessToken).
Expand Down Expand Up @@ -343,10 +340,11 @@ impl Resolvable for LivelinessTokenUndeclaration<'_> {

#[zenoh_macros::unstable]
impl Wait for LivelinessTokenUndeclaration<'_> {
fn wait(mut self) -> <Self as Resolvable>::To {
// set the flag first to avoid double panic if this function panic
self.token.undeclare_on_drop = false;
self.token.session.undeclare_liveliness(self.token.state.id)
fn wait(self) -> <Self as Resolvable>::To {
let Some(session) = self.token.session.upgrade() else {
return Ok(());
};
session.undeclare_liveliness(self.token.state.id)
}
}

Expand All @@ -362,11 +360,7 @@ impl<'a> IntoFuture for LivelinessTokenUndeclaration<'a> {

#[zenoh_macros::unstable]
impl<'a> LivelinessToken<'a> {
/// Undeclare a [`LivelinessToken`].
///
/// LivelinessTokens are automatically closed when dropped,
/// but you may want to use this function to handle errors or
/// undeclare the LivelinessToken asynchronously.
/// Undeclare the [`LivelinessToken`].
///
/// # Examples
/// ```
Expand All @@ -388,31 +382,14 @@ impl<'a> LivelinessToken<'a> {
pub fn undeclare(self) -> impl Resolve<ZResult<()>> + 'a {
UndeclarableSealed::undeclare_inner(self, ())
}

/// Keep this liveliness token in background, until the session is closed.
#[inline]
#[zenoh_macros::unstable]
pub fn background(mut self) {
// It's not necessary to undeclare this resource when session close, as other sessions
// will clean all resources related to the closed one.
// So we can just never undeclare it.
self.undeclare_on_drop = false;
}
}

#[zenoh_macros::unstable]
impl<'a> UndeclarableSealed<(), LivelinessTokenUndeclaration<'a>> for LivelinessToken<'a> {
fn undeclare_inner(self, _: ()) -> LivelinessTokenUndeclaration<'a> {
LivelinessTokenUndeclaration { token: self }
}
}
impl<'a> UndeclarableSealed<()> for LivelinessToken<'a> {
type Res = LivelinessTokenUndeclaration<'a>;

#[zenoh_macros::unstable]
impl Drop for LivelinessToken<'_> {
fn drop(&mut self) {
if self.undeclare_on_drop {
let _ = self.session.undeclare_liveliness(self.state.id);
}
fn undeclare_inner(self, _: ()) -> Self::Res {
LivelinessTokenUndeclaration { token: self }
}
}

Expand Down Expand Up @@ -579,10 +556,11 @@ where
.declare_liveliness_subscriber_inner(&key_expr, Locality::default(), callback)
.map(|sub_state| Subscriber {
subscriber: SubscriberInner {
session,
#[cfg(feature = "unstable")]
session_id: session.zid(),
session: session.into(),
state: sub_state,
kind: SubscriberKind::LivelinessSubscriber,
undeclare_on_drop: true,
},
handler,
})
Expand Down
75 changes: 24 additions & 51 deletions zenoh/src/api/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl fmt::Debug for PublisherState {
#[derive(Clone)]
pub enum PublisherRef<'a> {
Borrow(&'a Publisher<'a>),
Shared(std::sync::Arc<Publisher<'static>>),
Shared(Arc<Publisher<'static>>),
}

#[zenoh_macros::unstable]
Expand All @@ -105,8 +105,6 @@ impl std::fmt::Debug for PublisherRef<'_> {

/// A publisher that allows to send data through a stream.
///
/// Publishers are automatically undeclared when dropped.
///
/// # Examples
/// ```
/// # #[tokio::main]
Expand Down Expand Up @@ -146,7 +144,6 @@ pub struct Publisher<'a> {
pub(crate) destination: Locality,
#[cfg(feature = "unstable")]
pub(crate) matching_listeners: Arc<Mutex<HashSet<Id>>>,
pub(crate) undeclare_on_drop: bool,
}

impl<'a> Publisher<'a> {
Expand Down Expand Up @@ -347,7 +344,7 @@ impl<'a> Publisher<'a> {
}
}

/// Undeclares the [`Publisher`], informing the network that it needn't optimize publications for its key expression anymore.
/// Undeclare the [`Publisher`], informing the network that it needn't optimize publications for its key expression anymore.
///
/// # Examples
/// ```
Expand Down Expand Up @@ -462,8 +459,10 @@ impl PublisherDeclarations for std::sync::Arc<Publisher<'static>> {
}
}

impl<'a> UndeclarableSealed<(), PublisherUndeclaration<'a>> for Publisher<'a> {
fn undeclare_inner(self, _: ()) -> PublisherUndeclaration<'a> {
impl<'a> UndeclarableSealed<()> for Publisher<'a> {
type Res = PublisherUndeclaration<'a>;

fn undeclare_inner(self, _: ()) -> Self::Res {
PublisherUndeclaration { publisher: self }
}
}
Expand Down Expand Up @@ -491,9 +490,7 @@ impl Resolvable for PublisherUndeclaration<'_> {
}

impl Wait for PublisherUndeclaration<'_> {
fn wait(mut self) -> <Self as Resolvable>::To {
// set the flag first to avoid double panic if this function panic
self.publisher.undeclare_on_drop = false;
fn wait(self) -> <Self as Resolvable>::To {
#[cfg(feature = "unstable")]
self.publisher.undeclare_matching_listeners()?;
self.publisher
Expand All @@ -513,11 +510,8 @@ impl IntoFuture for PublisherUndeclaration<'_> {

impl Drop for Publisher<'_> {
fn drop(&mut self) {
if self.undeclare_on_drop {
#[cfg(feature = "unstable")]
let _ = self.undeclare_matching_listeners();
let _ = self.session.undeclare_publisher_inner(self.id);
}
#[cfg(feature = "unstable")]
let _ = self.undeclare_matching_listeners();
}
}

Expand Down Expand Up @@ -922,7 +916,6 @@ where
listener: MatchingListenerInner {
publisher: self.publisher,
state,
undeclare_on_drop: true,
},
receiver,
})
Expand All @@ -947,7 +940,7 @@ where
#[zenoh_macros::unstable]
pub(crate) struct MatchingListenerState {
pub(crate) id: Id,
pub(crate) current: std::sync::Mutex<bool>,
pub(crate) current: Mutex<bool>,
pub(crate) key_expr: KeyExpr<'static>,
pub(crate) destination: Locality,
pub(crate) callback: Callback<'static, MatchingStatus>,
Expand All @@ -966,8 +959,7 @@ impl std::fmt::Debug for MatchingListenerState {
#[zenoh_macros::unstable]
pub(crate) struct MatchingListenerInner<'a> {
pub(crate) publisher: PublisherRef<'a>,
pub(crate) state: std::sync::Arc<MatchingListenerState>,
undeclare_on_drop: bool,
pub(crate) state: Arc<MatchingListenerState>,
}

#[zenoh_macros::unstable]
Expand All @@ -979,15 +971,20 @@ impl<'a> MatchingListenerInner<'a> {
}

#[zenoh_macros::unstable]
impl<'a> UndeclarableSealed<(), MatchingListenerUndeclaration<'a>> for MatchingListenerInner<'a> {
fn undeclare_inner(self, _: ()) -> MatchingListenerUndeclaration<'a> {
impl<'a> UndeclarableSealed<()> for MatchingListenerInner<'a> {
type Res = MatchingListenerUndeclaration<'a>;

fn undeclare_inner(self, _: ()) -> Self::Res {
MatchingListenerUndeclaration { subscriber: self }
}
}

/// A listener that sends notifications when the [`MatchingStatus`] of a
/// publisher changes.
///
/// Matching litsteners run in background until the publisher is undeclared.
/// They can be manually undeclared, but will not be undeclared on drop.
///
/// # Examples
/// ```no_run
/// # #[tokio::main]
Expand All @@ -1014,10 +1011,7 @@ pub struct MatchingListener<'a, Receiver> {

#[zenoh_macros::unstable]
impl<'a, Receiver> MatchingListener<'a, Receiver> {
/// Close a [`MatchingListener`].
///
/// MatchingListeners are automatically closed when dropped, but you may want to use this function to handle errors or
/// close the MatchingListener asynchronously.
/// Undeclare the [`MatchingListener`].
///
/// # Examples
/// ```
Expand All @@ -1035,19 +1029,13 @@ impl<'a, Receiver> MatchingListener<'a, Receiver> {
pub fn undeclare(self) -> MatchingListenerUndeclaration<'a> {
self.listener.undeclare()
}

/// Make the matching listener run in background, until the publisher is undeclared.
#[inline]
#[zenoh_macros::unstable]
pub fn background(mut self) {
// The matching listener will be undeclared as part of publisher undeclaration.
self.listener.undeclare_on_drop = false;
}
}

#[zenoh_macros::unstable]
impl<'a, T> UndeclarableSealed<(), MatchingListenerUndeclaration<'a>> for MatchingListener<'a, T> {
fn undeclare_inner(self, _: ()) -> MatchingListenerUndeclaration<'a> {
impl<'a, T> UndeclarableSealed<()> for MatchingListener<'a, T> {
type Res = MatchingListenerUndeclaration<'a>;

fn undeclare_inner(self, _: ()) -> Self::Res {
UndeclarableSealed::undeclare_inner(self.listener, ())
}
}
Expand Down Expand Up @@ -1079,9 +1067,7 @@ impl Resolvable for MatchingListenerUndeclaration<'_> {

#[zenoh_macros::unstable]
impl Wait for MatchingListenerUndeclaration<'_> {
fn wait(mut self) -> <Self as Resolvable>::To {
// set the flag first to avoid double panic if this function panic
self.subscriber.undeclare_on_drop = false;
fn wait(self) -> <Self as Resolvable>::To {
zlock!(self.subscriber.publisher.matching_listeners).remove(&self.subscriber.state.id);
self.subscriber
.publisher
Expand All @@ -1100,19 +1086,6 @@ impl IntoFuture for MatchingListenerUndeclaration<'_> {
}
}

#[zenoh_macros::unstable]
impl Drop for MatchingListenerInner<'_> {
fn drop(&mut self) {
if self.undeclare_on_drop {
zlock!(self.publisher.matching_listeners).remove(&self.state.id);
let _ = self
.publisher
.session
.undeclare_matches_listener_inner(self.state.id);
}
}
}

#[cfg(test)]
mod tests {
use zenoh_config::Config;
Expand Down
Loading