diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index 8d1a5dbc0f..f887c15864 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -211,3 +211,14 @@ impl Default for SharedMemoryConf { Self { enabled: false } } } + +impl Default for DownsamplingItemConf { + fn default() -> Self { + Self { + keyexprs: None, + interfaces: None, + strategy: Some("ratelimit".to_string()), + threshold_ms: None, + } + } +} diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 29a87a43ee..df5f1ff89c 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -70,6 +70,18 @@ impl Zeroize for SecretString { pub type SecretValue = Secret; +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct DownsamplingItemConf { + /// A list of key-expressions to which the downsampling will be applied. + pub keyexprs: Option>, + /// A list of interfaces to which the downsampling will be applied. + pub interfaces: Option>, + /// Downsampling strategy (default: ratelimit). + pub strategy: Option, + /// Minimim timeout between two messages for ratelimit starategy + pub threshold_ms: Option, +} + pub trait ConfigValidator: Send + Sync { fn check_config( &self, @@ -405,6 +417,11 @@ validated_struct::validator! { }, }, + /// Configuration of the downsampling. + pub downsampling: #[derive(Default)] + DownsamplingConf { + items: Vec, + }, /// A list of directories where plugins may be searched for if no `__path__` was specified for them. /// The executable's current directory will be added to the search paths. plugins_search_dirs: Vec, // TODO (low-prio): Switch this String to a PathBuf? (applies to other paths in the config as well) diff --git a/zenoh/src/net/routing/interceptor/downsampling.rs b/zenoh/src/net/routing/interceptor/downsampling.rs new file mode 100644 index 0000000000..3c9d2230ea --- /dev/null +++ b/zenoh/src/net/routing/interceptor/downsampling.rs @@ -0,0 +1,160 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +//! ⚠️ WARNING ⚠️ +//! +//! This module is intended for Zenoh's internal use. +//! +//! [Click here for Zenoh's documentation](../zenoh/index.html) + +use crate::net::routing::interceptor::*; +use std::sync::{Arc, Mutex}; +use zenoh_config::DownsamplingItemConf; +use zenoh_protocol::core::key_expr::OwnedKeyExpr; + +const RATELIMIT_STRATEGY: &str = "ratelimit"; + +// EgressMsgDownsamplerRatelimit implements default ratelimit strategy, it has following +// configuration parameters: +// threshold - mandatory, indicates the minimum interval between messages. +// All messages arriving with a smaller interval will be skipped +// keyexprs - optional, list of key expressions, only messages matching this expression will be processed, +// the rest will be passed as is. +// interfaces - optional, list of interfaces messages will be processed on, +// the rest will be passed as is. +// +// TODO: we can also add decimation strategy, with mandatory "factor" parameter + +pub(crate) struct EgressMsgDownsamplerRatelimit { + keyexprs: Option>, + threshold: std::time::Duration, + latest_message_timestamp: Arc>, +} + +impl InterceptorTrait for EgressMsgDownsamplerRatelimit { + fn intercept( + &self, + ctx: RoutingContext, + ) -> Option> { + if let Some(cfg_keyexprs) = self.keyexprs.as_ref() { + let mut matched = false; + if let Some(keyexpr) = ctx.full_key_expr() { + for cfg_keyexpr in cfg_keyexprs { + if cfg_keyexpr.intersects(&keyexpr) { + matched = true; + } + } + } + if !matched { + return Some(ctx); + } + } + + let timestamp = std::time::Instant::now(); + let mut latest_message_timestamp = self.latest_message_timestamp.lock().unwrap(); + + if timestamp - *latest_message_timestamp >= self.threshold { + *latest_message_timestamp = timestamp; + log::debug!("Interceptor: Passed threshold, passing."); + Some(ctx) + } else { + log::debug!("Interceptor: Skipped due to threshold."); + None + } + } +} + +impl EgressMsgDownsamplerRatelimit { + pub fn new(conf: DownsamplingItemConf) -> Self { + if let Some(threshold_ms) = conf.threshold_ms { + let threshold = std::time::Duration::from_millis(threshold_ms); + Self { + keyexprs: conf.keyexprs, + threshold, + // TODO (sashacmc): I need just := 0, but how??? + latest_message_timestamp: Arc::new(Mutex::new( + std::time::Instant::now() - threshold, + )), + } + } else { + // TODO (sashacmc): how correctly process an error? + panic!("Rate limit downsampler shoud have a threshold_ms parameter"); + } + } +} + +pub struct DownsamplerInterceptor { + conf: DownsamplingItemConf, +} + +impl DownsamplerInterceptor { + pub fn new(conf: DownsamplingItemConf) -> Self { + log::debug!("DownsamplerInterceptor enabled: {:?}", conf); + Self { conf } + } +} + +impl InterceptorFactoryTrait for DownsamplerInterceptor { + fn new_transport_unicast( + &self, + transport: &TransportUnicast, + ) -> (Option, Option) { + log::debug!("New downsampler transport unicast {:?}", transport); + if let Some(interfaces) = &self.conf.interfaces { + log::debug!( + "New downsampler transport unicast config interfaces: {:?}", + interfaces + ); + if let Ok(links) = transport.get_links() { + for link in links { + log::debug!( + "New downsampler transport unicast link interfaces: {:?}", + link.interfaces + ); + if !link.interfaces.iter().any(|x| interfaces.contains(x)) { + return (None, None); + } + } + } + }; + + let strategy = self + .conf + .strategy + .as_ref() + .map_or_else(|| RATELIMIT_STRATEGY.to_string(), |s| s.clone()); + + if strategy == RATELIMIT_STRATEGY { + ( + None, + Some(Box::new(EgressMsgDownsamplerRatelimit::new( + self.conf.clone(), + ))), + ) + } else { + panic!("Unsupported downsampling strategy: {}", strategy) + } + } + + fn new_transport_multicast( + &self, + _transport: &TransportMulticast, + ) -> Option { + None + } + + fn new_peer_multicast(&self, _transport: &TransportMulticast) -> Option { + None + } +} diff --git a/zenoh/src/net/routing/interceptor/mod.rs b/zenoh/src/net/routing/interceptor/mod.rs index 7503580405..6491cf1ad8 100644 --- a/zenoh/src/net/routing/interceptor/mod.rs +++ b/zenoh/src/net/routing/interceptor/mod.rs @@ -22,6 +22,9 @@ use zenoh_config::Config; use zenoh_protocol::network::NetworkMessage; use zenoh_transport::{multicast::TransportMulticast, unicast::TransportUnicast}; +pub mod downsampling; +use crate::net::routing::interceptor::downsampling::DownsamplerInterceptor; + pub(crate) trait InterceptorTrait { fn intercept( &self, @@ -44,11 +47,15 @@ pub(crate) trait InterceptorFactoryTrait { pub(crate) type InterceptorFactory = Box; -pub(crate) fn interceptor_factories(_config: &Config) -> Vec { - // Add interceptors here - // @TODO build the list of intercetors with the correct order from the config - // vec![Box::new(LoggerInterceptor {})] - vec![] +pub(crate) fn interceptor_factories(config: &Config) -> Vec { + let mut res: Vec = vec![]; + + for ds in config.downsampling().items() { + res.push(Box::new(DownsamplerInterceptor::new(ds.clone()))) + } + // Uncomment to log the interceptors initialisation + // res.push(Box::new(LoggerInterceptor {})); + res } pub(crate) struct InterceptorsChain { diff --git a/zenoh/src/net/routing/mod.rs b/zenoh/src/net/routing/mod.rs index 0b069c1337..c96a6302a3 100644 --- a/zenoh/src/net/routing/mod.rs +++ b/zenoh/src/net/routing/mod.rs @@ -24,6 +24,7 @@ pub mod router; use std::{cell::OnceCell, sync::Arc}; +use zenoh_protocol::core::key_expr::OwnedKeyExpr; use zenoh_protocol::{core::WireExpr, network::NetworkMessage}; use self::{dispatcher::face::Face, router::Resource}; @@ -168,4 +169,18 @@ impl RoutingContext { } None } + + #[inline] + pub(crate) fn full_key_expr(&self) -> Option { + match self.full_expr() { + Some(full_expr) => { + if let Ok(keyexpr) = OwnedKeyExpr::new(full_expr) { + Some(keyexpr) + } else { + None + } + } + None => None, + } + } } diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs new file mode 100644 index 0000000000..1cd07c2f09 --- /dev/null +++ b/zenoh/tests/interceptors.rs @@ -0,0 +1,216 @@ +use std::sync::{Arc, Mutex}; + +struct IntervalCounter { + first_tick: bool, + last_time: std::time::Instant, + count: u32, + total_time: std::time::Duration, +} + +impl IntervalCounter { + fn new() -> IntervalCounter { + IntervalCounter { + first_tick: true, + last_time: std::time::Instant::now(), + count: 0, + total_time: std::time::Duration::from_secs(0), + } + } + + fn tick(&mut self) { + let curr_time = std::time::Instant::now(); + if self.first_tick { + self.first_tick = false; + } else { + self.total_time += curr_time - self.last_time; + self.count += 1; + } + self.last_time = curr_time; + } + + fn get_middle(&self) -> u32 { + self.total_time.as_millis() as u32 / self.count + } + + fn check_middle(&self, ms: u32) { + let middle = self.get_middle(); + println!("Interval {}, count: {}, middle: {}", ms, self.count, middle); + assert!(middle + 1 >= ms); + } +} + +#[test] +fn downsampling_by_keyexpr() { + let _ = env_logger::builder().is_test(true).try_init(); + + use zenoh::prelude::sync::*; + + // declare subscriber + let zenoh_sub = zenoh::open(Config::default()).res().unwrap(); + + let counter_r100 = Arc::new(Mutex::new(IntervalCounter::new())); + let counter_r100_clone = counter_r100.clone(); + let counter_r50 = Arc::new(Mutex::new(IntervalCounter::new())); + let counter_r50_clone = counter_r50.clone(); + + let total_count = Arc::new(Mutex::new(0)); + let total_count_clone = total_count.clone(); + + let _sub = zenoh_sub + .declare_subscriber("test/downsamples_by_keyexp/*") + .callback(move |sample| { + let mut count = total_count_clone.lock().unwrap(); + *count += 1; + if sample.key_expr.as_str() == "test/downsamples_by_keyexp/r100" { + counter_r100.lock().unwrap().tick(); + } else if sample.key_expr.as_str() == "test/downsamples_by_keyexp/r50" { + counter_r50.lock().unwrap().tick(); + } + }) + .res() + .unwrap(); + + // declare publisher + let mut config = Config::default(); + config + .insert_json5( + "downsampling/items", + r#" + [ + { + keyexprs: ["test/downsamples_by_keyexp/r100"], + threshold_ms: 100, + }, + { + keyexprs: ["test/downsamples_by_keyexp/r50"], + threshold_ms: 50, + }, + ] + "#, + ) + .unwrap(); + let zenoh_pub = zenoh::open(config).res().unwrap(); + let publisher_r100 = zenoh_pub + .declare_publisher("test/downsamples_by_keyexp/r100") + .res() + .unwrap(); + + let publisher_r50 = zenoh_pub + .declare_publisher("test/downsamples_by_keyexp/r50") + .res() + .unwrap(); + + let publisher_all = zenoh_pub + .declare_publisher("test/downsamples_by_keyexp/all") + .res() + .unwrap(); + + let interval = std::time::Duration::from_millis(1); + let messages_count = 1000; + for i in 0..messages_count { + publisher_r100.put(format!("message {}", i)).res().unwrap(); + publisher_r50.put(format!("message {}", i)).res().unwrap(); + publisher_all.put(format!("message {}", i)).res().unwrap(); + + std::thread::sleep(interval); + } + + for _ in 0..100 { + if *(total_count.lock().unwrap()) >= messages_count { + break; + } + std::thread::sleep(std::time::Duration::from_millis(100)); + } + assert!(*(total_count.lock().unwrap()) >= messages_count); + + counter_r50_clone.lock().unwrap().check_middle(50); + counter_r100_clone.lock().unwrap().check_middle(100); +} + +#[cfg(unix)] +#[test] +fn downsampling_by_interface() { + let _ = env_logger::builder().is_test(true).try_init(); + + use zenoh::prelude::sync::*; + + // declare subscriber + let mut config_sub = Config::default(); + config_sub + .insert_json5("listen/endpoints", r#"["tcp/127.0.0.1:7447"]"#) + .unwrap(); + let zenoh_sub = zenoh::open(config_sub).res().unwrap(); + + let counter_r100 = Arc::new(Mutex::new(IntervalCounter::new())); + let counter_r100_clone = counter_r100.clone(); + + let total_count = Arc::new(Mutex::new(0)); + let total_count_clone = total_count.clone(); + + let _sub = zenoh_sub + .declare_subscriber("test/downsamples_by_interface/*") + .callback(move |sample| { + let mut count = total_count_clone.lock().unwrap(); + *count += 1; + if sample.key_expr.as_str() == "test/downsamples_by_interface/r100" { + counter_r100.lock().unwrap().tick(); + } + }) + .res() + .unwrap(); + + // declare publisher + let mut config_pub = Config::default(); + config_pub + .insert_json5("connect/endpoints", r#"["tcp/127.0.0.1:7447"]"#) + .unwrap(); + config_pub + .insert_json5( + "downsampling/items", + r#" + [ + { + keyexprs: ["test/downsamples_by_interface/r100"], + interfaces: ["lo", "lo0"], + threshold_ms: 100, + }, + { + keyexprs: ["test/downsamples_by_interface/all"], + interfaces: ["some_unknown_interface"], + threshold_ms: 100, + }, + ] + "#, + ) + .unwrap(); + + let zenoh_pub = zenoh::open(config_pub).res().unwrap(); + let publisher_r100 = zenoh_pub + .declare_publisher("test/downsamples_by_interface/r100") + .res() + .unwrap(); + + let publisher_all = zenoh_pub + .declare_publisher("test/downsamples_by_interface/all") + .res() + .unwrap(); + + let interval = std::time::Duration::from_millis(1); + let messages_count = 1000; + for i in 0..messages_count { + publisher_r100.put(format!("message {}", i)).res().unwrap(); + publisher_all.put(format!("message {}", i)).res().unwrap(); + + std::thread::sleep(interval); + } + + for _ in 0..100 { + if *(total_count.lock().unwrap()) >= messages_count { + break; + } + std::thread::sleep(std::time::Duration::from_millis(100)); + } + assert!(*(total_count.lock().unwrap()) >= messages_count); + + counter_r100_clone.lock().unwrap().check_middle(100); +}