Skip to content

Commit

Permalink
feat: don't undeclare objects on drop
Browse files Browse the repository at this point in the history
  • Loading branch information
wyfo committed Aug 5, 2024
1 parent 4d337a9 commit b0171c5
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 149 deletions.
2 changes: 0 additions & 2 deletions zenoh/src/api/builders/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,6 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> {
destination: self.destination,
#[cfg(feature = "unstable")]
matching_listeners: Default::default(),
undeclare_on_drop: true,
})
}
}
Expand Down Expand Up @@ -362,7 +361,6 @@ impl<'a, 'b> Wait for PublisherBuilder<'a, 'b> {
destination: self.destination,
#[cfg(feature = "unstable")]
matching_listeners: Default::default(),
undeclare_on_drop: true,
})
}
}
Expand Down
28 changes: 12 additions & 16 deletions zenoh/src/api/liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use super::{
subscriber::{Subscriber, SubscriberInner},
Id,
};
use crate::api::session::WeakSessionRef;

/// A structure with functions to declare a
/// [`LivelinessToken`](LivelinessToken), query
Expand Down Expand Up @@ -254,7 +255,7 @@ impl Wait for LivelinessTokenBuilder<'_, '_> {
session
.declare_liveliness_inner(&key_expr)
.map(|tok_state| LivelinessToken {
session,
session: session.into(),
state: tok_state,
undeclare_on_drop: true,
})
Expand Down Expand Up @@ -307,7 +308,7 @@ pub(crate) struct LivelinessTokenState {
#[zenoh_macros::unstable]
#[derive(Debug)]
pub struct LivelinessToken<'a> {
pub(crate) session: SessionRef<'a>,
pub(crate) session: WeakSessionRef<'a>,
pub(crate) state: Arc<LivelinessTokenState>,
undeclare_on_drop: bool,
}
Expand Down Expand Up @@ -346,7 +347,10 @@ impl Wait for LivelinessTokenUndeclaration<'_> {
fn wait(mut self) -> <Self as Resolvable>::To {
// set the flag first to avoid double panic if this function panic
self.token.undeclare_on_drop = false;
self.token.session.undeclare_liveliness(self.token.state.id)
let Some(session) = self.token.session.upgrade() else {
return Ok(());
};
session.undeclare_liveliness(self.token.state.id)
}
}

Expand Down Expand Up @@ -388,16 +392,6 @@ impl<'a> LivelinessToken<'a> {
pub fn undeclare(self) -> impl Resolve<ZResult<()>> + 'a {
Undeclarable::undeclare_inner(self, ())
}

/// Keep this liveliness token in background, until the session is closed.
#[inline]
#[zenoh_macros::unstable]
pub fn background(mut self) {
// It's not necessary to undeclare this resource when session close, as other sessions
// will clean all resources related to the closed one.
// So we can just never undeclare it.
self.undeclare_on_drop = false;
}
}

#[zenoh_macros::unstable]
Expand All @@ -411,7 +405,9 @@ impl<'a> Undeclarable<(), LivelinessTokenUndeclaration<'a>> for LivelinessToken<
impl Drop for LivelinessToken<'_> {
fn drop(&mut self) {
if self.undeclare_on_drop {
let _ = self.session.undeclare_liveliness(self.state.id);
if let Some(session) = self.session.upgrade() {
let _ = session.undeclare_liveliness(self.state.id);
}
}
}
}
Expand Down Expand Up @@ -579,10 +575,10 @@ where
.declare_liveliness_subscriber_inner(&key_expr, Locality::default(), callback)
.map(|sub_state| Subscriber {
subscriber: SubscriberInner {
session,
session_id: session.zid(),
session: session.into(),
state: sub_state,
kind: SubscriberKind::LivelinessSubscriber,
undeclare_on_drop: true,
},
handler,
})
Expand Down
50 changes: 10 additions & 40 deletions zenoh/src/api/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl fmt::Debug for PublisherState {
#[derive(Clone)]
pub enum PublisherRef<'a> {
Borrow(&'a Publisher<'a>),
Shared(std::sync::Arc<Publisher<'static>>),
Shared(Arc<Publisher<'static>>),
}

#[zenoh_macros::unstable]
Expand All @@ -105,8 +105,6 @@ impl std::fmt::Debug for PublisherRef<'_> {

/// A publisher that allows to send data through a stream.
///
/// Publishers are automatically undeclared when dropped.
///
/// # Examples
/// ```
/// # #[tokio::main]
Expand Down Expand Up @@ -146,7 +144,6 @@ pub struct Publisher<'a> {
pub(crate) destination: Locality,
#[cfg(feature = "unstable")]
pub(crate) matching_listeners: Arc<Mutex<HashSet<Id>>>,
pub(crate) undeclare_on_drop: bool,
}

impl<'a> Publisher<'a> {
Expand Down Expand Up @@ -491,9 +488,7 @@ impl Resolvable for PublisherUndeclaration<'_> {
}

impl Wait for PublisherUndeclaration<'_> {
fn wait(mut self) -> <Self as Resolvable>::To {
// set the flag first to avoid double panic if this function panic
self.publisher.undeclare_on_drop = false;
fn wait(self) -> <Self as Resolvable>::To {
#[cfg(feature = "unstable")]
self.publisher.undeclare_matching_listeners()?;
self.publisher
Expand All @@ -513,11 +508,8 @@ impl IntoFuture for PublisherUndeclaration<'_> {

impl Drop for Publisher<'_> {
fn drop(&mut self) {
if self.undeclare_on_drop {
#[cfg(feature = "unstable")]
let _ = self.undeclare_matching_listeners();
let _ = self.session.undeclare_publisher_inner(self.id);
}
#[cfg(feature = "unstable")]
let _ = self.undeclare_matching_listeners();
}
}

Expand Down Expand Up @@ -922,7 +914,6 @@ where
listener: MatchingListenerInner {
publisher: self.publisher,
state,
undeclare_on_drop: true,
},
receiver,
})
Expand All @@ -947,7 +938,7 @@ where
#[zenoh_macros::unstable]
pub(crate) struct MatchingListenerState {
pub(crate) id: Id,
pub(crate) current: std::sync::Mutex<bool>,
pub(crate) current: Mutex<bool>,
pub(crate) key_expr: KeyExpr<'static>,
pub(crate) destination: Locality,
pub(crate) callback: Callback<'static, MatchingStatus>,
Expand All @@ -966,8 +957,7 @@ impl std::fmt::Debug for MatchingListenerState {
#[zenoh_macros::unstable]
pub(crate) struct MatchingListenerInner<'a> {
pub(crate) publisher: PublisherRef<'a>,
pub(crate) state: std::sync::Arc<MatchingListenerState>,
undeclare_on_drop: bool,
pub(crate) state: Arc<MatchingListenerState>,
}

#[zenoh_macros::unstable]
Expand All @@ -988,6 +978,9 @@ impl<'a> Undeclarable<(), MatchingListenerUndeclaration<'a>> for MatchingListene
/// A listener that sends notifications when the [`MatchingStatus`] of a
/// publisher changes.
///
/// Matching litsteners run in background until the publisher is undeclared.
/// They can be manually undeclared, but will not be undeclared on drop.
///
/// # Examples
/// ```no_run
/// # #[tokio::main]
Expand Down Expand Up @@ -1035,14 +1028,6 @@ impl<'a, Receiver> MatchingListener<'a, Receiver> {
pub fn undeclare(self) -> MatchingListenerUndeclaration<'a> {
self.listener.undeclare()
}

/// Make the matching listener run in background, until the publisher is undeclared.
#[inline]
#[zenoh_macros::unstable]
pub fn background(mut self) {
// The matching listener will be undeclared as part of publisher undeclaration.
self.listener.undeclare_on_drop = false;
}
}

#[zenoh_macros::unstable]
Expand Down Expand Up @@ -1079,9 +1064,7 @@ impl Resolvable for MatchingListenerUndeclaration<'_> {

#[zenoh_macros::unstable]
impl Wait for MatchingListenerUndeclaration<'_> {
fn wait(mut self) -> <Self as Resolvable>::To {
// set the flag first to avoid double panic if this function panic
self.subscriber.undeclare_on_drop = false;
fn wait(self) -> <Self as Resolvable>::To {
zlock!(self.subscriber.publisher.matching_listeners).remove(&self.subscriber.state.id);
self.subscriber
.publisher
Expand All @@ -1100,19 +1083,6 @@ impl IntoFuture for MatchingListenerUndeclaration<'_> {
}
}

#[zenoh_macros::unstable]
impl Drop for MatchingListenerInner<'_> {
fn drop(&mut self) {
if self.undeclare_on_drop {
zlock!(self.publisher.matching_listeners).remove(&self.state.id);
let _ = self
.publisher
.session
.undeclare_matches_listener_inner(self.state.id);
}
}
}

#[cfg(test)]
mod tests {
use zenoh_config::Config;
Expand Down
79 changes: 32 additions & 47 deletions zenoh/src/api/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::{
};

use uhlc::Timestamp;
use zenoh_config::ZenohId;
use zenoh_core::{Resolvable, Resolve, Wait};
use zenoh_protocol::{
core::{CongestionControl, EntityId, Parameters, WireExpr, ZenohIdProto},
Expand All @@ -28,30 +29,32 @@ use zenoh_protocol::{
use zenoh_result::ZResult;
#[zenoh_macros::unstable]
use {
super::{query::ReplyKeyExpr, sample::SourceInfo},
crate::api::{query::ReplyKeyExpr, sample::SourceInfo},
zenoh_config::wrappers::EntityGlobalId,
zenoh_protocol::core::EntityGlobalIdProto,
};

#[zenoh_macros::unstable]
use super::selector::ZenohParameters;
use super::{
builders::sample::{
EncodingBuilderTrait, QoSBuilderTrait, SampleBuilder, SampleBuilderTrait,
TimestampBuilderTrait,
use crate::api::selector::ZenohParameters;
use crate::{
api::{
builders::sample::{
EncodingBuilderTrait, QoSBuilderTrait, SampleBuilder, SampleBuilderTrait,
TimestampBuilderTrait,
},
bytes::{OptionZBytes, ZBytes},
encoding::Encoding,
handlers::{locked, DefaultHandler, IntoHandler},
key_expr::KeyExpr,
publisher::Priority,
sample::{Locality, QoSBuilder, Sample, SampleKind},
selector::Selector,
session::{SessionRef, Undeclarable, WeakSessionRef},
value::Value,
Id,
},
bytes::{OptionZBytes, ZBytes},
encoding::Encoding,
handlers::{locked, DefaultHandler, IntoHandler},
key_expr::KeyExpr,
publisher::Priority,
sample::{Locality, QoSBuilder, Sample, SampleKind},
selector::Selector,
session::{SessionRef, Undeclarable},
value::Value,
Id,
net::primitives::Primitives,
};
use crate::net::primitives::Primitives;

pub(crate) struct QueryInner {
pub(crate) key_expr: KeyExpr<'static>,
Expand Down Expand Up @@ -562,9 +565,9 @@ impl fmt::Debug for QueryableState {
/// ```
#[derive(Debug)]
pub(crate) struct CallbackQueryable<'a> {
pub(crate) session: SessionRef<'a>,
pub(crate) session_id: ZenohId,
pub(crate) session: WeakSessionRef<'a>,
pub(crate) state: Arc<QueryableState>,
undeclare_on_drop: bool,
}

impl<'a> Undeclarable<(), QueryableUndeclaration<'a>> for CallbackQueryable<'a> {
Expand Down Expand Up @@ -596,12 +599,11 @@ impl Resolvable for QueryableUndeclaration<'_> {
}

impl Wait for QueryableUndeclaration<'_> {
fn wait(mut self) -> <Self as Resolvable>::To {
// set the flag first to avoid double panic if this function panic
self.queryable.undeclare_on_drop = false;
self.queryable
.session
.close_queryable(self.queryable.state.id)
fn wait(self) -> <Self as Resolvable>::To {
let Some(session) = self.queryable.session.upgrade() else {
return Ok(());
};
session.close_queryable(self.queryable.state.id)
}
}

Expand All @@ -614,14 +616,6 @@ impl<'a> IntoFuture for QueryableUndeclaration<'a> {
}
}

impl Drop for CallbackQueryable<'_> {
fn drop(&mut self) {
if self.undeclare_on_drop {
let _ = self.session.close_queryable(self.state.id);
}
}
}

/// A builder for initializing a [`Queryable`].
///
/// # Examples
Expand Down Expand Up @@ -778,7 +772,8 @@ impl<'a, 'b, Handler> QueryableBuilder<'a, 'b, Handler> {
/// and the [`with`](QueryableBuilder::with) function
/// of the resulting builder.
///
/// Queryables are automatically undeclared when dropped.
/// Queryables run in background until the session is closed.
/// They can be manually undeclared, but will not be undeclared on drop.
///
/// # Examples
/// ```no_run
Expand Down Expand Up @@ -826,7 +821,7 @@ impl<'a, Handler> Queryable<'a, Handler> {
#[zenoh_macros::unstable]
pub fn id(&self) -> EntityGlobalId {
EntityGlobalIdProto {
zid: self.queryable.session.zid().into(),
zid: self.queryable.session_id.into(),
eid: self.queryable.state.id,
}
.into()
Expand All @@ -850,16 +845,6 @@ impl<'a, Handler> Queryable<'a, Handler> {
pub fn undeclare(self) -> impl Resolve<ZResult<()>> + 'a {
Undeclarable::undeclare_inner(self, ())
}

/// Make the queryable run in background, until the session is closed.
#[inline]
#[zenoh_macros::unstable]
pub fn background(mut self) {
// It's not necessary to undeclare this resource when session close, as other sessions
// will clean all resources related to the closed one.
// So we can just never undeclare it.
self.queryable.undeclare_on_drop = false;
}
}

impl<'a, T> Undeclarable<(), QueryableUndeclaration<'a>> for Queryable<'a, T> {
Expand Down Expand Up @@ -907,9 +892,9 @@ where
)
.map(|qable_state| Queryable {
queryable: CallbackQueryable {
session,
session_id: session.zid(),
session: session.into(),
state: qable_state,
undeclare_on_drop: true,
},
handler: receiver,
})
Expand Down
Loading

0 comments on commit b0171c5

Please sign in to comment.