Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: make callback opaque #1383

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions commons/zenoh-config/src/wrappers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ impl FromStr for ZenohId {
}

/// A zenoh Hello message.
#[derive(Clone, PartialEq, Eq)]
#[repr(transparent)]
pub struct Hello(HelloProto);

Expand Down
18 changes: 9 additions & 9 deletions zenoh-ext/src/querying_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle
handler: Handler,
) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler>
where
Handler: IntoHandler<'static, Sample>,
Handler: IntoHandler<Sample>,
{
let QueryingSubscriberBuilder {
session,
Expand Down Expand Up @@ -221,7 +221,7 @@ impl<'a, 'b, KeySpace, Handler> QueryingSubscriberBuilder<'a, 'b, KeySpace, Hand

impl<'a, KeySpace, Handler> Resolvable for QueryingSubscriberBuilder<'a, '_, KeySpace, Handler>
where
Handler: IntoHandler<'static, Sample>,
Handler: IntoHandler<Sample>,
Handler::Handler: Send,
{
type To = ZResult<FetchingSubscriber<'a, Handler::Handler>>;
Expand All @@ -230,7 +230,7 @@ where
impl<KeySpace, Handler> Wait for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler>
where
KeySpace: Into<crate::KeySpace> + Clone,
Handler: IntoHandler<'static, Sample> + Send,
Handler: IntoHandler<Sample> + Send,
Handler::Handler: Send,
{
fn wait(self) -> <Self as Resolvable>::To {
Expand Down Expand Up @@ -279,7 +279,7 @@ where
impl<'a, KeySpace, Handler> IntoFuture for QueryingSubscriberBuilder<'a, '_, KeySpace, Handler>
where
KeySpace: Into<crate::KeySpace> + Clone,
Handler: IntoHandler<'static, Sample> + Send,
Handler: IntoHandler<Sample> + Send,
Handler::Handler: Send,
{
type Output = <Self as Resolvable>::To;
Expand Down Expand Up @@ -470,7 +470,7 @@ where
handler: Handler,
) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Handler, Fetch, TryIntoSample>
where
Handler: IntoHandler<'static, Sample>,
Handler: IntoHandler<Sample>,
{
let FetchingSubscriberBuilder {
session,
Expand Down Expand Up @@ -544,7 +544,7 @@ impl<
TryIntoSample,
> Resolvable for FetchingSubscriberBuilder<'a, '_, KeySpace, Handler, Fetch, TryIntoSample>
where
Handler: IntoHandler<'static, Sample>,
Handler: IntoHandler<Sample>,
Handler::Handler: Send,
TryIntoSample: ExtractSample,
{
Expand All @@ -559,7 +559,7 @@ impl<
> Wait for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample>
where
KeySpace: Into<crate::KeySpace>,
Handler: IntoHandler<'static, Sample> + Send,
Handler: IntoHandler<Sample> + Send,
Handler::Handler: Send,
TryIntoSample: ExtractSample + Send + Sync,
{
Expand All @@ -577,7 +577,7 @@ impl<
> IntoFuture for FetchingSubscriberBuilder<'a, '_, KeySpace, Handler, Fetch, TryIntoSample>
where
KeySpace: Into<crate::KeySpace>,
Handler: IntoHandler<'static, Sample> + Send,
Handler: IntoHandler<Sample> + Send,
Handler::Handler: Send,
TryIntoSample: ExtractSample + Send + Sync,
{
Expand Down Expand Up @@ -651,7 +651,7 @@ impl<'a, Handler> FetchingSubscriber<'a, Handler> {
) -> ZResult<Self>
where
KeySpace: Into<crate::KeySpace>,
InputHandler: IntoHandler<'static, Sample, Handler = Handler> + Send,
InputHandler: IntoHandler<Sample, Handler = Handler> + Send,
TryIntoSample: ExtractSample + Send + Sync,
{
let session_id = conf.session.zid();
Expand Down
11 changes: 6 additions & 5 deletions zenoh/src/api/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use super::{
session::Session,
subscriber::SubscriberKind,
};
use crate::handlers::Callback;

lazy_static::lazy_static!(
static ref KE_STARSTAR: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("**") };
Expand All @@ -54,15 +55,15 @@ pub(crate) fn init(session: &Session) {
&admin_key,
true,
Locality::SessionLocal,
Arc::new({
Callback::new(Arc::new({
let session = session.clone();
move |q| super::admin::on_admin_query(&session, q)
}),
})),
);
}
}

pub(crate) fn on_admin_query(session: &Session, query: Query) {
pub(crate) fn on_admin_query(session: &Session, query: &Query) {
fn reply_peer(own_zid: &keyexpr, query: &Query, peer: TransportPeer) {
let zid = peer.zid.to_string();
if let Ok(zid) = keyexpr::new(&zid) {
Expand Down Expand Up @@ -109,14 +110,14 @@ pub(crate) fn on_admin_query(session: &Session, query: Query) {
.block_in_place(session.runtime.manager().get_transports_unicast())
{
if let Ok(peer) = transport.get_peer() {
reply_peer(own_zid, &query, peer);
reply_peer(own_zid, query, peer);
}
}
for transport in zenoh_runtime::ZRuntime::Net
.block_in_place(session.runtime.manager().get_transports_multicast())
{
for peer in transport.get_peers().unwrap_or_default() {
reply_peer(own_zid, &query, peer);
reply_peer(own_zid, query, peer);
}
}
}
Expand Down
111 changes: 85 additions & 26 deletions zenoh/src/api/handlers/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
//

//! Callback handler trait.
use super::{Dyn, IntoHandler};

use std::sync::Arc;

use super::{IntoHandler, RingChannelSender};

/// A function that can transform a [`FnMut`]`(T)` to
/// a [`Fn`]`(T)` with the help of a [`Mutex`](std::sync::Mutex).
Expand All @@ -22,32 +25,88 @@ pub fn locked<T>(fnmut: impl FnMut(T)) -> impl Fn(T) {
move |x| zlock!(lock)(x)
}

/// An immutable callback function.
pub type Callback<'a, T> = Dyn<dyn Fn(T) + Send + Sync + 'a>;
enum CallbackInner<T> {
Dyn(Arc<dyn Fn(&T) + Send + Sync>),
Flume(flume::Sender<T>),
Ring(RingChannelSender<T>),
}

impl<'a, T, F> IntoHandler<'a, T> for F
where
F: Fn(T) + Send + Sync + 'a,
{
pub struct Callback<T>(CallbackInner<T>);

impl<T> Clone for Callback<T> {
fn clone(&self) -> Self {
Self(match &self.0 {
CallbackInner::Dyn(cb) => CallbackInner::Dyn(cb.clone()),
CallbackInner::Flume(tx) => CallbackInner::Flume(tx.clone()),
CallbackInner::Ring(tx) => CallbackInner::Ring(tx.clone()),
})
}
}

impl<T> Callback<T> {
pub fn new(cb: Arc<dyn Fn(&T) + Send + Sync>) -> Self {
Self(CallbackInner::Dyn(cb))
}

pub(crate) fn new_flume(sender: flume::Sender<T>) -> Self {
Self(CallbackInner::Flume(sender))
}

pub(crate) fn new_ring(sender: RingChannelSender<T>) -> Self {
Self(CallbackInner::Ring(sender))
}

#[inline]
pub fn call(&self, arg: &T)
where
T: Clone,
{
match &self.0 {
CallbackInner::Dyn(cb) => cb(arg),
CallbackInner::Flume(tx) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure that this kind of optimization is really justified given that flume/ring channels are just one of many that users can use in rust, and that many people are still going to just use function callbacks ?
Also to make use of such an optimization we would need to complicate zenoh-c interface since right now it only accepts a "closure" with pointer to c-function (which is transformed into Arc).

Copy link
Contributor Author

@wyfo wyfo Sep 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flume is not "just one of many that users can use in rust", it's the default one, and one of the few with a IntoHandler implementation (the others being RingChannel, which is also included in CallbackInner, std:sync::mpsc).
And the goal of this change is also to optimize callback use, see https://godbolt.org/z/G6vcT4b93.
But more than a performance optimization, the long-term vision is to be able to get flume sender directly in order to use its async method.

Also to make use of such an optimization we would need to complicate zenoh-c interface since right now it only accepts a "closure" with pointer to c-function (which is transformed into Arc).

I don't see the how this PR changes anything here

Copy link
Contributor

@DenisBiryukov91 DenisBiryukov91 Sep 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the how this PR changes anything here

It adds an extra overhead due to enum match (not sure whether it is significant though). Which means that you probably improve performance of flume channel in rust, but maybe also make it worse for the whole zenoh-c/cpp + produce slightly more complex code.
This also means that if in several month we decide to have another "default" channel we would need to revisit this code.

What I mean is that I would really like to see the performance gain for channels / loss for callbacks if there is any.

Copy link
Contributor Author

@wyfo wyfo Sep 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It adds an extra overhead due to enum match (not sure whether it is significant though).

Branch prediction enters the room.

Which means that you probably improve performance of flume channel in rust, but maybe also make it worse for the whole zenoh-c/cpp + produce slightly more complex code.

Still better than the previous "by value" callback, see godbolt. Why is the produced code more complex?

This also means that if in several month we decide to have another "default" channel we would need to revisit this code.

And then, we will change the internal enum, we don't care, it's internal, and it's only a dozen lines to change.

What I mean is that I would really like to see the performance gain for channels / loss for callbacks if there is any.

Trust branch prediction, especially if you are using only callbacks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

till better than the previous "by value" callback, see godbolt. Why is the produced code more complex?

I do not argue that passing by ref is faster than by value, this can still be achieved through only Arc, without any enum matching.
The produced code is more complex since it adds more lines => higher maintenance cost (which is always better to be avoided if there is no any significant benefit)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But more than a performance optimization, the long-term vision is to be able to get flume sender directly in order to use its async method.

This is the goal behind making the type opaque. Now, using the enum is not a high maintenance code, all the code is contained in one module and is only a few lines.

if let Err(error) = tx.send(arg.clone()) {
tracing::error!(%error)
}
}
CallbackInner::Ring(tx) => tx.send(arg.clone()),
}
}

#[inline]
pub(crate) fn call_by_value(&self, arg: T) {
match &self.0 {
CallbackInner::Dyn(cb) => cb(&arg),
CallbackInner::Flume(tx) => {
if let Err(error) = tx.send(arg) {
tracing::error!(%error)
}
}
CallbackInner::Ring(tx) => tx.send(arg),
}
}
}

impl<T> IntoHandler<T> for Callback<T> {
type Handler = ();
fn into_handler(self) -> (Callback<'a, T>, Self::Handler) {
(Dyn::from(self), ())
fn into_handler(self) -> (Callback<T>, Self::Handler) {
(self, ())
}
}

impl<T: Send + 'static> IntoHandler<'static, T> for (flume::Sender<T>, flume::Receiver<T>) {
impl<T: Send> IntoHandler<T> for (flume::Sender<T>, flume::Receiver<T>) {
type Handler = flume::Receiver<T>;

fn into_handler(self) -> (Callback<'static, T>, Self::Handler) {
fn into_handler(self) -> (Callback<T>, Self::Handler) {
let (sender, receiver) = self;
(
Dyn::new(move |t| {
if let Err(e) = sender.send(t) {
tracing::error!("{}", e)
}
}),
receiver,
)
(Callback::new_flume(sender), receiver)
}
}

impl<T: Send> IntoHandler<T> for flume::Sender<T> {
type Handler = ();

fn into_handler(self) -> (Callback<T>, Self::Handler) {
(Callback::new_flume(self), ())
}
}

Expand All @@ -60,15 +119,15 @@ impl<T: Send + 'static> IntoHandler<'static, T> for (flume::Sender<T>, flume::Re
/// - `callback` will never be called once `drop` has started.
/// - `drop` will only be called **once**, and **after every** `callback` has ended.
/// - The two previous guarantees imply that `call` and `drop` are never called concurrently.
pub struct CallbackDrop<Callback, DropFn>
pub struct CallbackDrop<F, DropFn>
where
DropFn: FnMut() + Send + Sync + 'static,
{
pub callback: Callback,
pub callback: F,
pub drop: DropFn,
}

impl<Callback, DropFn> Drop for CallbackDrop<Callback, DropFn>
impl<F, DropFn> Drop for CallbackDrop<F, DropFn>
where
DropFn: FnMut() + Send + Sync + 'static,
{
Expand All @@ -77,14 +136,14 @@ where
}
}

impl<'a, OnEvent, Event, DropFn> IntoHandler<'a, Event> for CallbackDrop<OnEvent, DropFn>
impl<F, T, DropFn> IntoHandler<T> for CallbackDrop<F, DropFn>
where
OnEvent: Fn(Event) + Send + Sync + 'a,
F: Fn(&T) + Send + Sync + 'static,
DropFn: FnMut() + Send + Sync + 'static,
{
type Handler = ();

fn into_handler(self) -> (Callback<'a, Event>, Self::Handler) {
(Dyn::from(move |evt| (self.callback)(evt)), ())
fn into_handler(self) -> (Callback<T>, Self::Handler) {
(Callback::new(Arc::new(move |evt| (self.callback)(evt))), ())
}
}
21 changes: 12 additions & 9 deletions zenoh/src/api/handlers/fifo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
//

//! Callback handler trait.
use super::{callback::Callback, Dyn, IntoHandler, API_DATA_RECEPTION_CHANNEL_SIZE};

use std::sync::Arc;

use super::{callback::Callback, IntoHandler, API_DATA_RECEPTION_CHANNEL_SIZE};

/// The default handler in Zenoh is a FIFO queue.

Expand All @@ -34,27 +37,27 @@ impl Default for FifoChannel {
}
}

impl<T: Send + 'static> IntoHandler<'static, T> for FifoChannel {
impl<T: Send> IntoHandler<T> for FifoChannel {
type Handler = flume::Receiver<T>;

fn into_handler(self) -> (Callback<'static, T>, Self::Handler) {
fn into_handler(self) -> (Callback<T>, Self::Handler) {
flume::bounded(self.capacity).into_handler()
}
}

impl<T: Send + Sync + 'static> IntoHandler<'static, T>
impl<T: Clone + Send + Sync + 'static> IntoHandler<T>
for (std::sync::mpsc::SyncSender<T>, std::sync::mpsc::Receiver<T>)
{
type Handler = std::sync::mpsc::Receiver<T>;

fn into_handler(self) -> (Callback<'static, T>, Self::Handler) {
fn into_handler(self) -> (Callback<T>, Self::Handler) {
let (sender, receiver) = self;
(
Dyn::new(move |t| {
if let Err(e) = sender.send(t) {
tracing::error!("{}", e)
Callback::new(Arc::new(move |t| {
if let Err(error) = sender.send(t.clone()) {
tracing::error!(%error)
}
}),
})),
receiver,
)
}
Expand Down
13 changes: 5 additions & 8 deletions zenoh/src/api/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,27 @@ pub use ring::*;

use crate::api::session::API_DATA_RECEPTION_CHANNEL_SIZE;

/// An alias for `Arc<T>`.
pub type Dyn<T> = std::sync::Arc<T>;

/// A type that can be converted into a [`Callback`]-Handler pair.
///
/// When Zenoh functions accept types that implement these, it intends to use the [`Callback`] as just that,
/// while granting you access to the handler through the returned value via [`std::ops::Deref`] and [`std::ops::DerefMut`].
///
/// Any closure that accepts `T` can be converted into a pair of itself and `()`.
pub trait IntoHandler<'a, T> {
pub trait IntoHandler<T> {
type Handler;

fn into_handler(self) -> (Callback<'a, T>, Self::Handler);
fn into_handler(self) -> (Callback<T>, Self::Handler);
}

/// The default handler in Zenoh is a FIFO queue.
#[repr(transparent)]
#[derive(Default)]
pub struct DefaultHandler(FifoChannel);

impl<T: Send + 'static> IntoHandler<'static, T> for DefaultHandler {
type Handler = <FifoChannel as IntoHandler<'static, T>>::Handler;
impl<T: Send> IntoHandler<T> for DefaultHandler {
type Handler = <FifoChannel as IntoHandler<T>>::Handler;

fn into_handler(self) -> (Callback<'static, T>, Self::Handler) {
fn into_handler(self) -> (Callback<T>, Self::Handler) {
self.0.into_handler()
}
}
Loading
Loading