Skip to content

Commit

Permalink
feat!: make callback an opaque type (#1405)
Browse files Browse the repository at this point in the history
* feat!: make callback an opaque type

It may allow special case for channel handlers later, in order to use async
methods, or change the return type of the callback in order to support
async callbacks.

For the record I've tested quickly with an enum inside `Callback`,
and there was a sensitive cost in performance of 1% when using callbacks.

* Retrigger CI

* Retrigger CI

* Retrigger CI
  • Loading branch information
wyfo authored and fuzzypixelz committed Sep 23, 2024
1 parent cd43221 commit ed357e4
Show file tree
Hide file tree
Showing 13 changed files with 173 additions and 144 deletions.
35 changes: 16 additions & 19 deletions zenoh-ext/src/querying_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
#[cfg(feature = "unstable")]
use zenoh::pubsub::Reliability;
use zenoh::{
handlers::{locked, DefaultHandler, IntoHandler},
handlers::{locked, Callback, DefaultHandler, IntoHandler},
internal::zlock,
key_expr::KeyExpr,
prelude::Wait,
Expand Down Expand Up @@ -97,7 +97,7 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle
handler: Handler,
) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler>
where
Handler: IntoHandler<'static, Sample>,
Handler: IntoHandler<Sample>,
{
let QueryingSubscriberBuilder {
session,
Expand Down Expand Up @@ -230,7 +230,7 @@ impl<KeySpace, Handler> QueryingSubscriberBuilder<'_, '_, KeySpace, Handler> {

impl<KeySpace, Handler> Resolvable for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler>
where
Handler: IntoHandler<'static, Sample>,
Handler: IntoHandler<Sample>,
Handler::Handler: Send,
{
type To = ZResult<FetchingSubscriber<Handler::Handler>>;
Expand All @@ -239,7 +239,7 @@ where
impl<KeySpace, Handler> Wait for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler>
where
KeySpace: Into<crate::KeySpace> + Clone,
Handler: IntoHandler<'static, Sample> + Send,
Handler: IntoHandler<Sample> + Send,
Handler::Handler: Send,
{
fn wait(self) -> <Self as Resolvable>::To {
Expand Down Expand Up @@ -288,7 +288,7 @@ where
impl<KeySpace, Handler> IntoFuture for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler>
where
KeySpace: Into<crate::KeySpace> + Clone,
Handler: IntoHandler<'static, Sample> + Send,
Handler: IntoHandler<Sample> + Send,
Handler::Handler: Send,
{
type Output = <Self as Resolvable>::To;
Expand Down Expand Up @@ -469,7 +469,7 @@ where
handler: Handler,
) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Handler, Fetch, TryIntoSample>
where
Handler: IntoHandler<'static, Sample>,
Handler: IntoHandler<Sample>,
{
let FetchingSubscriberBuilder {
session,
Expand Down Expand Up @@ -574,7 +574,7 @@ impl<
TryIntoSample,
> Resolvable for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample>
where
Handler: IntoHandler<'static, Sample>,
Handler: IntoHandler<Sample>,
Handler::Handler: Send,
TryIntoSample: ExtractSample,
{
Expand All @@ -589,7 +589,7 @@ impl<
> Wait for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample>
where
KeySpace: Into<crate::KeySpace>,
Handler: IntoHandler<'static, Sample> + Send,
Handler: IntoHandler<Sample> + Send,
Handler::Handler: Send,
TryIntoSample: ExtractSample + Send + Sync,
{
Expand All @@ -606,7 +606,7 @@ impl<
> IntoFuture for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample>
where
KeySpace: Into<crate::KeySpace>,
Handler: IntoHandler<'static, Sample> + Send,
Handler: IntoHandler<Sample> + Send,
Handler::Handler: Send,
TryIntoSample: ExtractSample + Send + Sync,
{
Expand Down Expand Up @@ -651,7 +651,7 @@ where
/// ```
pub struct FetchingSubscriber<Handler> {
subscriber: Subscriber<()>,
callback: Arc<dyn Fn(Sample) + Send + Sync + 'static>,
callback: Callback<Sample>,
state: Arc<Mutex<InnerState>>,
handler: Handler,
}
Expand Down Expand Up @@ -681,7 +681,7 @@ impl<Handler> FetchingSubscriber<Handler> {
) -> ZResult<Self>
where
KeySpace: Into<crate::KeySpace>,
InputHandler: IntoHandler<'static, Sample, Handler = Handler> + Send,
InputHandler: IntoHandler<Sample, Handler = Handler> + Send,
TryIntoSample: ExtractSample + Send + Sync,
{
let session_id = conf.session.zid();
Expand All @@ -698,7 +698,7 @@ impl<Handler> FetchingSubscriber<Handler> {
move |s| {
let state = &mut zlock!(state);
if state.pending_fetches == 0 {
callback(s);
callback.call(s);
} else {
tracing::trace!(
"Sample received while fetch in progress: push it to merge_queue"
Expand Down Expand Up @@ -823,7 +823,7 @@ impl<Handler> FetchingSubscriber<Handler> {

struct RepliesHandler {
state: Arc<Mutex<InnerState>>,
callback: Arc<dyn Fn(Sample) + Send + Sync>,
callback: Callback<Sample>,
}

impl Drop for RepliesHandler {
Expand All @@ -840,7 +840,7 @@ impl Drop for RepliesHandler {
state.merge_queue.len()
);
for s in state.merge_queue.drain() {
(self.callback)(s);
self.callback.call(s);
}
}
}
Expand Down Expand Up @@ -888,7 +888,7 @@ pub struct FetchBuilder<
fetch: Fetch,
phantom: std::marker::PhantomData<TryIntoSample>,
state: Arc<Mutex<InnerState>>,
callback: Arc<dyn Fn(Sample) + Send + Sync>,
callback: Callback<Sample>,
}

impl<Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>, TryIntoSample>
Expand Down Expand Up @@ -923,10 +923,7 @@ where
}
}

fn register_handler(
state: Arc<Mutex<InnerState>>,
callback: Arc<dyn Fn(Sample) + Send + Sync>,
) -> RepliesHandler {
fn register_handler(state: Arc<Mutex<InnerState>>, callback: Callback<Sample>) -> RepliesHandler {
zlock!(state).pending_fetches += 1;
// pending fetches will be decremented in RepliesHandler drop()
RepliesHandler { state, callback }
Expand Down
6 changes: 3 additions & 3 deletions zenoh/src/api/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use super::{
sample::{DataInfo, Locality, SampleKind},
subscriber::SubscriberKind,
};
use crate::api::session::WeakSession;
use crate::{api::session::WeakSession, handlers::Callback};

lazy_static::lazy_static!(
static ref KE_STARSTAR: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("**") };
Expand All @@ -54,10 +54,10 @@ pub(crate) fn init(session: WeakSession) {
&admin_key,
true,
Locality::SessionLocal,
Arc::new({
Callback::new(Arc::new({
let session = session.clone();
move |q| on_admin_query(&session, q)
}),
})),
);
}
}
Expand Down
71 changes: 50 additions & 21 deletions zenoh/src/api/handlers/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
//

//! Callback handler trait.
use super::{Dyn, IntoHandler};
use std::sync::Arc;

use crate::api::handlers::IntoHandler;

/// A function that can transform a [`FnMut`]`(T)` to
/// a [`Fn`]`(T)` with the help of a [`Mutex`](std::sync::Mutex).
Expand All @@ -22,50 +25,76 @@ pub fn locked<T>(fnmut: impl FnMut(T)) -> impl Fn(T) {
move |x| zlock!(lock)(x)
}

/// An immutable callback function.
pub type Callback<'a, T> = Dyn<dyn Fn(T) + Send + Sync + 'a>;
/// Callback type used by zenoh entities.
pub struct Callback<T>(Arc<dyn Fn(T) + Send + Sync>);

impl<T> Clone for Callback<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}

impl<T> Callback<T> {
/// Instantiate a `Callback` from a callback function.
pub fn new(cb: Arc<dyn Fn(T) + Send + Sync>) -> Self {
Self(cb)
}

/// Call the inner callback.
#[inline]
pub fn call(&self, arg: T) {
self.0(arg)
}
}

impl<T> IntoHandler<T> for Callback<T> {
type Handler = ();
fn into_handler(self) -> (Callback<T>, Self::Handler) {
(self, ())
}
}

impl<'a, T, F> IntoHandler<'a, T> for F
impl<T, F> IntoHandler<T> for F
where
F: Fn(T) + Send + Sync + 'a,
F: Fn(T) + Send + Sync + 'static,
{
type Handler = ();

fn into_handler(self) -> (Callback<'a, T>, Self::Handler) {
(Dyn::from(self), ())
fn into_handler(self) -> (Callback<T>, Self::Handler) {
(Callback::new(Arc::new(self)), ())
}
}

impl<'a, T, F, H> IntoHandler<'a, T> for (F, H)
impl<T, F, H> IntoHandler<T> for (F, H)
where
F: Fn(T) + Send + Sync + 'a,
F: Fn(T) + Send + Sync + 'static,
{
type Handler = H;

fn into_handler(self) -> (Callback<'a, T>, Self::Handler) {
(Dyn::from(self.0), self.1)
fn into_handler(self) -> (Callback<T>, Self::Handler) {
(self.0.into_handler().0, self.1)
}
}

impl<'a, T, H> IntoHandler<'a, T> for (Callback<'static, T>, H) {
impl<T, H> IntoHandler<T> for (Callback<T>, H) {
type Handler = H;

fn into_handler(self) -> (Callback<'a, T>, Self::Handler) {
fn into_handler(self) -> (Callback<T>, Self::Handler) {
self
}
}

impl<T: Send + 'static> IntoHandler<'static, T> for (flume::Sender<T>, flume::Receiver<T>) {
impl<T: Send + 'static> IntoHandler<T> for (flume::Sender<T>, flume::Receiver<T>) {
type Handler = flume::Receiver<T>;

fn into_handler(self) -> (Callback<'static, T>, Self::Handler) {
fn into_handler(self) -> (Callback<T>, Self::Handler) {
let (sender, receiver) = self;
(
Dyn::new(move |t| {
Callback::new(Arc::new(move |t| {
if let Err(e) = sender.send(t) {
tracing::error!("{}", e)
}
}),
})),
receiver,
)
}
Expand Down Expand Up @@ -97,14 +126,14 @@ where
}
}

impl<'a, OnEvent, Event, DropFn> IntoHandler<'a, Event> for CallbackDrop<OnEvent, DropFn>
impl<OnEvent, Event, DropFn> IntoHandler<Event> for CallbackDrop<OnEvent, DropFn>
where
OnEvent: Fn(Event) + Send + Sync + 'a,
OnEvent: Fn(Event) + Send + Sync + 'static,
DropFn: FnMut() + Send + Sync + 'static,
{
type Handler = ();

fn into_handler(self) -> (Callback<'a, Event>, Self::Handler) {
(Dyn::from(move |evt| (self.callback)(evt)), ())
fn into_handler(self) -> (Callback<Event>, Self::Handler) {
(move |evt| (self.callback)(evt)).into_handler()
}
}
21 changes: 12 additions & 9 deletions zenoh/src/api/handlers/fifo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
//

//! Callback handler trait.
use super::{callback::Callback, Dyn, IntoHandler, API_DATA_RECEPTION_CHANNEL_SIZE};
use std::sync::Arc;

use crate::api::handlers::{callback::Callback, IntoHandler, API_DATA_RECEPTION_CHANNEL_SIZE};

/// The default handler in Zenoh is a FIFO queue.
Expand All @@ -34,27 +37,27 @@ impl Default for FifoChannel {
}
}

impl<T: Send + 'static> IntoHandler<'static, T> for FifoChannel {
impl<T: Send + 'static> IntoHandler<T> for FifoChannel {
type Handler = flume::Receiver<T>;

fn into_handler(self) -> (Callback<'static, T>, Self::Handler) {
fn into_handler(self) -> (Callback<T>, Self::Handler) {
flume::bounded(self.capacity).into_handler()
}
}

impl<T: Send + Sync + 'static> IntoHandler<'static, T>
impl<T: Clone + Send + Sync + 'static> IntoHandler<T>
for (std::sync::mpsc::SyncSender<T>, std::sync::mpsc::Receiver<T>)
{
type Handler = std::sync::mpsc::Receiver<T>;

fn into_handler(self) -> (Callback<'static, T>, Self::Handler) {
fn into_handler(self) -> (Callback<T>, Self::Handler) {
let (sender, receiver) = self;
(
Dyn::new(move |t| {
if let Err(e) = sender.send(t) {
tracing::error!("{}", e)
Callback::new(Arc::new(move |t| {
if let Err(error) = sender.send(t.clone()) {
tracing::error!(%error)
}
}),
})),
receiver,
)
}
Expand Down
13 changes: 5 additions & 8 deletions zenoh/src/api/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,27 @@ pub use ring::*;

use crate::api::session::API_DATA_RECEPTION_CHANNEL_SIZE;

/// An alias for `Arc<T>`.
pub type Dyn<T> = std::sync::Arc<T>;

/// A type that can be converted into a [`Callback`]-Handler pair.
///
/// When Zenoh functions accept types that implement these, it intends to use the [`Callback`] as just that,
/// while granting you access to the handler through the returned value via [`std::ops::Deref`] and [`std::ops::DerefMut`].
///
/// Any closure that accepts `T` can be converted into a pair of itself and `()`.
pub trait IntoHandler<'a, T> {
pub trait IntoHandler<T> {
type Handler;

fn into_handler(self) -> (Callback<'a, T>, Self::Handler);
fn into_handler(self) -> (Callback<T>, Self::Handler);
}

/// The default handler in Zenoh is a FIFO queue.
#[repr(transparent)]
#[derive(Default)]
pub struct DefaultHandler(FifoChannel);

impl<T: Send + 'static> IntoHandler<'static, T> for DefaultHandler {
type Handler = <FifoChannel as IntoHandler<'static, T>>::Handler;
impl<T: Send + 'static> IntoHandler<T> for DefaultHandler {
type Handler = <FifoChannel as IntoHandler<T>>::Handler;

fn into_handler(self) -> (Callback<'static, T>, Self::Handler) {
fn into_handler(self) -> (Callback<T>, Self::Handler) {
self.0.into_handler()
}
}
Loading

0 comments on commit ed357e4

Please sign in to comment.