Skip to content

Commit

Permalink
Syntax improving based on the code review.
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Feb 9, 2024
1 parent 6c33d3f commit e39082b
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 42 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ aes = "0.8.2"
ahash = "0.8.7"
anyhow = { version = "1.0.69", default-features = false } # Default features are disabled due to usage in no_std crates
async-executor = "1.5.0"
async-liveliness-monitor = "0.1.1"
async-global-executor = "2.3.1"
async-io = "=1.13.0"
async-rustls = "0.4.0"
Expand Down
1 change: 1 addition & 0 deletions zenoh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ default = [

[dependencies]
async-global-executor = { workspace = true }
async-liveliness-monitor = { workspace = true }
async-std = { workspace = true, features = ["attributes"] }
async-trait = { workspace = true }
base64 = { workspace = true }
Expand Down
35 changes: 15 additions & 20 deletions zenoh/src/net/routing/interceptor/downsampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
//! [Click here for Zenoh's documentation](../zenoh/index.html)
use crate::net::routing::interceptor::*;
use std::sync::{Arc, Mutex};
use async_liveliness_monitor::support::AtomicInstant;
use std::sync::atomic::Ordering;
use zenoh_config::DownsamplingItemConf;
use zenoh_protocol::core::key_expr::OwnedKeyExpr;

Expand All @@ -39,7 +40,7 @@ const RATELIMIT_STRATEGY: &str = "ratelimit";
pub(crate) struct EgressMsgDownsamplerRatelimit {
keyexprs: Option<Vec<OwnedKeyExpr>>,
threshold: std::time::Duration,
latest_message_timestamp: Arc<Mutex<std::time::Instant>>,
latest_message_timestamp: AtomicInstant,
}

impl InterceptorTrait for EgressMsgDownsamplerRatelimit {
Expand All @@ -48,28 +49,24 @@ impl InterceptorTrait for EgressMsgDownsamplerRatelimit {
ctx: RoutingContext<NetworkMessage>,
) -> Option<RoutingContext<NetworkMessage>> {
if let Some(cfg_keyexprs) = self.keyexprs.as_ref() {
let mut matched = false;
if let Some(keyexpr) = ctx.full_key_expr() {
for cfg_keyexpr in cfg_keyexprs {
if cfg_keyexpr.intersects(&keyexpr) {
matched = true;
}
}
}
let matched = ctx.full_key_expr().map_or(false, |keyexpr| {
cfg_keyexprs
.iter()
.any(|cfg_keyexpr| cfg_keyexpr.intersects(&keyexpr))
});

if !matched {
return Some(ctx);
}
}

let timestamp = std::time::Instant::now();
let mut latest_message_timestamp = self.latest_message_timestamp.lock().unwrap();

if timestamp - *latest_message_timestamp >= self.threshold {
*latest_message_timestamp = timestamp;
log::debug!("Interceptor: Passed threshold, passing.");
if timestamp - self.latest_message_timestamp.load(Ordering::Relaxed) >= self.threshold {
self.latest_message_timestamp
.store(timestamp, Ordering::Relaxed);
Some(ctx)
} else {
log::debug!("Interceptor: Skipped due to threshold.");
None
}
}
Expand All @@ -82,10 +79,7 @@ impl EgressMsgDownsamplerRatelimit {
Self {
keyexprs: conf.keyexprs,
threshold,
// TODO (sashacmc): I need just := 0, but how???
latest_message_timestamp: Arc::new(Mutex::new(
std::time::Instant::now() - threshold,
)),
latest_message_timestamp: AtomicInstant::new(std::time::Instant::now() - threshold),
}
} else {
// TODO (sashacmc): how correctly process an error?
Expand Down Expand Up @@ -143,7 +137,8 @@ impl InterceptorFactoryTrait for DownsamplerInterceptor {
))),
)
} else {
panic!("Unsupported downsampling strategy: {}", strategy)
log::error!("Unsupported downsampling strategy: {}", strategy);
(None, None)
}
}

Expand Down
12 changes: 2 additions & 10 deletions zenoh/src/net/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,7 @@ impl RoutingContext<NetworkMessage> {

#[inline]
pub(crate) fn full_key_expr(&self) -> Option<OwnedKeyExpr> {
match self.full_expr() {
Some(full_expr) => {
if let Ok(keyexpr) = OwnedKeyExpr::new(full_expr) {
Some(keyexpr)
} else {
None
}
}
None => None,
}
let full_expr = self.full_expr()?;
OwnedKeyExpr::new(full_expr).ok()
}
}
25 changes: 13 additions & 12 deletions zenoh/tests/interceptors.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::sync::{Arc, Mutex};
use zenoh_core::zlock;

struct IntervalCounter {
first_tick: bool,
Expand Down Expand Up @@ -59,12 +60,12 @@ fn downsampling_by_keyexpr() {
let _sub = zenoh_sub
.declare_subscriber("test/downsamples_by_keyexp/*")
.callback(move |sample| {
let mut count = total_count_clone.lock().unwrap();
let mut count = zlock!(total_count_clone);
*count += 1;
if sample.key_expr.as_str() == "test/downsamples_by_keyexp/r100" {
counter_r100.lock().unwrap().tick();
zlock!(counter_r100).tick();
} else if sample.key_expr.as_str() == "test/downsamples_by_keyexp/r50" {
counter_r50.lock().unwrap().tick();
zlock!(counter_r50).tick();
}
})
.res()
Expand Down Expand Up @@ -116,15 +117,15 @@ fn downsampling_by_keyexpr() {
}

for _ in 0..100 {
if *(total_count.lock().unwrap()) >= messages_count {
if *zlock!(total_count) >= messages_count {
break;
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
assert!(*(total_count.lock().unwrap()) >= messages_count);
assert!(*zlock!(total_count) >= messages_count);

counter_r50_clone.lock().unwrap().check_middle(50);
counter_r100_clone.lock().unwrap().check_middle(100);
zlock!(counter_r50_clone).check_middle(50);
zlock!(counter_r100_clone).check_middle(100);
}

#[cfg(unix)]
Expand All @@ -150,10 +151,10 @@ fn downsampling_by_interface() {
let _sub = zenoh_sub
.declare_subscriber("test/downsamples_by_interface/*")
.callback(move |sample| {
let mut count = total_count_clone.lock().unwrap();
let mut count = zlock!(total_count_clone);
*count += 1;
if sample.key_expr.as_str() == "test/downsamples_by_interface/r100" {
counter_r100.lock().unwrap().tick();
zlock!(counter_r100).tick();
}
})
.res()
Expand Down Expand Up @@ -205,12 +206,12 @@ fn downsampling_by_interface() {
}

for _ in 0..100 {
if *(total_count.lock().unwrap()) >= messages_count {
if *zlock!(total_count) >= messages_count {
break;
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
assert!(*(total_count.lock().unwrap()) >= messages_count);
assert!(*zlock!(total_count) >= messages_count);

counter_r100_clone.lock().unwrap().check_middle(100);
zlock!(counter_r100_clone).check_middle(100);
}

0 comments on commit e39082b

Please sign in to comment.