diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index f887c15864..c8a7c5512c 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -217,7 +217,7 @@ impl Default for DownsamplingItemConf { Self { keyexprs: None, interfaces: None, - strategy: Some("ratelimit".to_string()), + strategy: Some(DownsamplingStrategy::Ratelimit), threshold_ms: None, } } diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index df5f1ff89c..eaa4b8a12e 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -70,6 +70,11 @@ impl Zeroize for SecretString { pub type SecretValue = Secret; +#[derive(Debug, Deserialize, Serialize, Clone)] +pub enum DownsamplingStrategy { + Ratelimit, +} + #[derive(Debug, Deserialize, Serialize, Clone)] pub struct DownsamplingItemConf { /// A list of key-expressions to which the downsampling will be applied. @@ -77,7 +82,7 @@ pub struct DownsamplingItemConf { /// A list of interfaces to which the downsampling will be applied. pub interfaces: Option>, /// Downsampling strategy (default: ratelimit). - pub strategy: Option, + pub strategy: Option, /// Minimim timeout between two messages for ratelimit starategy pub threshold_ms: Option, } diff --git a/zenoh/src/net/routing/dispatcher/tables.rs b/zenoh/src/net/routing/dispatcher/tables.rs index 274b600024..e239a316a1 100644 --- a/zenoh/src/net/routing/dispatcher/tables.rs +++ b/zenoh/src/net/routing/dispatcher/tables.rs @@ -28,6 +28,7 @@ use zenoh_config::unwrap_or_default; use zenoh_config::Config; use zenoh_protocol::core::{ExprId, WhatAmI, ZenohId}; use zenoh_protocol::network::Mapping; +use zenoh_result::ZResult; // use zenoh_collections::Timer; use zenoh_sync::get_mut_unchecked; @@ -76,7 +77,12 @@ pub struct Tables { } impl Tables { - pub fn new(zid: ZenohId, whatami: WhatAmI, hlc: Option>, config: &Config) -> Self { + pub fn new( + zid: ZenohId, + whatami: WhatAmI, + hlc: Option>, + config: &Config, + ) -> ZResult { let drop_future_timestamp = unwrap_or_default!(config.timestamping().drop_future_timestamp()); let router_peers_failover_brokering = @@ -84,7 +90,7 @@ impl Tables { // let queries_default_timeout = // Duration::from_millis(unwrap_or_default!(config.queries_default_timeout())); let hat_code = hat::new_hat(whatami, config); - Tables { + Ok(Tables { zid, whatami, face_counter: 0, @@ -96,11 +102,11 @@ impl Tables { faces: HashMap::new(), mcast_groups: vec![], mcast_faces: vec![], - interceptors: interceptor_factories(config), + interceptors: interceptor_factories(config)?, pull_caches_lock: Mutex::new(()), hat: hat_code.new_tables(router_peers_failover_brokering), hat_code: hat_code.into(), - } + }) } #[doc(hidden)] diff --git a/zenoh/src/net/routing/interceptor/downsampling.rs b/zenoh/src/net/routing/interceptor/downsampling.rs index 4799a62ffb..92fdcbe51c 100644 --- a/zenoh/src/net/routing/interceptor/downsampling.rs +++ b/zenoh/src/net/routing/interceptor/downsampling.rs @@ -21,91 +21,71 @@ use crate::net::routing::interceptor::*; use async_liveliness_monitor::support::AtomicInstant; use std::sync::atomic::Ordering; -use zenoh_config::DownsamplingItemConf; +use zenoh_config::{DownsamplingConf, DownsamplingItemConf, DownsamplingStrategy}; use zenoh_protocol::core::key_expr::OwnedKeyExpr; +use zenoh_result::{zerror, ZResult}; -const RATELIMIT_STRATEGY: &str = "ratelimit"; +pub(crate) fn downsampling_interceptor_factories( + config: &DownsamplingConf, +) -> ZResult> { + let mut res: Vec = vec![]; -// EgressMsgDownsamplerRatelimit implements default ratelimit strategy, it has following + for ds in config.items() { + let strategy = ds + .strategy + .clone() + .unwrap_or(DownsamplingStrategy::Ratelimit); + + match strategy { + DownsamplingStrategy::Ratelimit => { + res.push(Box::new(DownsamplerInterceptorRateLimit::new(ds.clone())?)); + } + } + } + + Ok(res) +} + +// DownsamplerInterceptorRateLimit implements default ratelimit strategy, it has following // configuration parameters: -// threshold - mandatory, indicates the minimum interval between messages. -// All messages arriving with a smaller interval will be skipped -// keyexprs - optional, list of key expressions, only messages matching this expression will be processed, -// the rest will be passed as is. // interfaces - optional, list of interfaces messages will be processed on, // the rest will be passed as is. +// keyexprs - optional, list of key expressions, only messages matching this expression will be processed, +// the rest will be passed as is. +// threshold - mandatory, indicates the minimum interval between messages. +// All messages arriving with a smaller interval will be skipped // // TODO: we can also add decimation strategy, with mandatory "factor" parameter -pub(crate) struct EgressMsgDownsamplerRatelimit { +pub struct DownsamplerInterceptorRateLimit { + interfaces: Option>, keyexprs: Option>, threshold: std::time::Duration, - latest_message_timestamp: AtomicInstant, } -impl InterceptorTrait for EgressMsgDownsamplerRatelimit { - fn intercept( - &self, - ctx: RoutingContext, - ) -> Option> { - if let Some(cfg_keyexprs) = self.keyexprs.as_ref() { - let matched = ctx.full_key_expr().map_or(false, |keyexpr| { - cfg_keyexprs - .iter() - .any(|cfg_keyexpr| cfg_keyexpr.intersects(&keyexpr)) - }); - - if !matched { - return Some(ctx); - } - } - - let timestamp = std::time::Instant::now(); - - if timestamp - self.latest_message_timestamp.load(Ordering::Relaxed) >= self.threshold { - self.latest_message_timestamp - .store(timestamp, Ordering::Relaxed); - Some(ctx) - } else { - None - } - } -} - -impl EgressMsgDownsamplerRatelimit { - pub fn new(conf: DownsamplingItemConf) -> Self { +impl DownsamplerInterceptorRateLimit { + pub fn new(conf: DownsamplingItemConf) -> ZResult { if let Some(threshold_ms) = conf.threshold_ms { + log::debug!("DownsamplerInterceptor enabled: {:?}", conf); let threshold = std::time::Duration::from_millis(threshold_ms); - Self { + Ok(Self { + interfaces: conf.interfaces, keyexprs: conf.keyexprs, threshold, - latest_message_timestamp: AtomicInstant::new(std::time::Instant::now() - threshold), - } + }) } else { - // TODO (sashacmc): how correctly process an error? - panic!("Rate limit downsampler shoud have a threshold_ms parameter"); + Err(zerror!("Rate limit downsampler shoud have a threshold_ms parameter").into()) } } } -pub struct DownsamplerInterceptor { - conf: DownsamplingItemConf, -} - -impl DownsamplerInterceptor { - pub fn new(conf: DownsamplingItemConf) -> Self { - log::debug!("DownsamplerInterceptor enabled: {:?}", conf); - Self { conf } - } -} - -impl InterceptorFactoryTrait for DownsamplerInterceptor { +impl InterceptorFactoryTrait for DownsamplerInterceptorRateLimit { fn new_transport_unicast( &self, transport: &TransportUnicast, ) -> (Option, Option) { log::debug!("New downsampler transport unicast {:?}", transport); - if let Some(interfaces) = &self.conf.interfaces { + if let Some(interfaces) = &self.interfaces { log::debug!( "New downsampler transport unicast config interfaces: {:?}", interfaces @@ -123,23 +103,13 @@ impl InterceptorFactoryTrait for DownsamplerInterceptor { } }; - let strategy = self - .conf - .strategy - .as_ref() - .map_or_else(|| RATELIMIT_STRATEGY.to_string(), |s| s.clone()); - - if strategy == RATELIMIT_STRATEGY { - ( - None, - Some(Box::new(EgressMsgDownsamplerRatelimit::new( - self.conf.clone(), - ))), - ) - } else { - log::error!("Unsupported downsampling strategy: {}", strategy); - (None, None) - } + ( + None, + Some(Box::new(EgressMsgDownsamplerRatelimit::new( + self.keyexprs.clone(), + self.threshold.clone(), + ))), + ) } fn new_transport_multicast( @@ -153,3 +123,48 @@ impl InterceptorFactoryTrait for DownsamplerInterceptor { None } } + +pub(crate) struct EgressMsgDownsamplerRatelimit { + keyexprs: Option>, + threshold: std::time::Duration, + latest_message_timestamp: AtomicInstant, +} + +impl InterceptorTrait for EgressMsgDownsamplerRatelimit { + fn intercept( + &self, + ctx: RoutingContext, + ) -> Option> { + if let Some(cfg_keyexprs) = self.keyexprs.as_ref() { + let matched = ctx.full_key_expr().map_or(false, |keyexpr| { + cfg_keyexprs + .iter() + .any(|cfg_keyexpr| cfg_keyexpr.intersects(&keyexpr)) + }); + + if !matched { + return Some(ctx); + } + } + + let timestamp = std::time::Instant::now(); + + if timestamp - self.latest_message_timestamp.load(Ordering::Relaxed) >= self.threshold { + self.latest_message_timestamp + .store(timestamp, Ordering::Relaxed); + Some(ctx) + } else { + None + } + } +} + +impl EgressMsgDownsamplerRatelimit { + pub fn new(keyexprs: Option>, threshold: std::time::Duration) -> Self { + Self { + keyexprs, + threshold, + latest_message_timestamp: AtomicInstant::new(std::time::Instant::now() - threshold), + } + } +} diff --git a/zenoh/src/net/routing/interceptor/mod.rs b/zenoh/src/net/routing/interceptor/mod.rs index 6491cf1ad8..f98759264e 100644 --- a/zenoh/src/net/routing/interceptor/mod.rs +++ b/zenoh/src/net/routing/interceptor/mod.rs @@ -20,10 +20,11 @@ use super::RoutingContext; use zenoh_config::Config; use zenoh_protocol::network::NetworkMessage; +use zenoh_result::ZResult; use zenoh_transport::{multicast::TransportMulticast, unicast::TransportUnicast}; pub mod downsampling; -use crate::net::routing::interceptor::downsampling::DownsamplerInterceptor; +use crate::net::routing::interceptor::downsampling::downsampling_interceptor_factories; pub(crate) trait InterceptorTrait { fn intercept( @@ -47,15 +48,14 @@ pub(crate) trait InterceptorFactoryTrait { pub(crate) type InterceptorFactory = Box; -pub(crate) fn interceptor_factories(config: &Config) -> Vec { +pub(crate) fn interceptor_factories(config: &Config) -> ZResult> { let mut res: Vec = vec![]; - for ds in config.downsampling().items() { - res.push(Box::new(DownsamplerInterceptor::new(ds.clone()))) - } + res.extend(downsampling_interceptor_factories(config.downsampling())?); + // Uncomment to log the interceptors initialisation // res.push(Box::new(LoggerInterceptor {})); - res + Ok(res) } pub(crate) struct InterceptorsChain { diff --git a/zenoh/src/net/routing/router.rs b/zenoh/src/net/routing/router.rs index 26c9d36185..ba0249af1b 100644 --- a/zenoh/src/net/routing/router.rs +++ b/zenoh/src/net/routing/router.rs @@ -45,15 +45,20 @@ pub struct Router { } impl Router { - pub fn new(zid: ZenohId, whatami: WhatAmI, hlc: Option>, config: &Config) -> Self { - Router { + pub fn new( + zid: ZenohId, + whatami: WhatAmI, + hlc: Option>, + config: &Config, + ) -> ZResult { + Ok(Router { // whatami, tables: Arc::new(TablesLock { - tables: RwLock::new(Tables::new(zid, whatami, hlc, config)), + tables: RwLock::new(Tables::new(zid, whatami, hlc, config)?), ctrl_lock: Mutex::new(hat::new_hat(whatami, config)), queries_lock: RwLock::new(()), }), - } + }) } #[allow(clippy::too_many_arguments)] diff --git a/zenoh/src/net/runtime/mod.rs b/zenoh/src/net/runtime/mod.rs index ac125421f6..7061b38622 100644 --- a/zenoh/src/net/runtime/mod.rs +++ b/zenoh/src/net/runtime/mod.rs @@ -95,7 +95,7 @@ impl Runtime { let hlc = (*unwrap_or_default!(config.timestamping().enabled().get(whatami))) .then(|| Arc::new(HLCBuilder::new().with_id(uhlc::ID::from(&zid)).build())); - let router = Arc::new(Router::new(zid, whatami, hlc.clone(), &config)); + let router = Arc::new(Router::new(zid, whatami, hlc.clone(), &config)?); let handler = Arc::new(RuntimeTransportEventHandler { runtime: std::sync::RwLock::new(None), diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs index 6c646d31de..34c164afe9 100644 --- a/zenoh/tests/interceptors.rs +++ b/zenoh/tests/interceptors.rs @@ -215,3 +215,52 @@ fn downsampling_by_interface() { zlock!(counter_r100_clone).check_middle(100); } + +#[test] +#[should_panic(expected = "unknown variant `some_uknown_strategy`")] +fn downsampling_config_error_wrong_strategy() { + let _ = env_logger::builder().is_test(true).try_init(); + + use zenoh::prelude::sync::*; + + let mut config = Config::default(); + config + .insert_json5( + "downsampling/items", + r#" + [ + { + strategy: "some_uknown_strategy" + }, + ] + "#, + ) + .unwrap(); + + zenoh::open(config).res().unwrap(); +} + +#[test] +#[should_panic(expected = "Rate limit downsampler shoud have a threshold_ms parameter")] +fn downsampling_config_error_missed_threshold() { + let _ = env_logger::builder().is_test(true).try_init(); + + use zenoh::prelude::sync::*; + + let mut config = Config::default(); + config + .insert_json5( + "downsampling/items", + r#" + [ + { + keyexprs: ["test/downsamples_by_interface/all"], + interfaces: ["some_unknown_interface"], + }, + ] + "#, + ) + .unwrap(); + + zenoh::open(config).res().unwrap(); +}