From 2d6ac394c0c61d9470d2834b37f5e21d35c587df Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Thu, 1 Feb 2024 15:41:10 +0100 Subject: [PATCH] Optimisation and refactoring --- commons/zenoh-config/src/lib.rs | 4 +- .../net/routing/interceptor/downsampling.rs | 53 ++++++++++--------- zenoh/tests/interceptors.rs | 4 +- 3 files changed, 34 insertions(+), 27 deletions(-) diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 753abdb8e6..8645317e92 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -76,8 +76,10 @@ pub struct DownsamplerConf { pub keyexprs: Option>, /// A list of interfaces to which the downsampling will be applied. pub interfaces: Option>, - /// Downsampling strategy. + /// Downsampling strategy (default: ratelimit). + // TODO(sashacmc): how specify default value and generate DEFAULT_CONFIG? pub strategy: Option, + /// Minimim timeout between two messages for ratelimit starategy pub threshold_ms: Option, } diff --git a/zenoh/src/net/routing/interceptor/downsampling.rs b/zenoh/src/net/routing/interceptor/downsampling.rs index 4492f1338a..d648e3da0b 100644 --- a/zenoh/src/net/routing/interceptor/downsampling.rs +++ b/zenoh/src/net/routing/interceptor/downsampling.rs @@ -23,26 +23,17 @@ use std::sync::{Arc, Mutex}; use zenoh_config::DownsamplerConf; use zenoh_protocol::core::key_expr::OwnedKeyExpr; -// TODO(sashacmc): this is ratelimit strategy, we should also add decimation (with "factor" option) +static RATELIMIT_STRATEGY: &'static str = "ratelimit"; -pub(crate) struct IngressMsgDownsampler {} +// TODO(sashacmc): this is ratelimit strategy, we can also add decimation (with "factor" option) -impl InterceptorTrait for IngressMsgDownsampler { - fn intercept( - &self, - ctx: RoutingContext, - ) -> Option> { - Some(ctx) - } -} - -pub(crate) struct EgressMsgDownsampler { +pub(crate) struct EgressMsgDownsamplerRatelimit { keyexprs: Option>, threshold: std::time::Duration, latest_message_timestamp: Arc>, } -impl InterceptorTrait for EgressMsgDownsampler { +impl InterceptorTrait for EgressMsgDownsamplerRatelimit { fn intercept( &self, ctx: RoutingContext, @@ -75,7 +66,7 @@ impl InterceptorTrait for EgressMsgDownsampler { } } -impl EgressMsgDownsampler { +impl EgressMsgDownsamplerRatelimit { pub fn new(conf: DownsamplerConf) -> Self { if let Some(threshold_ms) = conf.threshold_ms { let threshold = std::time::Duration::from_millis(threshold_ms); @@ -128,19 +119,33 @@ impl InterceptorFactoryTrait for DownsamplerInterceptor { } } }; - ( - Some(Box::new(IngressMsgDownsampler {})), - Some(Box::new(EgressMsgDownsampler::new(self.conf.clone()))), - ) + + let strategy = self + .conf + .strategy + .as_ref() + .map_or_else(|| RATELIMIT_STRATEGY.to_string(), |s| s.clone()); + + if strategy == RATELIMIT_STRATEGY { + ( + None, + Some(Box::new(EgressMsgDownsamplerRatelimit::new( + self.conf.clone(), + ))), + ) + } else { + panic!("Unsupported downsampling strategy: {}", strategy) + } } - fn new_transport_multicast(&self, transport: &TransportMulticast) -> Option { - log::debug!("New transport multicast {:?}", transport); - Some(Box::new(EgressMsgDownsampler::new(self.conf.clone()))) + fn new_transport_multicast( + &self, + _transport: &TransportMulticast, + ) -> Option { + None } - fn new_peer_multicast(&self, transport: &TransportMulticast) -> Option { - log::debug!("New peer multicast {:?}", transport); - Some(Box::new(IngressMsgDownsampler {})) + fn new_peer_multicast(&self, _transport: &TransportMulticast) -> Option { + None } } diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs index 502940ab41..0c1beb455a 100644 --- a/zenoh/tests/interceptors.rs +++ b/zenoh/tests/interceptors.rs @@ -27,13 +27,13 @@ fn downsampling_by_keyexpr() { let curr_time = std::time::Instant::now(); if sample.key_expr.as_str() == "test/downsamples_by_keyexp/r100" { let mut last_time = last_time_r100.lock().unwrap(); - let interval = (curr_time - *last_time).as_millis() + 1; + let interval = (curr_time - *last_time).as_millis() + 3; *last_time = curr_time; println!("interval 100: {}", interval); assert!(interval >= 100); } else if sample.key_expr.as_str() == "test/downsamples_by_keyexp/r50" { let mut last_time = last_time_r50.lock().unwrap(); - let interval = (curr_time - *last_time).as_millis() + 1; + let interval = (curr_time - *last_time).as_millis() + 2; *last_time = curr_time; println!("interval 50: {}", interval); assert!(interval >= 50);