diff --git a/zenoh-ext/src/lib.rs b/zenoh-ext/src/lib.rs index 659afa006d..2a73b84172 100644 --- a/zenoh-ext/src/lib.rs +++ b/zenoh-ext/src/lib.rs @@ -53,11 +53,13 @@ impl From for KeySpace { } pub trait ExtractSample { - fn extract(self) -> ZResult; + fn extract(&self) -> ZResult; } impl ExtractSample for Reply { - fn extract(self) -> ZResult { - self.into_result().map_err(|e| zerror!("{:?}", e).into()) + fn extract(&self) -> ZResult { + self.result() + .cloned() + .map_err(|e| zerror!("{:?}", e).into()) } } diff --git a/zenoh-ext/src/querying_subscriber.rs b/zenoh-ext/src/querying_subscriber.rs index 95e1b4bb9e..72d92041f9 100644 --- a/zenoh-ext/src/querying_subscriber.rs +++ b/zenoh-ext/src/querying_subscriber.rs @@ -21,7 +21,7 @@ use std::{ }; use zenoh::{ - handlers::{locked, DefaultHandler, IntoHandler}, + handlers::{locked, Callback, DefaultHandler, IntoHandler}, internal::zlock, key_expr::KeyExpr, prelude::Wait, @@ -54,12 +54,12 @@ pub struct QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler> { impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandler> { /// Add callback to [`FetchingSubscriber`]. #[inline] - pub fn callback( + pub fn callback( self, - callback: Callback, - ) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Callback> + callback: F, + ) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Callback> where - Callback: Fn(Sample) + Send + Sync + 'static, + F: Fn(&Sample) + Send + Sync + 'static, { let QueryingSubscriberBuilder { session, @@ -85,7 +85,7 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle query_consolidation, query_accept_replies, query_timeout, - handler: callback, + handler: Callback::new(Arc::new(callback)), } } @@ -95,12 +95,12 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle /// If your callback is also accepted by the [`callback`](QueryingSubscriberBuilder::callback) /// method, we suggest you use it instead of `callback_mut` #[inline] - pub fn callback_mut( + pub fn callback_mut( self, - callback: CallbackMut, - ) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, impl Fn(Sample) + Send + Sync + 'static> + callback: F, + ) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Callback> where - CallbackMut: FnMut(Sample) + Send + Sync + 'static, + F: FnMut(&Sample) + Send + Sync + 'static, { self.callback(locked(callback)) } @@ -357,7 +357,7 @@ pub struct FetchingSubscriberBuilder< 'b, KeySpace, Handler, - Fetch: FnOnce(Box) -> ZResult<()>, + Fetch: FnOnce(Box) -> ZResult<()>, TryIntoSample, > where TryIntoSample: ExtractSample, @@ -377,7 +377,7 @@ impl< 'b, KeySpace, Handler, - Fetch: FnOnce(Box) -> ZResult<()>, + Fetch: FnOnce(Box) -> ZResult<()>, TryIntoSample, > FetchingSubscriberBuilder<'a, 'b, KeySpace, Handler, Fetch, TryIntoSample> where @@ -403,7 +403,7 @@ impl< 'a, 'b, KeySpace, - Fetch: FnOnce(Box) -> ZResult<()>, + Fetch: FnOnce(Box) -> ZResult<()>, TryIntoSample, > FetchingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandler, Fetch, TryIntoSample> where @@ -411,12 +411,12 @@ where { /// Add callback to [`FetchingSubscriber`]. #[inline] - pub fn callback( + pub fn callback( self, - callback: Callback, - ) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Callback, Fetch, TryIntoSample> + callback: F, + ) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Callback, Fetch, TryIntoSample> where - Callback: Fn(Sample) + Send + Sync + 'static, + F: Fn(&Sample) + Send + Sync + 'static, { let FetchingSubscriberBuilder { session, @@ -435,7 +435,7 @@ where reliability, origin, fetch, - handler: callback, + handler: Callback::new(Arc::new(callback)), phantom, } } @@ -446,19 +446,12 @@ where /// If your callback is also accepted by the [`callback`](FetchingSubscriberBuilder::callback) /// method, we suggest you use it instead of `callback_mut` #[inline] - pub fn callback_mut( + pub fn callback_mut( self, - callback: CallbackMut, - ) -> FetchingSubscriberBuilder< - 'a, - 'b, - KeySpace, - impl Fn(Sample) + Send + Sync + 'static, - Fetch, - TryIntoSample, - > + callback: F, + ) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Callback, Fetch, TryIntoSample> where - CallbackMut: FnMut(Sample) + Send + Sync + 'static, + F: FnMut(&Sample) + Send + Sync + 'static, { self.callback(locked(callback)) } @@ -499,7 +492,7 @@ impl< 'a, 'b, Handler, - Fetch: FnOnce(Box) -> ZResult<()>, + Fetch: FnOnce(Box) -> ZResult<()>, TryIntoSample, > FetchingSubscriberBuilder<'a, 'b, crate::UserSpace, Handler, Fetch, TryIntoSample> where @@ -540,7 +533,7 @@ impl< 'a, KeySpace, Handler, - Fetch: FnOnce(Box) -> ZResult<()>, + Fetch: FnOnce(Box) -> ZResult<()>, TryIntoSample, > Resolvable for FetchingSubscriberBuilder<'a, '_, KeySpace, Handler, Fetch, TryIntoSample> where @@ -554,7 +547,7 @@ where impl< KeySpace, Handler, - Fetch: FnOnce(Box) -> ZResult<()> + Send + Sync, + Fetch: FnOnce(Box) -> ZResult<()> + Send + Sync, TryIntoSample, > Wait for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample> where @@ -572,7 +565,7 @@ impl< 'a, KeySpace, Handler, - Fetch: FnOnce(Box) -> ZResult<()> + Send + Sync, + Fetch: FnOnce(Box) -> ZResult<()> + Send + Sync, TryIntoSample, > IntoFuture for FetchingSubscriberBuilder<'a, '_, KeySpace, Handler, Fetch, TryIntoSample> where @@ -622,7 +615,7 @@ where /// ``` pub struct FetchingSubscriber<'a, Handler> { subscriber: Subscriber<'a, ()>, - callback: Arc, + callback: Callback, state: Arc>, handler: Handler, } @@ -644,7 +637,7 @@ impl<'a, Handler> FetchingSubscriber<'a, Handler> { fn new< KeySpace, InputHandler, - Fetch: FnOnce(Box) -> ZResult<()> + Send + Sync, + Fetch: FnOnce(Box) -> ZResult<()> + Send + Sync, TryIntoSample, >( conf: FetchingSubscriberBuilder<'a, 'a, KeySpace, InputHandler, Fetch, TryIntoSample>, @@ -665,10 +658,10 @@ impl<'a, Handler> FetchingSubscriber<'a, Handler> { let sub_callback = { let state = state.clone(); let callback = callback.clone(); - move |s| { + move |s: &Sample| { let state = &mut zlock!(state); if state.pending_fetches == 0 { - callback(s); + callback.call(s); } else { tracing::trace!( "Sample received while fetch in progress: push it to merge_queue" @@ -683,7 +676,7 @@ impl<'a, Handler> FetchingSubscriber<'a, Handler> { .unwrap_or(Timestamp::new(now, session_id.into())); state .merge_queue - .push(SampleBuilder::from(s).timestamp(timestamp).into()); + .push(SampleBuilder::from(s.clone()).timestamp(timestamp).into()); } } }; @@ -772,7 +765,7 @@ impl<'a, Handler> FetchingSubscriber<'a, Handler> { /// ``` #[inline] pub fn fetch< - Fetch: FnOnce(Box) -> ZResult<()> + Send + Sync, + Fetch: FnOnce(Box) -> ZResult<()> + Send + Sync, TryIntoSample, >( &self, @@ -792,7 +785,7 @@ impl<'a, Handler> FetchingSubscriber<'a, Handler> { struct RepliesHandler { state: Arc>, - callback: Arc, + callback: Callback, } impl Drop for RepliesHandler { @@ -809,7 +802,7 @@ impl Drop for RepliesHandler { state.merge_queue.len() ); for s in state.merge_queue.drain() { - (self.callback)(s); + self.callback.call_by_value(s); } } } @@ -849,7 +842,7 @@ impl Drop for RepliesHandler { /// ``` #[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] pub struct FetchBuilder< - Fetch: FnOnce(Box) -> ZResult<()>, + Fetch: FnOnce(Box) -> ZResult<()>, TryIntoSample, > where TryIntoSample: ExtractSample, @@ -857,10 +850,10 @@ pub struct FetchBuilder< fetch: Fetch, phantom: std::marker::PhantomData, state: Arc>, - callback: Arc, + callback: Callback, } -impl) -> ZResult<()>, TryIntoSample> +impl) -> ZResult<()>, TryIntoSample> Resolvable for FetchBuilder where TryIntoSample: ExtractSample, @@ -868,7 +861,7 @@ where type To = ZResult<()>; } -impl) -> ZResult<()>, TryIntoSample> Wait +impl) -> ZResult<()>, TryIntoSample> Wait for FetchBuilder where TryIntoSample: ExtractSample, @@ -879,7 +872,7 @@ where } } -impl) -> ZResult<()>, TryIntoSample> +impl) -> ZResult<()>, TryIntoSample> IntoFuture for FetchBuilder where TryIntoSample: ExtractSample, @@ -892,17 +885,14 @@ where } } -fn register_handler( - state: Arc>, - callback: Arc, -) -> RepliesHandler { +fn register_handler(state: Arc>, callback: Callback) -> RepliesHandler { zlock!(state).pending_fetches += 1; // pending fetches will be decremented in RepliesHandler drop() RepliesHandler { state, callback } } fn run_fetch< - Fetch: FnOnce(Box) -> ZResult<()>, + Fetch: FnOnce(Box) -> ZResult<()>, TryIntoSample, >( fetch: Fetch, @@ -912,7 +902,7 @@ where TryIntoSample: ExtractSample, { tracing::debug!("Fetch data for FetchingSubscriber"); - (fetch)(Box::new(move |s: TryIntoSample| match s.extract() { + (fetch)(Box::new(move |s: &TryIntoSample| match s.extract() { Ok(s) => { let mut state = zlock!(handler.state); tracing::trace!("Fetched sample received: push it to merge_queue"); diff --git a/zenoh-ext/src/subscriber_ext.rs b/zenoh-ext/src/subscriber_ext.rs index a7356f86dc..bc9435d343 100644 --- a/zenoh-ext/src/subscriber_ext.rs +++ b/zenoh-ext/src/subscriber_ext.rs @@ -80,7 +80,7 @@ pub trait SubscriberBuilderExt<'a, 'b, Handler> { /// # } /// ``` fn fetching< - Fetch: FnOnce(Box) -> ZResult<()>, + Fetch: FnOnce(Box) -> ZResult<()>, TryIntoSample, >( self, @@ -158,7 +158,7 @@ impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> for SubscriberBuilde /// # } /// ``` fn fetching< - Fetch: FnOnce(Box) -> ZResult<()>, + Fetch: FnOnce(Box) -> ZResult<()>, TryIntoSample, >( self, @@ -270,7 +270,7 @@ impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> /// # } /// ``` fn fetching< - Fetch: FnOnce(Box) -> ZResult<()>, + Fetch: FnOnce(Box) -> ZResult<()>, TryIntoSample, >( self, diff --git a/zenoh/src/api/handlers/callback.rs b/zenoh/src/api/handlers/callback.rs index 3e2d21f60e..5f99575091 100644 --- a/zenoh/src/api/handlers/callback.rs +++ b/zenoh/src/api/handlers/callback.rs @@ -20,7 +20,7 @@ use super::{IntoHandler, RingChannelSender}; /// A function that can transform a [`FnMut`]`(T)` to /// a [`Fn`]`(T)` with the help of a [`Mutex`](std::sync::Mutex). -pub fn locked(fnmut: impl FnMut(T)) -> impl Fn(T) { +pub fn locked(fnmut: impl FnMut(&T)) -> impl Fn(&T) { let lock = std::sync::Mutex::new(fnmut); move |x| zlock!(lock)(x) } @@ -72,8 +72,9 @@ impl Callback { } } + #[cfg(feature = "internal")] #[inline] - pub(crate) fn call_by_value(&self, arg: T) { + pub fn call_by_value(&self, arg: T) { match &self.0 { CallbackInner::Dyn(cb) => cb(&arg), CallbackInner::Flume(tx) => { diff --git a/zenoh/src/api/liveliness.rs b/zenoh/src/api/liveliness.rs index fb62cdcab6..1e401636ac 100644 --- a/zenoh/src/api/liveliness.rs +++ b/zenoh/src/api/liveliness.rs @@ -31,6 +31,7 @@ use super::{ subscriber::{Subscriber, SubscriberInner}, Id, }; +use crate::handlers::Callback; /// A structure with functions to declare a /// [`LivelinessToken`](LivelinessToken), query @@ -464,12 +465,9 @@ impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> { /// ``` #[inline] #[zenoh_macros::unstable] - pub fn callback( - self, - callback: Callback, - ) -> LivelinessSubscriberBuilder<'a, 'b, Callback> + pub fn callback(self, callback: F) -> LivelinessSubscriberBuilder<'a, 'b, Callback> where - Callback: Fn(Sample) + Send + Sync + 'static, + F: Fn(&Sample) + Send + Sync + 'static, { let LivelinessSubscriberBuilder { session, @@ -480,7 +478,7 @@ impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> { LivelinessSubscriberBuilder { session, key_expr, - handler: callback, + handler: Callback::new(Arc::new(callback)), history, } } @@ -508,12 +506,12 @@ impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> { /// ``` #[inline] #[zenoh_macros::unstable] - pub fn callback_mut( + pub fn callback_mut( self, - callback: CallbackMut, - ) -> LivelinessSubscriberBuilder<'a, 'b, impl Fn(Sample) + Send + Sync + 'static> + callback: F, + ) -> LivelinessSubscriberBuilder<'a, 'b, Callback> where - CallbackMut: FnMut(Sample) + Send + Sync + 'static, + F: FnMut(&Sample) + Send + Sync + 'static, { self.callback(locked(callback)) } @@ -686,9 +684,9 @@ impl<'a, 'b> LivelinessGetBuilder<'a, 'b, DefaultHandler> { /// # } /// ``` #[inline] - pub fn callback(self, callback: Callback) -> LivelinessGetBuilder<'a, 'b, Callback> + pub fn callback(self, callback: F) -> LivelinessGetBuilder<'a, 'b, Callback> where - Callback: Fn(Reply) + Send + Sync + 'static, + F: Fn(&Reply) + Send + Sync + 'static, { let LivelinessGetBuilder { session, @@ -700,7 +698,7 @@ impl<'a, 'b> LivelinessGetBuilder<'a, 'b, DefaultHandler> { session, key_expr, timeout, - handler: callback, + handler: Callback::new(Arc::new(callback)), } } @@ -726,12 +724,9 @@ impl<'a, 'b> LivelinessGetBuilder<'a, 'b, DefaultHandler> { /// # } /// ``` #[inline] - pub fn callback_mut( - self, - callback: CallbackMut, - ) -> LivelinessGetBuilder<'a, 'b, impl Fn(Reply) + Send + Sync + 'static> + pub fn callback_mut(self, callback: F) -> LivelinessGetBuilder<'a, 'b, Callback> where - CallbackMut: FnMut(Reply) + Send + Sync + 'static, + F: FnMut(&Reply) + Send + Sync + 'static, { self.callback(locked(callback)) } diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index 05855a6ce4..d95cbc3376 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -824,9 +824,9 @@ impl<'a> MatchingListenerBuilder<'a, DefaultHandler> { /// ``` #[inline] #[zenoh_macros::unstable] - pub fn callback(self, callback: Callback) -> MatchingListenerBuilder<'a, Callback> + pub fn callback(self, callback: F) -> MatchingListenerBuilder<'a, Callback> where - Callback: Fn(MatchingStatus) + Send + Sync + 'static, + F: Fn(&MatchingStatus) + Send + Sync + 'static, { let MatchingListenerBuilder { publisher, @@ -834,7 +834,7 @@ impl<'a> MatchingListenerBuilder<'a, DefaultHandler> { } = self; MatchingListenerBuilder { publisher, - handler: callback, + handler: Callback::new(Arc::new(callback)), } } @@ -858,12 +858,12 @@ impl<'a> MatchingListenerBuilder<'a, DefaultHandler> { /// ``` #[inline] #[zenoh_macros::unstable] - pub fn callback_mut( + pub fn callback_mut( self, - callback: CallbackMut, - ) -> MatchingListenerBuilder<'a, impl Fn(MatchingStatus) + Send + Sync + 'static> + callback: F, + ) -> MatchingListenerBuilder<'a, Callback> where - CallbackMut: FnMut(MatchingStatus) + Send + Sync + 'static, + F: FnMut(&MatchingStatus) + Send + Sync + 'static, { self.callback(crate::api::handlers::locked(callback)) } diff --git a/zenoh/src/api/query.rs b/zenoh/src/api/query.rs index 0b9a0467c4..9100775b00 100644 --- a/zenoh/src/api/query.rs +++ b/zenoh/src/api/query.rs @@ -15,6 +15,7 @@ use std::{ collections::HashMap, future::{IntoFuture, Ready}, + sync::Arc, time::Duration, }; @@ -275,9 +276,9 @@ impl<'a, 'b> SessionGetBuilder<'a, 'b, DefaultHandler> { /// # } /// ``` #[inline] - pub fn callback(self, callback: Callback) -> SessionGetBuilder<'a, 'b, Callback> + pub fn callback(self, callback: F) -> SessionGetBuilder<'a, 'b, Callback> where - Callback: Fn(Reply) + Send + Sync + 'static, + F: Fn(&Reply) + Send + Sync + 'static, { let SessionGetBuilder { session, @@ -305,7 +306,7 @@ impl<'a, 'b> SessionGetBuilder<'a, 'b, DefaultHandler> { attachment, #[cfg(feature = "unstable")] source_info, - handler: callback, + handler: Callback::new(Arc::new(callback)), } } @@ -330,12 +331,9 @@ impl<'a, 'b> SessionGetBuilder<'a, 'b, DefaultHandler> { /// # } /// ``` #[inline] - pub fn callback_mut( - self, - callback: CallbackMut, - ) -> SessionGetBuilder<'a, 'b, impl Fn(Reply) + Send + Sync + 'static> + pub fn callback_mut(self, callback: F) -> SessionGetBuilder<'a, 'b, Callback> where - CallbackMut: FnMut(Reply) + Send + Sync + 'static, + F: FnMut(&Reply) + Send + Sync + 'static, { self.callback(locked(callback)) } diff --git a/zenoh/src/api/queryable.rs b/zenoh/src/api/queryable.rs index c669dacb9e..0fbc8ec530 100644 --- a/zenoh/src/api/queryable.rs +++ b/zenoh/src/api/queryable.rs @@ -666,9 +666,9 @@ impl<'a, 'b> QueryableBuilder<'a, 'b, DefaultHandler> { /// # } /// ``` #[inline] - pub fn callback(self, callback: Callback) -> QueryableBuilder<'a, 'b, Callback> + pub fn callback(self, callback: F) -> QueryableBuilder<'a, 'b, Callback> where - Callback: Fn(Query) + Send + Sync + 'static, + F: Fn(&Query) + Send + Sync + 'static, { let QueryableBuilder { session, @@ -682,7 +682,7 @@ impl<'a, 'b> QueryableBuilder<'a, 'b, DefaultHandler> { key_expr, complete, origin, - handler: callback, + handler: Callback::new(Arc::new(callback)), } } @@ -707,12 +707,9 @@ impl<'a, 'b> QueryableBuilder<'a, 'b, DefaultHandler> { /// # } /// ``` #[inline] - pub fn callback_mut( - self, - callback: CallbackMut, - ) -> QueryableBuilder<'a, 'b, impl Fn(Query) + Send + Sync + 'static> + pub fn callback_mut(self, callback: F) -> QueryableBuilder<'a, 'b, Callback> where - CallbackMut: FnMut(Query) + Send + Sync + 'static, + F: FnMut(&Query) + Send + Sync + 'static, { self.callback(locked(callback)) } diff --git a/zenoh/src/api/scouting.rs b/zenoh/src/api/scouting.rs index 8ca90daab5..a56daa6dbb 100644 --- a/zenoh/src/api/scouting.rs +++ b/zenoh/src/api/scouting.rs @@ -16,6 +16,7 @@ use std::{ future::{IntoFuture, Ready}, net::SocketAddr, ops::Deref, + sync::Arc, time::Duration, }; @@ -71,9 +72,9 @@ impl ScoutBuilder { /// # } /// ``` #[inline] - pub fn callback(self, callback: Callback) -> ScoutBuilder + pub fn callback(self, callback: F) -> ScoutBuilder> where - Callback: Fn(Hello) + Send + Sync + 'static, + F: Fn(&Hello) + Send + Sync + 'static, { let ScoutBuilder { what, @@ -83,7 +84,7 @@ impl ScoutBuilder { ScoutBuilder { what, config, - handler: callback, + handler: Callback::new(Arc::new(callback)), } } @@ -106,12 +107,9 @@ impl ScoutBuilder { /// # } /// ``` #[inline] - pub fn callback_mut( - self, - callback: CallbackMut, - ) -> ScoutBuilder + pub fn callback_mut(self, callback: F) -> ScoutBuilder> where - CallbackMut: FnMut(Hello) + Send + Sync + 'static, + F: FnMut(&Hello) + Send + Sync + 'static, { self.callback(locked(callback)) } diff --git a/zenoh/src/api/subscriber.rs b/zenoh/src/api/subscriber.rs index 84305c1dce..0a086baad5 100644 --- a/zenoh/src/api/subscriber.rs +++ b/zenoh/src/api/subscriber.rs @@ -232,9 +232,9 @@ impl<'a, 'b> SubscriberBuilder<'a, 'b, DefaultHandler> { /// # } /// ``` #[inline] - pub fn callback(self, callback: Callback) -> SubscriberBuilder<'a, 'b, Callback> + pub fn callback(self, callback: F) -> SubscriberBuilder<'a, 'b, Callback> where - Callback: Fn(Sample) + Send + Sync + 'static, + F: Fn(&Sample) + Send + Sync + 'static, { let SubscriberBuilder { session, @@ -250,7 +250,7 @@ impl<'a, 'b> SubscriberBuilder<'a, 'b, DefaultHandler> { #[cfg(feature = "unstable")] reliability, origin, - handler: callback, + handler: Callback::new(Arc::new(callback)), } } @@ -275,12 +275,9 @@ impl<'a, 'b> SubscriberBuilder<'a, 'b, DefaultHandler> { /// # } /// ``` #[inline] - pub fn callback_mut( - self, - callback: CallbackMut, - ) -> SubscriberBuilder<'a, 'b, impl Fn(Sample) + Send + Sync + 'static> + pub fn callback_mut(self, callback: F) -> SubscriberBuilder<'a, 'b, Callback> where - CallbackMut: FnMut(Sample) + Send + Sync + 'static, + F: FnMut(&Sample) + Send + Sync + 'static, { self.callback(locked(callback)) }