Skip to content

Commit

Permalink
Merge pull request #1403 from ZettaScaleLabs/arc_session11
Browse files Browse the repository at this point in the history
refactor: use builder method for undeclaration on drop
  • Loading branch information
Mallets authored Sep 12, 2024
2 parents 88b05ee + 9ff1756 commit e8fcd62
Show file tree
Hide file tree
Showing 10 changed files with 233 additions and 71 deletions.
92 changes: 75 additions & 17 deletions zenoh-ext/src/querying_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,17 @@ pub struct QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler> {
pub(crate) query_accept_replies: ReplyKeyExpr,
pub(crate) query_timeout: Duration,
pub(crate) handler: Handler,
pub(crate) undeclare_on_drop: bool,
}

impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandler> {
/// Add callback to [`FetchingSubscriber`].
///
/// Subscriber will not be undeclared when dropped, with the callback running
/// in background until the session is closed.
///
/// It is in fact just a convenient shortcut for
/// `.with(my_callback).undeclare_on_drop(false)`.
#[inline]
pub fn callback<Callback>(
self,
Expand All @@ -61,14 +68,17 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle
where
Callback: Fn(Sample) + Send + Sync + 'static,
{
self.with(callback)
self.with(callback).undeclare_on_drop(false)
}

/// Add callback to [`FetchingSubscriber`].
///
/// Using this guarantees that your callback will never be called concurrently.
/// If your callback is also accepted by the [`callback`](QueryingSubscriberBuilder::callback)
/// method, we suggest you use it instead of `callback_mut`
/// method, we suggest you use it instead of `callback_mut`.
///
/// Subscriber will not be undeclared when dropped, with the callback running
/// in background until the session is closed.
#[inline]
pub fn callback_mut<CallbackMut>(
self,
Expand Down Expand Up @@ -100,6 +110,7 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle
query_accept_replies,
query_timeout,
handler: _,
undeclare_on_drop,
} = self;
QueryingSubscriberBuilder {
session,
Expand All @@ -112,11 +123,12 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle
query_accept_replies,
query_timeout,
handler,
undeclare_on_drop,
}
}
}

impl<'a, 'b, Handler> QueryingSubscriberBuilder<'a, 'b, crate::UserSpace, Handler> {
impl<'b, Handler> QueryingSubscriberBuilder<'_, 'b, crate::UserSpace, Handler> {
/// Change the subscription reliability.
#[cfg(feature = "unstable")]
#[deprecated(
Expand Down Expand Up @@ -195,16 +207,28 @@ impl<'a, 'b, Handler> QueryingSubscriberBuilder<'a, 'b, crate::UserSpace, Handle
}
}

impl<'a, 'b, KeySpace, Handler> QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler> {
impl<KeySpace, Handler> QueryingSubscriberBuilder<'_, '_, KeySpace, Handler> {
/// Change the timeout to be used for queries.
#[inline]
pub fn query_timeout(mut self, query_timeout: Duration) -> Self {
self.query_timeout = query_timeout;
self
}

/// Set whether the subscriber will be undeclared when dropped.
///
/// The method is usually used in combination with a callback like in
/// [`callback`](Self::callback) method, or a channel sender.
/// Be careful when using it, as subscribers not undeclared will consume
/// resources until the session is closed.
#[inline]
pub fn undeclare_on_drop(mut self, undeclare_on_drop: bool) -> Self {
self.undeclare_on_drop = undeclare_on_drop;
self
}
}

impl<'a, KeySpace, Handler> Resolvable for QueryingSubscriberBuilder<'a, '_, KeySpace, Handler>
impl<KeySpace, Handler> Resolvable for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler>
where
Handler: IntoHandler<'static, Sample>,
Handler::Handler: Send,
Expand Down Expand Up @@ -254,13 +278,14 @@ where
.wait(),
},
handler: self.handler,
undeclare_on_drop: self.undeclare_on_drop,
phantom: std::marker::PhantomData,
}
.wait()
}
}

impl<'a, KeySpace, Handler> IntoFuture for QueryingSubscriberBuilder<'a, '_, KeySpace, Handler>
impl<KeySpace, Handler> IntoFuture for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler>
where
KeySpace: Into<crate::KeySpace> + Clone,
Handler: IntoHandler<'static, Sample> + Send,
Expand Down Expand Up @@ -352,6 +377,7 @@ pub struct FetchingSubscriberBuilder<
pub(crate) origin: Locality,
pub(crate) fetch: Fetch,
pub(crate) handler: Handler,
pub(crate) undeclare_on_drop: bool,
pub(crate) phantom: std::marker::PhantomData<TryIntoSample>,
}

Expand All @@ -376,6 +402,7 @@ where
origin: self.origin,
fetch: self.fetch,
handler: self.handler,
undeclare_on_drop: self.undeclare_on_drop,
phantom: std::marker::PhantomData,
}
}
Expand All @@ -392,6 +419,12 @@ where
TryIntoSample: ExtractSample,
{
/// Add callback to [`FetchingSubscriber`].
///
/// Subscriber will not be undeclared when dropped, with the callback running
/// in background until the session is closed.
///
/// It is in fact just a convenient shortcut for
/// `.with(my_callback).undeclare_on_drop(false)`.
#[inline]
pub fn callback<Callback>(
self,
Expand All @@ -400,14 +433,17 @@ where
where
Callback: Fn(Sample) + Send + Sync + 'static,
{
self.with(callback)
self.with(callback).undeclare_on_drop(false)
}

/// Add callback to [`FetchingSubscriber`].
///
/// Using this guarantees that your callback will never be called concurrently.
/// If your callback is also accepted by the [`callback`](FetchingSubscriberBuilder::callback)
/// method, we suggest you use it instead of `callback_mut`
/// method, we suggest you use it instead of `callback_mut`.
///
/// Subscriber will not be undeclared when dropped, with the callback running
/// in background until the session is closed.
#[inline]
pub fn callback_mut<CallbackMut>(
self,
Expand Down Expand Up @@ -442,6 +478,7 @@ where
origin,
fetch,
handler: _,
undeclare_on_drop,
phantom,
} = self;
FetchingSubscriberBuilder {
Expand All @@ -451,18 +488,17 @@ where
origin,
fetch,
handler,
undeclare_on_drop,
phantom,
}
}
}

impl<
'a,
'b,
Handler,
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
TryIntoSample,
> FetchingSubscriberBuilder<'a, 'b, crate::UserSpace, Handler, Fetch, TryIntoSample>
> FetchingSubscriberBuilder<'_, '_, crate::UserSpace, Handler, Fetch, TryIntoSample>
where
TryIntoSample: ExtractSample,
{
Expand Down Expand Up @@ -510,12 +546,33 @@ where
}

impl<
'a,
KeySpace,
Handler,
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
TryIntoSample,
> Resolvable for FetchingSubscriberBuilder<'a, '_, KeySpace, Handler, Fetch, TryIntoSample>
> FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample>
where
TryIntoSample: ExtractSample,
{
/// Set whether the subscriber will be undeclared when dropped.
///
/// The method is usually used in combination with a callback like in
/// [`callback`](Self::callback) method, or a channel sender.
/// Be careful when using it, as subscribers not undeclared will consume
/// resources until the session is closed.
#[inline]
pub fn undeclare_on_drop(mut self, undeclare_on_drop: bool) -> Self {
self.undeclare_on_drop = undeclare_on_drop;
self
}
}

impl<
KeySpace,
Handler,
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
TryIntoSample,
> Resolvable for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample>
where
Handler: IntoHandler<'static, Sample>,
Handler::Handler: Send,
Expand All @@ -542,12 +599,11 @@ where
}

impl<
'a,
KeySpace,
Handler,
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
TryIntoSample,
> IntoFuture for FetchingSubscriberBuilder<'a, '_, KeySpace, Handler, Fetch, TryIntoSample>
> IntoFuture for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample>
where
KeySpace: Into<crate::KeySpace>,
Handler: IntoHandler<'static, Sample> + Send,
Expand Down Expand Up @@ -671,14 +727,16 @@ impl<Handler> FetchingSubscriber<Handler> {
crate::KeySpace::User => conf
.session
.declare_subscriber(&key_expr)
.callback(sub_callback)
.with(sub_callback)
.undeclare_on_drop(conf.undeclare_on_drop)
.allowed_origin(conf.origin)
.wait()?,
crate::KeySpace::Liveliness => conf
.session
.liveliness()
.declare_subscriber(&key_expr)
.callback(sub_callback)
.with(sub_callback)
.undeclare_on_drop(conf.undeclare_on_drop)
.wait()?,
};

Expand Down
4 changes: 4 additions & 0 deletions zenoh-ext/src/subscriber_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> for SubscriberBuilde
origin: self.origin,
fetch,
handler: self.handler,
undeclare_on_drop: true,
phantom: std::marker::PhantomData,
}
}
Expand Down Expand Up @@ -219,6 +220,7 @@ impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> for SubscriberBuilde
query_consolidation: QueryConsolidation::from(zenoh::query::ConsolidationMode::None),
query_accept_replies: ReplyKeyExpr::default(),
query_timeout: Duration::from_secs(10),
undeclare_on_drop: true,
handler: self.handler,
}
}
Expand Down Expand Up @@ -282,6 +284,7 @@ impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler>
origin: Locality::default(),
fetch,
handler: self.handler,
undeclare_on_drop: true,
phantom: std::marker::PhantomData,
}
}
Expand Down Expand Up @@ -327,6 +330,7 @@ impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler>
query_consolidation: QueryConsolidation::DEFAULT,
query_accept_replies: ReplyKeyExpr::MatchingQuery,
query_timeout: Duration::from_secs(10),
undeclare_on_drop: true,
handler: self.handler,
}
}
Expand Down
23 changes: 19 additions & 4 deletions zenoh/src/api/handlers/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,30 @@ where
{
type Handler = ();

const BACKGROUND: bool = true;

fn into_handler(self) -> (Callback<'a, T>, Self::Handler) {
(Dyn::from(self), ())
}
}

impl<'a, T, F, H> IntoHandler<'a, T> for (F, H)
where
F: Fn(T) + Send + Sync + 'a,
{
type Handler = H;

fn into_handler(self) -> (Callback<'a, T>, Self::Handler) {
(Dyn::from(self.0), self.1)
}
}

impl<'a, T, H> IntoHandler<'a, T> for (Callback<'static, T>, H) {
type Handler = H;

fn into_handler(self) -> (Callback<'a, T>, Self::Handler) {
self
}
}

impl<T: Send + 'static> IntoHandler<'static, T> for (flume::Sender<T>, flume::Receiver<T>) {
type Handler = flume::Receiver<T>;

Expand Down Expand Up @@ -87,8 +104,6 @@ where
{
type Handler = ();

const BACKGROUND: bool = true;

fn into_handler(self) -> (Callback<'a, Event>, Self::Handler) {
(Dyn::from(move |evt| (self.callback)(evt)), ())
}
Expand Down
6 changes: 0 additions & 6 deletions zenoh/src/api/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ pub type Dyn<T> = std::sync::Arc<T>;
pub trait IntoHandler<'a, T> {
type Handler;

/// If it makes sense to still run the callback in background
/// when the handler having been dropped.
/// This boolean may be used to determine if an entity declared
/// with the callback should be undeclared on drop.
const BACKGROUND: bool = false;

fn into_handler(self) -> (Callback<'a, T>, Self::Handler);
}

Expand Down
Loading

0 comments on commit e8fcd62

Please sign in to comment.