From 931ff84ee234dbfcf9f41afceae4dd3ad30bb22e Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Fri, 6 Sep 2024 10:55:56 +0200 Subject: [PATCH] feat!: make callback opaque Use an enum hidden to use channel senders directly. It should improve performances, and may allow later to use async sender methods. --- commons/zenoh-config/src/wrappers.rs | 1 + zenoh-ext/src/querying_subscriber.rs | 18 ++--- zenoh/src/api/admin.rs | 7 +- zenoh/src/api/handlers/callback.rs | 111 ++++++++++++++++++++------- zenoh/src/api/handlers/fifo.rs | 21 ++--- zenoh/src/api/handlers/mod.rs | 13 ++-- zenoh/src/api/handlers/ring.rs | 57 ++++++++------ zenoh/src/api/liveliness.rs | 16 ++-- zenoh/src/api/publisher.rs | 10 +-- zenoh/src/api/query.rs | 12 +-- zenoh/src/api/queryable.rs | 12 +-- zenoh/src/api/scouting.rs | 12 +-- zenoh/src/api/session.rs | 87 ++++++++++----------- zenoh/src/api/subscriber.rs | 10 +-- 14 files changed, 229 insertions(+), 158 deletions(-) diff --git a/commons/zenoh-config/src/wrappers.rs b/commons/zenoh-config/src/wrappers.rs index d04950f21e..22a7551b67 100644 --- a/commons/zenoh-config/src/wrappers.rs +++ b/commons/zenoh-config/src/wrappers.rs @@ -100,6 +100,7 @@ impl FromStr for ZenohId { } /// A zenoh Hello message. +#[derive(Clone, PartialEq, Eq)] #[repr(transparent)] pub struct Hello(HelloProto); diff --git a/zenoh-ext/src/querying_subscriber.rs b/zenoh-ext/src/querying_subscriber.rs index e3ab70c1c6..95e1b4bb9e 100644 --- a/zenoh-ext/src/querying_subscriber.rs +++ b/zenoh-ext/src/querying_subscriber.rs @@ -112,7 +112,7 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle handler: Handler, ) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler> where - Handler: IntoHandler<'static, Sample>, + Handler: IntoHandler, { let QueryingSubscriberBuilder { session, @@ -221,7 +221,7 @@ impl<'a, 'b, KeySpace, Handler> QueryingSubscriberBuilder<'a, 'b, KeySpace, Hand impl<'a, KeySpace, Handler> Resolvable for QueryingSubscriberBuilder<'a, '_, KeySpace, Handler> where - Handler: IntoHandler<'static, Sample>, + Handler: IntoHandler, Handler::Handler: Send, { type To = ZResult>; @@ -230,7 +230,7 @@ where impl Wait for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler> where KeySpace: Into + Clone, - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { fn wait(self) -> ::To { @@ -279,7 +279,7 @@ where impl<'a, KeySpace, Handler> IntoFuture for QueryingSubscriberBuilder<'a, '_, KeySpace, Handler> where KeySpace: Into + Clone, - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type Output = ::To; @@ -470,7 +470,7 @@ where handler: Handler, ) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Handler, Fetch, TryIntoSample> where - Handler: IntoHandler<'static, Sample>, + Handler: IntoHandler, { let FetchingSubscriberBuilder { session, @@ -544,7 +544,7 @@ impl< TryIntoSample, > Resolvable for FetchingSubscriberBuilder<'a, '_, KeySpace, Handler, Fetch, TryIntoSample> where - Handler: IntoHandler<'static, Sample>, + Handler: IntoHandler, Handler::Handler: Send, TryIntoSample: ExtractSample, { @@ -559,7 +559,7 @@ impl< > Wait for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample> where KeySpace: Into, - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, TryIntoSample: ExtractSample + Send + Sync, { @@ -577,7 +577,7 @@ impl< > IntoFuture for FetchingSubscriberBuilder<'a, '_, KeySpace, Handler, Fetch, TryIntoSample> where KeySpace: Into, - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, TryIntoSample: ExtractSample + Send + Sync, { @@ -651,7 +651,7 @@ impl<'a, Handler> FetchingSubscriber<'a, Handler> { ) -> ZResult where KeySpace: Into, - InputHandler: IntoHandler<'static, Sample, Handler = Handler> + Send, + InputHandler: IntoHandler + Send, TryIntoSample: ExtractSample + Send + Sync, { let session_id = conf.session.zid(); diff --git a/zenoh/src/api/admin.rs b/zenoh/src/api/admin.rs index 8c20a275c1..f4bd2401c1 100644 --- a/zenoh/src/api/admin.rs +++ b/zenoh/src/api/admin.rs @@ -35,6 +35,7 @@ use super::{ session::Session, subscriber::SubscriberKind, }; +use crate::handlers::Callback; lazy_static::lazy_static!( static ref KE_STARSTAR: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("**") }; @@ -54,15 +55,15 @@ pub(crate) fn init(session: &Session) { &admin_key, true, Locality::SessionLocal, - Arc::new({ + Callback::new(Arc::new({ let session = session.clone(); move |q| super::admin::on_admin_query(&session, q) - }), + })), ); } } -pub(crate) fn on_admin_query(session: &Session, query: Query) { +pub(crate) fn on_admin_query(session: &Session, query: &Query) { fn reply_peer(own_zid: &keyexpr, query: &Query, peer: TransportPeer) { let zid = peer.zid.to_string(); if let Ok(zid) = keyexpr::new(&zid) { diff --git a/zenoh/src/api/handlers/callback.rs b/zenoh/src/api/handlers/callback.rs index 4f49e7c41f..3e2d21f60e 100644 --- a/zenoh/src/api/handlers/callback.rs +++ b/zenoh/src/api/handlers/callback.rs @@ -13,7 +13,10 @@ // //! Callback handler trait. -use super::{Dyn, IntoHandler}; + +use std::sync::Arc; + +use super::{IntoHandler, RingChannelSender}; /// A function that can transform a [`FnMut`]`(T)` to /// a [`Fn`]`(T)` with the help of a [`Mutex`](std::sync::Mutex). @@ -22,32 +25,88 @@ pub fn locked(fnmut: impl FnMut(T)) -> impl Fn(T) { move |x| zlock!(lock)(x) } -/// An immutable callback function. -pub type Callback<'a, T> = Dyn; +enum CallbackInner { + Dyn(Arc), + Flume(flume::Sender), + Ring(RingChannelSender), +} -impl<'a, T, F> IntoHandler<'a, T> for F -where - F: Fn(T) + Send + Sync + 'a, -{ +pub struct Callback(CallbackInner); + +impl Clone for Callback { + fn clone(&self) -> Self { + Self(match &self.0 { + CallbackInner::Dyn(cb) => CallbackInner::Dyn(cb.clone()), + CallbackInner::Flume(tx) => CallbackInner::Flume(tx.clone()), + CallbackInner::Ring(tx) => CallbackInner::Ring(tx.clone()), + }) + } +} + +impl Callback { + pub fn new(cb: Arc) -> Self { + Self(CallbackInner::Dyn(cb)) + } + + pub(crate) fn new_flume(sender: flume::Sender) -> Self { + Self(CallbackInner::Flume(sender)) + } + + pub(crate) fn new_ring(sender: RingChannelSender) -> Self { + Self(CallbackInner::Ring(sender)) + } + + #[inline] + pub fn call(&self, arg: &T) + where + T: Clone, + { + match &self.0 { + CallbackInner::Dyn(cb) => cb(arg), + CallbackInner::Flume(tx) => { + if let Err(error) = tx.send(arg.clone()) { + tracing::error!(%error) + } + } + CallbackInner::Ring(tx) => tx.send(arg.clone()), + } + } + + #[inline] + pub(crate) fn call_by_value(&self, arg: T) { + match &self.0 { + CallbackInner::Dyn(cb) => cb(&arg), + CallbackInner::Flume(tx) => { + if let Err(error) = tx.send(arg) { + tracing::error!(%error) + } + } + CallbackInner::Ring(tx) => tx.send(arg), + } + } +} + +impl IntoHandler for Callback { type Handler = (); - fn into_handler(self) -> (Callback<'a, T>, Self::Handler) { - (Dyn::from(self), ()) + fn into_handler(self) -> (Callback, Self::Handler) { + (self, ()) } } -impl IntoHandler<'static, T> for (flume::Sender, flume::Receiver) { +impl IntoHandler for (flume::Sender, flume::Receiver) { type Handler = flume::Receiver; - fn into_handler(self) -> (Callback<'static, T>, Self::Handler) { + fn into_handler(self) -> (Callback, Self::Handler) { let (sender, receiver) = self; - ( - Dyn::new(move |t| { - if let Err(e) = sender.send(t) { - tracing::error!("{}", e) - } - }), - receiver, - ) + (Callback::new_flume(sender), receiver) + } +} + +impl IntoHandler for flume::Sender { + type Handler = (); + + fn into_handler(self) -> (Callback, Self::Handler) { + (Callback::new_flume(self), ()) } } @@ -60,15 +119,15 @@ impl IntoHandler<'static, T> for (flume::Sender, flume::Re /// - `callback` will never be called once `drop` has started. /// - `drop` will only be called **once**, and **after every** `callback` has ended. /// - The two previous guarantees imply that `call` and `drop` are never called concurrently. -pub struct CallbackDrop +pub struct CallbackDrop where DropFn: FnMut() + Send + Sync + 'static, { - pub callback: Callback, + pub callback: F, pub drop: DropFn, } -impl Drop for CallbackDrop +impl Drop for CallbackDrop where DropFn: FnMut() + Send + Sync + 'static, { @@ -77,14 +136,14 @@ where } } -impl<'a, OnEvent, Event, DropFn> IntoHandler<'a, Event> for CallbackDrop +impl IntoHandler for CallbackDrop where - OnEvent: Fn(Event) + Send + Sync + 'a, + F: Fn(&T) + Send + Sync + 'static, DropFn: FnMut() + Send + Sync + 'static, { type Handler = (); - fn into_handler(self) -> (Callback<'a, Event>, Self::Handler) { - (Dyn::from(move |evt| (self.callback)(evt)), ()) + fn into_handler(self) -> (Callback, Self::Handler) { + (Callback::new(Arc::new(move |evt| (self.callback)(evt))), ()) } } diff --git a/zenoh/src/api/handlers/fifo.rs b/zenoh/src/api/handlers/fifo.rs index f0ae1a5257..9436f34320 100644 --- a/zenoh/src/api/handlers/fifo.rs +++ b/zenoh/src/api/handlers/fifo.rs @@ -13,7 +13,10 @@ // //! Callback handler trait. -use super::{callback::Callback, Dyn, IntoHandler, API_DATA_RECEPTION_CHANNEL_SIZE}; + +use std::sync::Arc; + +use super::{callback::Callback, IntoHandler, API_DATA_RECEPTION_CHANNEL_SIZE}; /// The default handler in Zenoh is a FIFO queue. @@ -34,27 +37,27 @@ impl Default for FifoChannel { } } -impl IntoHandler<'static, T> for FifoChannel { +impl IntoHandler for FifoChannel { type Handler = flume::Receiver; - fn into_handler(self) -> (Callback<'static, T>, Self::Handler) { + fn into_handler(self) -> (Callback, Self::Handler) { flume::bounded(self.capacity).into_handler() } } -impl IntoHandler<'static, T> +impl IntoHandler for (std::sync::mpsc::SyncSender, std::sync::mpsc::Receiver) { type Handler = std::sync::mpsc::Receiver; - fn into_handler(self) -> (Callback<'static, T>, Self::Handler) { + fn into_handler(self) -> (Callback, Self::Handler) { let (sender, receiver) = self; ( - Dyn::new(move |t| { - if let Err(e) = sender.send(t) { - tracing::error!("{}", e) + Callback::new(Arc::new(move |t| { + if let Err(error) = sender.send(t.clone()) { + tracing::error!(%error) } - }), + })), receiver, ) } diff --git a/zenoh/src/api/handlers/mod.rs b/zenoh/src/api/handlers/mod.rs index 60ec658a4d..1d243b6fc2 100644 --- a/zenoh/src/api/handlers/mod.rs +++ b/zenoh/src/api/handlers/mod.rs @@ -23,19 +23,16 @@ pub use ring::*; use crate::api::session::API_DATA_RECEPTION_CHANNEL_SIZE; -/// An alias for `Arc`. -pub type Dyn = std::sync::Arc; - /// A type that can be converted into a [`Callback`]-Handler pair. /// /// When Zenoh functions accept types that implement these, it intends to use the [`Callback`] as just that, /// while granting you access to the handler through the returned value via [`std::ops::Deref`] and [`std::ops::DerefMut`]. /// /// Any closure that accepts `T` can be converted into a pair of itself and `()`. -pub trait IntoHandler<'a, T> { +pub trait IntoHandler { type Handler; - fn into_handler(self) -> (Callback<'a, T>, Self::Handler); + fn into_handler(self) -> (Callback, Self::Handler); } /// The default handler in Zenoh is a FIFO queue. @@ -43,10 +40,10 @@ pub trait IntoHandler<'a, T> { #[derive(Default)] pub struct DefaultHandler(FifoChannel); -impl IntoHandler<'static, T> for DefaultHandler { - type Handler = >::Handler; +impl IntoHandler for DefaultHandler { + type Handler = >::Handler; - fn into_handler(self) -> (Callback<'static, T>, Self::Handler) { + fn into_handler(self) -> (Callback, Self::Handler) { self.0.into_handler() } } diff --git a/zenoh/src/api/handlers/ring.rs b/zenoh/src/api/handlers/ring.rs index 7b058d1905..dbcdccf6db 100644 --- a/zenoh/src/api/handlers/ring.rs +++ b/zenoh/src/api/handlers/ring.rs @@ -21,7 +21,7 @@ use std::{ use zenoh_collections::RingBuffer; use zenoh_result::ZResult; -use super::{callback::Callback, Dyn, IntoHandler}; +use super::{callback::Callback, IntoHandler}; use crate::api::session::API_DATA_RECEPTION_CHANNEL_SIZE; /// A synchronous ring channel with a limited size that allows users to keep the last N data. @@ -44,7 +44,30 @@ impl Default for RingChannel { struct RingChannelInner { ring: std::sync::Mutex>, - not_empty: flume::Receiver<()>, + not_empty_tx: flume::Sender<()>, + not_empty_rx: flume::Receiver<()>, +} + +pub(crate) struct RingChannelSender(Arc>); + +impl Clone for RingChannelSender { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl RingChannelSender { + pub(crate) fn send(&self, t: T) { + match self.0.ring.lock() { + Ok(mut g) => { + // Eventually drop the oldest element. + g.push_force(t); + drop(g); + let _ = self.0.not_empty_tx.try_send(()); + } + Err(error) => tracing::error!(%error), + } + } } pub struct RingChannelHandler { @@ -63,7 +86,7 @@ impl RingChannelHandler { if let Some(t) = channel.ring.lock().map_err(|e| zerror!("{}", e))?.pull() { return Ok(t); } - channel.not_empty.recv().map_err(|e| zerror!("{}", e))?; + channel.not_empty_rx.recv().map_err(|e| zerror!("{}", e))?; } } @@ -80,7 +103,7 @@ impl RingChannelHandler { if let Some(t) = channel.ring.lock().map_err(|e| zerror!("{}", e))?.pull() { return Ok(Some(t)); } - match channel.not_empty.recv_deadline(deadline) { + match channel.not_empty_rx.recv_deadline(deadline) { Ok(()) => {} Err(flume::RecvTimeoutError::Timeout) => return Ok(None), Err(err) => bail!("{}", err), @@ -101,7 +124,7 @@ impl RingChannelHandler { if let Some(t) = channel.ring.lock().map_err(|e| zerror!("{}", e))?.pull() { return Ok(Some(t)); } - match channel.not_empty.recv_timeout(timeout) { + match channel.not_empty_rx.recv_timeout(timeout) { Ok(()) => {} Err(flume::RecvTimeoutError::Timeout) => return Ok(None), Err(err) => bail!("{}", err), @@ -121,7 +144,7 @@ impl RingChannelHandler { return Ok(t); } channel - .not_empty + .not_empty_rx .recv_async() .await .map_err(|e| zerror!("{}", e))?; @@ -140,29 +163,19 @@ impl RingChannelHandler { } } -impl IntoHandler<'static, T> for RingChannel { +impl IntoHandler for RingChannel { type Handler = RingChannelHandler; - fn into_handler(self) -> (Callback<'static, T>, Self::Handler) { - let (sender, receiver) = flume::bounded(1); + fn into_handler(self) -> (Callback, Self::Handler) { + let (not_empty_tx, not_empty_rx) = flume::bounded(1); let inner = Arc::new(RingChannelInner { ring: std::sync::Mutex::new(RingBuffer::new(self.capacity)), - not_empty: receiver, + not_empty_tx, + not_empty_rx, }); let receiver = RingChannelHandler { ring: Arc::downgrade(&inner), }; - ( - Dyn::new(move |t| match inner.ring.lock() { - Ok(mut g) => { - // Eventually drop the oldest element. - g.push_force(t); - drop(g); - let _ = sender.try_send(()); - } - Err(e) => tracing::error!("{}", e), - }), - receiver, - ) + (Callback::new_ring(RingChannelSender(inner)), receiver) } } diff --git a/zenoh/src/api/liveliness.rs b/zenoh/src/api/liveliness.rs index e9a12400fd..fb62cdcab6 100644 --- a/zenoh/src/api/liveliness.rs +++ b/zenoh/src/api/liveliness.rs @@ -542,7 +542,7 @@ impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> { #[zenoh_macros::unstable] pub fn with(self, handler: Handler) -> LivelinessSubscriberBuilder<'a, 'b, Handler> where - Handler: crate::handlers::IntoHandler<'static, Sample>, + Handler: crate::handlers::IntoHandler, { let LivelinessSubscriberBuilder { session, @@ -582,7 +582,7 @@ impl LivelinessSubscriberBuilder<'_, '_, Handler> { #[zenoh_macros::unstable] impl<'a, Handler> Resolvable for LivelinessSubscriberBuilder<'a, '_, Handler> where - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type To = ZResult>; @@ -591,7 +591,7 @@ where #[zenoh_macros::unstable] impl<'a, Handler> Wait for LivelinessSubscriberBuilder<'a, '_, Handler> where - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { #[zenoh_macros::unstable] @@ -623,7 +623,7 @@ where #[zenoh_macros::unstable] impl<'a, Handler> IntoFuture for LivelinessSubscriberBuilder<'a, '_, Handler> where - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type Output = ::To; @@ -759,7 +759,7 @@ impl<'a, 'b> LivelinessGetBuilder<'a, 'b, DefaultHandler> { #[inline] pub fn with(self, handler: Handler) -> LivelinessGetBuilder<'a, 'b, Handler> where - Handler: IntoHandler<'static, Reply>, + Handler: IntoHandler, { let LivelinessGetBuilder { session, @@ -787,7 +787,7 @@ impl<'a, 'b, Handler> LivelinessGetBuilder<'a, 'b, Handler> { impl Resolvable for LivelinessGetBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Reply> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type To = ZResult; @@ -795,7 +795,7 @@ where impl Wait for LivelinessGetBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Reply> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { fn wait(self) -> ::To { @@ -808,7 +808,7 @@ where impl IntoFuture for LivelinessGetBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Reply> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type Output = ::To; diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index 2b13ee32af..05855a6ce4 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -896,7 +896,7 @@ impl<'a> MatchingListenerBuilder<'a, DefaultHandler> { #[zenoh_macros::unstable] pub fn with(self, handler: Handler) -> MatchingListenerBuilder<'a, Handler> where - Handler: IntoHandler<'static, MatchingStatus>, + Handler: IntoHandler, { let MatchingListenerBuilder { publisher, @@ -909,7 +909,7 @@ impl<'a> MatchingListenerBuilder<'a, DefaultHandler> { #[zenoh_macros::unstable] impl<'a, Handler> Resolvable for MatchingListenerBuilder<'a, Handler> where - Handler: IntoHandler<'static, MatchingStatus> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type To = ZResult>; @@ -918,7 +918,7 @@ where #[zenoh_macros::unstable] impl<'a, Handler> Wait for MatchingListenerBuilder<'a, Handler> where - Handler: IntoHandler<'static, MatchingStatus> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { #[zenoh_macros::unstable] @@ -943,7 +943,7 @@ where #[zenoh_macros::unstable] impl<'a, Handler> IntoFuture for MatchingListenerBuilder<'a, Handler> where - Handler: IntoHandler<'static, MatchingStatus> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type Output = ::To; @@ -961,7 +961,7 @@ pub(crate) struct MatchingListenerState { pub(crate) current: std::sync::Mutex, pub(crate) key_expr: KeyExpr<'static>, pub(crate) destination: Locality, - pub(crate) callback: Callback<'static, MatchingStatus>, + pub(crate) callback: Callback, } #[zenoh_macros::unstable] diff --git a/zenoh/src/api/query.rs b/zenoh/src/api/query.rs index 1e5efee48f..0b9a0467c4 100644 --- a/zenoh/src/api/query.rs +++ b/zenoh/src/api/query.rs @@ -154,7 +154,7 @@ impl From for Result { #[cfg(feature = "unstable")] pub(crate) struct LivelinessQueryState { - pub(crate) callback: Callback<'static, Reply>, + pub(crate) callback: Callback, } pub(crate) struct QueryState { @@ -163,7 +163,7 @@ pub(crate) struct QueryState { pub(crate) parameters: Parameters<'static>, pub(crate) reception_mode: ConsolidationMode, pub(crate) replies: Option>, - pub(crate) callback: Callback<'static, Reply>, + pub(crate) callback: Callback, } impl QueryState { @@ -362,7 +362,7 @@ impl<'a, 'b> SessionGetBuilder<'a, 'b, DefaultHandler> { #[inline] pub fn with(self, handler: Handler) -> SessionGetBuilder<'a, 'b, Handler> where - Handler: IntoHandler<'static, Reply>, + Handler: IntoHandler, { let SessionGetBuilder { session, @@ -473,7 +473,7 @@ pub enum ReplyKeyExpr { impl Resolvable for SessionGetBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Reply> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type To = ZResult; @@ -481,7 +481,7 @@ where impl Wait for SessionGetBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Reply> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { fn wait(self) -> ::To { @@ -511,7 +511,7 @@ where impl IntoFuture for SessionGetBuilder<'_, '_, Handler> where - Handler: IntoHandler<'static, Reply> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type Output = ::To; diff --git a/zenoh/src/api/queryable.rs b/zenoh/src/api/queryable.rs index 88c375c11f..c669dacb9e 100644 --- a/zenoh/src/api/queryable.rs +++ b/zenoh/src/api/queryable.rs @@ -51,7 +51,7 @@ use super::{ value::Value, Id, }; -use crate::net::primitives::Primitives; +use crate::{handlers::Callback, net::primitives::Primitives}; pub(crate) struct QueryInner { pub(crate) key_expr: KeyExpr<'static>, @@ -525,7 +525,7 @@ pub(crate) struct QueryableState { pub(crate) key_expr: WireExpr<'static>, pub(crate) complete: bool, pub(crate) origin: Locality, - pub(crate) callback: Arc, + pub(crate) callback: Callback, } impl fmt::Debug for QueryableState { @@ -739,7 +739,7 @@ impl<'a, 'b> QueryableBuilder<'a, 'b, DefaultHandler> { #[inline] pub fn with(self, handler: Handler) -> QueryableBuilder<'a, 'b, Handler> where - Handler: IntoHandler<'static, Query>, + Handler: IntoHandler, { let QueryableBuilder { session, @@ -888,7 +888,7 @@ impl DerefMut for Queryable<'_, Handler> { impl<'a, Handler> Resolvable for QueryableBuilder<'a, '_, Handler> where - Handler: IntoHandler<'static, Query> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type To = ZResult>; @@ -896,7 +896,7 @@ where impl<'a, Handler> Wait for QueryableBuilder<'a, '_, Handler> where - Handler: IntoHandler<'static, Query> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { fn wait(self) -> ::To { @@ -922,7 +922,7 @@ where impl<'a, Handler> IntoFuture for QueryableBuilder<'a, '_, Handler> where - Handler: IntoHandler<'static, Query> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type Output = ::To; diff --git a/zenoh/src/api/scouting.rs b/zenoh/src/api/scouting.rs index 4f08530533..8ca90daab5 100644 --- a/zenoh/src/api/scouting.rs +++ b/zenoh/src/api/scouting.rs @@ -136,7 +136,7 @@ impl ScoutBuilder { #[inline] pub fn with(self, handler: Handler) -> ScoutBuilder where - Handler: IntoHandler<'static, Hello>, + Handler: IntoHandler, { let ScoutBuilder { what, @@ -153,7 +153,7 @@ impl ScoutBuilder { impl Resolvable for ScoutBuilder where - Handler: IntoHandler<'static, Hello> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type To = ZResult>; @@ -161,7 +161,7 @@ where impl Wait for ScoutBuilder where - Handler: IntoHandler<'static, Hello> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { fn wait(self) -> ::To { @@ -172,7 +172,7 @@ where impl IntoFuture for ScoutBuilder where - Handler: IntoHandler<'static, Hello> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type Output = ::To; @@ -295,7 +295,7 @@ impl Scout { fn _scout( what: WhatAmIMatcher, config: zenoh_config::Config, - callback: Callback<'static, Hello>, + callback: Callback, ) -> ZResult { tracing::trace!("scout({}, {})", what, &config); let default_addr = SocketAddr::from(zenoh_config::defaults::scouting::multicast::address); @@ -325,7 +325,7 @@ fn _scout( let scout = Runtime::scout(&sockets, what, &addr, move |hello| { let callback = callback.clone(); async move { - callback(hello.into()); + callback.call_by_value(hello.into()); Loop::Continue } }); diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 72ad150a11..6c4dabf188 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -1127,7 +1127,7 @@ impl Session { &self, key_expr: &KeyExpr, origin: Locality, - callback: Callback<'static, Sample>, + callback: Callback, info: &SubscriberInfo, ) -> ZResult> { let mut state = zwrite!(self.state); @@ -1331,7 +1331,7 @@ impl Session { key_expr: &WireExpr, complete: bool, origin: Locality, - callback: Callback<'static, Query>, + callback: Callback, ) -> ZResult> { let mut state = zwrite!(self.state); tracing::trace!("declare_queryable({:?})", key_expr); @@ -1429,7 +1429,7 @@ impl Session { key_expr: &KeyExpr, origin: Locality, history: bool, - callback: Callback<'static, Sample>, + callback: Callback, ) -> ZResult> { let mut state = zwrite!(self.state); trace!("declare_liveliness_subscriber({:?})", key_expr); @@ -1523,7 +1523,7 @@ impl Session { pub(crate) fn declare_matches_listener_inner( &self, publisher: &Publisher, - callback: Callback<'static, MatchingStatus>, + callback: Callback, ) -> ZResult> { let mut state = zwrite!(self.state); let id = self.runtime.next_id(); @@ -1545,7 +1545,9 @@ impl Session { .unwrap_or(true) { *current = true; - (listener_state.callback)(MatchingStatus { matching: true }); + listener_state + .callback + .call_by_value(MatchingStatus { matching: true }); } } Err(e) => tracing::error!("Error trying to acquire MathginListener lock: {}", e), @@ -1609,8 +1611,7 @@ impl Session { { if status.matching_subscribers() { *current = true; - let callback = msub.callback.clone(); - (callback)(status) + msub.callback.call_by_value(status); } } } @@ -1647,8 +1648,7 @@ impl Session { { if !status.matching_subscribers() { *current = false; - let callback = msub.callback.clone(); - (callback)(status) + msub.callback.call_by_value(status); } } } @@ -1732,26 +1732,21 @@ impl Session { } }; drop(state); + let mut sample = info.clone().into_sample( + unsafe { KeyExpr::from_str_unchecked("dummy") }, + payload.clone(), + #[cfg(feature = "unstable")] + reliability, + attachment.clone(), + ); let zenoh_collections::single_or_vec::IntoIter { drain, last } = callbacks.into_iter(); for (cb, key_expr) in drain { - let sample = info.clone().into_sample( - key_expr, - payload.clone(), - #[cfg(feature = "unstable")] - reliability, - attachment.clone(), - ); - cb(sample); + sample.key_expr = key_expr; + cb.call(&sample); } if let Some((cb, key_expr)) = last { - let sample = info.into_sample( - key_expr, - payload, - #[cfg(feature = "unstable")] - reliability, - attachment.clone(), - ); - cb(sample); + sample.key_expr = key_expr; + cb.call_by_value(sample); } } @@ -1768,7 +1763,7 @@ impl Session { value: Option, attachment: Option, #[cfg(feature = "unstable")] source: SourceInfo, - callback: Callback<'static, Reply>, + callback: Callback, ) -> ZResult<()> { tracing::trace!( "get({}, {:?}, {:?})", @@ -1804,10 +1799,10 @@ impl Session { tracing::debug!("Timeout on query {}! Send error and close.", qid); if query.reception_mode == ConsolidationMode::Latest { for (_, reply) in query.replies.unwrap().into_iter() { - (query.callback)(reply); + query.callback.call_by_value(reply); } } - (query.callback)(Reply { + query.callback.call_by_value(Reply { result: Err(Value::new("Timeout", Encoding::ZENOH_STRING).into()), #[cfg(feature = "unstable")] replier_id: Some(zid.into()), @@ -1890,7 +1885,7 @@ impl Session { &self, key_expr: &KeyExpr<'_>, timeout: Duration, - callback: Callback<'static, Reply>, + callback: Callback, ) -> ZResult<()> { tracing::trace!("liveliness.get({}, {:?})", key_expr, timeout); let mut state = zwrite!(self.state); @@ -1907,7 +1902,7 @@ impl Session { if let Some(query) = state.liveliness_queries.remove(&id) { std::mem::drop(state); tracing::debug!("Timeout on liveliness query {}! Send error and close.", id); - (query.callback)(Reply { + query.callback.call_by_value(Reply { result: Err(Value::new("Timeout", Encoding::ZENOH_STRING).into()), #[cfg(feature = "unstable")] replier_id: Some(zid.into()), @@ -1979,7 +1974,7 @@ impl Session { } ) .map(|(id, qable)| (*id, qable.callback.clone())) - .collect::)>>(); + .collect::)>>(); ( state.primitives.as_ref().unwrap().clone(), key_expr.into_owned(), @@ -2006,16 +2001,18 @@ impl Session { primitives }, }); - for (eid, callback) in queryables { - callback(Query { - inner: query_inner.clone(), - eid, - value: body.as_ref().map(|b| Value { - payload: b.payload.clone().into(), - encoding: b.encoding.clone().into(), - }), - attachment: attachment.clone(), - }); + let mut query = Query { + inner: query_inner, + eid: 0, + value: body.map(|b| Value { + payload: b.payload.into(), + encoding: b.encoding.into(), + }), + attachment, + }; + for (eid, cb) in queryables { + query.eid = eid; + cb.call(&query); } } } @@ -2283,7 +2280,7 @@ impl Primitives for Session { replier_id: None, }; - (query.callback)(reply); + query.callback.call_by_value(reply); } } else { state.remote_tokens.insert(m.id, key_expr.clone()); @@ -2460,7 +2457,7 @@ impl Primitives for Session { #[cfg(feature = "unstable")] replier_id: e.ext_sinfo.map(|info| info.id.zid), }; - callback(new_reply); + callback.call_by_value(new_reply); } None => { tracing::warn!("Received ReplyData for unknown Query: {}", msg.rid); @@ -2630,7 +2627,7 @@ impl Primitives for Session { }; std::mem::drop(state); if let Some((callback, new_reply)) = callback { - callback(new_reply); + callback.call_by_value(new_reply); } } None => { @@ -2652,7 +2649,7 @@ impl Primitives for Session { std::mem::drop(state); if query.reception_mode == ConsolidationMode::Latest { for (_, reply) in query.replies.unwrap().into_iter() { - (query.callback)(reply); + query.callback.call_by_value(reply); } } trace!("Close query {}", msg.rid); diff --git a/zenoh/src/api/subscriber.rs b/zenoh/src/api/subscriber.rs index 0e82a20331..84305c1dce 100644 --- a/zenoh/src/api/subscriber.rs +++ b/zenoh/src/api/subscriber.rs @@ -40,7 +40,7 @@ pub(crate) struct SubscriberState { pub(crate) remote_id: Id, pub(crate) key_expr: KeyExpr<'static>, pub(crate) origin: Locality, - pub(crate) callback: Callback<'static, Sample>, + pub(crate) callback: Callback, } impl fmt::Debug for SubscriberState { @@ -307,7 +307,7 @@ impl<'a, 'b> SubscriberBuilder<'a, 'b, DefaultHandler> { #[inline] pub fn with(self, handler: Handler) -> SubscriberBuilder<'a, 'b, Handler> where - Handler: IntoHandler<'static, Sample>, + Handler: IntoHandler, { let SubscriberBuilder { session, @@ -366,7 +366,7 @@ impl<'a, 'b, Handler> SubscriberBuilder<'a, 'b, Handler> { // Push mode impl<'a, Handler> Resolvable for SubscriberBuilder<'a, '_, Handler> where - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type To = ZResult>; @@ -374,7 +374,7 @@ where impl<'a, Handler> Wait for SubscriberBuilder<'a, '_, Handler> where - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { fn wait(self) -> ::To { @@ -407,7 +407,7 @@ where impl<'a, Handler> IntoFuture for SubscriberBuilder<'a, '_, Handler> where - Handler: IntoHandler<'static, Sample> + Send, + Handler: IntoHandler + Send, Handler::Handler: Send, { type Output = ::To;