Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: make callback opaque #1383

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions commons/zenoh-config/src/wrappers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ impl FromStr for ZenohId {
}

/// A zenoh Hello message.
#[derive(Clone, PartialEq, Eq)]
#[repr(transparent)]
pub struct Hello(HelloProto);

Expand Down
8 changes: 5 additions & 3 deletions zenoh-ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@ impl From<LivelinessSpace> for KeySpace {
}

pub trait ExtractSample {
fn extract(self) -> ZResult<Sample>;
fn extract(&self) -> ZResult<Sample>;
}

impl ExtractSample for Reply {
fn extract(self) -> ZResult<Sample> {
self.into_result().map_err(|e| zerror!("{:?}", e).into())
fn extract(&self) -> ZResult<Sample> {
self.result()
.cloned()
.map_err(|e| zerror!("{:?}", e).into())
}
}
112 changes: 51 additions & 61 deletions zenoh-ext/src/querying_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{
};

use zenoh::{
handlers::{locked, DefaultHandler, IntoHandler},
handlers::{locked, Callback, DefaultHandler, IntoHandler},
internal::zlock,
key_expr::KeyExpr,
prelude::Wait,
Expand Down Expand Up @@ -54,12 +54,12 @@ pub struct QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler> {
impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandler> {
/// Add callback to [`FetchingSubscriber`].
#[inline]
pub fn callback<Callback>(
pub fn callback<F>(
self,
callback: Callback,
) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Callback>
callback: F,
) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>>
where
Callback: Fn(Sample) + Send + Sync + 'static,
F: Fn(&Sample) + Send + Sync + 'static,
{
let QueryingSubscriberBuilder {
session,
Expand All @@ -85,7 +85,7 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle
query_consolidation,
query_accept_replies,
query_timeout,
handler: callback,
handler: Callback::new(Arc::new(callback)),
}
}

Expand All @@ -95,12 +95,12 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle
/// If your callback is also accepted by the [`callback`](QueryingSubscriberBuilder::callback)
/// method, we suggest you use it instead of `callback_mut`
#[inline]
pub fn callback_mut<CallbackMut>(
pub fn callback_mut<F>(
self,
callback: CallbackMut,
) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, impl Fn(Sample) + Send + Sync + 'static>
callback: F,
) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>>
where
CallbackMut: FnMut(Sample) + Send + Sync + 'static,
F: FnMut(&Sample) + Send + Sync + 'static,
{
self.callback(locked(callback))
}
Expand All @@ -112,7 +112,7 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle
handler: Handler,
) -> QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler>
where
Handler: IntoHandler<'static, Sample>,
Handler: IntoHandler<Sample>,
{
let QueryingSubscriberBuilder {
session,
Expand Down Expand Up @@ -221,7 +221,7 @@ impl<'a, 'b, KeySpace, Handler> QueryingSubscriberBuilder<'a, 'b, KeySpace, Hand

impl<'a, KeySpace, Handler> Resolvable for QueryingSubscriberBuilder<'a, '_, KeySpace, Handler>
where
Handler: IntoHandler<'static, Sample>,
Handler: IntoHandler<Sample>,
Handler::Handler: Send,
{
type To = ZResult<FetchingSubscriber<'a, Handler::Handler>>;
Expand All @@ -230,7 +230,7 @@ where
impl<KeySpace, Handler> Wait for QueryingSubscriberBuilder<'_, '_, KeySpace, Handler>
where
KeySpace: Into<crate::KeySpace> + Clone,
Handler: IntoHandler<'static, Sample> + Send,
Handler: IntoHandler<Sample> + Send,
Handler::Handler: Send,
{
fn wait(self) -> <Self as Resolvable>::To {
Expand Down Expand Up @@ -279,7 +279,7 @@ where
impl<'a, KeySpace, Handler> IntoFuture for QueryingSubscriberBuilder<'a, '_, KeySpace, Handler>
where
KeySpace: Into<crate::KeySpace> + Clone,
Handler: IntoHandler<'static, Sample> + Send,
Handler: IntoHandler<Sample> + Send,
Handler::Handler: Send,
{
type Output = <Self as Resolvable>::To;
Expand Down Expand Up @@ -357,7 +357,7 @@ pub struct FetchingSubscriberBuilder<
'b,
KeySpace,
Handler,
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()>,
TryIntoSample,
> where
TryIntoSample: ExtractSample,
Expand All @@ -377,7 +377,7 @@ impl<
'b,
KeySpace,
Handler,
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()>,
TryIntoSample,
> FetchingSubscriberBuilder<'a, 'b, KeySpace, Handler, Fetch, TryIntoSample>
where
Expand All @@ -403,20 +403,20 @@ impl<
'a,
'b,
KeySpace,
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()>,
TryIntoSample,
> FetchingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandler, Fetch, TryIntoSample>
where
TryIntoSample: ExtractSample,
{
/// Add callback to [`FetchingSubscriber`].
#[inline]
pub fn callback<Callback>(
pub fn callback<F>(
self,
callback: Callback,
) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Callback, Fetch, TryIntoSample>
callback: F,
) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>, Fetch, TryIntoSample>
where
Callback: Fn(Sample) + Send + Sync + 'static,
F: Fn(&Sample) + Send + Sync + 'static,
{
let FetchingSubscriberBuilder {
session,
Expand All @@ -435,7 +435,7 @@ where
reliability,
origin,
fetch,
handler: callback,
handler: Callback::new(Arc::new(callback)),
phantom,
}
}
Expand All @@ -446,19 +446,12 @@ where
/// If your callback is also accepted by the [`callback`](FetchingSubscriberBuilder::callback)
/// method, we suggest you use it instead of `callback_mut`
#[inline]
pub fn callback_mut<CallbackMut>(
pub fn callback_mut<F>(
self,
callback: CallbackMut,
) -> FetchingSubscriberBuilder<
'a,
'b,
KeySpace,
impl Fn(Sample) + Send + Sync + 'static,
Fetch,
TryIntoSample,
>
callback: F,
) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Callback<Sample>, Fetch, TryIntoSample>
where
CallbackMut: FnMut(Sample) + Send + Sync + 'static,
F: FnMut(&Sample) + Send + Sync + 'static,
{
self.callback(locked(callback))
}
Expand All @@ -470,7 +463,7 @@ where
handler: Handler,
) -> FetchingSubscriberBuilder<'a, 'b, KeySpace, Handler, Fetch, TryIntoSample>
where
Handler: IntoHandler<'static, Sample>,
Handler: IntoHandler<Sample>,
{
let FetchingSubscriberBuilder {
session,
Expand Down Expand Up @@ -499,7 +492,7 @@ impl<
'a,
'b,
Handler,
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()>,
TryIntoSample,
> FetchingSubscriberBuilder<'a, 'b, crate::UserSpace, Handler, Fetch, TryIntoSample>
where
Expand Down Expand Up @@ -540,11 +533,11 @@ impl<
'a,
KeySpace,
Handler,
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()>,
TryIntoSample,
> Resolvable for FetchingSubscriberBuilder<'a, '_, KeySpace, Handler, Fetch, TryIntoSample>
where
Handler: IntoHandler<'static, Sample>,
Handler: IntoHandler<Sample>,
Handler::Handler: Send,
TryIntoSample: ExtractSample,
{
Expand All @@ -554,12 +547,12 @@ where
impl<
KeySpace,
Handler,
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
TryIntoSample,
> Wait for FetchingSubscriberBuilder<'_, '_, KeySpace, Handler, Fetch, TryIntoSample>
where
KeySpace: Into<crate::KeySpace>,
Handler: IntoHandler<'static, Sample> + Send,
Handler: IntoHandler<Sample> + Send,
Handler::Handler: Send,
TryIntoSample: ExtractSample + Send + Sync,
{
Expand All @@ -572,12 +565,12 @@ impl<
'a,
KeySpace,
Handler,
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
TryIntoSample,
> IntoFuture for FetchingSubscriberBuilder<'a, '_, KeySpace, Handler, Fetch, TryIntoSample>
where
KeySpace: Into<crate::KeySpace>,
Handler: IntoHandler<'static, Sample> + Send,
Handler: IntoHandler<Sample> + Send,
Handler::Handler: Send,
TryIntoSample: ExtractSample + Send + Sync,
{
Expand Down Expand Up @@ -622,7 +615,7 @@ where
/// ```
pub struct FetchingSubscriber<'a, Handler> {
subscriber: Subscriber<'a, ()>,
callback: Arc<dyn Fn(Sample) + Send + Sync + 'static>,
callback: Callback<Sample>,
state: Arc<Mutex<InnerState>>,
handler: Handler,
}
Expand All @@ -644,14 +637,14 @@ impl<'a, Handler> FetchingSubscriber<'a, Handler> {
fn new<
KeySpace,
InputHandler,
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
TryIntoSample,
>(
conf: FetchingSubscriberBuilder<'a, 'a, KeySpace, InputHandler, Fetch, TryIntoSample>,
) -> ZResult<Self>
where
KeySpace: Into<crate::KeySpace>,
InputHandler: IntoHandler<'static, Sample, Handler = Handler> + Send,
InputHandler: IntoHandler<Sample, Handler = Handler> + Send,
TryIntoSample: ExtractSample + Send + Sync,
{
let session_id = conf.session.zid();
Expand All @@ -665,10 +658,10 @@ impl<'a, Handler> FetchingSubscriber<'a, Handler> {
let sub_callback = {
let state = state.clone();
let callback = callback.clone();
move |s| {
move |s: &Sample| {
let state = &mut zlock!(state);
if state.pending_fetches == 0 {
callback(s);
callback.call(s);
} else {
tracing::trace!(
"Sample received while fetch in progress: push it to merge_queue"
Expand All @@ -683,7 +676,7 @@ impl<'a, Handler> FetchingSubscriber<'a, Handler> {
.unwrap_or(Timestamp::new(now, session_id.into()));
state
.merge_queue
.push(SampleBuilder::from(s).timestamp(timestamp).into());
.push(SampleBuilder::from(s.clone()).timestamp(timestamp).into());
}
}
};
Expand Down Expand Up @@ -772,7 +765,7 @@ impl<'a, Handler> FetchingSubscriber<'a, Handler> {
/// ```
#[inline]
pub fn fetch<
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
TryIntoSample,
>(
&self,
Expand All @@ -792,7 +785,7 @@ impl<'a, Handler> FetchingSubscriber<'a, Handler> {

struct RepliesHandler {
state: Arc<Mutex<InnerState>>,
callback: Arc<dyn Fn(Sample) + Send + Sync>,
callback: Callback<Sample>,
}

impl Drop for RepliesHandler {
Expand All @@ -809,7 +802,7 @@ impl Drop for RepliesHandler {
state.merge_queue.len()
);
for s in state.merge_queue.drain() {
(self.callback)(s);
self.callback.call_by_value(s);
}
}
}
Expand Down Expand Up @@ -849,26 +842,26 @@ impl Drop for RepliesHandler {
/// ```
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
pub struct FetchBuilder<
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()>,
TryIntoSample,
> where
TryIntoSample: ExtractSample,
{
fetch: Fetch,
phantom: std::marker::PhantomData<TryIntoSample>,
state: Arc<Mutex<InnerState>>,
callback: Arc<dyn Fn(Sample) + Send + Sync>,
callback: Callback<Sample>,
}

impl<Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>, TryIntoSample>
impl<Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()>, TryIntoSample>
Resolvable for FetchBuilder<Fetch, TryIntoSample>
where
TryIntoSample: ExtractSample,
{
type To = ZResult<()>;
}

impl<Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>, TryIntoSample> Wait
impl<Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()>, TryIntoSample> Wait
for FetchBuilder<Fetch, TryIntoSample>
where
TryIntoSample: ExtractSample,
Expand All @@ -879,7 +872,7 @@ where
}
}

impl<Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>, TryIntoSample>
impl<Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()>, TryIntoSample>
IntoFuture for FetchBuilder<Fetch, TryIntoSample>
where
TryIntoSample: ExtractSample,
Expand All @@ -892,17 +885,14 @@ where
}
}

fn register_handler(
state: Arc<Mutex<InnerState>>,
callback: Arc<dyn Fn(Sample) + Send + Sync>,
) -> RepliesHandler {
fn register_handler(state: Arc<Mutex<InnerState>>, callback: Callback<Sample>) -> RepliesHandler {
zlock!(state).pending_fetches += 1;
// pending fetches will be decremented in RepliesHandler drop()
RepliesHandler { state, callback }
}

fn run_fetch<
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
Fetch: FnOnce(Box<dyn Fn(&TryIntoSample) + Send + Sync>) -> ZResult<()>,
TryIntoSample,
>(
fetch: Fetch,
Expand All @@ -912,7 +902,7 @@ where
TryIntoSample: ExtractSample,
{
tracing::debug!("Fetch data for FetchingSubscriber");
(fetch)(Box::new(move |s: TryIntoSample| match s.extract() {
(fetch)(Box::new(move |s: &TryIntoSample| match s.extract() {
Ok(s) => {
let mut state = zlock!(handler.state);
tracing::trace!("Fetched sample received: push it to merge_queue");
Expand Down
Loading