Skip to content

Commit

Permalink
Optimisation and refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Feb 1, 2024
1 parent 060ce09 commit 2d6ac39
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 27 deletions.
4 changes: 3 additions & 1 deletion commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@ pub struct DownsamplerConf {
pub keyexprs: Option<Vec<OwnedKeyExpr>>,
/// A list of interfaces to which the downsampling will be applied.
pub interfaces: Option<Vec<String>>,
/// Downsampling strategy.
/// Downsampling strategy (default: ratelimit).
// TODO(sashacmc): how specify default value and generate DEFAULT_CONFIG?
pub strategy: Option<String>,
/// Minimim timeout between two messages for ratelimit starategy
pub threshold_ms: Option<u64>,
}

Expand Down
53 changes: 29 additions & 24 deletions zenoh/src/net/routing/interceptor/downsampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,17 @@ use std::sync::{Arc, Mutex};
use zenoh_config::DownsamplerConf;
use zenoh_protocol::core::key_expr::OwnedKeyExpr;

// TODO(sashacmc): this is ratelimit strategy, we should also add decimation (with "factor" option)
static RATELIMIT_STRATEGY: &'static str = "ratelimit";

pub(crate) struct IngressMsgDownsampler {}
// TODO(sashacmc): this is ratelimit strategy, we can also add decimation (with "factor" option)

impl InterceptorTrait for IngressMsgDownsampler {
fn intercept(
&self,
ctx: RoutingContext<NetworkMessage>,
) -> Option<RoutingContext<NetworkMessage>> {
Some(ctx)
}
}

pub(crate) struct EgressMsgDownsampler {
pub(crate) struct EgressMsgDownsamplerRatelimit {
keyexprs: Option<Vec<OwnedKeyExpr>>,
threshold: std::time::Duration,
latest_message_timestamp: Arc<Mutex<std::time::Instant>>,
}

impl InterceptorTrait for EgressMsgDownsampler {
impl InterceptorTrait for EgressMsgDownsamplerRatelimit {
fn intercept(
&self,
ctx: RoutingContext<NetworkMessage>,
Expand Down Expand Up @@ -75,7 +66,7 @@ impl InterceptorTrait for EgressMsgDownsampler {
}
}

impl EgressMsgDownsampler {
impl EgressMsgDownsamplerRatelimit {
pub fn new(conf: DownsamplerConf) -> Self {
if let Some(threshold_ms) = conf.threshold_ms {
let threshold = std::time::Duration::from_millis(threshold_ms);
Expand Down Expand Up @@ -128,19 +119,33 @@ impl InterceptorFactoryTrait for DownsamplerInterceptor {
}
}
};
(
Some(Box::new(IngressMsgDownsampler {})),
Some(Box::new(EgressMsgDownsampler::new(self.conf.clone()))),
)

let strategy = self
.conf
.strategy
.as_ref()
.map_or_else(|| RATELIMIT_STRATEGY.to_string(), |s| s.clone());

if strategy == RATELIMIT_STRATEGY {
(
None,
Some(Box::new(EgressMsgDownsamplerRatelimit::new(
self.conf.clone(),
))),
)
} else {
panic!("Unsupported downsampling strategy: {}", strategy)
}
}

fn new_transport_multicast(&self, transport: &TransportMulticast) -> Option<EgressInterceptor> {
log::debug!("New transport multicast {:?}", transport);
Some(Box::new(EgressMsgDownsampler::new(self.conf.clone())))
fn new_transport_multicast(
&self,
_transport: &TransportMulticast,
) -> Option<EgressInterceptor> {
None
}

fn new_peer_multicast(&self, transport: &TransportMulticast) -> Option<IngressInterceptor> {
log::debug!("New peer multicast {:?}", transport);
Some(Box::new(IngressMsgDownsampler {}))
fn new_peer_multicast(&self, _transport: &TransportMulticast) -> Option<IngressInterceptor> {
None
}
}
4 changes: 2 additions & 2 deletions zenoh/tests/interceptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ fn downsampling_by_keyexpr() {
let curr_time = std::time::Instant::now();
if sample.key_expr.as_str() == "test/downsamples_by_keyexp/r100" {
let mut last_time = last_time_r100.lock().unwrap();
let interval = (curr_time - *last_time).as_millis() + 1;
let interval = (curr_time - *last_time).as_millis() + 3;
*last_time = curr_time;
println!("interval 100: {}", interval);
assert!(interval >= 100);
} else if sample.key_expr.as_str() == "test/downsamples_by_keyexp/r50" {
let mut last_time = last_time_r50.lock().unwrap();
let interval = (curr_time - *last_time).as_millis() + 1;
let interval = (curr_time - *last_time).as_millis() + 2;
*last_time = curr_time;
println!("interval 50: {}", interval);
assert!(interval >= 50);
Expand Down

0 comments on commit 2d6ac39

Please sign in to comment.