diff --git a/Cargo.lock b/Cargo.lock index 09e598d878..66c6c4f2c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -165,9 +165,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" [[package]] name = "anstream" -version = "0.6.13" +version = "0.6.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d96bd03f33fe50a863e394ee9718a706f988b9079b20c3784fb726e7678b62fb" +checksum = "96b09b5178381e0874812a9b157f7fe84982617e48f71f4e3235482775e5b540" dependencies = [ "anstyle", "anstyle-parse", @@ -1103,9 +1103,9 @@ dependencies = [ [[package]] name = "env_logger" -version = "0.11.3" +version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38b35839ba51819680ba087cd351788c9a3c476841207e0b8cee0b04722343b9" +checksum = "6c012a26a7f605efc424dd53697843a72be7dc86ad2d01f7814337794a12231d" dependencies = [ "anstream", "anstyle", @@ -1122,9 +1122,9 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "erased-serde" -version = "0.4.4" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b73807008a3c7f171cc40312f37d95ef0396e048b5848d775f54b1a4dd4a0d3" +checksum = "6c138974f9d5e7fe373eb04df7cae98833802ae4b11c24ac7039a21d5af4b26c" dependencies = [ "serde", ] @@ -1541,9 +1541,9 @@ dependencies = [ [[package]] name = "http" -version = "1.1.0" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" dependencies = [ "bytes", "fnv", @@ -1854,9 +1854,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.21" +version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" dependencies = [ "serde", "value-bag", @@ -2865,9 +2865,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.22.3" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99008d7ad0bbbea527ec27bddbc0e432c5b87d8175178cee68d2eec9c4a1813c" +checksum = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41" dependencies = [ "log", "ring 0.17.6", @@ -2923,9 +2923,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.4.1" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ecd36cc4259e3e4514335c4a138c6b43171a8d61d8f5c9348f9fc7529416f247" +checksum = "5ede67b28608b4c60685c7d54122d4400d90f62b40caee7700e700380a390fa8" [[package]] name = "rustls-webpki" @@ -3701,9 +3701,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.37.0" +version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" +checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" dependencies = [ "backtrace", "bytes", @@ -3743,7 +3743,7 @@ version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" dependencies = [ - "rustls 0.22.3", + "rustls 0.22.2", "rustls-pki-types", "tokio", ] @@ -4036,9 +4036,9 @@ dependencies = [ [[package]] name = "value-bag" -version = "1.8.1" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74797339c3b98616c009c7c3eb53a0ce41e85c8ec66bd3db96ed132d20cfdee8" +checksum = "d92ccd67fb88503048c01b59152a04effd0782d035a83a6d256ce6085f08f4a3" dependencies = [ "value-bag-serde1", "value-bag-sval2", @@ -4046,9 +4046,9 @@ dependencies = [ [[package]] name = "value-bag-serde1" -version = "1.8.1" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc35703541cbccb5278ef7b589d79439fc808ff0b5867195a3230f9a47421d39" +checksum = "b0b9f3feef403a50d4d67e9741a6d8fc688bcbb4e4f31bd4aab72cc690284394" dependencies = [ "erased-serde", "serde", @@ -4057,9 +4057,9 @@ dependencies = [ [[package]] name = "value-bag-sval2" -version = "1.8.1" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "285b43c29d0b4c0e65aad24561baee67a1b69dc9be9375d4a85138cbf556f7f8" +checksum = "30b24f4146b6f3361e91cbf527d1fb35e9376c3c0cef72ca5ec5af6d640fad7d" dependencies = [ "sval", "sval_buffer", @@ -4686,7 +4686,7 @@ dependencies = [ "flume", "futures", "log", - "rustls 0.22.3", + "rustls 0.22.2", "rustls-webpki 0.102.2", "serde", "tokio", @@ -4773,7 +4773,7 @@ dependencies = [ "base64 0.21.4", "futures", "log", - "rustls 0.22.3", + "rustls 0.22.2", "rustls-pemfile 2.0.0", "rustls-pki-types", "rustls-webpki 0.102.2", diff --git a/examples/examples/z_get.rs b/examples/examples/z_get.rs index 8735ae8daa..486346a8ea 100644 --- a/examples/examples/z_get.rs +++ b/examples/examples/z_get.rs @@ -30,6 +30,10 @@ async fn main() { println!("Sending Query '{selector}'..."); let replies = session .get(&selector) + // // By default get receives replies from a FIFO. + // // Uncomment this line to use a ring channel instead. + // // More information on the ring channel are available in the z_pull example. + .with(zenoh::handlers::RingChannel::default()) .value(value) .target(target) .timeout(timeout) diff --git a/examples/examples/z_pull.rs b/examples/examples/z_pull.rs index 4e44930f4f..d6ae465555 100644 --- a/examples/examples/z_pull.rs +++ b/examples/examples/z_pull.rs @@ -13,7 +13,7 @@ // use clap::Parser; use std::time::Duration; -use zenoh::{config::Config, handlers::RingBuffer, prelude::r#async::*}; +use zenoh::{config::Config, handlers::RingChannel, prelude::r#async::*}; use zenoh_examples::CommonArgs; #[tokio::main] @@ -29,32 +29,27 @@ async fn main() { println!("Declaring Subscriber on '{key_expr}'..."); let subscriber = session .declare_subscriber(&key_expr) - .with(RingBuffer::new(size)) + .with(RingChannel::new(size)) .res() .await .unwrap(); - println!( - "Pulling data every {:#?} seconds. Press CTRL-C to quit...", - interval - ); + println!("Press CTRL-C to quit..."); + + // Blocking recv. If the ring is empty, wait for the first sample to arrive. loop { - match subscriber.recv() { - Ok(Some(sample)) => { + // Use .recv() for the synchronous version. + match subscriber.recv_async().await { + Ok(sample) => { let payload = sample .payload() .deserialize::() .unwrap_or_else(|e| format!("{}", e)); println!( - ">> [Subscriber] Pulled {} ('{}': '{}')", + ">> [Subscriber] Pulled {} ('{}': '{}')... performing a computation of {:#?}", sample.kind(), sample.key_expr().as_str(), payload, - ); - } - Ok(None) => { - println!( - ">> [Subscriber] Pulled nothing... sleep for {:#?}", interval ); tokio::time::sleep(interval).await; @@ -65,6 +60,35 @@ async fn main() { } } } + + // Non-blocking recv. This can be usually used to implement a polling mechanism. + // loop { + // match subscriber.try_recv() { + // Ok(Some(sample)) => { + // let payload = sample + // .payload() + // .deserialize::() + // .unwrap_or_else(|e| format!("{}", e)); + // println!( + // ">> [Subscriber] Pulled {} ('{}': '{}')", + // sample.kind(), + // sample.key_expr().as_str(), + // payload, + // ); + // } + // Ok(None) => { + // println!( + // ">> [Subscriber] Pulled nothing... sleep for {:#?}", + // interval + // ); + // tokio::time::sleep(interval).await; + // } + // Err(e) => { + // println!(">> [Subscriber] Pull error: {e}"); + // return; + // } + // } + // } } #[derive(clap::Parser, Clone, PartialEq, Debug)] @@ -73,10 +97,10 @@ struct SubArgs { /// The Key Expression to subscribe to. key: KeyExpr<'static>, /// The size of the ringbuffer. - #[arg(long, default_value = "3")] + #[arg(short, long, default_value = "3")] size: usize, /// The interval for pulling the ringbuffer. - #[arg(long, default_value = "5.0")] + #[arg(short, long, default_value = "5.0")] interval: f32, #[command(flatten)] common: CommonArgs, diff --git a/examples/examples/z_queryable.rs b/examples/examples/z_queryable.rs index 83ac63ce1f..5113f1c2b7 100644 --- a/examples/examples/z_queryable.rs +++ b/examples/examples/z_queryable.rs @@ -29,6 +29,10 @@ async fn main() { println!("Declaring Queryable on '{key_expr}'..."); let queryable = session .declare_queryable(&key_expr) + // // By default queryable receives queries from a FIFO. + // // Uncomment this line to use a ring channel instead. + // // More information on the ring channel are available in the z_pull example. + // .with(zenoh::handlers::RingChannel::default()) .complete(complete) .res() .await diff --git a/zenoh/src/handlers.rs b/zenoh/src/handlers.rs deleted file mode 100644 index c5d2c6bb90..0000000000 --- a/zenoh/src/handlers.rs +++ /dev/null @@ -1,186 +0,0 @@ -// -// Copyright (c) 2023 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -//! Callback handler trait. -use crate::API_DATA_RECEPTION_CHANNEL_SIZE; - -use std::sync::{Arc, Mutex, Weak}; -use zenoh_collections::RingBuffer as RingBufferInner; -use zenoh_result::ZResult; - -/// An alias for `Arc`. -pub type Dyn = std::sync::Arc; - -/// An immutable callback function. -pub type Callback<'a, T> = Dyn; - -/// A type that can be converted into a [`Callback`]-handler pair. -/// -/// When Zenoh functions accept types that implement these, it intends to use the [`Callback`] as just that, -/// while granting you access to the handler through the returned value via [`std::ops::Deref`] and [`std::ops::DerefMut`]. -/// -/// Any closure that accepts `T` can be converted into a pair of itself and `()`. -pub trait IntoHandler<'a, T> { - type Handler; - - fn into_handler(self) -> (Callback<'a, T>, Self::Handler); -} - -impl<'a, T, F> IntoHandler<'a, T> for F -where - F: Fn(T) + Send + Sync + 'a, -{ - type Handler = (); - fn into_handler(self) -> (Callback<'a, T>, Self::Handler) { - (Dyn::from(self), ()) - } -} - -impl IntoHandler<'static, T> for (flume::Sender, flume::Receiver) { - type Handler = flume::Receiver; - - fn into_handler(self) -> (Callback<'static, T>, Self::Handler) { - let (sender, receiver) = self; - ( - Dyn::new(move |t| { - if let Err(e) = sender.send(t) { - log::error!("{}", e) - } - }), - receiver, - ) - } -} - -/// The default handler in Zenoh is a FIFO queue. -pub struct DefaultHandler; - -impl IntoHandler<'static, T> for DefaultHandler { - type Handler = flume::Receiver; - - fn into_handler(self) -> (Callback<'static, T>, Self::Handler) { - flume::bounded(*API_DATA_RECEPTION_CHANNEL_SIZE).into_handler() - } -} - -impl IntoHandler<'static, T> - for (std::sync::mpsc::SyncSender, std::sync::mpsc::Receiver) -{ - type Handler = std::sync::mpsc::Receiver; - - fn into_handler(self) -> (Callback<'static, T>, Self::Handler) { - let (sender, receiver) = self; - ( - Dyn::new(move |t| { - if let Err(e) = sender.send(t) { - log::error!("{}", e) - } - }), - receiver, - ) - } -} - -/// Ring buffer with a limited queue size, which allows users to keep the last N data. -pub struct RingBuffer { - ring: Arc>>, -} - -impl RingBuffer { - /// Initialize the RingBuffer with the capacity size. - pub fn new(capacity: usize) -> Self { - RingBuffer { - ring: Arc::new(Mutex::new(RingBufferInner::new(capacity))), - } - } -} - -pub struct RingBufferHandler { - ring: Weak>>, -} - -impl RingBufferHandler { - pub fn recv(&self) -> ZResult> { - let Some(ring) = self.ring.upgrade() else { - bail!("The ringbuffer has been deleted."); - }; - let mut guard = ring.lock().map_err(|e| zerror!("{}", e))?; - Ok(guard.pull()) - } -} - -impl IntoHandler<'static, T> for RingBuffer { - type Handler = RingBufferHandler; - - fn into_handler(self) -> (Callback<'static, T>, Self::Handler) { - let receiver = RingBufferHandler { - ring: Arc::downgrade(&self.ring), - }; - ( - Dyn::new(move |t| match self.ring.lock() { - Ok(mut g) => { - // Eventually drop the oldest element. - g.push_force(t); - } - Err(e) => log::error!("{}", e), - }), - receiver, - ) - } -} - -/// A function that can transform a [`FnMut`]`(T)` to -/// a [`Fn`]`(T)` with the help of a [`Mutex`](std::sync::Mutex). -pub fn locked(fnmut: impl FnMut(T)) -> impl Fn(T) { - let lock = std::sync::Mutex::new(fnmut); - move |x| zlock!(lock)(x) -} - -/// A handler containing 2 callback functions: -/// - `callback`: the typical callback function. `context` will be passed as its last argument. -/// - `drop`: a callback called when this handler is dropped. -/// -/// It is guaranteed that: -/// -/// - `callback` will never be called once `drop` has started. -/// - `drop` will only be called **once**, and **after every** `callback` has ended. -/// - The two previous guarantees imply that `call` and `drop` are never called concurrently. -pub struct CallbackDrop -where - DropFn: FnMut() + Send + Sync + 'static, -{ - pub callback: Callback, - pub drop: DropFn, -} - -impl Drop for CallbackDrop -where - DropFn: FnMut() + Send + Sync + 'static, -{ - fn drop(&mut self) { - (self.drop)() - } -} - -impl<'a, OnEvent, Event, DropFn> IntoHandler<'a, Event> for CallbackDrop -where - OnEvent: Fn(Event) + Send + Sync + 'a, - DropFn: FnMut() + Send + Sync + 'static, -{ - type Handler = (); - - fn into_handler(self) -> (Callback<'a, Event>, Self::Handler) { - (Dyn::from(move |evt| (self.callback)(evt)), ()) - } -} diff --git a/zenoh/src/handlers/callback.rs b/zenoh/src/handlers/callback.rs new file mode 100644 index 0000000000..21c1b0878c --- /dev/null +++ b/zenoh/src/handlers/callback.rs @@ -0,0 +1,90 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +//! Callback handler trait. +use super::{Dyn, IntoHandler}; + +/// A function that can transform a [`FnMut`]`(T)` to +/// a [`Fn`]`(T)` with the help of a [`Mutex`](std::sync::Mutex). +pub fn locked(fnmut: impl FnMut(T)) -> impl Fn(T) { + let lock = std::sync::Mutex::new(fnmut); + move |x| zlock!(lock)(x) +} + +/// An immutable callback function. +pub type Callback<'a, T> = Dyn; + +impl<'a, T, F> IntoHandler<'a, T> for F +where + F: Fn(T) + Send + Sync + 'a, +{ + type Handler = (); + fn into_handler(self) -> (Callback<'a, T>, Self::Handler) { + (Dyn::from(self), ()) + } +} + +impl IntoHandler<'static, T> for (flume::Sender, flume::Receiver) { + type Handler = flume::Receiver; + + fn into_handler(self) -> (Callback<'static, T>, Self::Handler) { + let (sender, receiver) = self; + ( + Dyn::new(move |t| { + if let Err(e) = sender.send(t) { + log::error!("{}", e) + } + }), + receiver, + ) + } +} + +/// A handler containing 2 callback functions: +/// - `callback`: the typical callback function. `context` will be passed as its last argument. +/// - `drop`: a callback called when this handler is dropped. +/// +/// It is guaranteed that: +/// +/// - `callback` will never be called once `drop` has started. +/// - `drop` will only be called **once**, and **after every** `callback` has ended. +/// - The two previous guarantees imply that `call` and `drop` are never called concurrently. +pub struct CallbackDrop +where + DropFn: FnMut() + Send + Sync + 'static, +{ + pub callback: Callback, + pub drop: DropFn, +} + +impl Drop for CallbackDrop +where + DropFn: FnMut() + Send + Sync + 'static, +{ + fn drop(&mut self) { + (self.drop)() + } +} + +impl<'a, OnEvent, Event, DropFn> IntoHandler<'a, Event> for CallbackDrop +where + OnEvent: Fn(Event) + Send + Sync + 'a, + DropFn: FnMut() + Send + Sync + 'static, +{ + type Handler = (); + + fn into_handler(self) -> (Callback<'a, Event>, Self::Handler) { + (Dyn::from(move |evt| (self.callback)(evt)), ()) + } +} diff --git a/zenoh/src/handlers/fifo.rs b/zenoh/src/handlers/fifo.rs new file mode 100644 index 0000000000..0fa3ab304c --- /dev/null +++ b/zenoh/src/handlers/fifo.rs @@ -0,0 +1,61 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +//! Callback handler trait. +use super::{callback::Callback, Dyn, IntoHandler, API_DATA_RECEPTION_CHANNEL_SIZE}; + +/// The default handler in Zenoh is a FIFO queue. + +pub struct FifoChannel { + capacity: usize, +} + +impl FifoChannel { + /// Initialize the RingBuffer with the capacity size. + pub fn new(capacity: usize) -> Self { + Self { capacity } + } +} + +impl Default for FifoChannel { + fn default() -> Self { + Self::new(*API_DATA_RECEPTION_CHANNEL_SIZE) + } +} + +impl IntoHandler<'static, T> for FifoChannel { + type Handler = flume::Receiver; + + fn into_handler(self) -> (Callback<'static, T>, Self::Handler) { + flume::bounded(self.capacity).into_handler() + } +} + +impl IntoHandler<'static, T> + for (std::sync::mpsc::SyncSender, std::sync::mpsc::Receiver) +{ + type Handler = std::sync::mpsc::Receiver; + + fn into_handler(self) -> (Callback<'static, T>, Self::Handler) { + let (sender, receiver) = self; + ( + Dyn::new(move |t| { + if let Err(e) = sender.send(t) { + log::error!("{}", e) + } + }), + receiver, + ) + } +} diff --git a/zenoh/src/handlers/mod.rs b/zenoh/src/handlers/mod.rs new file mode 100644 index 0000000000..627c166795 --- /dev/null +++ b/zenoh/src/handlers/mod.rs @@ -0,0 +1,52 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +//! Callback handler trait. +mod callback; +mod fifo; +mod ring; + +pub use callback::*; +pub use fifo::*; +pub use ring::*; + +use crate::API_DATA_RECEPTION_CHANNEL_SIZE; + +/// An alias for `Arc`. +pub type Dyn = std::sync::Arc; + +/// A type that can be converted into a [`Callback`]-handler pair. +/// +/// When Zenoh functions accept types that implement these, it intends to use the [`Callback`] as just that, +/// while granting you access to the handler through the returned value via [`std::ops::Deref`] and [`std::ops::DerefMut`]. +/// +/// Any closure that accepts `T` can be converted into a pair of itself and `()`. +pub trait IntoHandler<'a, T> { + type Handler; + + fn into_handler(self) -> (Callback<'a, T>, Self::Handler); +} + +/// The default handler in Zenoh is a FIFO queue. +#[repr(transparent)] +#[derive(Default)] +pub struct DefaultHandler(FifoChannel); + +impl IntoHandler<'static, T> for DefaultHandler { + type Handler = >::Handler; + + fn into_handler(self) -> (Callback<'static, T>, Self::Handler) { + self.0.into_handler() + } +} diff --git a/zenoh/src/handlers/ring.rs b/zenoh/src/handlers/ring.rs new file mode 100644 index 0000000000..341a3efadd --- /dev/null +++ b/zenoh/src/handlers/ring.rs @@ -0,0 +1,116 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +//! Callback handler trait. +use crate::API_DATA_RECEPTION_CHANNEL_SIZE; + +use super::{callback::Callback, Dyn, IntoHandler}; +use std::sync::{Arc, Weak}; +use zenoh_collections::RingBuffer; +use zenoh_result::ZResult; + +/// A synchrounous ring channel with a limited size that allows users to keep the last N data. +pub struct RingChannel { + capacity: usize, +} + +impl RingChannel { + /// Initialize the RingBuffer with the capacity size. + pub fn new(capacity: usize) -> Self { + Self { capacity } + } +} + +impl Default for RingChannel { + fn default() -> Self { + Self::new(*API_DATA_RECEPTION_CHANNEL_SIZE) + } +} + +struct RingChannelInner { + ring: std::sync::Mutex>, + not_empty: flume::Receiver<()>, +} + +pub struct RingChannelHandler { + ring: Weak>, +} + +impl RingChannelHandler { + /// Receive from the ring channel. If the ring channel is empty, this call will block until an element is available in the channel. + pub fn recv(&self) -> ZResult { + let Some(channel) = self.ring.upgrade() else { + bail!("The ringbuffer has been deleted."); + }; + loop { + if let Some(t) = channel.ring.lock().map_err(|e| zerror!("{}", e))?.pull() { + return Ok(t); + } + channel.not_empty.recv().map_err(|e| zerror!("{}", e))?; + } + } + + /// Receive from the ring channel. If the ring channel is empty, this call will block until an element is available in the channel. + pub async fn recv_async(&self) -> ZResult { + let Some(channel) = self.ring.upgrade() else { + bail!("The ringbuffer has been deleted."); + }; + loop { + if let Some(t) = channel.ring.lock().map_err(|e| zerror!("{}", e))?.pull() { + return Ok(t); + } + channel + .not_empty + .recv_async() + .await + .map_err(|e| zerror!("{}", e))?; + } + } + + /// Try to receive from the ring channel. If the ring channel is empty, this call will return immediately without blocking. + pub fn try_recv(&self) -> ZResult> { + let Some(channel) = self.ring.upgrade() else { + bail!("The ringbuffer has been deleted."); + }; + let mut guard = channel.ring.lock().map_err(|e| zerror!("{}", e))?; + Ok(guard.pull()) + } +} + +impl IntoHandler<'static, T> for RingChannel { + type Handler = RingChannelHandler; + + fn into_handler(self) -> (Callback<'static, T>, Self::Handler) { + let (sender, receiver) = flume::bounded(1); + let inner = Arc::new(RingChannelInner { + ring: std::sync::Mutex::new(RingBuffer::new(self.capacity)), + not_empty: receiver, + }); + let receiver = RingChannelHandler { + ring: Arc::downgrade(&inner), + }; + ( + Dyn::new(move |t| match inner.ring.lock() { + Ok(mut g) => { + // Eventually drop the oldest element. + g.push_force(t); + drop(g); + let _ = sender.try_send(()); + } + Err(e) => log::error!("{}", e), + }), + receiver, + ) + } +} diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index ea212485ec..90b4b2af58 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -214,7 +214,7 @@ where ScoutBuilder { what: what.into(), config: config.try_into().map_err(|e| e.into()), - handler: DefaultHandler, + handler: DefaultHandler::default(), } } diff --git a/zenoh/src/liveliness.rs b/zenoh/src/liveliness.rs index 23e1846741..a28292fda2 100644 --- a/zenoh/src/liveliness.rs +++ b/zenoh/src/liveliness.rs @@ -155,7 +155,7 @@ impl<'a> Liveliness<'a> { LivelinessSubscriberBuilder { session: self.session.clone(), key_expr: TryIntoKeyExpr::try_into(key_expr).map_err(Into::into), - handler: DefaultHandler, + handler: DefaultHandler::default(), } } @@ -198,7 +198,7 @@ impl<'a> Liveliness<'a> { session: &self.session, key_expr, timeout, - handler: DefaultHandler, + handler: DefaultHandler::default(), } } } diff --git a/zenoh/src/publication.rs b/zenoh/src/publication.rs index cdd9e810a6..cefebc2141 100644 --- a/zenoh/src/publication.rs +++ b/zenoh/src/publication.rs @@ -519,7 +519,7 @@ impl<'a> Publisher<'a> { pub fn matching_listener(&self) -> MatchingListenerBuilder<'_, DefaultHandler> { MatchingListenerBuilder { publisher: PublisherRef::Borrow(self), - handler: DefaultHandler, + handler: DefaultHandler::default(), } } @@ -624,7 +624,7 @@ impl PublisherDeclarations for std::sync::Arc> { fn matching_listener(&self) -> MatchingListenerBuilder<'static, DefaultHandler> { MatchingListenerBuilder { publisher: PublisherRef::Shared(self.clone()), - handler: DefaultHandler, + handler: DefaultHandler::default(), } } } diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index ca5d44c3a6..3f1c382a66 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -302,7 +302,7 @@ impl<'s, 'a> SessionDeclarations<'s, 'a> for SessionRef<'a> { key_expr: TryIntoKeyExpr::try_into(key_expr).map_err(Into::into), reliability: Reliability::DEFAULT, origin: Locality::default(), - handler: DefaultHandler, + handler: DefaultHandler::default(), } } fn declare_queryable<'b, TryIntoKeyExpr>( @@ -318,7 +318,7 @@ impl<'s, 'a> SessionDeclarations<'s, 'a> for SessionRef<'a> { key_expr: key_expr.try_into().map_err(Into::into), complete: false, origin: Locality::default(), - handler: DefaultHandler, + handler: DefaultHandler::default(), } } fn declare_publisher<'b, TryIntoKeyExpr>( @@ -814,7 +814,7 @@ impl Session { value: None, #[cfg(feature = "unstable")] attachment: None, - handler: DefaultHandler, + handler: DefaultHandler::default(), #[cfg(feature = "unstable")] source_info: SourceInfo::empty(), } @@ -1865,7 +1865,7 @@ impl<'s> SessionDeclarations<'s, 'static> for Arc { key_expr: key_expr.try_into().map_err(Into::into), reliability: Reliability::DEFAULT, origin: Locality::default(), - handler: DefaultHandler, + handler: DefaultHandler::default(), } } @@ -1910,7 +1910,7 @@ impl<'s> SessionDeclarations<'s, 'static> for Arc { key_expr: key_expr.try_into().map_err(Into::into), complete: false, origin: Locality::default(), - handler: DefaultHandler, + handler: DefaultHandler::default(), } } diff --git a/zenoh/tests/handler.rs b/zenoh/tests/handler.rs index ceed15e2c3..57910bf3d6 100644 --- a/zenoh/tests/handler.rs +++ b/zenoh/tests/handler.rs @@ -14,12 +14,12 @@ #[test] fn pubsub_with_ringbuffer() { use std::{thread, time::Duration}; - use zenoh::{handlers::RingBuffer, prelude::sync::*}; + use zenoh::{handlers::RingChannel, prelude::sync::*}; let zenoh = zenoh::open(Config::default()).res().unwrap(); let sub = zenoh .declare_subscriber("test/ringbuffer") - .with(RingBuffer::new(3)) + .with(RingChannel::new(3)) .res() .unwrap(); for i in 0..10 { @@ -32,7 +32,6 @@ fn pubsub_with_ringbuffer() { for i in 7..10 { assert_eq!( sub.recv() - .unwrap() .unwrap() .payload() .deserialize::() @@ -46,12 +45,12 @@ fn pubsub_with_ringbuffer() { #[test] fn query_with_ringbuffer() { - use zenoh::{handlers::RingBuffer, prelude::sync::*}; + use zenoh::{handlers::RingChannel, prelude::sync::*}; let zenoh = zenoh::open(Config::default()).res().unwrap(); let queryable = zenoh .declare_queryable("test/ringbuffer_query") - .with(RingBuffer::new(1)) + .with(RingChannel::new(1)) .res() .unwrap(); @@ -66,7 +65,7 @@ fn query_with_ringbuffer() { .res() .unwrap(); - let query = queryable.recv().unwrap().unwrap(); + let query = queryable.recv().unwrap(); // Only receive the latest query assert_eq!( query