Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RingChannel sync/async/blocking/non-blocking #903

Merged
merged 3 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 40 additions & 16 deletions examples/examples/z_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//
use clap::Parser;
use std::time::Duration;
use zenoh::{config::Config, handlers::RingBuffer, prelude::r#async::*};
use zenoh::{config::Config, handlers::RingChannel, prelude::r#async::*};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand All @@ -29,32 +29,27 @@ async fn main() {
println!("Declaring Subscriber on '{key_expr}'...");
let subscriber = session
.declare_subscriber(&key_expr)
.with(RingBuffer::new(size))
.with(RingChannel::new(size))
.res()
.await
.unwrap();

println!(
"Pulling data every {:#?} seconds. Press CTRL-C to quit...",
interval
);
println!("Press CTRL-C to quit...");

// Blocking recv. If the ring is empty, wait for the first sample to arrive.
loop {
match subscriber.recv() {
Ok(Some(sample)) => {
// Use .recv() for the synchronous version.
match subscriber.recv_async().await {
Ok(sample) => {
let payload = sample
.payload()
.deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));
println!(
">> [Subscriber] Pulled {} ('{}': '{}')",
">> [Subscriber] Pulled {} ('{}': '{}')... performing a computation of {:#?}",
sample.kind(),
sample.key_expr().as_str(),
payload,
);
}
Ok(None) => {
println!(
">> [Subscriber] Pulled nothing... sleep for {:#?}",
interval
);
tokio::time::sleep(interval).await;
Expand All @@ -65,6 +60,35 @@ async fn main() {
}
}
}

// Non-blocking recv. This can be usually used to implement a polling mechanism.
// loop {
// match subscriber.try_recv() {
// Ok(Some(sample)) => {
// let payload = sample
// .payload()
// .deserialize::<String>()
// .unwrap_or_else(|e| format!("{}", e));
// println!(
// ">> [Subscriber] Pulled {} ('{}': '{}')",
// sample.kind(),
// sample.key_expr().as_str(),
// payload,
// );
// }
// Ok(None) => {
// println!(
// ">> [Subscriber] Pulled nothing... sleep for {:#?}",
// interval
// );
// tokio::time::sleep(interval).await;
// }
// Err(e) => {
// println!(">> [Subscriber] Pull error: {e}");
// return;
// }
// }
// }
}

#[derive(clap::Parser, Clone, PartialEq, Debug)]
Expand All @@ -73,10 +97,10 @@ struct SubArgs {
/// The Key Expression to subscribe to.
key: KeyExpr<'static>,
/// The size of the ringbuffer.
#[arg(long, default_value = "3")]
#[arg(short, long, default_value = "3")]
size: usize,
/// The interval for pulling the ringbuffer.
#[arg(long, default_value = "5.0")]
#[arg(short, long, default_value = "5.0")]
interval: f32,
#[command(flatten)]
common: CommonArgs,
Expand Down
186 changes: 0 additions & 186 deletions zenoh/src/handlers.rs

This file was deleted.

90 changes: 90 additions & 0 deletions zenoh/src/handlers/callback.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

//! Callback handler trait.
use super::{Dyn, IntoHandler};

/// A function that can transform a [`FnMut`]`(T)` to
/// a [`Fn`]`(T)` with the help of a [`Mutex`](std::sync::Mutex).
pub fn locked<T>(fnmut: impl FnMut(T)) -> impl Fn(T) {
let lock = std::sync::Mutex::new(fnmut);
move |x| zlock!(lock)(x)
}

/// An immutable callback function.
pub type Callback<'a, T> = Dyn<dyn Fn(T) + Send + Sync + 'a>;

impl<'a, T, F> IntoHandler<'a, T> for F
where
F: Fn(T) + Send + Sync + 'a,
{
type Handler = ();
fn into_handler(self) -> (Callback<'a, T>, Self::Handler) {
(Dyn::from(self), ())
}
}

impl<T: Send + 'static> IntoHandler<'static, T> for (flume::Sender<T>, flume::Receiver<T>) {
type Handler = flume::Receiver<T>;

fn into_handler(self) -> (Callback<'static, T>, Self::Handler) {
let (sender, receiver) = self;
(
Dyn::new(move |t| {
if let Err(e) = sender.send(t) {
log::error!("{}", e)
}
}),
receiver,
)
}
}

/// A handler containing 2 callback functions:
/// - `callback`: the typical callback function. `context` will be passed as its last argument.
/// - `drop`: a callback called when this handler is dropped.
///
/// It is guaranteed that:
///
/// - `callback` will never be called once `drop` has started.
/// - `drop` will only be called **once**, and **after every** `callback` has ended.
/// - The two previous guarantees imply that `call` and `drop` are never called concurrently.
pub struct CallbackDrop<Callback, DropFn>
where
DropFn: FnMut() + Send + Sync + 'static,
{
pub callback: Callback,
pub drop: DropFn,
}

impl<Callback, DropFn> Drop for CallbackDrop<Callback, DropFn>
where
DropFn: FnMut() + Send + Sync + 'static,
{
fn drop(&mut self) {
(self.drop)()
}
}

impl<'a, OnEvent, Event, DropFn> IntoHandler<'a, Event> for CallbackDrop<OnEvent, DropFn>
where
OnEvent: Fn(Event) + Send + Sync + 'a,
DropFn: FnMut() + Send + Sync + 'static,
{
type Handler = ();

fn into_handler(self) -> (Callback<'a, Event>, Self::Handler) {
(Dyn::from(move |evt| (self.callback)(evt)), ())
}
}
Loading
Loading