Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support RingBuffer to get the latest sample. #851

Merged
merged 10 commits into from
Mar 26, 2024
28 changes: 10 additions & 18 deletions examples/examples/z_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,8 @@
//
use async_std::task::sleep;
use clap::Parser;
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use zenoh::{config::Config, prelude::r#async::*};
use zenoh_collections::RingBuffer;
use std::time::Duration;
use zenoh::{config::Config, handlers::RingBuffer, prelude::r#async::*};
use zenoh_examples::CommonArgs;

#[async_std::main]
Expand All @@ -31,26 +27,19 @@ async fn main() {
println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();

println!("Creating a local queue keeping the last {cache} elements...");
let arb = Arc::new(Mutex::new(RingBuffer::new(cache)));
let arb_c = arb.clone();

println!("Declaring Subscriber on '{key_expr}'...");
let _subscriber = session
let subscriber = session
.declare_subscriber(&key_expr)
.callback(move |sample| {
arb_c.lock().unwrap().push_force(sample);
})
.with(RingBuffer::new(cache))
evshary marked this conversation as resolved.
Show resolved Hide resolved
.res()
.await
.unwrap();

println!("Pulling data every {:#?} seconds", interval);
loop {
let mut res = arb.lock().unwrap().pull();
print!(">> [Subscriber] Pulling ");
match res.take() {
Some(sample) => {
match subscriber.recv() {
Ok(Some(sample)) => {
let payload = sample
.payload()
.deserialize::<String>()
Expand All @@ -62,10 +51,13 @@ async fn main() {
payload,
);
}
None => {
Ok(None) => {
println!("nothing... sleep for {:#?}", interval);
sleep(interval).await;
}
Err(e) => {
println!("Pull error: {e}");
}
}
}
}
Expand Down
52 changes: 52 additions & 0 deletions zenoh/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
//! Callback handler trait.
use crate::API_DATA_RECEPTION_CHANNEL_SIZE;

use std::sync::{Arc, Mutex, Weak};
use zenoh_collections::RingBuffer as RingBuffer_inner;
evshary marked this conversation as resolved.
Show resolved Hide resolved
use zenoh_result::ZResult;

/// An alias for `Arc<T>`.
pub type Dyn<T> = std::sync::Arc<T>;

Expand Down Expand Up @@ -88,6 +92,54 @@ impl<T: Send + Sync + 'static> IntoHandler<'static, T>
}
}

/// Ring buffer with a limited queue size, which allows users to keep the last N data.
pub struct RingBuffer<T> {
cache: Arc<Mutex<RingBuffer_inner<T>>>,
evshary marked this conversation as resolved.
Show resolved Hide resolved
}

impl<T> RingBuffer<T> {
/// Initialize the RingBuffer with the capacity size.
pub fn new(capacity: usize) -> Self {
RingBuffer {
cache: Arc::new(Mutex::new(RingBuffer_inner::new(capacity))),
}
}
}

pub struct RingBufferHandler<T> {
cache: Weak<Mutex<RingBuffer_inner<T>>>,
}

impl<T> RingBufferHandler<T> {
pub fn recv(&self) -> ZResult<Option<T>> {
let Some(cache) = self.cache.upgrade() else {
bail!("The cache has been deleted.");
};
let mut guard = cache.lock().map_err(|e| zerror!("{}", e))?;
Ok(guard.pull())
}
}

impl<T: Send + 'static> IntoHandler<'static, T> for RingBuffer<T> {
type Handler = RingBufferHandler<T>;

fn into_handler(self) -> (Callback<'static, T>, Self::Handler) {
let receiver = RingBufferHandler {
cache: Arc::downgrade(&self.cache),
};
(
Dyn::new(move |t| match self.cache.lock() {
Ok(mut g) => {
// Eventually drop the oldest element.
g.push_force(t);
}
Err(e) => log::error!("{}", e),
}),
receiver,
)
}
}

/// A function that can transform a [`FnMut`]`(T)` to
/// a [`Fn`]`(T)` with the help of a [`Mutex`](std::sync::Mutex).
pub fn locked<T>(fnmut: impl FnMut(T)) -> impl Fn(T) {
Expand Down
64 changes: 64 additions & 0 deletions zenoh/tests/handler.rs
evshary marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#[test]
fn pubsub_with_ringbuffer() {
use zenoh::{handlers::RingBuffer, prelude::sync::*};

let zenoh = zenoh::open(Config::default()).res().unwrap();
let sub = zenoh
.declare_subscriber("test/ringbuffer")
.with(RingBuffer::new(3))
.res()
.unwrap();
for i in 0..10 {
zenoh
.put("test/ringbuffer", format!("put{i}"))
.res()
.unwrap();
}
evshary marked this conversation as resolved.
Show resolved Hide resolved
// Should only receive the last three samples ("put7", "put8", "put9")
for i in 7..10 {
assert_eq!(
sub.recv()
.unwrap()
.unwrap()
.payload()
.deserialize::<String>()
.unwrap(),
format!("put{i}")
);
}
}

#[test]
fn query_with_ringbuffer() {
use zenoh::{handlers::RingBuffer, prelude::sync::*};

let zenoh = zenoh::open(Config::default()).res().unwrap();
let queryable = zenoh
.declare_queryable("test/ringbuffer_query")
.with(RingBuffer::new(1))
.res()
.unwrap();

let _reply1 = zenoh
.get("test/ringbuffer_query")
.with_value("query1")
.res()
.unwrap();
let _reply2 = zenoh
.get("test/ringbuffer_query")
.with_value("query2")
.res()
.unwrap();

let query = queryable.recv().unwrap().unwrap();
// Only receive the latest query
assert_eq!(
query
.value()
.unwrap()
.payload
.deserialize::<String>()
.unwrap(),
"query2"
);
}