Skip to content

Commit

Permalink
fix: fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
wyfo committed Sep 9, 2024
1 parent 3b1fc2a commit 467d27e
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 117 deletions.
8 changes: 5 additions & 3 deletions zenoh-ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@ impl From<LivelinessSpace> for KeySpace {
}

pub trait ExtractSample {
fn extract(self) -> ZResult<Sample>;
fn extract(&self) -> ZResult<Sample>;
}

impl ExtractSample for Reply {
fn extract(self) -> ZResult<Sample> {
self.into_result().map_err(|e| zerror!("{:?}", e).into())
fn extract(&self) -> ZResult<Sample> {
self.result()
.cloned()
.map_err(|e| zerror!("{:?}", e).into())
}
}
94 changes: 42 additions & 52 deletions zenoh-ext/src/querying_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{
};

use zenoh::{
handlers::{locked, DefaultHandler, IntoHandler},
handlers::{locked, Callback, DefaultHandler, IntoHandler},
internal::zlock,
key_expr::KeyExpr,
prelude::Wait,
Expand Down Expand Up @@ -54,12 +54,12 @@ pub struct QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler> {
impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandler> {
/// Add callback to [`FetchingSubscriber`].
#[inline]
pub fn callback<Callback>(
pub fn callback<F>(
self,
callback: Callback,
) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Callback>
callback: F,
) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>>
where
Callback: Fn(Sample) + Send + Sync + 'static,
F: Fn(&Sample) + Send + Sync + 'static,
{
let QueryingSubscriberBuilder {
session,
Expand All @@ -85,7 +85,7 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle
query_consolidation,
query_accept_replies,
query_timeout,
handler: callback,
handler: Callback::new(Arc::new(callback)),
}
}

Expand All @@ -95,12 +95,12 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle
/// If your callback is also accepted by the [`callback`](QueryingSubscriberBuilder::callback)
/// method, we suggest you use it instead of `callback_mut`
#[inline]
pub fn callback_mut<CallbackMut>(
pub fn callback_mut<F>(
self,
callback: CallbackMut,
) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, impl Fn(Sample) + Send + Sync + 'static>
callback: F,
) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>>
where
CallbackMut: FnMut(Sample) + Send + Sync + 'static,
F: FnMut(&Sample) + Send + Sync + 'static,
{
self.callback(locked(callback))
}
Expand Down Expand Up @@ -357,7 +357,7 @@ pub struct FetchingSubscriberBuilder<
'b,
KeySpace,
Handler,
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()>,
TryIntoSample,
> where
TryIntoSample: ExtractSample,
Expand All @@ -377,7 +377,7 @@ impl<
'b,
KeySpace,
Handler,
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()>,
TryIntoSample,
> FetchingSubscriberBuilder<'a, 'b, KeySpace, Handler, Fetch, TryIntoSample>
where
Expand All @@ -403,20 +403,20 @@ impl<
'a,
'b,
KeySpace,
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()>,
TryIntoSample,
> FetchingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandler, Fetch, TryIntoSample>
where
TryIntoSample: ExtractSample,
{
/// Add callback to [`FetchingSubscriber`].
#[inline]
pub fn callback<Callback>(
pub fn callback<F>(
self,
callback: Callback,
) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Callback, Fetch, TryIntoSample>
callback: F,
) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>, Fetch, TryIntoSample>
where
Callback: Fn(Sample) + Send + Sync + 'static,
F: Fn(&Sample) + Send + Sync + 'static,
{
let FetchingSubscriberBuilder {
session,
Expand All @@ -435,7 +435,7 @@ where
reliability,
origin,
fetch,
handler: callback,
handler: Callback::new(Arc::new(callback)),
phantom,
}
}
Expand All @@ -446,19 +446,12 @@ where
/// If your callback is also accepted by the [`callback`](FetchingSubscriberBuilder::callback)
/// method, we suggest you use it instead of `callback_mut`
#[inline]
pub fn callback_mut<CallbackMut>(
pub fn callback_mut<F>(
self,
callback: CallbackMut,
) -> FetchingSubscriberBuilder<
'a,
'b,
KeySpace,
impl Fn(Sample) + Send + Sync + 'static,
Fetch,
TryIntoSample,
>
callback: F,
) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>, Fetch, TryIntoSample>
where
CallbackMut: FnMut(Sample) + Send + Sync + 'static,
F: FnMut(&Sample) + Send + Sync + 'static,
{
self.callback(locked(callback))
}
Expand Down Expand Up @@ -499,7 +492,7 @@ impl<
'a,
'b,
Handler,
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()>,
TryIntoSample,
> FetchingSubscriberBuilder<'a, 'b, crate::UserSpace, Handler, Fetch, TryIntoSample>
where
Expand Down Expand Up @@ -540,7 +533,7 @@ impl<
'a,
KeySpace,
Handler,
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()>,
TryIntoSample,
> Resolvable for FetchingSubscriberBuilder<'a, '_, KeySpace, Handler, Fetch, TryIntoSample>
where
Expand All @@ -554,7 +547,7 @@ where
impl<
KeySpace,
Handler,
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
TryIntoSample,
> Wait for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample>
where
Expand All @@ -572,7 +565,7 @@ impl<
'a,
KeySpace,
Handler,
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
TryIntoSample,
> IntoFuture for FetchingSubscriberBuilder<'a, '_, KeySpace, Handler, Fetch, TryIntoSample>
where
Expand Down Expand Up @@ -622,7 +615,7 @@ where
/// ```
pub struct FetchingSubscriber<'a, Handler> {
subscriber: Subscriber<'a, ()>,
callback: Arc<dyn Fn(Sample) + Send + Sync + 'static>,
callback: Callback<Sample>,
state: Arc<Mutex<InnerState>>,
handler: Handler,
}
Expand All @@ -644,7 +637,7 @@ impl<'a, Handler> FetchingSubscriber<'a, Handler> {
fn new<
KeySpace,
InputHandler,
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
TryIntoSample,
>(
conf: FetchingSubscriberBuilder<'a, 'a, KeySpace, InputHandler, Fetch, TryIntoSample>,
Expand All @@ -665,10 +658,10 @@ impl<'a, Handler> FetchingSubscriber<'a, Handler> {
let sub_callback = {
let state = state.clone();
let callback = callback.clone();
move |s| {
move |s: &Sample| {
let state = &mut zlock!(state);
if state.pending_fetches == 0 {
callback(s);
callback.call(s);
} else {
tracing::trace!(
"Sample received while fetch in progress: push it to merge_queue"
Expand All @@ -683,7 +676,7 @@ impl<'a, Handler> FetchingSubscriber<'a, Handler> {
.unwrap_or(Timestamp::new(now, session_id.into()));
state
.merge_queue
.push(SampleBuilder::from(s).timestamp(timestamp).into());
.push(SampleBuilder::from(s.clone()).timestamp(timestamp).into());
}
}
};
Expand Down Expand Up @@ -772,7 +765,7 @@ impl<'a, Handler> FetchingSubscriber<'a, Handler> {
/// ```
#[inline]
pub fn fetch<
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
TryIntoSample,
>(
&self,
Expand All @@ -792,7 +785,7 @@ impl<'a, Handler> FetchingSubscriber<'a, Handler> {

struct RepliesHandler {
state: Arc<Mutex<InnerState>>,
callback: Arc<dyn Fn(Sample) + Send + Sync>,
callback: Callback<Sample>,
}

impl Drop for RepliesHandler {
Expand All @@ -809,7 +802,7 @@ impl Drop for RepliesHandler {
state.merge_queue.len()
);
for s in state.merge_queue.drain() {
(self.callback)(s);
self.callback.call_by_value(s);
}
}
}
Expand Down Expand Up @@ -849,26 +842,26 @@ impl Drop for RepliesHandler {
/// ```
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
pub struct FetchBuilder<
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()>,
TryIntoSample,
> where
TryIntoSample: ExtractSample,
{
fetch: Fetch,
phantom: std::marker::PhantomData<TryIntoSample>,
state: Arc<Mutex<InnerState>>,
callback: Arc<dyn Fn(Sample) + Send + Sync>,
callback: Callback<Sample>,
}

impl<Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>, TryIntoSample>
impl<Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()>, TryIntoSample>
Resolvable for FetchBuilder<Fetch, TryIntoSample>
where
TryIntoSample: ExtractSample,
{
type To = ZResult<()>;
}

impl<Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>, TryIntoSample> Wait
impl<Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()>, TryIntoSample> Wait
for FetchBuilder<Fetch, TryIntoSample>
where
TryIntoSample: ExtractSample,
Expand All @@ -879,7 +872,7 @@ where
}
}

impl<Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>, TryIntoSample>
impl<Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()>, TryIntoSample>
IntoFuture for FetchBuilder<Fetch, TryIntoSample>
where
TryIntoSample: ExtractSample,
Expand All @@ -892,17 +885,14 @@ where
}
}

fn register_handler(
state: Arc<Mutex<InnerState>>,
callback: Arc<dyn Fn(Sample) + Send + Sync>,
) -> RepliesHandler {
fn register_handler(state: Arc<Mutex<InnerState>>, callback: Callback<Sample>) -> RepliesHandler {
zlock!(state).pending_fetches += 1;
// pending fetches will be decremented in RepliesHandler drop()
RepliesHandler { state, callback }
}

fn run_fetch<
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()>,
TryIntoSample,
>(
fetch: Fetch,
Expand All @@ -912,7 +902,7 @@ where
TryIntoSample: ExtractSample,
{
tracing::debug!("Fetch data for FetchingSubscriber");
(fetch)(Box::new(move |s: TryIntoSample| match s.extract() {
(fetch)(Box::new(move |s: &TryIntoSample| match s.extract() {
Ok(s) => {
let mut state = zlock!(handler.state);
tracing::trace!("Fetched sample received: push it to merge_queue");
Expand Down
6 changes: 3 additions & 3 deletions zenoh-ext/src/subscriber_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub trait SubscriberBuilderExt<'a, 'b, Handler> {
/// # }
/// ```
fn fetching<
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()>,
TryIntoSample,
>(
self,
Expand Down Expand Up @@ -158,7 +158,7 @@ impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> for SubscriberBuilde
/// # }
/// ```
fn fetching<
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()>,
TryIntoSample,
>(
self,
Expand Down Expand Up @@ -270,7 +270,7 @@ impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler>
/// # }
/// ```
fn fetching<
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()>,
TryIntoSample,
>(
self,
Expand Down
5 changes: 3 additions & 2 deletions zenoh/src/api/handlers/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use super::{IntoHandler, RingChannelSender};

/// A function that can transform a [`FnMut`]`(T)` to
/// a [`Fn`]`(T)` with the help of a [`Mutex`](std::sync::Mutex).
pub fn locked<T>(fnmut: impl FnMut(T)) -> impl Fn(T) {
pub fn locked<T>(fnmut: impl FnMut(&T)) -> impl Fn(&T) {
let lock = std::sync::Mutex::new(fnmut);
move |x| zlock!(lock)(x)
}
Expand Down Expand Up @@ -72,8 +72,9 @@ impl<T> Callback<T> {
}
}

#[cfg(feature = "internal")]
#[inline]
pub(crate) fn call_by_value(&self, arg: T) {
pub fn call_by_value(&self, arg: T) {
match &self.0 {
CallbackInner::Dyn(cb) => cb(&arg),
CallbackInner::Flume(tx) => {
Expand Down
Loading

0 comments on commit 467d27e

Please sign in to comment.