Skip to content

Commit

Permalink
RingChannel sync/async/blocking/non-blocking (#903)
Browse files Browse the repository at this point in the history
* RingChannel sync/async/blocking/non-blocking

* Add comment in the examples
  • Loading branch information
Mallets authored Apr 11, 2024
1 parent 66f4681 commit d86653e
Show file tree
Hide file tree
Showing 14 changed files with 407 additions and 243 deletions.
50 changes: 25 additions & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions examples/examples/z_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ async fn main() {
println!("Sending Query '{selector}'...");
let replies = session
.get(&selector)
// // By default get receives replies from a FIFO.
// // Uncomment this line to use a ring channel instead.
// // More information on the ring channel are available in the z_pull example.
.with(zenoh::handlers::RingChannel::default())
.value(value)
.target(target)
.timeout(timeout)
Expand Down
56 changes: 40 additions & 16 deletions examples/examples/z_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//
use clap::Parser;
use std::time::Duration;
use zenoh::{config::Config, handlers::RingBuffer, prelude::r#async::*};
use zenoh::{config::Config, handlers::RingChannel, prelude::r#async::*};
use zenoh_examples::CommonArgs;

#[tokio::main]
Expand All @@ -29,32 +29,27 @@ async fn main() {
println!("Declaring Subscriber on '{key_expr}'...");
let subscriber = session
.declare_subscriber(&key_expr)
.with(RingBuffer::new(size))
.with(RingChannel::new(size))
.res()
.await
.unwrap();

println!(
"Pulling data every {:#?} seconds. Press CTRL-C to quit...",
interval
);
println!("Press CTRL-C to quit...");

// Blocking recv. If the ring is empty, wait for the first sample to arrive.
loop {
match subscriber.recv() {
Ok(Some(sample)) => {
// Use .recv() for the synchronous version.
match subscriber.recv_async().await {
Ok(sample) => {
let payload = sample
.payload()
.deserialize::<String>()
.unwrap_or_else(|e| format!("{}", e));
println!(
">> [Subscriber] Pulled {} ('{}': '{}')",
">> [Subscriber] Pulled {} ('{}': '{}')... performing a computation of {:#?}",
sample.kind(),
sample.key_expr().as_str(),
payload,
);
}
Ok(None) => {
println!(
">> [Subscriber] Pulled nothing... sleep for {:#?}",
interval
);
tokio::time::sleep(interval).await;
Expand All @@ -65,6 +60,35 @@ async fn main() {
}
}
}

// Non-blocking recv. This can be usually used to implement a polling mechanism.
// loop {
// match subscriber.try_recv() {
// Ok(Some(sample)) => {
// let payload = sample
// .payload()
// .deserialize::<String>()
// .unwrap_or_else(|e| format!("{}", e));
// println!(
// ">> [Subscriber] Pulled {} ('{}': '{}')",
// sample.kind(),
// sample.key_expr().as_str(),
// payload,
// );
// }
// Ok(None) => {
// println!(
// ">> [Subscriber] Pulled nothing... sleep for {:#?}",
// interval
// );
// tokio::time::sleep(interval).await;
// }
// Err(e) => {
// println!(">> [Subscriber] Pull error: {e}");
// return;
// }
// }
// }
}

#[derive(clap::Parser, Clone, PartialEq, Debug)]
Expand All @@ -73,10 +97,10 @@ struct SubArgs {
/// The Key Expression to subscribe to.
key: KeyExpr<'static>,
/// The size of the ringbuffer.
#[arg(long, default_value = "3")]
#[arg(short, long, default_value = "3")]
size: usize,
/// The interval for pulling the ringbuffer.
#[arg(long, default_value = "5.0")]
#[arg(short, long, default_value = "5.0")]
interval: f32,
#[command(flatten)]
common: CommonArgs,
Expand Down
4 changes: 4 additions & 0 deletions examples/examples/z_queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ async fn main() {
println!("Declaring Queryable on '{key_expr}'...");
let queryable = session
.declare_queryable(&key_expr)
// // By default queryable receives queries from a FIFO.
// // Uncomment this line to use a ring channel instead.
// // More information on the ring channel are available in the z_pull example.
// .with(zenoh::handlers::RingChannel::default())
.complete(complete)
.res()
.await
Expand Down
Loading

0 comments on commit d86653e

Please sign in to comment.