diff --git a/examples/examples/z_pull.rs b/examples/examples/z_pull.rs index d2c9a5380b..c1b604dbde 100644 --- a/examples/examples/z_pull.rs +++ b/examples/examples/z_pull.rs @@ -13,12 +13,9 @@ // use async_std::task::sleep; use clap::Parser; -use std::{ - sync::{Arc, Mutex}, - time::Duration, -}; +use std::time::Duration; +use zenoh::handlers::RingQueue; use zenoh::{config::Config, prelude::r#async::*}; -use zenoh_collections::RingBuffer; use zenoh_examples::CommonArgs; #[async_std::main] @@ -31,26 +28,19 @@ async fn main() { println!("Opening session..."); let session = zenoh::open(config).res().await.unwrap(); - println!("Creating a local queue keeping the last {cache} elements..."); - let arb = Arc::new(Mutex::new(RingBuffer::new(cache))); - let arb_c = arb.clone(); - println!("Declaring Subscriber on '{key_expr}'..."); - let _subscriber = session + let subscriber = session .declare_subscriber(&key_expr) - .callback(move |sample| { - arb_c.lock().unwrap().push_force(sample); - }) + .with(RingQueue::new(cache)) .res() .await .unwrap(); println!("Pulling data every {:#?} seconds", interval); loop { - let mut res = arb.lock().unwrap().pull(); print!(">> [Subscriber] Pulling "); - match res.take() { - Some(sample) => { + match subscriber.recv() { + Ok(Some(sample)) => { let payload = sample .payload() .deserialize::() @@ -62,10 +52,13 @@ async fn main() { payload, ); } - None => { - println!("nothing... sleep for {:#?}", interval); + Ok(None) => { + println!(">> [Subscriber] nothing... sleep for {:#?}", interval); sleep(interval).await; } + Err(e) => { + println!(">> [Subscriber] Pull error: {e}") + } } } } diff --git a/zenoh/src/handlers.rs b/zenoh/src/handlers.rs index e5ec3bb0dc..d300771b72 100644 --- a/zenoh/src/handlers.rs +++ b/zenoh/src/handlers.rs @@ -15,6 +15,10 @@ //! Callback handler trait. use crate::API_DATA_RECEPTION_CHANNEL_SIZE; +use std::sync::{Arc, Mutex, Weak}; +use zenoh_collections::RingBuffer; +use zenoh_result::ZResult; + /// An alias for `Arc`. pub type Dyn = std::sync::Arc; @@ -88,6 +92,52 @@ impl IntoHandler<'static, T> } } +pub struct RingQueue { + cache: Arc>>, +} + +impl RingQueue { + pub fn new(capacity: usize) -> Self { + RingQueue { + cache: Arc::new(Mutex::new(RingBuffer::new(capacity))), + } + } +} + +pub struct RingHandler { + cache: Weak>>, +} + +impl RingHandler { + pub fn recv(&self) -> ZResult> { + let Some(cache) = self.cache.upgrade() else { + bail!("The cache has been deleted."); + }; + let mut guard = cache.lock().map_err(|e| zerror!("{}", e))?; + Ok(guard.pull()) + } +} + +impl IntoHandler<'static, T> for RingQueue { + type Handler = RingHandler; + + fn into_handler(self) -> (Callback<'static, T>, Self::Handler) { + let receiver = RingHandler { + cache: Arc::downgrade(&self.cache), + }; + ( + Dyn::new(move |t| match self.cache.lock() { + Ok(mut g) => { + // Eventually drop the oldest element. + g.push_force(t); + } + Err(e) => log::error!("{}", e), + }), + receiver, + ) + } +} + /// A function that can transform a [`FnMut`]`(T)` to /// a [`Fn`]`(T)` with the help of a [`Mutex`](std::sync::Mutex). pub fn locked(fnmut: impl FnMut(T)) -> impl Fn(T) {