Skip to content

Commit

Permalink
Support RingBuffer to get the latest sample. (#851)
Browse files Browse the repository at this point in the history
* Add RingQueue to support getting the latest sample.

Signed-off-by: ChenYing Kuo <[email protected]>

* Rename RingQueue to RingBuffer.

Signed-off-by: ChenYing Kuo <[email protected]>

* Update examples.

Signed-off-by: ChenYing Kuo <[email protected]>

* Add document.

Signed-off-by: ChenYing Kuo <[email protected]>

* Add test for RingBuffer.

Signed-off-by: ChenYing Kuo <[email protected]>

* Use the correct naming convention (CameCase)

Signed-off-by: ChenYing Kuo <[email protected]>

* Add file header.

Signed-off-by: ChenYing Kuo <[email protected]>

* gename z_pull and update the usage.

Signed-off-by: ChenYing Kuo <[email protected]>

* Use ring instead of cache.

Signed-off-by: ChenYing Kuo <[email protected]>

* Add sleep to wait for the result in pubsub_with_ringbuffer.

Signed-off-by: ChenYing Kuo <[email protected]>

---------

Signed-off-by: ChenYing Kuo <[email protected]>
  • Loading branch information
evshary authored Mar 26, 2024
1 parent 65a4d7f commit c7cc575
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 25 deletions.
4 changes: 2 additions & 2 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
### z_pull

Declares a key expression and a pull subscriber.
On each pull, the pull subscriber will be notified of the last `put` or `delete` made on each key expression matching the subscriber key expression, and will print this notification.
On each pull, the pull subscriber will be notified of the last N `put` or `delete` made on each key expression matching the subscriber key expression, and will print this notification.


Typical usage:
Expand All @@ -89,7 +89,7 @@
```
or
```bash
z_pull -k demo/**
z_pull -k demo/** --size 3
```

### z_get
Expand Down
38 changes: 15 additions & 23 deletions examples/examples/z_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,44 +13,33 @@
//
use async_std::task::sleep;
use clap::Parser;
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use zenoh::{config::Config, prelude::r#async::*};
use zenoh_collections::RingBuffer;
use std::time::Duration;
use zenoh::{config::Config, handlers::RingBuffer, prelude::r#async::*};
use zenoh_examples::CommonArgs;

#[async_std::main]
async fn main() {
// initiate logging
env_logger::init();

let (config, key_expr, cache, interval) = parse_args();
let (config, key_expr, size, interval) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();

println!("Creating a local queue keeping the last {cache} elements...");
let arb = Arc::new(Mutex::new(RingBuffer::new(cache)));
let arb_c = arb.clone();

println!("Declaring Subscriber on '{key_expr}'...");
let _subscriber = session
let subscriber = session
.declare_subscriber(&key_expr)
.callback(move |sample| {
arb_c.lock().unwrap().push_force(sample);
})
.with(RingBuffer::new(size))
.res()
.await
.unwrap();

println!("Pulling data every {:#?} seconds", interval);
loop {
let mut res = arb.lock().unwrap().pull();
print!(">> [Subscriber] Pulling ");
match res.take() {
Some(sample) => {
match subscriber.recv() {
Ok(Some(sample)) => {
let payload = sample
.payload()
.deserialize::<String>()
Expand All @@ -62,10 +51,13 @@ async fn main() {
payload,
);
}
None => {
Ok(None) => {
println!("nothing... sleep for {:#?}", interval);
sleep(interval).await;
}
Err(e) => {
println!("Pull error: {e}");
}
}
}
}
Expand All @@ -75,10 +67,10 @@ struct SubArgs {
#[arg(short, long, default_value = "demo/example/**")]
/// The Key Expression to subscribe to.
key: KeyExpr<'static>,
/// The size of the cache.
/// The size of the ringbuffer.
#[arg(long, default_value = "3")]
cache: usize,
/// The interval for pulling the cache.
size: usize,
/// The interval for pulling the ringbuffer.
#[arg(long, default_value = "5.0")]
interval: f32,
#[command(flatten)]
Expand All @@ -88,5 +80,5 @@ struct SubArgs {
fn parse_args() -> (Config, KeyExpr<'static>, usize, Duration) {
let args = SubArgs::parse();
let interval = Duration::from_secs_f32(args.interval);
(args.common.into(), args.key, args.cache, interval)
(args.common.into(), args.key, args.size, interval)
}
52 changes: 52 additions & 0 deletions zenoh/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
//! Callback handler trait.
use crate::API_DATA_RECEPTION_CHANNEL_SIZE;

use std::sync::{Arc, Mutex, Weak};
use zenoh_collections::RingBuffer as RingBufferInner;
use zenoh_result::ZResult;

/// An alias for `Arc<T>`.
pub type Dyn<T> = std::sync::Arc<T>;

Expand Down Expand Up @@ -88,6 +92,54 @@ impl<T: Send + Sync + 'static> IntoHandler<'static, T>
}
}

/// Ring buffer with a limited queue size, which allows users to keep the last N data.
pub struct RingBuffer<T> {
ring: Arc<Mutex<RingBufferInner<T>>>,
}

impl<T> RingBuffer<T> {
/// Initialize the RingBuffer with the capacity size.
pub fn new(capacity: usize) -> Self {
RingBuffer {
ring: Arc::new(Mutex::new(RingBufferInner::new(capacity))),
}
}
}

pub struct RingBufferHandler<T> {
ring: Weak<Mutex<RingBufferInner<T>>>,
}

impl<T> RingBufferHandler<T> {
pub fn recv(&self) -> ZResult<Option<T>> {
let Some(ring) = self.ring.upgrade() else {
bail!("The ringbuffer has been deleted.");
};
let mut guard = ring.lock().map_err(|e| zerror!("{}", e))?;
Ok(guard.pull())
}
}

impl<T: Send + 'static> IntoHandler<'static, T> for RingBuffer<T> {
type Handler = RingBufferHandler<T>;

fn into_handler(self) -> (Callback<'static, T>, Self::Handler) {
let receiver = RingBufferHandler {
ring: Arc::downgrade(&self.ring),
};
(
Dyn::new(move |t| match self.ring.lock() {
Ok(mut g) => {
// Eventually drop the oldest element.
g.push_force(t);
}
Err(e) => log::error!("{}", e),
}),
receiver,
)
}
}

/// A function that can transform a [`FnMut`]`(T)` to
/// a [`Fn`]`(T)` with the help of a [`Mutex`](std::sync::Mutex).
pub fn locked<T>(fnmut: impl FnMut(T)) -> impl Fn(T) {
Expand Down
13 changes: 13 additions & 0 deletions zenoh/tests/attachments.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#[cfg(feature = "unstable")]
#[test]
fn pubsub() {
Expand Down
13 changes: 13 additions & 0 deletions zenoh/tests/formatters.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#[test]
fn reuse() {
zenoh::kedefine!(
Expand Down
80 changes: 80 additions & 0 deletions zenoh/tests/handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#[test]
fn pubsub_with_ringbuffer() {
use std::{thread, time::Duration};
use zenoh::{handlers::RingBuffer, prelude::sync::*};

let zenoh = zenoh::open(Config::default()).res().unwrap();
let sub = zenoh
.declare_subscriber("test/ringbuffer")
.with(RingBuffer::new(3))
.res()
.unwrap();
for i in 0..10 {
zenoh
.put("test/ringbuffer", format!("put{i}"))
.res()
.unwrap();
}
// Should only receive the last three samples ("put7", "put8", "put9")
for i in 7..10 {
assert_eq!(
sub.recv()
.unwrap()
.unwrap()
.payload()
.deserialize::<String>()
.unwrap(),
format!("put{i}")
);
}
// Wait for the subscriber to get the value
thread::sleep(Duration::from_millis(1000));
}

#[test]
fn query_with_ringbuffer() {
use zenoh::{handlers::RingBuffer, prelude::sync::*};

let zenoh = zenoh::open(Config::default()).res().unwrap();
let queryable = zenoh
.declare_queryable("test/ringbuffer_query")
.with(RingBuffer::new(1))
.res()
.unwrap();

let _reply1 = zenoh
.get("test/ringbuffer_query")
.with_value("query1")
.res()
.unwrap();
let _reply2 = zenoh
.get("test/ringbuffer_query")
.with_value("query2")
.res()
.unwrap();

let query = queryable.recv().unwrap().unwrap();
// Only receive the latest query
assert_eq!(
query
.value()
.unwrap()
.payload
.deserialize::<String>()
.unwrap(),
"query2"
);
}
13 changes: 13 additions & 0 deletions zenoh/tests/interceptors.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use std::sync::{Arc, Mutex};
use zenoh_core::zlock;

Expand Down

0 comments on commit c7cc575

Please sign in to comment.