From 31ae67663ee2df407b017f6ab0916719280c4cd6 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Mon, 5 Aug 2024 08:10:30 +0200 Subject: [PATCH] feat: don't undeclare objects on drop --- examples/examples/z_liveliness.rs | 2 - examples/examples/z_sub_thr.rs | 4 +- zenoh/src/api/builders/publisher.rs | 2 - zenoh/src/api/key_expr.rs | 5 +- zenoh/src/api/liveliness.rs | 54 ++++------- zenoh/src/api/publisher.rs | 75 +++++---------- zenoh/src/api/queryable.rs | 143 ++++++++++++---------------- zenoh/src/api/session.rs | 98 +++++++++++++------ zenoh/src/api/subscriber.rs | 95 ++++++------------ zenoh/src/lib.rs | 4 +- zenoh/src/prelude.rs | 2 +- 11 files changed, 208 insertions(+), 276 deletions(-) diff --git a/examples/examples/z_liveliness.rs b/examples/examples/z_liveliness.rs index bf8890a267..6216050885 100644 --- a/examples/examples/z_liveliness.rs +++ b/examples/examples/z_liveliness.rs @@ -31,8 +31,6 @@ async fn main() { println!("Press CTRL-C to undeclare LivelinessToken and quit..."); std::thread::park(); - // LivelinessTokens are automatically closed when dropped - // Use the code below to manually undeclare it if needed token.undeclare().await.unwrap(); } diff --git a/examples/examples/z_sub_thr.rs b/examples/examples/z_sub_thr.rs index 78626d1d1d..460a8dfe73 100644 --- a/examples/examples/z_sub_thr.rs +++ b/examples/examples/z_sub_thr.rs @@ -87,9 +87,7 @@ fn main() { } }) .wait() - .unwrap() - // Make the subscriber run in background, until the session is closed. - .background(); + .unwrap(); println!("Press CTRL-C to quit..."); std::thread::park(); diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index 666b4378e0..24e0187ba5 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -307,7 +307,6 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { destination: self.destination, #[cfg(feature = "unstable")] matching_listeners: Default::default(), - undeclare_on_drop: true, }) } } @@ -362,7 +361,6 @@ impl<'a, 'b> Wait for PublisherBuilder<'a, 'b> { destination: self.destination, #[cfg(feature = "unstable")] matching_listeners: Default::default(), - undeclare_on_drop: true, }) } } diff --git a/zenoh/src/api/key_expr.rs b/zenoh/src/api/key_expr.rs index fc472e0db3..9dd7e6605c 100644 --- a/zenoh/src/api/key_expr.rs +++ b/zenoh/src/api/key_expr.rs @@ -549,8 +549,9 @@ impl<'a> KeyExpr<'a> { } } -impl<'a> UndeclarableSealed<&'a Session, KeyExprUndeclaration<'a>> for KeyExpr<'a> { - fn undeclare_inner(self, session: &'a Session) -> KeyExprUndeclaration<'a> { +impl<'a> UndeclarableSealed<&'a Session> for KeyExpr<'a> { + type Res = KeyExprUndeclaration<'a>; + fn undeclare_inner(self, session: &'a Session) -> Self::Res { KeyExprUndeclaration { session, expr: self, diff --git a/zenoh/src/api/liveliness.rs b/zenoh/src/api/liveliness.rs index 64f87c6de5..1f631ae51e 100644 --- a/zenoh/src/api/liveliness.rs +++ b/zenoh/src/api/liveliness.rs @@ -31,6 +31,7 @@ use super::{ subscriber::{Subscriber, SubscriberInner}, Id, }; +use crate::api::session::WeakSessionRef; /// A structure with functions to declare a /// [`LivelinessToken`](LivelinessToken), query @@ -254,9 +255,8 @@ impl Wait for LivelinessTokenBuilder<'_, '_> { session .declare_liveliness_inner(&key_expr) .map(|tok_state| LivelinessToken { - session, + session: session.into(), state: tok_state, - undeclare_on_drop: true, }) } } @@ -283,13 +283,11 @@ pub(crate) struct LivelinessTokenState { /// /// A declared liveliness token will be seen as alive by any other Zenoh /// application in the system that monitors it while the liveliness token -/// is not undeclared or dropped, while the Zenoh application that declared +/// is not undeclared, while the Zenoh application that declared /// it is alive (didn't stop or crashed) and while the Zenoh application /// that declared the token has Zenoh connectivity with the Zenoh application /// that monitors it. /// -/// `LivelinessTokens` are automatically undeclared when dropped. -/// /// # Examples /// ```no_run /// # #[tokio::main] @@ -307,9 +305,8 @@ pub(crate) struct LivelinessTokenState { #[zenoh_macros::unstable] #[derive(Debug)] pub struct LivelinessToken<'a> { - pub(crate) session: SessionRef<'a>, + pub(crate) session: WeakSessionRef<'a>, pub(crate) state: Arc, - undeclare_on_drop: bool, } /// A [`Resolvable`] returned when undeclaring a [`LivelinessToken`](LivelinessToken). @@ -344,9 +341,10 @@ impl Resolvable for LivelinessTokenUndeclaration<'_> { #[zenoh_macros::unstable] impl Wait for LivelinessTokenUndeclaration<'_> { fn wait(mut self) -> ::To { - // set the flag first to avoid double panic if this function panic - self.token.undeclare_on_drop = false; - self.token.session.undeclare_liveliness(self.token.state.id) + let Some(session) = self.token.session.upgrade() else { + return Ok(()); + }; + session.undeclare_liveliness(self.token.state.id) } } @@ -362,11 +360,7 @@ impl<'a> IntoFuture for LivelinessTokenUndeclaration<'a> { #[zenoh_macros::unstable] impl<'a> LivelinessToken<'a> { - /// Undeclare a [`LivelinessToken`]. - /// - /// LivelinessTokens are automatically closed when dropped, - /// but you may want to use this function to handle errors or - /// undeclare the LivelinessToken asynchronously. + /// Undeclare the [`LivelinessToken`]. /// /// # Examples /// ``` @@ -388,31 +382,14 @@ impl<'a> LivelinessToken<'a> { pub fn undeclare(self) -> impl Resolve> + 'a { UndeclarableSealed::undeclare_inner(self, ()) } - - /// Keep this liveliness token in background, until the session is closed. - #[inline] - #[zenoh_macros::unstable] - pub fn background(mut self) { - // It's not necessary to undeclare this resource when session close, as other sessions - // will clean all resources related to the closed one. - // So we can just never undeclare it. - self.undeclare_on_drop = false; - } } #[zenoh_macros::unstable] -impl<'a> UndeclarableSealed<(), LivelinessTokenUndeclaration<'a>> for LivelinessToken<'a> { - fn undeclare_inner(self, _: ()) -> LivelinessTokenUndeclaration<'a> { - LivelinessTokenUndeclaration { token: self } - } -} +impl<'a> UndeclarableSealed<()> for LivelinessToken<'a> { + type Res = LivelinessTokenUndeclaration<'a>; -#[zenoh_macros::unstable] -impl Drop for LivelinessToken<'_> { - fn drop(&mut self) { - if self.undeclare_on_drop { - let _ = self.session.undeclare_liveliness(self.state.id); - } + fn undeclare_inner(self, _: ()) -> Self::Res { + LivelinessTokenUndeclaration { token: self } } } @@ -579,10 +556,11 @@ where .declare_liveliness_subscriber_inner(&key_expr, Locality::default(), callback) .map(|sub_state| Subscriber { subscriber: SubscriberInner { - session, + #[cfg(feature = "unstable")] + session_id: session.zid(), + session: session.into(), state: sub_state, kind: SubscriberKind::LivelinessSubscriber, - undeclare_on_drop: true, }, handler, }) diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index c8e0ace03e..5393c56593 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -78,7 +78,7 @@ impl fmt::Debug for PublisherState { #[derive(Clone)] pub enum PublisherRef<'a> { Borrow(&'a Publisher<'a>), - Shared(std::sync::Arc>), + Shared(Arc>), } #[zenoh_macros::unstable] @@ -105,8 +105,6 @@ impl std::fmt::Debug for PublisherRef<'_> { /// A publisher that allows to send data through a stream. /// -/// Publishers are automatically undeclared when dropped. -/// /// # Examples /// ``` /// # #[tokio::main] @@ -146,7 +144,6 @@ pub struct Publisher<'a> { pub(crate) destination: Locality, #[cfg(feature = "unstable")] pub(crate) matching_listeners: Arc>>, - pub(crate) undeclare_on_drop: bool, } impl<'a> Publisher<'a> { @@ -347,7 +344,7 @@ impl<'a> Publisher<'a> { } } - /// Undeclares the [`Publisher`], informing the network that it needn't optimize publications for its key expression anymore. + /// Undeclare the [`Publisher`], informing the network that it needn't optimize publications for its key expression anymore. /// /// # Examples /// ``` @@ -462,8 +459,10 @@ impl PublisherDeclarations for std::sync::Arc> { } } -impl<'a> UndeclarableSealed<(), PublisherUndeclaration<'a>> for Publisher<'a> { - fn undeclare_inner(self, _: ()) -> PublisherUndeclaration<'a> { +impl<'a> UndeclarableSealed<()> for Publisher<'a> { + type Res = PublisherUndeclaration<'a>; + + fn undeclare_inner(self, _: ()) -> Self::Res { PublisherUndeclaration { publisher: self } } } @@ -491,9 +490,7 @@ impl Resolvable for PublisherUndeclaration<'_> { } impl Wait for PublisherUndeclaration<'_> { - fn wait(mut self) -> ::To { - // set the flag first to avoid double panic if this function panic - self.publisher.undeclare_on_drop = false; + fn wait(self) -> ::To { #[cfg(feature = "unstable")] self.publisher.undeclare_matching_listeners()?; self.publisher @@ -513,11 +510,8 @@ impl IntoFuture for PublisherUndeclaration<'_> { impl Drop for Publisher<'_> { fn drop(&mut self) { - if self.undeclare_on_drop { - #[cfg(feature = "unstable")] - let _ = self.undeclare_matching_listeners(); - let _ = self.session.undeclare_publisher_inner(self.id); - } + #[cfg(feature = "unstable")] + let _ = self.undeclare_matching_listeners(); } } @@ -922,7 +916,6 @@ where listener: MatchingListenerInner { publisher: self.publisher, state, - undeclare_on_drop: true, }, receiver, }) @@ -947,7 +940,7 @@ where #[zenoh_macros::unstable] pub(crate) struct MatchingListenerState { pub(crate) id: Id, - pub(crate) current: std::sync::Mutex, + pub(crate) current: Mutex, pub(crate) key_expr: KeyExpr<'static>, pub(crate) destination: Locality, pub(crate) callback: Callback<'static, MatchingStatus>, @@ -966,8 +959,7 @@ impl std::fmt::Debug for MatchingListenerState { #[zenoh_macros::unstable] pub(crate) struct MatchingListenerInner<'a> { pub(crate) publisher: PublisherRef<'a>, - pub(crate) state: std::sync::Arc, - undeclare_on_drop: bool, + pub(crate) state: Arc, } #[zenoh_macros::unstable] @@ -979,8 +971,10 @@ impl<'a> MatchingListenerInner<'a> { } #[zenoh_macros::unstable] -impl<'a> UndeclarableSealed<(), MatchingListenerUndeclaration<'a>> for MatchingListenerInner<'a> { - fn undeclare_inner(self, _: ()) -> MatchingListenerUndeclaration<'a> { +impl<'a> UndeclarableSealed<()> for MatchingListenerInner<'a> { + type Res = MatchingListenerUndeclaration<'a>; + + fn undeclare_inner(self, _: ()) -> Self::Res { MatchingListenerUndeclaration { subscriber: self } } } @@ -988,6 +982,9 @@ impl<'a> UndeclarableSealed<(), MatchingListenerUndeclaration<'a>> for MatchingL /// A listener that sends notifications when the [`MatchingStatus`] of a /// publisher changes. /// +/// Matching litsteners run in background until the publisher is undeclared. +/// They can be manually undeclared, but will not be undeclared on drop. +/// /// # Examples /// ```no_run /// # #[tokio::main] @@ -1014,10 +1011,7 @@ pub struct MatchingListener<'a, Receiver> { #[zenoh_macros::unstable] impl<'a, Receiver> MatchingListener<'a, Receiver> { - /// Close a [`MatchingListener`]. - /// - /// MatchingListeners are automatically closed when dropped, but you may want to use this function to handle errors or - /// close the MatchingListener asynchronously. + /// Undeclare the [`MatchingListener`]. /// /// # Examples /// ``` @@ -1035,19 +1029,13 @@ impl<'a, Receiver> MatchingListener<'a, Receiver> { pub fn undeclare(self) -> MatchingListenerUndeclaration<'a> { self.listener.undeclare() } - - /// Make the matching listener run in background, until the publisher is undeclared. - #[inline] - #[zenoh_macros::unstable] - pub fn background(mut self) { - // The matching listener will be undeclared as part of publisher undeclaration. - self.listener.undeclare_on_drop = false; - } } #[zenoh_macros::unstable] -impl<'a, T> UndeclarableSealed<(), MatchingListenerUndeclaration<'a>> for MatchingListener<'a, T> { - fn undeclare_inner(self, _: ()) -> MatchingListenerUndeclaration<'a> { +impl<'a, T> UndeclarableSealed<()> for MatchingListener<'a, T> { + type Res = MatchingListenerUndeclaration<'a>; + + fn undeclare_inner(self, _: ()) -> Self::Res { UndeclarableSealed::undeclare_inner(self.listener, ()) } } @@ -1079,9 +1067,7 @@ impl Resolvable for MatchingListenerUndeclaration<'_> { #[zenoh_macros::unstable] impl Wait for MatchingListenerUndeclaration<'_> { - fn wait(mut self) -> ::To { - // set the flag first to avoid double panic if this function panic - self.subscriber.undeclare_on_drop = false; + fn wait(self) -> ::To { zlock!(self.subscriber.publisher.matching_listeners).remove(&self.subscriber.state.id); self.subscriber .publisher @@ -1100,19 +1086,6 @@ impl IntoFuture for MatchingListenerUndeclaration<'_> { } } -#[zenoh_macros::unstable] -impl Drop for MatchingListenerInner<'_> { - fn drop(&mut self) { - if self.undeclare_on_drop { - zlock!(self.publisher.matching_listeners).remove(&self.state.id); - let _ = self - .publisher - .session - .undeclare_matches_listener_inner(self.state.id); - } - } -} - #[cfg(test)] mod tests { use zenoh_config::Config; diff --git a/zenoh/src/api/queryable.rs b/zenoh/src/api/queryable.rs index 61ae0093ea..db2275e63c 100644 --- a/zenoh/src/api/queryable.rs +++ b/zenoh/src/api/queryable.rs @@ -28,30 +28,32 @@ use zenoh_protocol::{ use zenoh_result::ZResult; #[zenoh_macros::unstable] use { - super::{query::ReplyKeyExpr, sample::SourceInfo}, - zenoh_config::wrappers::EntityGlobalId, + crate::api::{query::ReplyKeyExpr, sample::SourceInfo}, + zenoh_config::wrappers::{EntityGlobalId, ZenohId}, zenoh_protocol::core::EntityGlobalIdProto, }; #[zenoh_macros::unstable] -use super::selector::ZenohParameters; -use super::{ - builders::sample::{ - EncodingBuilderTrait, QoSBuilderTrait, SampleBuilder, SampleBuilderTrait, - TimestampBuilderTrait, +use crate::api::selector::ZenohParameters; +use crate::{ + api::{ + builders::sample::{ + EncodingBuilderTrait, QoSBuilderTrait, SampleBuilder, SampleBuilderTrait, + TimestampBuilderTrait, + }, + bytes::{OptionZBytes, ZBytes}, + encoding::Encoding, + handlers::{locked, DefaultHandler, IntoHandler}, + key_expr::KeyExpr, + publisher::Priority, + sample::{Locality, QoSBuilder, Sample, SampleKind}, + selector::Selector, + session::{SessionRef, UndeclarableSealed, WeakSessionRef}, + value::Value, + Id, }, - bytes::{OptionZBytes, ZBytes}, - encoding::Encoding, - handlers::{locked, DefaultHandler, IntoHandler}, - key_expr::KeyExpr, - publisher::Priority, - sample::{Locality, QoSBuilder, Sample, SampleKind}, - selector::Selector, - session::{SessionRef, UndeclarableSealed}, - value::Value, - Id, + net::primitives::Primitives, }; -use crate::net::primitives::Primitives; pub(crate) struct QueryInner { pub(crate) key_expr: KeyExpr<'static>, @@ -534,41 +536,18 @@ impl fmt::Debug for QueryableState { } } -/// An entity able to reply to queries through a callback. -/// -/// CallbackQueryables can be created from a zenoh [`Session`](crate::Session) -/// with the [`declare_queryable`](crate::Session::declare_queryable) function -/// and the [`callback`](QueryableBuilder::callback) function -/// of the resulting builder. -/// -/// Queryables are automatically undeclared when dropped. -/// -/// # Examples -/// ```no_run -/// # #[tokio::main] -/// # async fn main() { -/// use futures::prelude::*; -/// use zenoh::prelude::*; -/// -/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); -/// let queryable = session.declare_queryable("key/expression").await.unwrap(); -/// while let Ok(query) = queryable.recv_async().await { -/// println!(">> Handling query '{}'", query.selector()); -/// query.reply("key/expression", "value") -/// .await -/// .unwrap(); -/// } -/// # } -/// ``` #[derive(Debug)] -pub(crate) struct CallbackQueryable<'a> { - pub(crate) session: SessionRef<'a>, +pub(crate) struct QueryableInner<'a> { + #[cfg(feature = "unstable")] + pub(crate) session_id: ZenohId, + pub(crate) session: WeakSessionRef<'a>, pub(crate) state: Arc, - undeclare_on_drop: bool, } -impl<'a> UndeclarableSealed<(), QueryableUndeclaration<'a>> for CallbackQueryable<'a> { - fn undeclare_inner(self, _: ()) -> QueryableUndeclaration<'a> { +impl<'a> UndeclarableSealed<()> for QueryableInner<'a> { + type Res = QueryableUndeclaration<'a>; + + fn undeclare_inner(self, _: ()) -> Self::Res { QueryableUndeclaration { queryable: self } } } @@ -588,7 +567,7 @@ impl<'a> UndeclarableSealed<(), QueryableUndeclaration<'a>> for CallbackQueryabl /// ``` #[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] pub struct QueryableUndeclaration<'a> { - queryable: CallbackQueryable<'a>, + queryable: QueryableInner<'a>, } impl Resolvable for QueryableUndeclaration<'_> { @@ -596,12 +575,11 @@ impl Resolvable for QueryableUndeclaration<'_> { } impl Wait for QueryableUndeclaration<'_> { - fn wait(mut self) -> ::To { - // set the flag first to avoid double panic if this function panic - self.queryable.undeclare_on_drop = false; - self.queryable - .session - .close_queryable(self.queryable.state.id) + fn wait(self) -> ::To { + let Some(session) = self.queryable.session.upgrade() else { + return Ok(()); + }; + session.close_queryable(self.queryable.state.id) } } @@ -614,14 +592,6 @@ impl<'a> IntoFuture for QueryableUndeclaration<'a> { } } -impl Drop for CallbackQueryable<'_> { - fn drop(&mut self) { - if self.undeclare_on_drop { - let _ = self.session.close_queryable(self.state.id); - } - } -} - /// A builder for initializing a [`Queryable`]. /// /// # Examples @@ -778,7 +748,8 @@ impl<'a, 'b, Handler> QueryableBuilder<'a, 'b, Handler> { /// and the [`with`](QueryableBuilder::with) function /// of the resulting builder. /// -/// Queryables are automatically undeclared when dropped. +/// Queryables run in background until the session is closed. +/// They can be manually undeclared, but will not be undeclared on drop. /// /// # Examples /// ```no_run @@ -803,7 +774,7 @@ impl<'a, 'b, Handler> QueryableBuilder<'a, 'b, Handler> { #[non_exhaustive] #[derive(Debug)] pub struct Queryable<'a, Handler> { - pub(crate) queryable: CallbackQueryable<'a>, + pub(crate) queryable: QueryableInner<'a>, pub(crate) handler: Handler, } @@ -826,7 +797,7 @@ impl<'a, Handler> Queryable<'a, Handler> { #[zenoh_macros::unstable] pub fn id(&self) -> EntityGlobalId { EntityGlobalIdProto { - zid: self.queryable.session.zid().into(), + zid: self.queryable.session_id.into(), eid: self.queryable.state.id, } .into() @@ -846,24 +817,31 @@ impl<'a, Handler> Queryable<'a, Handler> { &mut self.handler } + /// Undeclare the [`Queryable`]. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use zenoh::prelude::*; + /// + /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let queryable = session.declare_queryable("key/expression") + /// .await + /// .unwrap(); + /// queryable.undeclare().await.unwrap(); + /// # } + /// ``` #[inline] pub fn undeclare(self) -> impl Resolve> + 'a { UndeclarableSealed::undeclare_inner(self, ()) } - - /// Make the queryable run in background, until the session is closed. - #[inline] - #[zenoh_macros::unstable] - pub fn background(mut self) { - // It's not necessary to undeclare this resource when session close, as other sessions - // will clean all resources related to the closed one. - // So we can just never undeclare it. - self.queryable.undeclare_on_drop = false; - } } -impl<'a, T> UndeclarableSealed<(), QueryableUndeclaration<'a>> for Queryable<'a, T> { - fn undeclare_inner(self, _: ()) -> QueryableUndeclaration<'a> { +impl<'a, T> UndeclarableSealed<()> for Queryable<'a, T> { + type Res = QueryableUndeclaration<'a>; + + fn undeclare_inner(self, _: ()) -> Self::Res { UndeclarableSealed::undeclare_inner(self.queryable, ()) } } @@ -906,10 +884,11 @@ where callback, ) .map(|qable_state| Queryable { - queryable: CallbackQueryable { - session, + queryable: QueryableInner { + #[cfg(feature = "unstable")] + session_id: session.zid(), + session: session.into(), state: qable_state, - undeclare_on_drop: true, }, handler: receiver, }) diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 451c1340ad..99b3036ff6 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -16,10 +16,11 @@ use std::{ convert::TryInto, fmt, future::{IntoFuture, Ready}, - ops::Deref, + mem::ManuallyDrop, + ops::{Deref, DerefMut}, sync::{ atomic::{AtomicU16, Ordering}, - Arc, RwLock, + Arc, RwLock, Weak, }, time::{Duration, SystemTime, UNIX_EPOCH}, }; @@ -451,40 +452,88 @@ impl fmt::Debug for SessionRef<'_> { } } -pub(crate) trait UndeclarableSealed> -where - O: Resolve + Send, -{ - fn undeclare_inner(self, session: S) -> O; +#[derive(Debug, Clone)] +pub(crate) enum WeakSessionRef<'a> { + Borrow(&'a Session), + Shared(Weak), +} + +impl<'a> WeakSessionRef<'a> { + pub(crate) fn upgrade(&self) -> Option> { + match self { + Self::Borrow(s) => Some(SessionRef::Borrow(s)), + Self::Shared(s) => s.upgrade().map(SessionRef::Shared), + } + } +} + +impl<'a> From> for WeakSessionRef<'a> { + fn from(value: SessionRef<'a>) -> Self { + match value { + SessionRef::Borrow(s) => Self::Borrow(s), + SessionRef::Shared(s) => Self::Shared(Arc::downgrade(&s)), + } + } +} + +/// A trait implemented by types that can be undeclared. +pub trait UndeclarableSealed { + type Res: Resolve<()> + Send; + fn undeclare_inner(self, session: S) -> Self::Res; } -impl<'a, O, T, G> UndeclarableSealed<&'a Session, O, T> for G +impl<'a, T> UndeclarableSealed<&'a Session> for T where - O: Resolve + Send, - G: UndeclarableSealed<(), O, T>, + T: UndeclarableSealed<()>, { - fn undeclare_inner(self, _: &'a Session) -> O { + type Res = >::Res; + + fn undeclare_inner(self, _: &'a Session) -> Self::Res { self.undeclare_inner(()) } } +#[derive(Debug, Clone)] +struct UndeclareOnDrop(ManuallyDrop); + +impl Deref for UndeclareOnDrop { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for UndeclareOnDrop { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl Drop for UndeclareOnDrop { + fn drop(&mut self) { + // SAFETY: field is never reused, as this code is executed in drop + unsafe { ManuallyDrop::take(&mut self.0) }.undeclare_inner(()) + } +} + // NOTE: `UndeclarableInner` is only pub(crate) to hide the `undeclare_inner` method. So we don't // care about the `private_bounds` lint in this particular case. #[allow(private_bounds)] /// A trait implemented by types that can be undeclared. -pub trait Undeclarable: UndeclarableSealed -where - O: Resolve + Send, -{ -} +pub trait Undeclarable: UndeclarableSealed {} -impl Undeclarable for U -where - O: Resolve + Send, - U: UndeclarableSealed, -{ +impl Undeclarable for T where T: UndeclarableSealed {} + +/// Extension trait providing [`undeclare_on_drop`](Self::undeclare_on_drop) method +pub trait UndeclarableExt: Undeclarable + Sized { + fn undeclare_on_drop(self) -> UndeclareOnDrop { + UndeclareOnDrop(ManuallyDrop::new(self)) + } } +impl UndeclarableExt for T {} + /// A zenoh session. /// pub struct Session { @@ -770,13 +819,6 @@ impl Session { >>::Error: Into, { let key_expr: ZResult = key_expr.try_into().map_err(Into::into); - self._declare_keyexpr(key_expr) - } - - fn _declare_keyexpr<'a, 'b: 'a>( - &'a self, - key_expr: ZResult>, - ) -> impl Resolve>> + 'a { let sid = self.id; ResolveClosure::new(move || { let key_expr: KeyExpr = key_expr?; diff --git a/zenoh/src/api/subscriber.rs b/zenoh/src/api/subscriber.rs index 0e82a20331..854424882c 100644 --- a/zenoh/src/api/subscriber.rs +++ b/zenoh/src/api/subscriber.rs @@ -23,13 +23,16 @@ use zenoh_core::{Resolvable, Wait}; use zenoh_protocol::network::declare::subscriber::ext::SubscriberInfo; use zenoh_result::ZResult; #[cfg(feature = "unstable")] -use {zenoh_config::wrappers::EntityGlobalId, zenoh_protocol::core::EntityGlobalIdProto}; +use { + zenoh_config::wrappers::{EntityGlobalId, ZenohId}, + zenoh_protocol::core::EntityGlobalIdProto, +}; -use super::{ +use crate::api::{ handlers::{locked, Callback, DefaultHandler, IntoHandler}, key_expr::KeyExpr, sample::{Locality, Sample}, - session::{SessionRef, UndeclarableSealed}, + session::{SessionRef, UndeclarableSealed, WeakSessionRef}, Id, }; #[cfg(feature = "unstable")] @@ -59,7 +62,8 @@ impl fmt::Debug for SubscriberState { /// and the [`callback`](SubscriberBuilder::callback) function /// of the resulting builder. /// -/// Subscribers are automatically undeclared when dropped. +/// Subscribers run in background until the session is closed. +/// They can be manually undeclared, but will not be undeclared on drop. /// /// # Examples /// ``` @@ -77,42 +81,24 @@ impl fmt::Debug for SubscriberState { /// ``` #[derive(Debug)] pub(crate) struct SubscriberInner<'a> { - pub(crate) session: SessionRef<'a>, + #[cfg(feature = "unstable")] + pub(crate) session_id: ZenohId, + pub(crate) session: WeakSessionRef<'a>, pub(crate) state: Arc, pub(crate) kind: SubscriberKind, - pub(crate) undeclare_on_drop: bool, } impl<'a> SubscriberInner<'a> { - /// Close a [`CallbackSubscriber`](CallbackSubscriber). - /// - /// `CallbackSubscribers` are automatically closed when dropped, but you may want to use this function to handle errors or - /// close the `CallbackSubscriber` asynchronously. - /// - /// # Examples - /// ``` - /// # #[tokio::main] - /// # async fn main() { - /// use zenoh::{prelude::*, sample::Sample}; - /// - /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); - /// # fn data_handler(_sample: Sample) { }; - /// let subscriber = session - /// .declare_subscriber("key/expression") - /// .callback(data_handler) - /// .await - /// .unwrap(); - /// subscriber.undeclare().await.unwrap(); - /// # } - /// ``` #[inline] pub fn undeclare(self) -> SubscriberUndeclaration<'a> { UndeclarableSealed::undeclare_inner(self, ()) } } -impl<'a> UndeclarableSealed<(), SubscriberUndeclaration<'a>> for SubscriberInner<'a> { - fn undeclare_inner(self, _: ()) -> SubscriberUndeclaration<'a> { +impl<'a> UndeclarableSealed<()> for SubscriberInner<'a> { + type Res = SubscriberUndeclaration<'a>; + + fn undeclare_inner(self, _: ()) -> Self::Res { SubscriberUndeclaration { subscriber: self } } } @@ -143,12 +129,11 @@ impl Resolvable for SubscriberUndeclaration<'_> { } impl Wait for SubscriberUndeclaration<'_> { - fn wait(mut self) -> ::To { - // set the flag first to avoid double panic if this function panic - self.subscriber.undeclare_on_drop = false; - self.subscriber - .session - .undeclare_subscriber_inner(self.subscriber.state.id, self.subscriber.kind) + fn wait(self) -> ::To { + let Some(session) = self.subscriber.session.upgrade() else { + return Ok(()); + }; + session.undeclare_subscriber_inner(self.subscriber.state.id, self.subscriber.kind) } } @@ -161,16 +146,6 @@ impl IntoFuture for SubscriberUndeclaration<'_> { } } -impl Drop for SubscriberInner<'_> { - fn drop(&mut self) { - if self.undeclare_on_drop { - let _ = self - .session - .undeclare_subscriber_inner(self.state.id, self.kind); - } - } -} - /// A builder for initializing a [`FlumeSubscriber`]. /// /// # Examples @@ -395,10 +370,11 @@ where ) .map(|sub_state| Subscriber { subscriber: SubscriberInner { - session, + #[cfg(feature = "unstable")] + session_id: session.zid(), + session: session.into(), state: sub_state, kind: SubscriberKind::Subscriber, - undeclare_on_drop: true, }, handler: receiver, }) @@ -425,8 +401,6 @@ where /// and the [`with`](SubscriberBuilder::with) function /// of the resulting builder. /// -/// Subscribers are automatically undeclared when dropped. -/// /// # Examples /// ```no_run /// # #[tokio::main] @@ -470,7 +444,7 @@ impl<'a, Handler> Subscriber<'a, Handler> { #[zenoh_macros::unstable] pub fn id(&self) -> EntityGlobalId { EntityGlobalIdProto { - zid: self.subscriber.session.zid().into(), + zid: self.subscriber.session_id.into(), eid: self.subscriber.state.id, } .into() @@ -495,10 +469,7 @@ impl<'a, Handler> Subscriber<'a, Handler> { &mut self.handler } - /// Close a [`Subscriber`]. - /// - /// Subscribers are automatically closed when dropped, but you may want to use this function to handle errors or - /// close the Subscriber asynchronously. + /// Undeclare the [`Subscriber`]. /// /// # Examples /// ``` @@ -517,20 +488,12 @@ impl<'a, Handler> Subscriber<'a, Handler> { pub fn undeclare(self) -> SubscriberUndeclaration<'a> { self.subscriber.undeclare() } - - /// Make the subscriber run in background, until the session is closed. - #[inline] - #[zenoh_macros::unstable] - pub fn background(mut self) { - // It's not necessary to undeclare this resource when session close, as other sessions - // will clean all resources related to the closed one. - // So we can just never undeclare it. - self.subscriber.undeclare_on_drop = false; - } } -impl<'a, T> UndeclarableSealed<(), SubscriberUndeclaration<'a>> for Subscriber<'a, T> { - fn undeclare_inner(self, _: ()) -> SubscriberUndeclaration<'a> { +impl<'a, T> UndeclarableSealed<()> for Subscriber<'a, T> { + type Res = SubscriberUndeclaration<'a>; + + fn undeclare_inner(self, _: ()) -> Self::Res { UndeclarableSealed::undeclare_inner(self.subscriber, ()) } } diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index 0190acc319..1b563bb4e4 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -191,13 +191,15 @@ pub mod session { pub use zenoh_config::wrappers::{EntityGlobalId, ZenohId}; pub use zenoh_protocol::core::EntityId; + #[zenoh_macros::unstable] + pub use crate::api::session::SessionRef; #[zenoh_macros::internal] pub use crate::api::session::{init, InitBuilder}; pub use crate::api::{ builders::publisher::{SessionDeleteBuilder, SessionPutBuilder}, info::{PeersZenohIdBuilder, RoutersZenohIdBuilder, SessionInfo, ZenohIdBuilder}, query::SessionGetBuilder, - session::{open, OpenBuilder, Session, SessionDeclarations, SessionRef, Undeclarable}, + session::{open, OpenBuilder, Session, SessionDeclarations, Undeclarable}, }; } diff --git a/zenoh/src/prelude.rs b/zenoh/src/prelude.rs index 373d56c65a..4bac197073 100644 --- a/zenoh/src/prelude.rs +++ b/zenoh/src/prelude.rs @@ -34,7 +34,7 @@ mod _prelude { builders::sample::{ EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait, }, - session::{SessionDeclarations, Undeclarable}, + session::{SessionDeclarations, Undeclarable, UndeclarableExt}, }, config::ValidatedMap, Error as ZError, Resolvable, Resolve, Result as ZResult,