Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow subscriber to be run in background #1012

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions zenoh/src/api/liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ impl<'a> Liveliness<'a> {
LivelinessSubscriberBuilder {
session: self.session.clone(),
key_expr: TryIntoKeyExpr::try_into(key_expr).map_err(Into::into),
background: false,
handler: DefaultHandler::default(),
}
}
Expand Down Expand Up @@ -409,9 +410,20 @@ impl Drop for LivelinessToken<'_> {
pub struct LivelinessSubscriberBuilder<'a, 'b, Handler> {
pub session: SessionRef<'a>,
pub key_expr: ZResult<KeyExpr<'b>>,
pub background: bool,
pub handler: Handler,
}

impl<'a, 'b, Handler> LivelinessSubscriberBuilder<'a, 'b, Handler> {
/// Run the subscriber in background, binding its lifetime to the session one.
///
/// Background subscribers are undeclared when the session is closed, not when they are dropped.
pub fn background(mut self, background: bool) -> Self {
self.background = background;
self
}
}

#[zenoh_macros::unstable]
impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> {
/// Receive the samples for this subscription with a callback.
Expand Down Expand Up @@ -442,11 +454,13 @@ impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> {
let LivelinessSubscriberBuilder {
session,
key_expr,
background,
handler: _,
} = self;
LivelinessSubscriberBuilder {
session,
key_expr,
background,
handler: callback,
}
}
Expand Down Expand Up @@ -511,11 +525,13 @@ impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> {
let LivelinessSubscriberBuilder {
session,
key_expr,
background,
handler: _,
} = self;
LivelinessSubscriberBuilder {
session,
key_expr,
background,
handler,
}
}
Expand Down Expand Up @@ -546,6 +562,7 @@ where
&key_expr,
&Some(KeyExpr::from(*KE_PREFIX_LIVELINESS)),
Locality::default(),
self.background,
callback,
&SubscriberInfo::DEFAULT,
)
Expand Down
14 changes: 13 additions & 1 deletion zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::{
future::{IntoFuture, Ready},
ops::Deref,
sync::{
atomic::{AtomicU16, Ordering},
atomic::{AtomicBool, AtomicU16, Ordering},
Arc, RwLock,
},
time::Duration,
Expand Down Expand Up @@ -308,6 +308,7 @@ impl<'s, 'a> SessionDeclarations<'s, 'a> for SessionRef<'a> {
key_expr: TryIntoKeyExpr::try_into(key_expr).map_err(Into::into),
reliability: Reliability::DEFAULT,
origin: Locality::default(),
background: false,
handler: DefaultHandler::default(),
}
}
Expand Down Expand Up @@ -530,6 +531,14 @@ impl Session {
pub fn close(mut self) -> impl Resolve<ZResult<()>> {
ResolveFuture::new(async move {
trace!("close()");
{
let state = zread!(self.state);
for (&id, subscriber) in &state.subscribers {
if subscriber.background.load(Ordering::Relaxed) {
self.unsubscribe(id)?;
}
}
}
self.task_controller.terminate_all(Duration::from_secs(10));
if self.owns_runtime {
self.runtime.close().await?;
Expand Down Expand Up @@ -1001,6 +1010,7 @@ impl Session {
key_expr: &KeyExpr,
scope: &Option<KeyExpr>,
origin: Locality,
background: bool,
callback: Callback<'static, Sample>,
info: &SubscriberInfo,
) -> ZResult<Arc<SubscriberState>> {
Expand All @@ -1018,6 +1028,7 @@ impl Session {
key_expr: key_expr.clone().into_owned(),
scope: scope.clone().map(|e| e.into_owned()),
origin,
background: AtomicBool::new(background),
callback,
};

Expand Down Expand Up @@ -1866,6 +1877,7 @@ impl<'s> SessionDeclarations<'s, 'static> for Arc<Session> {
key_expr: key_expr.try_into().map_err(Into::into),
reliability: Reliability::DEFAULT,
origin: Locality::default(),
background: false,
handler: DefaultHandler::default(),
}
}
Expand Down
48 changes: 44 additions & 4 deletions zenoh/src/api/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use std::{
fmt,
future::{IntoFuture, Ready},
ops::{Deref, DerefMut},
sync::Arc,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};

use zenoh_core::{Resolvable, Wait};
Expand All @@ -39,6 +42,7 @@ pub(crate) struct SubscriberState {
pub(crate) key_expr: KeyExpr<'static>,
pub(crate) scope: Option<KeyExpr<'static>>,
pub(crate) origin: Locality,
pub(crate) background: AtomicBool,
pub(crate) callback: Callback<'static, Sample>,
}

Expand Down Expand Up @@ -160,7 +164,7 @@ impl IntoFuture for SubscriberUndeclaration<'_> {

impl Drop for SubscriberInner<'_> {
fn drop(&mut self) {
if self.alive {
if self.alive && !self.state.background.load(Ordering::Relaxed) {
let _ = self.session.unsubscribe(self.state.id);
}
}
Expand Down Expand Up @@ -205,6 +209,11 @@ pub struct SubscriberBuilder<'a, 'b, Handler> {
#[cfg(not(feature = "unstable"))]
pub(crate) origin: Locality,

#[cfg(feature = "unstable")]
pub background: bool,
#[cfg(not(feature = "unstable"))]
pub(crate) background: bool,

#[cfg(feature = "unstable")]
pub handler: Handler,
#[cfg(not(feature = "unstable"))]
Expand Down Expand Up @@ -237,16 +246,16 @@ impl<'a, 'b> SubscriberBuilder<'a, 'b, DefaultHandler> {
session,
key_expr,
reliability,

origin,
background,
handler: _,
} = self;
SubscriberBuilder {
session,
key_expr,
reliability,

origin,
background,
handler: callback,
}
}
Expand Down Expand Up @@ -311,13 +320,15 @@ impl<'a, 'b> SubscriberBuilder<'a, 'b, DefaultHandler> {
key_expr,
reliability,
origin,
background,
handler: _,
} = self;
SubscriberBuilder {
session,
key_expr,
reliability,
origin,
background,
handler,
}
}
Expand Down Expand Up @@ -353,6 +364,15 @@ impl<'a, 'b, Handler> SubscriberBuilder<'a, 'b, Handler> {
self.origin = origin;
self
}

/// Run the subscriber in background, binding its lifetime to the session one.
///
/// Background subscribers are undeclared when the session is closed, not when they are dropped.
#[inline]
pub fn background(mut self, background: bool) -> Self {
self.background = background;
self
}
}

// Push mode
Expand All @@ -378,6 +398,7 @@ where
&key_expr,
&None,
self.origin,
self.background,
callback,
&SubscriberInfo {
reliability: self.reliability,
Expand Down Expand Up @@ -505,6 +526,25 @@ impl<'a, Handler> Subscriber<'a, Handler> {
pub fn undeclare(self) -> SubscriberUndeclaration<'a> {
self.subscriber.undeclare()
}

/// Returns whether the subscriber run in background.
///
/// Background subscribers have their lifetime bound to the session one. They are undeclared when the session is closed, not when they are dropped.
#[inline]
pub fn background(self) -> bool {
self.subscriber.state.background.load(Ordering::Relaxed)
}

/// Set whether the subscriber run in background.
///
/// Background subscribers have their lifetime bound to the session one. They are undeclared when the session is closed, not when they are dropped.
#[inline]
pub fn set_background(self, background: bool) {
self.subscriber
.state
.background
.store(background, Ordering::Relaxed);
}
}

impl<'a, T> Undeclarable<(), SubscriberUndeclaration<'a>> for Subscriber<'a, T> {
Expand Down