Skip to content

Commit

Permalink
Add express support (#829)
Browse files Browse the repository at this point in the history
* Improve docs

* Add express to examples

* Fix doc in sample.rs
  • Loading branch information
Mallets authored Mar 15, 2024
1 parent 4d8ec6c commit 622b230
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 8 deletions.
9 changes: 7 additions & 2 deletions examples/examples/z_ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ fn main() {
// initiate logging
env_logger::init();

let (config, warmup, size, n) = parse_args();
let (config, warmup, size, n, express) = parse_args();
let session = zenoh::open(config).res().unwrap();

// The key expression to publish data on
Expand All @@ -35,6 +35,7 @@ fn main() {
let publisher = session
.declare_publisher(key_expr_ping)
.congestion_control(CongestionControl::Block)
.express(express)
.res()
.unwrap();

Expand Down Expand Up @@ -78,6 +79,9 @@ fn main() {

#[derive(Parser)]
struct Args {
/// express for sending data
#[arg(long, default_value = "false")]
no_express: bool,
#[arg(short, long, default_value = "1")]
/// The number of seconds to warm up (float)
warmup: f64,
Expand All @@ -90,12 +94,13 @@ struct Args {
common: CommonArgs,
}

fn parse_args() -> (Config, Duration, usize, usize) {
fn parse_args() -> (Config, Duration, usize, usize, bool) {
let args = Args::parse();
(
args.common.into(),
Duration::from_secs_f64(args.warmup),
args.payload_size,
args.samples,
!args.no_express,
)
}
10 changes: 7 additions & 3 deletions examples/examples/z_pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn main() {
// initiate logging
env_logger::init();

let config = parse_args();
let (config, express) = parse_args();

let session = zenoh::open(config).res().unwrap().into_arc();

Expand All @@ -34,6 +34,7 @@ fn main() {
let publisher = session
.declare_publisher(key_expr_pong)
.congestion_control(CongestionControl::Block)
.express(express)
.res()
.unwrap();

Expand All @@ -47,11 +48,14 @@ fn main() {

#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)]
struct Args {
/// express for sending data
#[arg(long, default_value = "false")]
no_express: bool,
#[command(flatten)]
common: CommonArgs,
}

fn parse_args() -> Config {
fn parse_args() -> (Config, bool) {
let args = Args::parse();
args.common.into()
(args.common.into(), !args.no_express)
}
4 changes: 4 additions & 0 deletions examples/examples/z_pub_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ fn main() {
.declare_publisher("test/thr")
.congestion_control(CongestionControl::Block)
.priority(prio)
.express(args.express)
.res()
.unwrap();

Expand All @@ -65,6 +66,9 @@ fn main() {

#[derive(Parser, Clone, PartialEq, Eq, Hash, Debug)]
struct Args {
/// express for sending data
#[arg(long, default_value = "false")]
express: bool,
/// Priority for sending data
#[arg(short, long)]
priority: Option<u8>,
Expand Down
8 changes: 6 additions & 2 deletions zenoh/src/publication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ impl PutBuilder<'_, '_> {
self
}

/// Change the `congestion_control` to apply when routing the data.
/// Change the `express` policy to apply when routing the data.
/// When express is set to `true`, then the message will not be batched.
/// This usually has a positive impact on latency but negative impact on throughput.
#[inline]
pub fn express(mut self, is_express: bool) -> Self {
self.publisher = self.publisher.express(is_express);
Expand Down Expand Up @@ -783,7 +785,9 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> {
self
}

/// Change the `congestion_control` to apply when routing the data.
/// Change the `express` policy to apply when routing the data.
/// When express is set to `true`, then the message will not be batched.
/// This usually has a positive impact on latency but negative impact on throughput.
#[inline]
pub fn express(mut self, is_express: bool) -> Self {
self.is_express = is_express;
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ impl QoS {
self.inner.get_congestion_control()
}

/// Gets express flag value. If true, the message is not batched during transmission, in order to reduce latency.
/// Gets express flag value. If `true`, the message is not batched during transmission, in order to reduce latency.
pub fn express(&self) -> bool {
self.inner.is_express()
}
Expand Down

0 comments on commit 622b230

Please sign in to comment.