Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add unstable background method to subscriber/queryable/matching listeners #1106

Merged
merged 9 commits into from
Jun 10, 2024
Merged
4 changes: 4 additions & 0 deletions zenoh/src/api/builders/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> {
priority: self.priority,
is_express: self.is_express,
destination: self.destination,
matching_listeners: Default::default(),
undeclare_on_drop: true,
})
}
}
Expand Down Expand Up @@ -371,6 +373,8 @@ impl<'a, 'b> Wait for PublisherBuilder<'a, 'b> {
priority: self.priority,
is_express: self.is_express,
destination: self.destination,
matching_listeners: Default::default(),
undeclare_on_drop: true,
})
}
}
Expand Down
21 changes: 16 additions & 5 deletions zenoh/src/api/liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ impl Wait for LivelinessTokenBuilder<'_, '_> {
.map(|tok_state| LivelinessToken {
session,
state: tok_state,
alive: true,
undeclare_on_drop: true,
})
}
}
Expand Down Expand Up @@ -291,7 +291,7 @@ pub(crate) struct LivelinessTokenState {
pub struct LivelinessToken<'a> {
pub(crate) session: SessionRef<'a>,
pub(crate) state: Arc<LivelinessTokenState>,
pub(crate) alive: bool,
undeclare_on_drop: bool,
}

/// A [`Resolvable`] returned when undeclaring a [`LivelinessToken`](LivelinessToken).
Expand Down Expand Up @@ -326,7 +326,8 @@ impl Resolvable for LivelinessTokenUndeclaration<'_> {
#[zenoh_macros::unstable]
impl Wait for LivelinessTokenUndeclaration<'_> {
fn wait(mut self) -> <Self as Resolvable>::To {
self.token.alive = false;
// set the flag first to avoid double panic if this function panic
self.token.undeclare_on_drop = false;
self.token.session.undeclare_liveliness(self.token.state.id)
}
}
Expand Down Expand Up @@ -369,6 +370,16 @@ impl<'a> LivelinessToken<'a> {
pub fn undeclare(self) -> impl Resolve<ZResult<()>> + 'a {
Undeclarable::undeclare_inner(self, ())
}

/// Keep this liveliness token in background, until the session is closed.
#[inline]
#[zenoh_macros::unstable]
pub fn background(mut self) {
// It's not necessary to undeclare this resource when session close, as other sessions
// will clean all resources related to the closed one.
// So we can just never undeclare it.
self.undeclare_on_drop = false;
}
}

#[zenoh_macros::unstable]
Expand All @@ -381,7 +392,7 @@ impl<'a> Undeclarable<(), LivelinessTokenUndeclaration<'a>> for LivelinessToken<
#[zenoh_macros::unstable]
impl Drop for LivelinessToken<'_> {
fn drop(&mut self) {
if self.alive {
if self.undeclare_on_drop {
let _ = self.session.undeclare_liveliness(self.state.id);
}
}
Expand Down Expand Up @@ -553,7 +564,7 @@ where
subscriber: SubscriberInner {
session,
state: sub_state,
alive: true,
undeclare_on_drop: true,
},
handler,
})
Expand Down
87 changes: 58 additions & 29 deletions zenoh/src/api/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@
//

use std::{
collections::HashSet,
convert::TryFrom,
fmt,
future::{IntoFuture, Ready},
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
};

use futures::Sink;
use zenoh_core::{zread, Resolvable, Resolve, Wait};
use zenoh_keyexpr::keyexpr;
use zenoh_protocol::{
core::CongestionControl,
network::{push::ext, Push},
Expand Down Expand Up @@ -134,6 +135,8 @@ pub struct Publisher<'a> {
pub(crate) priority: Priority,
pub(crate) is_express: bool,
pub(crate) destination: Locality,
pub(crate) matching_listeners: Arc<Mutex<HashSet<Id>>>,
pub(crate) undeclare_on_drop: bool,
}

impl<'a> Publisher<'a> {
Expand All @@ -160,28 +163,33 @@ impl<'a> Publisher<'a> {
}
}

#[inline]
pub fn key_expr(&self) -> &KeyExpr<'a> {
&self.key_expr
}

#[inline]
/// Get the `congestion_control` applied when routing the data.
pub fn congestion_control(&self) -> CongestionControl {
self.congestion_control
}

/// Change the `congestion_control` to apply when routing the data.
#[inline]
pub fn set_congestion_control(&mut self, congestion_control: CongestionControl) {
self.congestion_control = congestion_control;
}

/// Change the priority of the written data.
/// Get the priority of the written data.
#[inline]
pub fn set_priority(&mut self, priority: Priority) {
self.priority = priority;
pub fn priority(&self) -> Priority {
self.priority
}

/// Restrict the matching subscribers that will receive the published data
/// to the ones that have the given [`Locality`](crate::prelude::Locality).
#[zenoh_macros::unstable]
/// Change the priority of the written data.
#[inline]
pub fn set_allowed_destination(&mut self, destination: Locality) {
Copy link
Contributor

@milyin milyin Jun 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happened with set_allowed_destination? Is it's removal planned?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discussed about it with @OlivierHecart weeks ago, this thing is currently broken (the modification is not transmitted to the PublisherState stored in session, only the creation destination matter).
There was maybe another reason, but I forgot. @OlivierHecart do you remember?

self.destination = destination;
pub fn set_priority(&mut self, priority: Priority) {
self.priority = priority;
}

/// Consumes the given `Publisher`, returning a thread-safe reference-counting
Expand Down Expand Up @@ -350,6 +358,13 @@ impl<'a> Publisher<'a> {
pub fn undeclare(self) -> impl Resolve<ZResult<()>> + 'a {
Undeclarable::undeclare_inner(self, ())
}

fn undeclare_matching_listeners(&self) -> ZResult<()> {
for id in zlock!(self.matching_listeners).drain() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code would be safer w.r.t. mutex deadlock if:

  1. it takes the lock
  2. it drains the iterator in a supporting vec
  3. it releases the lock
  4. it undeclares the matching listeners

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.session.undeclare_matches_listener_inner(id)?
}
Ok(())
}
}

/// Functions to create zenoh entities with `'static` lifetime.
Expand Down Expand Up @@ -470,12 +485,12 @@ impl Resolvable for PublisherUndeclaration<'_> {

impl Wait for PublisherUndeclaration<'_> {
fn wait(mut self) -> <Self as Resolvable>::To {
let Publisher {
session, id: eid, ..
} = &self.publisher;
session.undeclare_publisher_inner(*eid)?;
self.publisher.key_expr = unsafe { keyexpr::from_str_unchecked("") }.into();
Ok(())
// set the flag first to avoid double panic if this function panic
self.publisher.undeclare_on_drop = false;
self.publisher.undeclare_matching_listeners()?;
self.publisher
.session
.undeclare_publisher_inner(self.publisher.id)
}
}

Expand All @@ -490,7 +505,8 @@ impl IntoFuture for PublisherUndeclaration<'_> {

impl Drop for Publisher<'_> {
fn drop(&mut self) {
if !self.key_expr.is_empty() {
if self.undeclare_on_drop {
let _ = self.undeclare_matching_listeners();
let _ = self.session.undeclare_publisher_inner(self.id);
}
}
Expand Down Expand Up @@ -887,17 +903,19 @@ where
#[zenoh_macros::unstable]
fn wait(self) -> <Self as Resolvable>::To {
let (callback, receiver) = self.handler.into_handler();
self.publisher
let state = self
.publisher
.session
.declare_matches_listener_inner(&self.publisher, callback)
.map(|listener_state| MatchingListener {
listener: MatchingListenerInner {
publisher: self.publisher,
state: listener_state,
alive: true,
},
receiver,
})
.declare_matches_listener_inner(&self.publisher, callback)?;
zlock!(self.publisher.matching_listeners).insert(state.id);
Ok(MatchingListener {
listener: MatchingListenerInner {
publisher: self.publisher,
state,
undeclare_on_drop: true,
},
receiver,
})
}
}

Expand Down Expand Up @@ -939,7 +957,7 @@ impl std::fmt::Debug for MatchingListenerState {
pub(crate) struct MatchingListenerInner<'a> {
pub(crate) publisher: PublisherRef<'a>,
pub(crate) state: std::sync::Arc<MatchingListenerState>,
pub(crate) alive: bool,
undeclare_on_drop: bool,
}

#[zenoh_macros::unstable]
Expand Down Expand Up @@ -1007,6 +1025,14 @@ impl<'a, Receiver> MatchingListener<'a, Receiver> {
pub fn undeclare(self) -> MatchingListenerUndeclaration<'a> {
self.listener.undeclare()
}

/// Make the matching listener run in background, until the publisher is undeclared.
#[inline]
#[zenoh_macros::unstable]
pub fn background(mut self) {
// The matching listener will be undeclared as part of publisher undeclaration.
self.listener.undeclare_on_drop = false;
}
}

#[zenoh_macros::unstable]
Expand Down Expand Up @@ -1044,7 +1070,9 @@ impl Resolvable for MatchingListenerUndeclaration<'_> {
#[zenoh_macros::unstable]
impl Wait for MatchingListenerUndeclaration<'_> {
fn wait(mut self) -> <Self as Resolvable>::To {
self.subscriber.alive = false;
// set the flag first to avoid double panic if this function panic
self.subscriber.undeclare_on_drop = false;
zlock!(self.subscriber.publisher.matching_listeners).remove(&self.subscriber.state.id);
self.subscriber
.publisher
.session
Expand All @@ -1065,7 +1093,8 @@ impl IntoFuture for MatchingListenerUndeclaration<'_> {
#[zenoh_macros::unstable]
impl Drop for MatchingListenerInner<'_> {
fn drop(&mut self) {
if self.alive {
if self.undeclare_on_drop {
zlock!(self.publisher.matching_listeners).remove(&self.state.id);
let _ = self
.publisher
.session
Expand Down
19 changes: 15 additions & 4 deletions zenoh/src/api/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ impl fmt::Debug for QueryableState {
pub(crate) struct CallbackQueryable<'a> {
pub(crate) session: SessionRef<'a>,
pub(crate) state: Arc<QueryableState>,
pub(crate) alive: bool,
undeclare_on_drop: bool,
}

impl<'a> Undeclarable<(), QueryableUndeclaration<'a>> for CallbackQueryable<'a> {
Expand Down Expand Up @@ -644,7 +644,8 @@ impl Resolvable for QueryableUndeclaration<'_> {

impl Wait for QueryableUndeclaration<'_> {
fn wait(mut self) -> <Self as Resolvable>::To {
self.queryable.alive = false;
// set the flag first to avoid double panic if this function panic
self.queryable.undeclare_on_drop = false;
self.queryable
.session
.close_queryable(self.queryable.state.id)
Expand All @@ -662,7 +663,7 @@ impl<'a> IntoFuture for QueryableUndeclaration<'a> {

impl Drop for CallbackQueryable<'_> {
fn drop(&mut self) {
if self.alive {
if self.undeclare_on_drop {
let _ = self.session.close_queryable(self.state.id);
}
}
Expand Down Expand Up @@ -895,6 +896,16 @@ impl<'a, Handler> Queryable<'a, Handler> {
pub fn undeclare(self) -> impl Resolve<ZResult<()>> + 'a {
Undeclarable::undeclare_inner(self, ())
}

/// Make the queryable run in background, until the session is closed.
#[inline]
#[zenoh_macros::unstable]
pub fn background(mut self) {
// It's not necessary to undeclare this resource when session close, as other sessions
// will clean all resources related to the closed one.
// So we can just never undeclare it.
self.queryable.undeclare_on_drop = false;
}
}

impl<'a, T> Undeclarable<(), QueryableUndeclaration<'a>> for Queryable<'a, T> {
Expand Down Expand Up @@ -944,7 +955,7 @@ where
queryable: CallbackQueryable {
session,
state: qable_state,
alive: true,
undeclare_on_drop: true,
},
handler: receiver,
})
Expand Down
13 changes: 7 additions & 6 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ pub struct Session {
pub(crate) runtime: Runtime,
pub(crate) state: Arc<RwLock<SessionState>>,
pub(crate) id: u16,
pub(crate) alive: bool,
close_on_drop: bool,
owns_runtime: bool,
task_controller: TaskController,
}
Expand All @@ -426,7 +426,7 @@ impl Session {
runtime: runtime.clone(),
state: state.clone(),
id: SESSION_ID_COUNTER.fetch_add(1, Ordering::SeqCst),
alive: true,
close_on_drop: true,
owns_runtime: false,
task_controller: TaskController::default(),
};
Expand Down Expand Up @@ -533,6 +533,8 @@ impl Session {
pub fn close(mut self) -> impl Resolve<ZResult<()>> {
ResolveFuture::new(async move {
trace!("close()");
// set the flag first to avoid double panic if this function panic
self.close_on_drop = false;
self.task_controller.terminate_all(Duration::from_secs(10));
if self.owns_runtime {
self.runtime.close().await?;
Expand All @@ -543,7 +545,6 @@ impl Session {
state.queryables.clear();
drop(state);
primitives.as_ref().unwrap().send_close();
self.alive = false;
Ok(())
})
}
Expand Down Expand Up @@ -826,11 +827,11 @@ impl Session {

impl Session {
pub(crate) fn clone(&self) -> Self {
Session {
Self {
runtime: self.runtime.clone(),
state: self.state.clone(),
id: self.id,
alive: false,
close_on_drop: false,
owns_runtime: self.owns_runtime,
task_controller: self.task_controller.clone(),
}
Expand Down Expand Up @@ -2472,7 +2473,7 @@ impl Primitives for Session {

impl Drop for Session {
fn drop(&mut self) {
if self.alive {
if self.close_on_drop {
let _ = self.clone().close().wait();
}
}
Expand Down
Loading
Loading