Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: make callback an opaque type #1405

Merged
merged 4 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 16 additions & 19 deletions zenoh-ext/src/querying_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
#[cfg(feature = "unstable")]
use zenoh::pubsub::Reliability;
use zenoh::{
handlers::{locked, DefaultHandler, IntoHandler},
handlers::{locked, Callback, DefaultHandler, IntoHandler},
internal::zlock,
key_expr::KeyExpr,
prelude::Wait,
Expand Down Expand Up @@ -97,7 +97,7 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle
handler: Handler,
) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler>
where
Handler: IntoHandler<'static, Sample>,
Handler: IntoHandler<Sample>,
{
let QueryingSubscriberBuilder {
session,
Expand Down Expand Up @@ -230,7 +230,7 @@ impl<KeySpace, Handler> QueryingSubscriberBuilder<'_, '_, KeySpace, Handler> {

impl<KeySpace, Handler> Resolvable for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler>
where
Handler: IntoHandler<'static, Sample>,
Handler: IntoHandler<Sample>,
Handler::Handler: Send,
{
type To = ZResult<FetchingSubscriber<Handler::Handler>>;
Expand All @@ -239,7 +239,7 @@ where
impl<KeySpace, Handler> Wait for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler>
where
KeySpace: Into<crate::KeySpace> + Clone,
Handler: IntoHandler<'static, Sample> + Send,
Handler: IntoHandler<Sample> + Send,
Handler::Handler: Send,
{
fn wait(self) -> <Self as Resolvable>::To {
Expand Down Expand Up @@ -288,7 +288,7 @@ where
impl<KeySpace, Handler> IntoFuture for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler>
where
KeySpace: Into<crate::KeySpace> + Clone,
Handler: IntoHandler<'static, Sample> + Send,
Handler: IntoHandler<Sample> + Send,
Handler::Handler: Send,
{
type Output = <Self as Resolvable>::To;
Expand Down Expand Up @@ -469,7 +469,7 @@ where
handler: Handler,
) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Handler, Fetch, TryIntoSample>
where
Handler: IntoHandler<'static, Sample>,
Handler: IntoHandler<Sample>,
{
let FetchingSubscriberBuilder {
session,
Expand Down Expand Up @@ -574,7 +574,7 @@ impl<
TryIntoSample,
> Resolvable for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample>
where
Handler: IntoHandler<'static, Sample>,
Handler: IntoHandler<Sample>,
Handler::Handler: Send,
TryIntoSample: ExtractSample,
{
Expand All @@ -589,7 +589,7 @@ impl<
> Wait for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample>
where
KeySpace: Into<crate::KeySpace>,
Handler: IntoHandler<'static, Sample> + Send,
Handler: IntoHandler<Sample> + Send,
Handler::Handler: Send,
TryIntoSample: ExtractSample + Send + Sync,
{
Expand All @@ -606,7 +606,7 @@ impl<
> IntoFuture for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample>
where
KeySpace: Into<crate::KeySpace>,
Handler: IntoHandler<'static, Sample> + Send,
Handler: IntoHandler<Sample> + Send,
Handler::Handler: Send,
TryIntoSample: ExtractSample + Send + Sync,
{
Expand Down Expand Up @@ -651,7 +651,7 @@ where
/// ```
pub struct FetchingSubscriber<Handler> {
subscriber: Subscriber<()>,
callback: Arc<dyn Fn(Sample) + Send + Sync + 'static>,
callback: Callback<Sample>,
state: Arc<Mutex<InnerState>>,
handler: Handler,
}
Expand Down Expand Up @@ -681,7 +681,7 @@ impl<Handler> FetchingSubscriber<Handler> {
) -> ZResult<Self>
where
KeySpace: Into<crate::KeySpace>,
InputHandler: IntoHandler<'static, Sample, Handler = Handler> + Send,
InputHandler: IntoHandler<Sample, Handler = Handler> + Send,
TryIntoSample: ExtractSample + Send + Sync,
{
let session_id = conf.session.zid();
Expand All @@ -698,7 +698,7 @@ impl<Handler> FetchingSubscriber<Handler> {
move |s| {
let state = &mut zlock!(state);
if state.pending_fetches == 0 {
callback(s);
callback.call(s);
} else {
tracing::trace!(
"Sample received while fetch in progress: push it to merge_queue"
Expand Down Expand Up @@ -823,7 +823,7 @@ impl<Handler> FetchingSubscriber<Handler> {

struct RepliesHandler {
state: Arc<Mutex<InnerState>>,
callback: Arc<dyn Fn(Sample) + Send + Sync>,
callback: Callback<Sample>,
}

impl Drop for RepliesHandler {
Expand All @@ -840,7 +840,7 @@ impl Drop for RepliesHandler {
state.merge_queue.len()
);
for s in state.merge_queue.drain() {
(self.callback)(s);
self.callback.call(s);
}
}
}
Expand Down Expand Up @@ -888,7 +888,7 @@ pub struct FetchBuilder<
fetch: Fetch,
phantom: std::marker::PhantomData<TryIntoSample>,
state: Arc<Mutex<InnerState>>,
callback: Arc<dyn Fn(Sample) + Send + Sync>,
callback: Callback<Sample>,
}

impl<Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>, TryIntoSample>
Expand Down Expand Up @@ -923,10 +923,7 @@ where
}
}

fn register_handler(
state: Arc<Mutex<InnerState>>,
callback: Arc<dyn Fn(Sample) + Send + Sync>,
) -> RepliesHandler {
fn register_handler(state: Arc<Mutex<InnerState>>, callback: Callback<Sample>) -> RepliesHandler {
zlock!(state).pending_fetches += 1;
// pending fetches will be decremented in RepliesHandler drop()
RepliesHandler { state, callback }
Expand Down
6 changes: 3 additions & 3 deletions zenoh/src/api/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use super::{
sample::{DataInfo, Locality, SampleKind},
subscriber::SubscriberKind,
};
use crate::api::session::WeakSession;
use crate::{api::session::WeakSession, handlers::Callback};

lazy_static::lazy_static!(
static ref KE_STARSTAR: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("**") };
Expand All @@ -54,10 +54,10 @@ pub(crate) fn init(session: WeakSession) {
&admin_key,
true,
Locality::SessionLocal,
Arc::new({
Callback::new(Arc::new({
let session = session.clone();
move |q| on_admin_query(&session, q)
}),
})),
);
}
}
Expand Down
71 changes: 50 additions & 21 deletions zenoh/src/api/handlers/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
//

//! Callback handler trait.
use super::{Dyn, IntoHandler};

use std::sync::Arc;

use crate::api::handlers::IntoHandler;

/// A function that can transform a [`FnMut`]`(T)` to
/// a [`Fn`]`(T)` with the help of a [`Mutex`](std::sync::Mutex).
Expand All @@ -22,50 +25,76 @@ pub fn locked<T>(fnmut: impl FnMut(T)) -> impl Fn(T) {
move |x| zlock!(lock)(x)
}

/// An immutable callback function.
pub type Callback<'a, T> = Dyn<dyn Fn(T) + Send + Sync + 'a>;
/// Callback type used by zenoh entities.
pub struct Callback<T>(Arc<dyn Fn(T) + Send + Sync>);

impl<T> Clone for Callback<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}

impl<T> Callback<T> {
/// Instantiate a `Callback` from a callback function.
pub fn new(cb: Arc<dyn Fn(T) + Send + Sync>) -> Self {
Self(cb)
}

/// Call the inner callback.
#[inline]
pub fn call(&self, arg: T) {
self.0(arg)
}
}

impl<T> IntoHandler<T> for Callback<T> {
type Handler = ();
fn into_handler(self) -> (Callback<T>, Self::Handler) {
(self, ())
}
}

impl<'a, T, F> IntoHandler<'a, T> for F
impl<T, F> IntoHandler<T> for F
where
F: Fn(T) + Send + Sync + 'a,
F: Fn(T) + Send + Sync + 'static,
{
type Handler = ();

fn into_handler(self) -> (Callback<'a, T>, Self::Handler) {
(Dyn::from(self), ())
fn into_handler(self) -> (Callback<T>, Self::Handler) {
(Callback::new(Arc::new(self)), ())
}
}

impl<'a, T, F, H> IntoHandler<'a, T> for (F, H)
impl<T, F, H> IntoHandler<T> for (F, H)
where
F: Fn(T) + Send + Sync + 'a,
F: Fn(T) + Send + Sync + 'static,
{
type Handler = H;

fn into_handler(self) -> (Callback<'a, T>, Self::Handler) {
(Dyn::from(self.0), self.1)
fn into_handler(self) -> (Callback<T>, Self::Handler) {
(self.0.into_handler().0, self.1)
}
}

impl<'a, T, H> IntoHandler<'a, T> for (Callback<'static, T>, H) {
impl<T, H> IntoHandler<T> for (Callback<T>, H) {
type Handler = H;

fn into_handler(self) -> (Callback<'a, T>, Self::Handler) {
fn into_handler(self) -> (Callback<T>, Self::Handler) {
self
}
}

impl<T: Send + 'static> IntoHandler<'static, T> for (flume::Sender<T>, flume::Receiver<T>) {
impl<T: Send + 'static> IntoHandler<T> for (flume::Sender<T>, flume::Receiver<T>) {
type Handler = flume::Receiver<T>;

fn into_handler(self) -> (Callback<'static, T>, Self::Handler) {
fn into_handler(self) -> (Callback<T>, Self::Handler) {
let (sender, receiver) = self;
(
Dyn::new(move |t| {
Callback::new(Arc::new(move |t| {
if let Err(e) = sender.send(t) {
tracing::error!("{}", e)
}
}),
})),
receiver,
)
}
Expand Down Expand Up @@ -97,14 +126,14 @@ where
}
}

impl<'a, OnEvent, Event, DropFn> IntoHandler<'a, Event> for CallbackDrop<OnEvent, DropFn>
impl<OnEvent, Event, DropFn> IntoHandler<Event> for CallbackDrop<OnEvent, DropFn>
where
OnEvent: Fn(Event) + Send + Sync + 'a,
OnEvent: Fn(Event) + Send + Sync + 'static,
DropFn: FnMut() + Send + Sync + 'static,
{
type Handler = ();

fn into_handler(self) -> (Callback<'a, Event>, Self::Handler) {
(Dyn::from(move |evt| (self.callback)(evt)), ())
fn into_handler(self) -> (Callback<Event>, Self::Handler) {
(move |evt| (self.callback)(evt)).into_handler()
}
}
21 changes: 12 additions & 9 deletions zenoh/src/api/handlers/fifo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
//

//! Callback handler trait.
use super::{callback::Callback, Dyn, IntoHandler, API_DATA_RECEPTION_CHANNEL_SIZE};

use std::sync::Arc;

use crate::api::handlers::{callback::Callback, IntoHandler, API_DATA_RECEPTION_CHANNEL_SIZE};

/// The default handler in Zenoh is a FIFO queue.

Expand All @@ -34,27 +37,27 @@ impl Default for FifoChannel {
}
}

impl<T: Send + 'static> IntoHandler<'static, T> for FifoChannel {
impl<T: Send + 'static> IntoHandler<T> for FifoChannel {
type Handler = flume::Receiver<T>;

fn into_handler(self) -> (Callback<'static, T>, Self::Handler) {
fn into_handler(self) -> (Callback<T>, Self::Handler) {
flume::bounded(self.capacity).into_handler()
}
}

impl<T: Send + Sync + 'static> IntoHandler<'static, T>
impl<T: Clone + Send + Sync + 'static> IntoHandler<T>
for (std::sync::mpsc::SyncSender<T>, std::sync::mpsc::Receiver<T>)
{
type Handler = std::sync::mpsc::Receiver<T>;

fn into_handler(self) -> (Callback<'static, T>, Self::Handler) {
fn into_handler(self) -> (Callback<T>, Self::Handler) {
let (sender, receiver) = self;
(
Dyn::new(move |t| {
if let Err(e) = sender.send(t) {
tracing::error!("{}", e)
Callback::new(Arc::new(move |t| {
if let Err(error) = sender.send(t.clone()) {
tracing::error!(%error)
}
}),
})),
receiver,
)
}
Expand Down
13 changes: 5 additions & 8 deletions zenoh/src/api/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,27 @@ pub use ring::*;

use crate::api::session::API_DATA_RECEPTION_CHANNEL_SIZE;

/// An alias for `Arc<T>`.
pub type Dyn<T> = std::sync::Arc<T>;

/// A type that can be converted into a [`Callback`]-Handler pair.
///
/// When Zenoh functions accept types that implement these, it intends to use the [`Callback`] as just that,
/// while granting you access to the handler through the returned value via [`std::ops::Deref`] and [`std::ops::DerefMut`].
///
/// Any closure that accepts `T` can be converted into a pair of itself and `()`.
pub trait IntoHandler<'a, T> {
pub trait IntoHandler<T> {
type Handler;

fn into_handler(self) -> (Callback<'a, T>, Self::Handler);
fn into_handler(self) -> (Callback<T>, Self::Handler);
}

/// The default handler in Zenoh is a FIFO queue.
#[repr(transparent)]
#[derive(Default)]
pub struct DefaultHandler(FifoChannel);

impl<T: Send + 'static> IntoHandler<'static, T> for DefaultHandler {
type Handler = <FifoChannel as IntoHandler<'static, T>>::Handler;
impl<T: Send + 'static> IntoHandler<T> for DefaultHandler {
type Handler = <FifoChannel as IntoHandler<T>>::Handler;

fn into_handler(self) -> (Callback<'static, T>, Self::Handler) {
fn into_handler(self) -> (Callback<T>, Self::Handler) {
self.0.into_handler()
}
}
Loading