Skip to content

Commit

Permalink
Create z_{pub,sub,ping,pong}_prio examples for priority test
Browse files Browse the repository at this point in the history
  • Loading branch information
jerry73204 committed Mar 6, 2024
1 parent 545b554 commit e6363cd
Show file tree
Hide file tree
Showing 4 changed files with 383 additions and 0 deletions.
101 changes: 101 additions & 0 deletions examples/examples/z_ping_prio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use std::time::{Duration, Instant};
use zenoh::config::Config;
use zenoh::prelude::sync::*;
use zenoh::publication::CongestionControl;
use zenoh_examples::CommonArgs;

fn main() {
// initiate logging
env_logger::init();

let (config, warmup, size, n) = parse_args();
let session = zenoh::open(config).res().unwrap();

// The key expression to publish data on
let key_expr_ping = keyexpr::new("test/ping").unwrap();

// The key expression to wait the response back
let key_expr_pong = keyexpr::new("test/pong").unwrap();

let sub = session.declare_subscriber(key_expr_pong).res().unwrap();
let publisher = session
.declare_publisher(key_expr_ping)
.congestion_control(CongestionControl::Block)
.res()
.unwrap();

let data: Value = (0usize..size)
.map(|i| (i % 10) as u8)
.collect::<Vec<u8>>()
.into();

let mut samples = Vec::with_capacity(n);

// -- warmup --
println!("Warming up for {warmup:?}...");
let now = Instant::now();
while now.elapsed() < warmup {
let data = data.clone();
publisher.put(data).res().unwrap();

let _ = sub.recv();
}

for _ in 0..n {
let data = data.clone();
let write_time = Instant::now();
publisher.put(data).res().unwrap();

let _ = sub.recv();
let ts = write_time.elapsed().as_micros();
samples.push(ts);
}

for (i, rtt) in samples.iter().enumerate().take(n) {
println!(
"{} bytes: seq={} rtt={:?}µs lat={:?}µs",
size,
i,
rtt,
rtt / 2
);
}
}

#[derive(Parser)]
struct Args {
#[arg(short, long, default_value = "1")]
/// The number of seconds to warm up (float)
warmup: f64,
#[arg(short = 'n', long, default_value = "100")]
/// The number of round-trips to measure
samples: usize,
/// Sets the size of the payload to publish
payload_size: usize,
#[command(flatten)]
common: CommonArgs,
}

fn parse_args() -> (Config, Duration, usize, usize) {
let args = Args::parse();
(
args.common.into(),
Duration::from_secs_f64(args.warmup),
args.payload_size,
args.samples,
)
}
69 changes: 69 additions & 0 deletions examples/examples/z_pong_prio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use std::io::{stdin, Read};

//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use zenoh::config::Config;
use zenoh::prelude::sync::*;
use zenoh::publication::CongestionControl;
use zenoh_examples::CommonArgs;

fn main() {
// initiate logging
env_logger::init();

let (config, no_stdin) = parse_args();

let session = zenoh::open(config).res().unwrap().into_arc();

// The key expression to read the data from
let key_expr_ping = keyexpr::new("test/ping").unwrap();

// The key expression to echo the data back
let key_expr_pong = keyexpr::new("test/pong").unwrap();

let publisher = session
.declare_publisher(key_expr_pong)
.congestion_control(CongestionControl::Block)
.res()
.unwrap();

let _sub = session
.declare_subscriber(key_expr_ping)
.callback(move |sample| publisher.put(sample.value).res().unwrap())
.res()
.unwrap();

if no_stdin {
loop {
std::thread::park();
}
} else {
for _ in stdin().bytes().take_while(|b| !matches!(b, Ok(b'q'))) {}
}
}

#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)]
struct Args {
/// Do not read standard input.
#[arg(long)]
no_stdin: bool,
#[command(flatten)]
common: CommonArgs,
}

fn parse_args() -> (Config, bool) {
let args = Args::parse();
(args.common.into(), args.no_stdin)
}
83 changes: 83 additions & 0 deletions examples/examples/z_pub_prio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use clap::Parser;
use log::info;
use std::time::Instant;
use zenoh::prelude::r#async::*;
use zenoh_examples::CommonArgs;

#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)]
struct Args {
#[arg(short, long, default_value = "demo/example/zenoh-prio")]
/// The key expression to write to.
key: KeyExpr<'static>,

/// Print the statistics
#[arg(short = 't', long)]
print: bool,

/// Number of messages in each throughput measurements
#[arg(short, long, default_value = "100000")]
number: usize,

#[arg(short = 's', long)]
/// Sets the size of the payload to publish
payload_size: usize,

priorities: Vec<u8>,

#[command(flatten)]
common: CommonArgs,
}

#[async_std::main]
async fn main() -> zenoh::Result<()> {
// Initiate logging
env_logger::init();

let Args {
key,
priorities,
print,
number,
payload_size,
common,
} = Args::parse();
let config: Config = common.into();

let priorities: Vec<Priority> = priorities
.into_iter()
.map(|p| {
p.try_into()
.unwrap_or_else(|_| panic!("'{p}' is not a valid priority"))
})
.collect();
let data: Value = (0..10)
.cycle()
.take(payload_size)
.collect::<Vec<u8>>()
.into();

info!("Opening session...");
let session = zenoh::open(config).res().await?;

info!("Declaring Publisher on '{key}'...");
let mut publisher = session
.declare_publisher(key)
.congestion_control(CongestionControl::Block)
.res()
.await?;

let mut start = Instant::now();

for (count, &prio) in (1..=number).cycle().zip(priorities.iter().cycle()) {
publisher.put(data.clone()).res().await?;
publisher = publisher.priority(prio);

if print && count == number {
let thpt = number as f64 / start.elapsed().as_secs_f64();
println!("{thpt} msg/s");
start = Instant::now();
}
}

Ok(())
}
130 changes: 130 additions & 0 deletions examples/examples/z_sub_prio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use clap::Parser;
use std::io::{stdin, Read};
use std::time::{Duration, Instant};
use zenoh::config::Config;
use zenoh::prelude::sync::*;
use zenoh_examples::CommonArgs;

struct Stats {
round_count: usize,
round_size: usize,
finished_rounds: usize,
round_start: Instant,
global_start: Option<Instant>,
}
impl Stats {
fn new(round_size: usize) -> Self {
Stats {
round_count: 0,
round_size,
finished_rounds: 0,
round_start: Instant::now(),
global_start: None,
}
}
fn increment(&mut self) {
if self.round_count == 0 {
self.round_start = Instant::now();
if self.global_start.is_none() {
self.global_start = Some(self.round_start)
}
self.round_count += 1;
} else if self.round_count < self.round_size {
self.round_count += 1;
} else {
self.print_round();
self.finished_rounds += 1;
self.round_count = 0;
}
}
fn print_round(&self) {
let elapsed = self.round_start.elapsed().as_secs_f64();
let throughtput = (self.round_size as f64) / elapsed;
println!("{throughtput} msg/s");
}
}
impl Drop for Stats {
fn drop(&mut self) {
let Some(global_start) = self.global_start else {
return;
};
let elapsed = global_start.elapsed().as_secs_f64();
let total = self.round_size * self.finished_rounds + self.round_count;
let throughtput = total as f64 / elapsed;
println!("Received {total} messages over {elapsed:.2}s: {throughtput}msg/s");
}
}

fn main() {
// initiate logging
env_logger::init();

let (mut config, m, n, no_stdin) = parse_args();

// A probing procedure for shared memory is performed upon session opening. To enable `z_pub_shm_thr` to operate
// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
config.transport.shared_memory.set_enabled(true).unwrap();

let session = zenoh::open(config).res().unwrap();

let key_expr = "test/thr";

let mut stats = Stats::new(n);
let _sub = session
.declare_subscriber(key_expr)
.callback_mut(move |_sample| {
stats.increment();
if stats.finished_rounds >= m {
std::process::exit(0)
}
})
.res()
.unwrap();

if no_stdin {
loop {
std::thread::park();
}
} else {
for byte in stdin().bytes() {
match byte {
Ok(b'q') => break,
_ => std::thread::yield_now(),
}
}
}
}

#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)]
struct Args {
#[arg(short, long, default_value = "10")]
/// Number of throughput measurements.
samples: usize,
#[arg(short, long, default_value = "100000")]
/// Number of messages in each throughput measurements.
number: usize,
/// Do not read standard input.
#[arg(long)]
no_stdin: bool,
#[command(flatten)]
common: CommonArgs,
}

fn parse_args() -> (Config, usize, usize, bool) {
let args = Args::parse();
(args.common.into(), args.samples, args.number, args.no_stdin)
}

0 comments on commit e6363cd

Please sign in to comment.