Skip to content

Commit

Permalink
Remove reading from stdin, align example implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
oteffahi committed Feb 28, 2024
1 parent 01ae772 commit 3310cae
Show file tree
Hide file tree
Showing 13 changed files with 43 additions and 141 deletions.
27 changes: 10 additions & 17 deletions examples/examples/z_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
//
use async_std::task::sleep;
use clap::Parser;
use futures::prelude::*;
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;
Expand All @@ -30,7 +29,7 @@ async fn main() {
let session = zenoh::open(config).res().await.unwrap();

println!("Declaring LivelinessToken on '{}'...", &key_expr);
let mut token = Some(
let _token = Some(
session
.liveliness()
.declare_token(&key_expr)
Expand All @@ -39,23 +38,17 @@ async fn main() {
.unwrap(),
);

println!("Enter 'd' to undeclare LivelinessToken, 'q' to quit...");
let mut stdin = async_std::io::stdin();
let mut input = [0_u8];
println!("Press CTRL-C to undeclare LivelinessToken and quit...");
loop {
let _ = stdin.read_exact(&mut input).await;
match input[0] {
b'q' => break,
b'd' => {
if let Some(token) = token.take() {
println!("Undeclaring LivelinessToken...");
token.undeclare().res().await.unwrap();
}
}
0 => sleep(Duration::from_secs(1)).await,
_ => (),
}
sleep(Duration::from_secs(1)).await;
}
// LivelinessTokens are automatically closed when dropped
// Use the code below to manually undeclare it if needed
//
// if let Some(token) = token.take() {
// println!("Undeclaring LivelinessToken...");
// token.undeclare().res().await.unwrap();
// }
}

#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)]
Expand Down
1 change: 1 addition & 0 deletions examples/examples/z_pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ async fn main() {
println!("Declaring Publisher on '{key_expr}'...");
let publisher = session.declare_publisher(&key_expr).res().await.unwrap();

println!("Press CTRL-C to quit...");
for idx in 0..u32::MAX {
sleep(Duration::from_secs(1)).await;
let buf = format!("[{idx:4}] {value}");
Expand Down
1 change: 1 addition & 0 deletions examples/examples/z_pub_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ async fn main() -> Result<(), zenoh::Error> {
println!("Allocating Shared Memory Buffer...");
let publisher = session.declare_publisher(&path).res().await.unwrap();

println!("Press CTRL-C to quit...");
for idx in 0..(K * N as u32) {
sleep(Duration::from_secs(1)).await;
let mut sbuf = match shm.alloc(1024) {
Expand Down
1 change: 1 addition & 0 deletions examples/examples/z_pub_shm_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ async fn main() {
// Make sure to not drop messages because of congestion control
.congestion_control(CongestionControl::Block).res().await.unwrap();

println!("Press CTRL-C to quit...");
loop {
publisher.put(buf.clone()).res().await.unwrap();
}
Expand Down
1 change: 1 addition & 0 deletions examples/examples/z_pub_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ fn main() {
.res()
.unwrap();

println!("Press CTRL-C to quit...");
let mut count: usize = 0;
let mut start = std::time::Instant::now();
loop {
Expand Down
40 changes: 11 additions & 29 deletions examples/examples/z_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use async_std::prelude::FutureExt;
use async_std::task::sleep;
use clap::Parser;
use futures::prelude::*;
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;
Expand All @@ -35,40 +33,24 @@ async fn main() {
let subscriber = session
.declare_subscriber(&key_expr)
.pull_mode()
.res()
.await
.unwrap();

println!("Press <enter> to pull data...");

// Define the future to handle incoming samples of the subscription.
let subs = async {
while let Ok(sample) = subscriber.recv_async().await {
.callback(|sample| {
println!(
">> [Subscriber] Received {} ('{}': '{}')",
sample.kind,
sample.key_expr.as_str(),
sample.value,
);
}
};

// Define the future to handle keyboard's input.
let keyb = async {
let mut stdin = async_std::io::stdin();
let mut input = [0_u8];
loop {
stdin.read_exact(&mut input).await.unwrap();
match input[0] {
b'q' => break,
0 => sleep(Duration::from_secs(1)).await,
_ => subscriber.pull().res().await.unwrap(),
}
}
};
})
.res()
.await
.unwrap();

// Execute both futures concurrently until one of them returns.
subs.race(keyb).await;
println!("Press CTRL-C to quit...");
for idx in 0..u32::MAX {
sleep(Duration::from_secs(1)).await;
println!("[{idx:4}] Pulling...");
subscriber.pull().res().await.unwrap();
}
}

#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)]
Expand Down
38 changes: 7 additions & 31 deletions examples/examples/z_queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,8 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use async_std::task::sleep;
use clap::Parser;
use futures::prelude::*;
use futures::select;
use std::sync::atomic::Ordering::Relaxed;
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;
use zenoh_examples::CommonArgs;
Expand All @@ -27,7 +23,6 @@ async fn main() {
env_logger::init();

let (config, key_expr, value, complete) = parse_args();
let send_errors = std::sync::atomic::AtomicBool::new(false);

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();
Expand All @@ -40,9 +35,7 @@ async fn main() {
.await
.unwrap();

println!("Enter 'q' to quit, 'e' to reply an error to next query...");
let mut stdin = async_std::io::stdin();
let mut input = [0_u8];
println!("Press CTRL-C to quit...");
loop {
select!(
query = queryable.recv_async() => {
Expand All @@ -51,34 +44,17 @@ async fn main() {
None => println!(">> [Queryable ] Received Query '{}'", query.selector()),
Some(value) => println!(">> [Queryable ] Received Query '{}' with value '{}'", query.selector(), value),
}
let reply = if send_errors.swap(false, Relaxed) {
println!(
">> [Queryable ] Replying (ERROR: '{}')",
value,
);
Err(value.clone().into())
} else {
println!(
">> [Queryable ] Responding ('{}': '{}')",
key_expr.as_str(),
value,
);
Ok(Sample::new(key_expr.clone(), value.clone()))
};
println!(
">> [Queryable ] Responding ('{}': '{}')",
key_expr.as_str(),
value,
);
let reply = Ok(Sample::new(key_expr.clone(), value.clone()));
query
.reply(reply)
.res()
.await
.unwrap_or_else(|e| println!(">> [Queryable ] Error sending reply: {e}"));
},

_ = stdin.read_exact(&mut input).fuse() => {
match input[0] {
b'q' => break,
0 => sleep(Duration::from_secs(1)).await,
b'e' => send_errors.store(true, Relaxed),
_ => (),
}
}
);
}
Expand Down
15 changes: 1 addition & 14 deletions examples/examples/z_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,9 @@
//
#![recursion_limit = "256"]

use async_std::task::sleep;
use clap::Parser;
use futures::prelude::*;
use futures::select;
use std::collections::HashMap;
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;
use zenoh_examples::CommonArgs;
Expand Down Expand Up @@ -46,9 +43,7 @@ async fn main() {
.await
.unwrap();

println!("Enter 'q' to quit...");
let mut stdin = async_std::io::stdin();
let mut input = [0u8];
println!("Press CTRL-C to quit...");
loop {
select!(
sample = subscriber.recv_async() => {
Expand All @@ -70,14 +65,6 @@ async fn main() {
query.reply(Ok(sample.clone())).res().await.unwrap();
}
}
},

_ = stdin.read_exact(&mut input).fuse() => {
match input[0] {
b'q' => break,
0 => sleep(Duration::from_secs(1)).await,
_ => (),
}
}
);
}
Expand Down
15 changes: 1 addition & 14 deletions examples/examples/z_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use async_std::task::sleep;
use clap::Parser;
use futures::prelude::*;
use futures::select;
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;
use zenoh_examples::CommonArgs;
Expand All @@ -39,23 +36,13 @@ async fn main() {

let subscriber = session.declare_subscriber(&key_expr).res().await.unwrap();

println!("Enter 'q' to quit...");
let mut stdin = async_std::io::stdin();
let mut input = [0_u8];
println!("Press CTRL-C to quit...");
loop {
select!(
sample = subscriber.recv_async() => {
let sample = sample.unwrap();
println!(">> [Subscriber] Received {} ('{}': '{}')",
sample.kind, sample.key_expr.as_str(), sample.value);
},

_ = stdin.read_exact(&mut input).fuse() => {
match input[0] {
b'q' => break,
0 => sleep(Duration::from_secs(1)).await,
_ => (),
}
}
);
}
Expand Down
15 changes: 1 addition & 14 deletions examples/examples/z_sub_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use async_std::task::sleep;
use clap::Parser;
use futures::prelude::*;
use futures::select;
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;
use zenoh_examples::CommonArgs;
Expand All @@ -39,9 +36,7 @@ async fn main() {
.await
.unwrap();

println!("Enter 'q' to quit...");
let mut stdin = async_std::io::stdin();
let mut input = [0_u8];
println!("Press CTRL-C to quit...");
loop {
select!(
sample = subscriber.recv_async() => {
Expand All @@ -54,14 +49,6 @@ async fn main() {
">> [LivelinessSubscriber] Dropped token ('{}')",
sample.key_expr.as_str()),
}
},

_ = stdin.read_exact(&mut input).fuse() => {
match input[0] {
b'q' => break,
0 => sleep(Duration::from_secs(1)).await,
_ => (),
}
}
);
}
Expand Down
12 changes: 5 additions & 7 deletions examples/examples/z_sub_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use std::io::{stdin, Read};
use std::time::Instant;
use std::thread::sleep;
use std::time::{Duration, Instant};
use zenoh::config::Config;
use zenoh::prelude::sync::*;
use zenoh_examples::CommonArgs;
Expand Down Expand Up @@ -95,11 +95,9 @@ fn main() {
.res()
.unwrap();

for byte in stdin().bytes() {
match byte {
Ok(b'q') => break,
_ => std::thread::yield_now(),
}
println!("Press CTRL-C to quit...");
loop {
sleep(Duration::from_secs(1));
}
}

Expand Down
3 changes: 2 additions & 1 deletion zenoh-ext/examples/z_pub_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ async fn main() {
}
let _publication_cache = publication_cache_builder.res().await.unwrap();

for idx in 0..u32::MAX {
println!("Press CTRL-C to quit...");
for idx in 1..u32::MAX {
sleep(Duration::from_secs(1)).await;
let buf = format!("[{idx:4}] {value}");
println!("Put Data ('{}': '{}')", &key_expr, buf);
Expand Down
Loading

0 comments on commit 3310cae

Please sign in to comment.