Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support RingBuffer to get the latest sample. #851

Merged
merged 10 commits into from
Mar 26, 2024
4 changes: 2 additions & 2 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
### z_pull

Declares a key expression and a pull subscriber.
On each pull, the pull subscriber will be notified of the last `put` or `delete` made on each key expression matching the subscriber key expression, and will print this notification.
On each pull, the pull subscriber will be notified of the last N `put` or `delete` made on each key expression matching the subscriber key expression, and will print this notification.


Typical usage:
Expand All @@ -89,7 +89,7 @@
```
or
```bash
z_pull -k demo/**
z_pull -k demo/** --size 3
```

### z_get
Expand Down
38 changes: 15 additions & 23 deletions examples/examples/z_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,44 +13,33 @@
//
use async_std::task::sleep;
use clap::Parser;
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use zenoh::{config::Config, prelude::r#async::*};
use zenoh_collections::RingBuffer;
use std::time::Duration;
use zenoh::{config::Config, handlers::RingBuffer, prelude::r#async::*};
use zenoh_examples::CommonArgs;

#[async_std::main]
async fn main() {
// initiate logging
env_logger::init();

let (config, key_expr, cache, interval) = parse_args();
let (config, key_expr, size, interval) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();

println!("Creating a local queue keeping the last {cache} elements...");
let arb = Arc::new(Mutex::new(RingBuffer::new(cache)));
let arb_c = arb.clone();

println!("Declaring Subscriber on '{key_expr}'...");
let _subscriber = session
let subscriber = session
.declare_subscriber(&key_expr)
.callback(move |sample| {
arb_c.lock().unwrap().push_force(sample);
})
.with(RingBuffer::new(size))
.res()
.await
.unwrap();

println!("Pulling data every {:#?} seconds", interval);
loop {
let mut res = arb.lock().unwrap().pull();
print!(">> [Subscriber] Pulling ");
match res.take() {
Some(sample) => {
match subscriber.recv() {
Ok(Some(sample)) => {
let payload = sample
.payload()
.deserialize::<String>()
Expand All @@ -62,10 +51,13 @@ async fn main() {
payload,
);
}
None => {
Ok(None) => {
println!("nothing... sleep for {:#?}", interval);
sleep(interval).await;
}
Err(e) => {
println!("Pull error: {e}");
}
}
}
}
Expand All @@ -75,10 +67,10 @@ struct SubArgs {
#[arg(short, long, default_value = "demo/example/**")]
/// The Key Expression to subscribe to.
key: KeyExpr<'static>,
/// The size of the cache.
/// The size of the ringbuffer.
#[arg(long, default_value = "3")]
cache: usize,
/// The interval for pulling the cache.
size: usize,
/// The interval for pulling the ringbuffer.
#[arg(long, default_value = "5.0")]
interval: f32,
#[command(flatten)]
Expand All @@ -88,5 +80,5 @@ struct SubArgs {
fn parse_args() -> (Config, KeyExpr<'static>, usize, Duration) {
let args = SubArgs::parse();
let interval = Duration::from_secs_f32(args.interval);
(args.common.into(), args.key, args.cache, interval)
(args.common.into(), args.key, args.size, interval)
}
52 changes: 52 additions & 0 deletions zenoh/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
//! Callback handler trait.
use crate::API_DATA_RECEPTION_CHANNEL_SIZE;

use std::sync::{Arc, Mutex, Weak};
use zenoh_collections::RingBuffer as RingBufferInner;
use zenoh_result::ZResult;

/// An alias for `Arc<T>`.
pub type Dyn<T> = std::sync::Arc<T>;

Expand Down Expand Up @@ -88,6 +92,54 @@ impl<T: Send + Sync + 'static> IntoHandler<'static, T>
}
}

/// Ring buffer with a limited queue size, which allows users to keep the last N data.
pub struct RingBuffer<T> {
ring: Arc<Mutex<RingBufferInner<T>>>,
}

impl<T> RingBuffer<T> {
/// Initialize the RingBuffer with the capacity size.
pub fn new(capacity: usize) -> Self {
RingBuffer {
ring: Arc::new(Mutex::new(RingBufferInner::new(capacity))),
}
}
}

pub struct RingBufferHandler<T> {
ring: Weak<Mutex<RingBufferInner<T>>>,
}

impl<T> RingBufferHandler<T> {
pub fn recv(&self) -> ZResult<Option<T>> {
let Some(ring) = self.ring.upgrade() else {
bail!("The ringbuffer has been deleted.");
};
let mut guard = ring.lock().map_err(|e| zerror!("{}", e))?;
Ok(guard.pull())
}
}

impl<T: Send + 'static> IntoHandler<'static, T> for RingBuffer<T> {
type Handler = RingBufferHandler<T>;

fn into_handler(self) -> (Callback<'static, T>, Self::Handler) {
let receiver = RingBufferHandler {
ring: Arc::downgrade(&self.ring),
};
(
Dyn::new(move |t| match self.ring.lock() {
Ok(mut g) => {
// Eventually drop the oldest element.
g.push_force(t);
}
Err(e) => log::error!("{}", e),
}),
receiver,
)
}
}

/// A function that can transform a [`FnMut`]`(T)` to
/// a [`Fn`]`(T)` with the help of a [`Mutex`](std::sync::Mutex).
pub fn locked<T>(fnmut: impl FnMut(T)) -> impl Fn(T) {
Expand Down
13 changes: 13 additions & 0 deletions zenoh/tests/attachments.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#[cfg(feature = "unstable")]
#[test]
fn pubsub() {
Expand Down
13 changes: 13 additions & 0 deletions zenoh/tests/formatters.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#[test]
fn reuse() {
zenoh::kedefine!(
Expand Down
80 changes: 80 additions & 0 deletions zenoh/tests/handler.rs
evshary marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#[test]
fn pubsub_with_ringbuffer() {
use std::{thread, time::Duration};
use zenoh::{handlers::RingBuffer, prelude::sync::*};

let zenoh = zenoh::open(Config::default()).res().unwrap();
let sub = zenoh
.declare_subscriber("test/ringbuffer")
.with(RingBuffer::new(3))
.res()
.unwrap();
for i in 0..10 {
zenoh
.put("test/ringbuffer", format!("put{i}"))
.res()
.unwrap();
}
evshary marked this conversation as resolved.
Show resolved Hide resolved
// Should only receive the last three samples ("put7", "put8", "put9")
for i in 7..10 {
assert_eq!(
sub.recv()
.unwrap()
.unwrap()
.payload()
.deserialize::<String>()
.unwrap(),
format!("put{i}")
);
}
// Wait for the subscriber to get the value
thread::sleep(Duration::from_millis(1000));
}

#[test]
fn query_with_ringbuffer() {
use zenoh::{handlers::RingBuffer, prelude::sync::*};

let zenoh = zenoh::open(Config::default()).res().unwrap();
let queryable = zenoh
.declare_queryable("test/ringbuffer_query")
.with(RingBuffer::new(1))
.res()
.unwrap();

let _reply1 = zenoh
.get("test/ringbuffer_query")
.with_value("query1")
.res()
.unwrap();
let _reply2 = zenoh
.get("test/ringbuffer_query")
.with_value("query2")
.res()
.unwrap();

let query = queryable.recv().unwrap().unwrap();
// Only receive the latest query
assert_eq!(
query
.value()
.unwrap()
.payload
.deserialize::<String>()
.unwrap(),
"query2"
);
}
13 changes: 13 additions & 0 deletions zenoh/tests/interceptors.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use std::sync::{Arc, Mutex};
use zenoh_core::zlock;

Expand Down