From e6363cdead23c267f58d93aaf149581c370063b1 Mon Sep 17 00:00:00 2001 From: aeon Date: Thu, 11 Jan 2024 15:46:30 +0800 Subject: [PATCH] Create z_{pub,sub,ping,pong}_prio examples for priority test --- examples/examples/z_ping_prio.rs | 101 ++++++++++++++++++++++++ examples/examples/z_pong_prio.rs | 69 ++++++++++++++++ examples/examples/z_pub_prio.rs | 83 ++++++++++++++++++++ examples/examples/z_sub_prio.rs | 130 +++++++++++++++++++++++++++++++ 4 files changed, 383 insertions(+) create mode 100644 examples/examples/z_ping_prio.rs create mode 100644 examples/examples/z_pong_prio.rs create mode 100644 examples/examples/z_pub_prio.rs create mode 100644 examples/examples/z_sub_prio.rs diff --git a/examples/examples/z_ping_prio.rs b/examples/examples/z_ping_prio.rs new file mode 100644 index 0000000000..fe5ed4d46b --- /dev/null +++ b/examples/examples/z_ping_prio.rs @@ -0,0 +1,101 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use clap::Parser; +use std::time::{Duration, Instant}; +use zenoh::config::Config; +use zenoh::prelude::sync::*; +use zenoh::publication::CongestionControl; +use zenoh_examples::CommonArgs; + +fn main() { + // initiate logging + env_logger::init(); + + let (config, warmup, size, n) = parse_args(); + let session = zenoh::open(config).res().unwrap(); + + // The key expression to publish data on + let key_expr_ping = keyexpr::new("test/ping").unwrap(); + + // The key expression to wait the response back + let key_expr_pong = keyexpr::new("test/pong").unwrap(); + + let sub = session.declare_subscriber(key_expr_pong).res().unwrap(); + let publisher = session + .declare_publisher(key_expr_ping) + .congestion_control(CongestionControl::Block) + .res() + .unwrap(); + + let data: Value = (0usize..size) + .map(|i| (i % 10) as u8) + .collect::>() + .into(); + + let mut samples = Vec::with_capacity(n); + + // -- warmup -- + println!("Warming up for {warmup:?}..."); + let now = Instant::now(); + while now.elapsed() < warmup { + let data = data.clone(); + publisher.put(data).res().unwrap(); + + let _ = sub.recv(); + } + + for _ in 0..n { + let data = data.clone(); + let write_time = Instant::now(); + publisher.put(data).res().unwrap(); + + let _ = sub.recv(); + let ts = write_time.elapsed().as_micros(); + samples.push(ts); + } + + for (i, rtt) in samples.iter().enumerate().take(n) { + println!( + "{} bytes: seq={} rtt={:?}µs lat={:?}µs", + size, + i, + rtt, + rtt / 2 + ); + } +} + +#[derive(Parser)] +struct Args { + #[arg(short, long, default_value = "1")] + /// The number of seconds to warm up (float) + warmup: f64, + #[arg(short = 'n', long, default_value = "100")] + /// The number of round-trips to measure + samples: usize, + /// Sets the size of the payload to publish + payload_size: usize, + #[command(flatten)] + common: CommonArgs, +} + +fn parse_args() -> (Config, Duration, usize, usize) { + let args = Args::parse(); + ( + args.common.into(), + Duration::from_secs_f64(args.warmup), + args.payload_size, + args.samples, + ) +} diff --git a/examples/examples/z_pong_prio.rs b/examples/examples/z_pong_prio.rs new file mode 100644 index 0000000000..f40cf2cf9d --- /dev/null +++ b/examples/examples/z_pong_prio.rs @@ -0,0 +1,69 @@ +use std::io::{stdin, Read}; + +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use clap::Parser; +use zenoh::config::Config; +use zenoh::prelude::sync::*; +use zenoh::publication::CongestionControl; +use zenoh_examples::CommonArgs; + +fn main() { + // initiate logging + env_logger::init(); + + let (config, no_stdin) = parse_args(); + + let session = zenoh::open(config).res().unwrap().into_arc(); + + // The key expression to read the data from + let key_expr_ping = keyexpr::new("test/ping").unwrap(); + + // The key expression to echo the data back + let key_expr_pong = keyexpr::new("test/pong").unwrap(); + + let publisher = session + .declare_publisher(key_expr_pong) + .congestion_control(CongestionControl::Block) + .res() + .unwrap(); + + let _sub = session + .declare_subscriber(key_expr_ping) + .callback(move |sample| publisher.put(sample.value).res().unwrap()) + .res() + .unwrap(); + + if no_stdin { + loop { + std::thread::park(); + } + } else { + for _ in stdin().bytes().take_while(|b| !matches!(b, Ok(b'q'))) {} + } +} + +#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] +struct Args { + /// Do not read standard input. + #[arg(long)] + no_stdin: bool, + #[command(flatten)] + common: CommonArgs, +} + +fn parse_args() -> (Config, bool) { + let args = Args::parse(); + (args.common.into(), args.no_stdin) +} diff --git a/examples/examples/z_pub_prio.rs b/examples/examples/z_pub_prio.rs new file mode 100644 index 0000000000..aa95830be6 --- /dev/null +++ b/examples/examples/z_pub_prio.rs @@ -0,0 +1,83 @@ +use clap::Parser; +use log::info; +use std::time::Instant; +use zenoh::prelude::r#async::*; +use zenoh_examples::CommonArgs; + +#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] +struct Args { + #[arg(short, long, default_value = "demo/example/zenoh-prio")] + /// The key expression to write to. + key: KeyExpr<'static>, + + /// Print the statistics + #[arg(short = 't', long)] + print: bool, + + /// Number of messages in each throughput measurements + #[arg(short, long, default_value = "100000")] + number: usize, + + #[arg(short = 's', long)] + /// Sets the size of the payload to publish + payload_size: usize, + + priorities: Vec, + + #[command(flatten)] + common: CommonArgs, +} + +#[async_std::main] +async fn main() -> zenoh::Result<()> { + // Initiate logging + env_logger::init(); + + let Args { + key, + priorities, + print, + number, + payload_size, + common, + } = Args::parse(); + let config: Config = common.into(); + + let priorities: Vec = priorities + .into_iter() + .map(|p| { + p.try_into() + .unwrap_or_else(|_| panic!("'{p}' is not a valid priority")) + }) + .collect(); + let data: Value = (0..10) + .cycle() + .take(payload_size) + .collect::>() + .into(); + + info!("Opening session..."); + let session = zenoh::open(config).res().await?; + + info!("Declaring Publisher on '{key}'..."); + let mut publisher = session + .declare_publisher(key) + .congestion_control(CongestionControl::Block) + .res() + .await?; + + let mut start = Instant::now(); + + for (count, &prio) in (1..=number).cycle().zip(priorities.iter().cycle()) { + publisher.put(data.clone()).res().await?; + publisher = publisher.priority(prio); + + if print && count == number { + let thpt = number as f64 / start.elapsed().as_secs_f64(); + println!("{thpt} msg/s"); + start = Instant::now(); + } + } + + Ok(()) +} diff --git a/examples/examples/z_sub_prio.rs b/examples/examples/z_sub_prio.rs new file mode 100644 index 0000000000..508d31b87b --- /dev/null +++ b/examples/examples/z_sub_prio.rs @@ -0,0 +1,130 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use clap::Parser; +use std::io::{stdin, Read}; +use std::time::{Duration, Instant}; +use zenoh::config::Config; +use zenoh::prelude::sync::*; +use zenoh_examples::CommonArgs; + +struct Stats { + round_count: usize, + round_size: usize, + finished_rounds: usize, + round_start: Instant, + global_start: Option, +} +impl Stats { + fn new(round_size: usize) -> Self { + Stats { + round_count: 0, + round_size, + finished_rounds: 0, + round_start: Instant::now(), + global_start: None, + } + } + fn increment(&mut self) { + if self.round_count == 0 { + self.round_start = Instant::now(); + if self.global_start.is_none() { + self.global_start = Some(self.round_start) + } + self.round_count += 1; + } else if self.round_count < self.round_size { + self.round_count += 1; + } else { + self.print_round(); + self.finished_rounds += 1; + self.round_count = 0; + } + } + fn print_round(&self) { + let elapsed = self.round_start.elapsed().as_secs_f64(); + let throughtput = (self.round_size as f64) / elapsed; + println!("{throughtput} msg/s"); + } +} +impl Drop for Stats { + fn drop(&mut self) { + let Some(global_start) = self.global_start else { + return; + }; + let elapsed = global_start.elapsed().as_secs_f64(); + let total = self.round_size * self.finished_rounds + self.round_count; + let throughtput = total as f64 / elapsed; + println!("Received {total} messages over {elapsed:.2}s: {throughtput}msg/s"); + } +} + +fn main() { + // initiate logging + env_logger::init(); + + let (mut config, m, n, no_stdin) = parse_args(); + + // A probing procedure for shared memory is performed upon session opening. To enable `z_pub_shm_thr` to operate + // over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the + // subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected. + config.transport.shared_memory.set_enabled(true).unwrap(); + + let session = zenoh::open(config).res().unwrap(); + + let key_expr = "test/thr"; + + let mut stats = Stats::new(n); + let _sub = session + .declare_subscriber(key_expr) + .callback_mut(move |_sample| { + stats.increment(); + if stats.finished_rounds >= m { + std::process::exit(0) + } + }) + .res() + .unwrap(); + + if no_stdin { + loop { + std::thread::park(); + } + } else { + for byte in stdin().bytes() { + match byte { + Ok(b'q') => break, + _ => std::thread::yield_now(), + } + } + } +} + +#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] +struct Args { + #[arg(short, long, default_value = "10")] + /// Number of throughput measurements. + samples: usize, + #[arg(short, long, default_value = "100000")] + /// Number of messages in each throughput measurements. + number: usize, + /// Do not read standard input. + #[arg(long)] + no_stdin: bool, + #[command(flatten)] + common: CommonArgs, +} + +fn parse_args() -> (Config, usize, usize, bool) { + let args = Args::parse(); + (args.common.into(), args.samples, args.number, args.no_stdin) +}