diff --git a/examples/examples/z_liveliness.rs b/examples/examples/z_liveliness.rs index 41890f7d77..359d4ac3ca 100644 --- a/examples/examples/z_liveliness.rs +++ b/examples/examples/z_liveliness.rs @@ -13,7 +13,6 @@ // use async_std::task::sleep; use clap::Parser; -use futures::prelude::*; use std::time::Duration; use zenoh::config::Config; use zenoh::prelude::r#async::*; @@ -30,7 +29,7 @@ async fn main() { let session = zenoh::open(config).res().await.unwrap(); println!("Declaring LivelinessToken on '{}'...", &key_expr); - let mut token = Some( + let _token = Some( session .liveliness() .declare_token(&key_expr) @@ -39,23 +38,17 @@ async fn main() { .unwrap(), ); - println!("Enter 'd' to undeclare LivelinessToken, 'q' to quit..."); - let mut stdin = async_std::io::stdin(); - let mut input = [0_u8]; + println!("Press CTRL-C to undeclare LivelinessToken and quit..."); loop { - let _ = stdin.read_exact(&mut input).await; - match input[0] { - b'q' => break, - b'd' => { - if let Some(token) = token.take() { - println!("Undeclaring LivelinessToken..."); - token.undeclare().res().await.unwrap(); - } - } - 0 => sleep(Duration::from_secs(1)).await, - _ => (), - } + sleep(Duration::from_secs(1)).await; } + // LivelinessTokens are automatically closed when dropped + // Use the code below to manually undeclare it if needed + // + // if let Some(token) = token.take() { + // println!("Undeclaring LivelinessToken..."); + // token.undeclare().res().await.unwrap(); + // } } #[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] diff --git a/examples/examples/z_pub.rs b/examples/examples/z_pub.rs index b892eeafc3..7ba17745b5 100644 --- a/examples/examples/z_pub.rs +++ b/examples/examples/z_pub.rs @@ -31,6 +31,7 @@ async fn main() { println!("Declaring Publisher on '{key_expr}'..."); let publisher = session.declare_publisher(&key_expr).res().await.unwrap(); + println!("Press CTRL-C to quit..."); for idx in 0..u32::MAX { sleep(Duration::from_secs(1)).await; let buf = format!("[{idx:4}] {value}"); diff --git a/examples/examples/z_pub_shm.rs b/examples/examples/z_pub_shm.rs index fc329cadfc..c54fb358d3 100644 --- a/examples/examples/z_pub_shm.rs +++ b/examples/examples/z_pub_shm.rs @@ -44,6 +44,7 @@ async fn main() -> Result<(), zenoh::Error> { println!("Allocating Shared Memory Buffer..."); let publisher = session.declare_publisher(&path).res().await.unwrap(); + println!("Press CTRL-C to quit..."); for idx in 0..(K * N as u32) { sleep(Duration::from_secs(1)).await; let mut sbuf = match shm.alloc(1024) { diff --git a/examples/examples/z_pub_shm_thr.rs b/examples/examples/z_pub_shm_thr.rs index 9921c869e5..7c6f3cbbd3 100644 --- a/examples/examples/z_pub_shm_thr.rs +++ b/examples/examples/z_pub_shm_thr.rs @@ -42,6 +42,7 @@ async fn main() { // Make sure to not drop messages because of congestion control .congestion_control(CongestionControl::Block).res().await.unwrap(); + println!("Press CTRL-C to quit..."); loop { publisher.put(buf.clone()).res().await.unwrap(); } diff --git a/examples/examples/z_pub_thr.rs b/examples/examples/z_pub_thr.rs index 3e130e0608..89b8b9b55c 100644 --- a/examples/examples/z_pub_thr.rs +++ b/examples/examples/z_pub_thr.rs @@ -44,6 +44,7 @@ fn main() { .res() .unwrap(); + println!("Press CTRL-C to quit..."); let mut count: usize = 0; let mut start = std::time::Instant::now(); loop { diff --git a/examples/examples/z_pull.rs b/examples/examples/z_pull.rs index 812c47294e..db1c9d0670 100644 --- a/examples/examples/z_pull.rs +++ b/examples/examples/z_pull.rs @@ -11,10 +11,8 @@ // Contributors: // ZettaScale Zenoh Team, // -use async_std::prelude::FutureExt; use async_std::task::sleep; use clap::Parser; -use futures::prelude::*; use std::time::Duration; use zenoh::config::Config; use zenoh::prelude::r#async::*; @@ -35,40 +33,24 @@ async fn main() { let subscriber = session .declare_subscriber(&key_expr) .pull_mode() - .res() - .await - .unwrap(); - - println!("Press to pull data..."); - - // Define the future to handle incoming samples of the subscription. - let subs = async { - while let Ok(sample) = subscriber.recv_async().await { + .callback(|sample| { println!( ">> [Subscriber] Received {} ('{}': '{}')", sample.kind, sample.key_expr.as_str(), sample.value, ); - } - }; - - // Define the future to handle keyboard's input. - let keyb = async { - let mut stdin = async_std::io::stdin(); - let mut input = [0_u8]; - loop { - stdin.read_exact(&mut input).await.unwrap(); - match input[0] { - b'q' => break, - 0 => sleep(Duration::from_secs(1)).await, - _ => subscriber.pull().res().await.unwrap(), - } - } - }; + }) + .res() + .await + .unwrap(); - // Execute both futures concurrently until one of them returns. - subs.race(keyb).await; + println!("Press CTRL-C to quit..."); + for idx in 0..u32::MAX { + sleep(Duration::from_secs(1)).await; + println!("[{idx:4}] Pulling..."); + subscriber.pull().res().await.unwrap(); + } } #[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] diff --git a/examples/examples/z_queryable.rs b/examples/examples/z_queryable.rs index 54b9858cf0..a5fd5af773 100644 --- a/examples/examples/z_queryable.rs +++ b/examples/examples/z_queryable.rs @@ -11,12 +11,8 @@ // Contributors: // ZettaScale Zenoh Team, // -use async_std::task::sleep; use clap::Parser; -use futures::prelude::*; use futures::select; -use std::sync::atomic::Ordering::Relaxed; -use std::time::Duration; use zenoh::config::Config; use zenoh::prelude::r#async::*; use zenoh_examples::CommonArgs; @@ -27,7 +23,6 @@ async fn main() { env_logger::init(); let (config, key_expr, value, complete) = parse_args(); - let send_errors = std::sync::atomic::AtomicBool::new(false); println!("Opening session..."); let session = zenoh::open(config).res().await.unwrap(); @@ -40,9 +35,7 @@ async fn main() { .await .unwrap(); - println!("Enter 'q' to quit, 'e' to reply an error to next query..."); - let mut stdin = async_std::io::stdin(); - let mut input = [0_u8]; + println!("Press CTRL-C to quit..."); loop { select!( query = queryable.recv_async() => { @@ -51,34 +44,17 @@ async fn main() { None => println!(">> [Queryable ] Received Query '{}'", query.selector()), Some(value) => println!(">> [Queryable ] Received Query '{}' with value '{}'", query.selector(), value), } - let reply = if send_errors.swap(false, Relaxed) { - println!( - ">> [Queryable ] Replying (ERROR: '{}')", - value, - ); - Err(value.clone().into()) - } else { - println!( - ">> [Queryable ] Responding ('{}': '{}')", - key_expr.as_str(), - value, - ); - Ok(Sample::new(key_expr.clone(), value.clone())) - }; + println!( + ">> [Queryable ] Responding ('{}': '{}')", + key_expr.as_str(), + value, + ); + let reply = Ok(Sample::new(key_expr.clone(), value.clone())); query .reply(reply) .res() .await .unwrap_or_else(|e| println!(">> [Queryable ] Error sending reply: {e}")); - }, - - _ = stdin.read_exact(&mut input).fuse() => { - match input[0] { - b'q' => break, - 0 => sleep(Duration::from_secs(1)).await, - b'e' => send_errors.store(true, Relaxed), - _ => (), - } } ); } diff --git a/examples/examples/z_storage.rs b/examples/examples/z_storage.rs index 79164c914a..88849f5b0d 100644 --- a/examples/examples/z_storage.rs +++ b/examples/examples/z_storage.rs @@ -13,12 +13,9 @@ // #![recursion_limit = "256"] -use async_std::task::sleep; use clap::Parser; -use futures::prelude::*; use futures::select; use std::collections::HashMap; -use std::time::Duration; use zenoh::config::Config; use zenoh::prelude::r#async::*; use zenoh_examples::CommonArgs; @@ -46,9 +43,7 @@ async fn main() { .await .unwrap(); - println!("Enter 'q' to quit..."); - let mut stdin = async_std::io::stdin(); - let mut input = [0u8]; + println!("Press CTRL-C to quit..."); loop { select!( sample = subscriber.recv_async() => { @@ -70,14 +65,6 @@ async fn main() { query.reply(Ok(sample.clone())).res().await.unwrap(); } } - }, - - _ = stdin.read_exact(&mut input).fuse() => { - match input[0] { - b'q' => break, - 0 => sleep(Duration::from_secs(1)).await, - _ => (), - } } ); } diff --git a/examples/examples/z_sub.rs b/examples/examples/z_sub.rs index 0542f85870..ea40f64fc7 100644 --- a/examples/examples/z_sub.rs +++ b/examples/examples/z_sub.rs @@ -11,11 +11,8 @@ // Contributors: // ZettaScale Zenoh Team, // -use async_std::task::sleep; use clap::Parser; -use futures::prelude::*; use futures::select; -use std::time::Duration; use zenoh::config::Config; use zenoh::prelude::r#async::*; use zenoh_examples::CommonArgs; @@ -39,23 +36,13 @@ async fn main() { let subscriber = session.declare_subscriber(&key_expr).res().await.unwrap(); - println!("Enter 'q' to quit..."); - let mut stdin = async_std::io::stdin(); - let mut input = [0_u8]; + println!("Press CTRL-C to quit..."); loop { select!( sample = subscriber.recv_async() => { let sample = sample.unwrap(); println!(">> [Subscriber] Received {} ('{}': '{}')", sample.kind, sample.key_expr.as_str(), sample.value); - }, - - _ = stdin.read_exact(&mut input).fuse() => { - match input[0] { - b'q' => break, - 0 => sleep(Duration::from_secs(1)).await, - _ => (), - } } ); } diff --git a/examples/examples/z_sub_liveliness.rs b/examples/examples/z_sub_liveliness.rs index 52ba53875c..b8e752b034 100644 --- a/examples/examples/z_sub_liveliness.rs +++ b/examples/examples/z_sub_liveliness.rs @@ -11,11 +11,8 @@ // Contributors: // ZettaScale Zenoh Team, // -use async_std::task::sleep; use clap::Parser; -use futures::prelude::*; use futures::select; -use std::time::Duration; use zenoh::config::Config; use zenoh::prelude::r#async::*; use zenoh_examples::CommonArgs; @@ -39,9 +36,7 @@ async fn main() { .await .unwrap(); - println!("Enter 'q' to quit..."); - let mut stdin = async_std::io::stdin(); - let mut input = [0_u8]; + println!("Press CTRL-C to quit..."); loop { select!( sample = subscriber.recv_async() => { @@ -54,14 +49,6 @@ async fn main() { ">> [LivelinessSubscriber] Dropped token ('{}')", sample.key_expr.as_str()), } - }, - - _ = stdin.read_exact(&mut input).fuse() => { - match input[0] { - b'q' => break, - 0 => sleep(Duration::from_secs(1)).await, - _ => (), - } } ); } diff --git a/examples/examples/z_sub_thr.rs b/examples/examples/z_sub_thr.rs index afdd07ed23..2a35f23799 100644 --- a/examples/examples/z_sub_thr.rs +++ b/examples/examples/z_sub_thr.rs @@ -12,8 +12,8 @@ // ZettaScale Zenoh Team, // use clap::Parser; -use std::io::{stdin, Read}; -use std::time::Instant; +use std::thread::sleep; +use std::time::{Duration, Instant}; use zenoh::config::Config; use zenoh::prelude::sync::*; use zenoh_examples::CommonArgs; @@ -95,11 +95,9 @@ fn main() { .res() .unwrap(); - for byte in stdin().bytes() { - match byte { - Ok(b'q') => break, - _ => std::thread::yield_now(), - } + println!("Press CTRL-C to quit..."); + loop { + sleep(Duration::from_secs(1)); } } diff --git a/zenoh-ext/examples/z_pub_cache.rs b/zenoh-ext/examples/z_pub_cache.rs index 882764b8f9..ea07983176 100644 --- a/zenoh-ext/examples/z_pub_cache.rs +++ b/zenoh-ext/examples/z_pub_cache.rs @@ -38,7 +38,8 @@ async fn main() { } let _publication_cache = publication_cache_builder.res().await.unwrap(); - for idx in 0..u32::MAX { + println!("Press CTRL-C to quit..."); + for idx in 1..u32::MAX { sleep(Duration::from_secs(1)).await; let buf = format!("[{idx:4}] {value}"); println!("Put Data ('{}': '{}')", &key_expr, buf); diff --git a/zenoh-ext/examples/z_query_sub.rs b/zenoh-ext/examples/z_query_sub.rs index 73433ebf14..c20c925eaa 100644 --- a/zenoh-ext/examples/z_query_sub.rs +++ b/zenoh-ext/examples/z_query_sub.rs @@ -11,12 +11,9 @@ // Contributors: // ZettaScale Zenoh Team, // -use async_std::task::sleep; use clap::arg; use clap::Command; -use futures::prelude::*; use futures::select; -use std::time::Duration; use zenoh::config::Config; use zenoh::prelude::r#async::*; use zenoh::query::ReplyKeyExpr; @@ -55,9 +52,7 @@ async fn main() { .unwrap() }; - println!("Enter 'q' to quit..."); - let mut stdin = async_std::io::stdin(); - let mut input = [0_u8]; + println!("Press CTRL-C to quit..."); loop { select!( sample = subscriber.recv_async() => { @@ -65,14 +60,6 @@ async fn main() { println!(">> [Subscriber] Received {} ('{}': '{}')", sample.kind, sample.key_expr.as_str(), sample.value); }, - - _ = stdin.read_exact(&mut input).fuse() => { - match input[0] { - b'q' => break, - 0 => sleep(Duration::from_secs(1)).await, - _ => (), - } - } ); } }