diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index 666b4378e0..24e0187ba5 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -307,7 +307,6 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { destination: self.destination, #[cfg(feature = "unstable")] matching_listeners: Default::default(), - undeclare_on_drop: true, }) } } @@ -362,7 +361,6 @@ impl<'a, 'b> Wait for PublisherBuilder<'a, 'b> { destination: self.destination, #[cfg(feature = "unstable")] matching_listeners: Default::default(), - undeclare_on_drop: true, }) } } diff --git a/zenoh/src/api/liveliness.rs b/zenoh/src/api/liveliness.rs index 038a4b8eab..370d95f522 100644 --- a/zenoh/src/api/liveliness.rs +++ b/zenoh/src/api/liveliness.rs @@ -31,6 +31,7 @@ use super::{ subscriber::{Subscriber, SubscriberInner}, Id, }; +use crate::api::session::WeakSessionRef; /// A structure with functions to declare a /// [`LivelinessToken`](LivelinessToken), query @@ -254,7 +255,7 @@ impl Wait for LivelinessTokenBuilder<'_, '_> { session .declare_liveliness_inner(&key_expr) .map(|tok_state| LivelinessToken { - session, + session: session.into(), state: tok_state, undeclare_on_drop: true, }) @@ -307,7 +308,7 @@ pub(crate) struct LivelinessTokenState { #[zenoh_macros::unstable] #[derive(Debug)] pub struct LivelinessToken<'a> { - pub(crate) session: SessionRef<'a>, + pub(crate) session: WeakSessionRef<'a>, pub(crate) state: Arc, undeclare_on_drop: bool, } @@ -346,7 +347,10 @@ impl Wait for LivelinessTokenUndeclaration<'_> { fn wait(mut self) -> ::To { // set the flag first to avoid double panic if this function panic self.token.undeclare_on_drop = false; - self.token.session.undeclare_liveliness(self.token.state.id) + let Some(session) = self.token.session.upgrade() else { + return Ok(()); + }; + session.undeclare_liveliness(self.token.state.id) } } @@ -388,16 +392,6 @@ impl<'a> LivelinessToken<'a> { pub fn undeclare(self) -> impl Resolve> + 'a { Undeclarable::undeclare_inner(self, ()) } - - /// Keep this liveliness token in background, until the session is closed. - #[inline] - #[zenoh_macros::unstable] - pub fn background(mut self) { - // It's not necessary to undeclare this resource when session close, as other sessions - // will clean all resources related to the closed one. - // So we can just never undeclare it. - self.undeclare_on_drop = false; - } } #[zenoh_macros::unstable] @@ -411,7 +405,9 @@ impl<'a> Undeclarable<(), LivelinessTokenUndeclaration<'a>> for LivelinessToken< impl Drop for LivelinessToken<'_> { fn drop(&mut self) { if self.undeclare_on_drop { - let _ = self.session.undeclare_liveliness(self.state.id); + if let Some(session) = self.session.upgrade() { + let _ = session.undeclare_liveliness(self.state.id); + } } } } @@ -579,10 +575,11 @@ where .declare_liveliness_subscriber_inner(&key_expr, Locality::default(), callback) .map(|sub_state| Subscriber { subscriber: SubscriberInner { - session, + #[cfg(feature = "unstable")] + session_id: session.zid(), + session: session.into(), state: sub_state, kind: SubscriberKind::LivelinessSubscriber, - undeclare_on_drop: true, }, handler, }) diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index f4b969b18f..48392befbe 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -78,7 +78,7 @@ impl fmt::Debug for PublisherState { #[derive(Clone)] pub enum PublisherRef<'a> { Borrow(&'a Publisher<'a>), - Shared(std::sync::Arc>), + Shared(Arc>), } #[zenoh_macros::unstable] @@ -105,8 +105,6 @@ impl std::fmt::Debug for PublisherRef<'_> { /// A publisher that allows to send data through a stream. /// -/// Publishers are automatically undeclared when dropped. -/// /// # Examples /// ``` /// # #[tokio::main] @@ -146,7 +144,6 @@ pub struct Publisher<'a> { pub(crate) destination: Locality, #[cfg(feature = "unstable")] pub(crate) matching_listeners: Arc>>, - pub(crate) undeclare_on_drop: bool, } impl<'a> Publisher<'a> { @@ -491,9 +488,7 @@ impl Resolvable for PublisherUndeclaration<'_> { } impl Wait for PublisherUndeclaration<'_> { - fn wait(mut self) -> ::To { - // set the flag first to avoid double panic if this function panic - self.publisher.undeclare_on_drop = false; + fn wait(self) -> ::To { #[cfg(feature = "unstable")] self.publisher.undeclare_matching_listeners()?; self.publisher @@ -513,11 +508,8 @@ impl IntoFuture for PublisherUndeclaration<'_> { impl Drop for Publisher<'_> { fn drop(&mut self) { - if self.undeclare_on_drop { - #[cfg(feature = "unstable")] - let _ = self.undeclare_matching_listeners(); - let _ = self.session.undeclare_publisher_inner(self.id); - } + #[cfg(feature = "unstable")] + let _ = self.undeclare_matching_listeners(); } } @@ -922,7 +914,6 @@ where listener: MatchingListenerInner { publisher: self.publisher, state, - undeclare_on_drop: true, }, receiver, }) @@ -947,7 +938,7 @@ where #[zenoh_macros::unstable] pub(crate) struct MatchingListenerState { pub(crate) id: Id, - pub(crate) current: std::sync::Mutex, + pub(crate) current: Mutex, pub(crate) key_expr: KeyExpr<'static>, pub(crate) destination: Locality, pub(crate) callback: Callback<'static, MatchingStatus>, @@ -966,8 +957,7 @@ impl std::fmt::Debug for MatchingListenerState { #[zenoh_macros::unstable] pub(crate) struct MatchingListenerInner<'a> { pub(crate) publisher: PublisherRef<'a>, - pub(crate) state: std::sync::Arc, - undeclare_on_drop: bool, + pub(crate) state: Arc, } #[zenoh_macros::unstable] @@ -988,6 +978,9 @@ impl<'a> Undeclarable<(), MatchingListenerUndeclaration<'a>> for MatchingListene /// A listener that sends notifications when the [`MatchingStatus`] of a /// publisher changes. /// +/// Matching litsteners run in background until the publisher is undeclared. +/// They can be manually undeclared, but will not be undeclared on drop. +/// /// # Examples /// ```no_run /// # #[tokio::main] @@ -1035,14 +1028,6 @@ impl<'a, Receiver> MatchingListener<'a, Receiver> { pub fn undeclare(self) -> MatchingListenerUndeclaration<'a> { self.listener.undeclare() } - - /// Make the matching listener run in background, until the publisher is undeclared. - #[inline] - #[zenoh_macros::unstable] - pub fn background(mut self) { - // The matching listener will be undeclared as part of publisher undeclaration. - self.listener.undeclare_on_drop = false; - } } #[zenoh_macros::unstable] @@ -1079,9 +1064,7 @@ impl Resolvable for MatchingListenerUndeclaration<'_> { #[zenoh_macros::unstable] impl Wait for MatchingListenerUndeclaration<'_> { - fn wait(mut self) -> ::To { - // set the flag first to avoid double panic if this function panic - self.subscriber.undeclare_on_drop = false; + fn wait(self) -> ::To { zlock!(self.subscriber.publisher.matching_listeners).remove(&self.subscriber.state.id); self.subscriber .publisher @@ -1100,19 +1083,6 @@ impl IntoFuture for MatchingListenerUndeclaration<'_> { } } -#[zenoh_macros::unstable] -impl Drop for MatchingListenerInner<'_> { - fn drop(&mut self) { - if self.undeclare_on_drop { - zlock!(self.publisher.matching_listeners).remove(&self.state.id); - let _ = self - .publisher - .session - .undeclare_matches_listener_inner(self.state.id); - } - } -} - #[cfg(test)] mod tests { use zenoh_config::Config; diff --git a/zenoh/src/api/queryable.rs b/zenoh/src/api/queryable.rs index 566a903bd1..2d06f1e193 100644 --- a/zenoh/src/api/queryable.rs +++ b/zenoh/src/api/queryable.rs @@ -28,30 +28,32 @@ use zenoh_protocol::{ use zenoh_result::ZResult; #[zenoh_macros::unstable] use { - super::{query::ReplyKeyExpr, sample::SourceInfo}, - zenoh_config::wrappers::EntityGlobalId, + crate::api::{query::ReplyKeyExpr, sample::SourceInfo}, + zenoh_config::wrappers::{EntityGlobalId, ZenohId}, zenoh_protocol::core::EntityGlobalIdProto, }; #[zenoh_macros::unstable] -use super::selector::ZenohParameters; -use super::{ - builders::sample::{ - EncodingBuilderTrait, QoSBuilderTrait, SampleBuilder, SampleBuilderTrait, - TimestampBuilderTrait, +use crate::api::selector::ZenohParameters; +use crate::{ + api::{ + builders::sample::{ + EncodingBuilderTrait, QoSBuilderTrait, SampleBuilder, SampleBuilderTrait, + TimestampBuilderTrait, + }, + bytes::{OptionZBytes, ZBytes}, + encoding::Encoding, + handlers::{locked, DefaultHandler, IntoHandler}, + key_expr::KeyExpr, + publisher::Priority, + sample::{Locality, QoSBuilder, Sample, SampleKind}, + selector::Selector, + session::{SessionRef, Undeclarable, WeakSessionRef}, + value::Value, + Id, }, - bytes::{OptionZBytes, ZBytes}, - encoding::Encoding, - handlers::{locked, DefaultHandler, IntoHandler}, - key_expr::KeyExpr, - publisher::Priority, - sample::{Locality, QoSBuilder, Sample, SampleKind}, - selector::Selector, - session::{SessionRef, Undeclarable}, - value::Value, - Id, + net::primitives::Primitives, }; -use crate::net::primitives::Primitives; pub(crate) struct QueryInner { pub(crate) key_expr: KeyExpr<'static>, @@ -562,9 +564,10 @@ impl fmt::Debug for QueryableState { /// ``` #[derive(Debug)] pub(crate) struct CallbackQueryable<'a> { - pub(crate) session: SessionRef<'a>, + #[cfg(feature = "unstable")] + pub(crate) session_id: ZenohId, + pub(crate) session: WeakSessionRef<'a>, pub(crate) state: Arc, - undeclare_on_drop: bool, } impl<'a> Undeclarable<(), QueryableUndeclaration<'a>> for CallbackQueryable<'a> { @@ -596,12 +599,11 @@ impl Resolvable for QueryableUndeclaration<'_> { } impl Wait for QueryableUndeclaration<'_> { - fn wait(mut self) -> ::To { - // set the flag first to avoid double panic if this function panic - self.queryable.undeclare_on_drop = false; - self.queryable - .session - .close_queryable(self.queryable.state.id) + fn wait(self) -> ::To { + let Some(session) = self.queryable.session.upgrade() else { + return Ok(()); + }; + session.close_queryable(self.queryable.state.id) } } @@ -614,14 +616,6 @@ impl<'a> IntoFuture for QueryableUndeclaration<'a> { } } -impl Drop for CallbackQueryable<'_> { - fn drop(&mut self) { - if self.undeclare_on_drop { - let _ = self.session.close_queryable(self.state.id); - } - } -} - /// A builder for initializing a [`Queryable`]. /// /// # Examples @@ -778,7 +772,8 @@ impl<'a, 'b, Handler> QueryableBuilder<'a, 'b, Handler> { /// and the [`with`](QueryableBuilder::with) function /// of the resulting builder. /// -/// Queryables are automatically undeclared when dropped. +/// Queryables run in background until the session is closed. +/// They can be manually undeclared, but will not be undeclared on drop. /// /// # Examples /// ```no_run @@ -826,7 +821,7 @@ impl<'a, Handler> Queryable<'a, Handler> { #[zenoh_macros::unstable] pub fn id(&self) -> EntityGlobalId { EntityGlobalIdProto { - zid: self.queryable.session.zid().into(), + zid: self.queryable.session_id.into(), eid: self.queryable.state.id, } .into() @@ -850,16 +845,6 @@ impl<'a, Handler> Queryable<'a, Handler> { pub fn undeclare(self) -> impl Resolve> + 'a { Undeclarable::undeclare_inner(self, ()) } - - /// Make the queryable run in background, until the session is closed. - #[inline] - #[zenoh_macros::unstable] - pub fn background(mut self) { - // It's not necessary to undeclare this resource when session close, as other sessions - // will clean all resources related to the closed one. - // So we can just never undeclare it. - self.queryable.undeclare_on_drop = false; - } } impl<'a, T> Undeclarable<(), QueryableUndeclaration<'a>> for Queryable<'a, T> { @@ -907,9 +892,10 @@ where ) .map(|qable_state| Queryable { queryable: CallbackQueryable { - session, + #[cfg(feature = "unstable")] + session_id: session.zid(), + session: session.into(), state: qable_state, - undeclare_on_drop: true, }, handler: receiver, }) diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index ed1c75d3f2..e24a9728ff 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -19,7 +19,7 @@ use std::{ ops::Deref, sync::{ atomic::{AtomicU16, Ordering}, - Arc, RwLock, + Arc, RwLock, Weak, }, time::{Duration, SystemTime, UNIX_EPOCH}, }; @@ -449,6 +449,30 @@ impl fmt::Debug for SessionRef<'_> { } } +#[derive(Debug, Clone)] +pub(crate) enum WeakSessionRef<'a> { + Borrow(&'a Session), + Shared(Weak), +} + +impl<'a> WeakSessionRef<'a> { + pub(crate) fn upgrade(&self) -> Option> { + match self { + Self::Borrow(s) => Some(SessionRef::Borrow(s)), + Self::Shared(s) => s.upgrade().map(SessionRef::Shared), + } + } +} + +impl<'a> From> for WeakSessionRef<'a> { + fn from(value: SessionRef<'a>) -> Self { + match value { + SessionRef::Borrow(s) => Self::Borrow(s), + SessionRef::Shared(s) => Self::Shared(Arc::downgrade(&s)), + } + } +} + /// A trait implemented by types that can be undeclared. pub trait Undeclarable> where @@ -699,7 +723,14 @@ impl<'a> SessionDeclarations<'a, 'a> for Session { TryIntoKeyExpr: TryInto>, >>::Error: Into, { - SessionRef::Borrow(self).declare_subscriber(key_expr) + let self1 = &SessionRef::Borrow(self); + SubscriberBuilder { + session: self1.clone(), + key_expr: TryIntoKeyExpr::try_into(key_expr).map_err(Into::into), + reliability: Reliability::DEFAULT, + origin: Locality::default(), + handler: DefaultHandler::default(), + } } fn declare_queryable<'b, TryIntoKeyExpr>( &'a self, @@ -752,13 +783,6 @@ impl Session { >>::Error: Into, { let key_expr: ZResult = key_expr.try_into().map_err(Into::into); - self._declare_keyexpr(key_expr) - } - - fn _declare_keyexpr<'a, 'b: 'a>( - &'a self, - key_expr: ZResult>, - ) -> impl Resolve>> + 'a { let sid = self.id; ResolveClosure::new(move || { let key_expr: KeyExpr = key_expr?; diff --git a/zenoh/src/api/subscriber.rs b/zenoh/src/api/subscriber.rs index f3c1fa14e7..0d633d25c4 100644 --- a/zenoh/src/api/subscriber.rs +++ b/zenoh/src/api/subscriber.rs @@ -23,13 +23,16 @@ use zenoh_core::{Resolvable, Wait}; use zenoh_protocol::{core::Reliability, network::declare::subscriber::ext::SubscriberInfo}; use zenoh_result::ZResult; #[cfg(feature = "unstable")] -use {zenoh_config::wrappers::EntityGlobalId, zenoh_protocol::core::EntityGlobalIdProto}; +use { + zenoh_config::wrappers::{EntityGlobalId, ZenohId}, + zenoh_protocol::core::EntityGlobalIdProto, +}; -use super::{ +use crate::api::{ handlers::{locked, Callback, DefaultHandler, IntoHandler}, key_expr::KeyExpr, sample::{Locality, Sample}, - session::{SessionRef, Undeclarable}, + session::{SessionRef, Undeclarable, WeakSessionRef}, Id, }; @@ -57,7 +60,8 @@ impl fmt::Debug for SubscriberState { /// and the [`callback`](SubscriberBuilder::callback) function /// of the resulting builder. /// -/// Subscribers are automatically undeclared when dropped. +/// Subscribers run in background until the session is closed. +/// They can be manually undeclared, but will not be undeclared on drop. /// /// # Examples /// ``` @@ -75,10 +79,11 @@ impl fmt::Debug for SubscriberState { /// ``` #[derive(Debug)] pub(crate) struct SubscriberInner<'a> { - pub(crate) session: SessionRef<'a>, + #[cfg(feature = "unstable")] + pub(crate) session_id: ZenohId, + pub(crate) session: WeakSessionRef<'a>, pub(crate) state: Arc, pub(crate) kind: SubscriberKind, - pub(crate) undeclare_on_drop: bool, } impl<'a> SubscriberInner<'a> { @@ -141,12 +146,11 @@ impl Resolvable for SubscriberUndeclaration<'_> { } impl Wait for SubscriberUndeclaration<'_> { - fn wait(mut self) -> ::To { - // set the flag first to avoid double panic if this function panic - self.subscriber.undeclare_on_drop = false; - self.subscriber - .session - .undeclare_subscriber_inner(self.subscriber.state.id, self.subscriber.kind) + fn wait(self) -> ::To { + let Some(session) = self.subscriber.session.upgrade() else { + return Ok(()); + }; + session.undeclare_subscriber_inner(self.subscriber.state.id, self.subscriber.kind) } } @@ -159,16 +163,6 @@ impl IntoFuture for SubscriberUndeclaration<'_> { } } -impl Drop for SubscriberInner<'_> { - fn drop(&mut self) { - if self.undeclare_on_drop { - let _ = self - .session - .undeclare_subscriber_inner(self.state.id, self.kind); - } - } -} - /// A builder for initializing a [`FlumeSubscriber`]. /// /// # Examples @@ -387,10 +381,11 @@ where ) .map(|sub_state| Subscriber { subscriber: SubscriberInner { - session, + #[cfg(feature = "unstable")] + session_id: session.zid(), + session: session.into(), state: sub_state, kind: SubscriberKind::Subscriber, - undeclare_on_drop: true, }, handler: receiver, }) @@ -462,7 +457,7 @@ impl<'a, Handler> Subscriber<'a, Handler> { #[zenoh_macros::unstable] pub fn id(&self) -> EntityGlobalId { EntityGlobalIdProto { - zid: self.subscriber.session.zid().into(), + zid: self.subscriber.session_id.into(), eid: self.subscriber.state.id, } .into() @@ -509,16 +504,6 @@ impl<'a, Handler> Subscriber<'a, Handler> { pub fn undeclare(self) -> SubscriberUndeclaration<'a> { self.subscriber.undeclare() } - - /// Make the subscriber run in background, until the session is closed. - #[inline] - #[zenoh_macros::unstable] - pub fn background(mut self) { - // It's not necessary to undeclare this resource when session close, as other sessions - // will clean all resources related to the closed one. - // So we can just never undeclare it. - self.subscriber.undeclare_on_drop = false; - } } impl<'a, T> Undeclarable<(), SubscriberUndeclaration<'a>> for Subscriber<'a, T> { diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index 02c90ce0ec..a20cab307b 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -191,13 +191,15 @@ pub mod session { pub use zenoh_config::wrappers::{EntityGlobalId, ZenohId}; pub use zenoh_protocol::core::EntityId; + #[zenoh_macros::unstable] + pub use crate::api::session::SessionRef; #[zenoh_macros::internal] pub use crate::api::session::{init, InitBuilder}; pub use crate::api::{ builders::publisher::{SessionDeleteBuilder, SessionPutBuilder}, info::{PeersZenohIdBuilder, RoutersZenohIdBuilder, SessionInfo, ZenohIdBuilder}, query::SessionGetBuilder, - session::{open, OpenBuilder, Session, SessionDeclarations, SessionRef, Undeclarable}, + session::{open, OpenBuilder, Session, SessionDeclarations, Undeclarable}, }; }