diff --git a/Cargo.lock b/Cargo.lock index e1db85da87..2dc8ae14b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4597,6 +4597,7 @@ dependencies = [ "zenoh-config", "zenoh-core", "zenoh-crypto", + "zenoh-keyexpr", "zenoh-link", "zenoh-macros", "zenoh-plugin-trait", diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 68dbaebdeb..9de5dd24a5 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -114,6 +114,20 @@ // ], // }, + // /// The downsampling declaration. + // downsampling: [ + // { + // /// A list of network interfaces messages will be processed on, the rest will be passed as is. + // interfaces: [ "wlan0" ], + // /// Data flow messages will be processed on. ("egress" or "ingress") + // flow: "egress", + // /// A list of downsampling rules: key_expression and the rate (maximum frequency in Hertz) + // rules: [ + // { key_expr: "demo/example/zenoh-rs-pub", rate: 0.1 }, + // ], + // }, + // ], + /// Configure internal transport parameters transport: { unicast: { diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 29a87a43ee..eba3a4aa55 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -70,6 +70,33 @@ impl Zeroize for SecretString { pub type SecretValue = Secret; +#[derive(Debug, Deserialize, Serialize, Clone)] +#[serde(rename_all = "lowercase")] +pub enum DownsamplingFlow { + Egress, + Ingress, +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct DownsamplingRuleConf { + /// A list of key-expressions to which the downsampling will be applied. + /// Downsampling will be applied for all key extensions if the parameter is None + pub key_expr: OwnedKeyExpr, + /// The maximum frequency in Hertz; + pub rate: f64, +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct DownsamplingItemConf { + /// A list of interfaces to which the downsampling will be applied + /// Downsampling will be applied for all interfaces if the parameter is None + pub interfaces: Option>, + /// A list of interfaces to which the downsampling will be applied. + pub rules: Vec, + /// Downsampling flow direction: egress, ingress + pub flow: DownsamplingFlow, +} + pub trait ConfigValidator: Send + Sync { fn check_config( &self, @@ -405,6 +432,10 @@ validated_struct::validator! { }, }, + + /// Configuration of the downsampling. + downsampling: Vec, + /// A list of directories where plugins may be searched for if no `__path__` was specified for them. /// The executable's current directory will be added to the search paths. plugins_search_dirs: Vec, // TODO (low-prio): Switch this String to a PathBuf? (applies to other paths in the config as well) diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml index 3896c5d57f..fb929e9893 100644 --- a/zenoh/Cargo.toml +++ b/zenoh/Cargo.toml @@ -95,6 +95,7 @@ zenoh-collections = { workspace = true, features = ["std"] } zenoh-config = { workspace = true } zenoh-core = { workspace = true } zenoh-crypto = { workspace = true } +zenoh-keyexpr = { workspace = true } zenoh-link = { workspace = true } zenoh-macros = { workspace = true } zenoh-plugin-trait = { workspace = true } diff --git a/zenoh/src/net/routing/interceptor/downsampling.rs b/zenoh/src/net/routing/interceptor/downsampling.rs new file mode 100644 index 0000000000..765dab8925 --- /dev/null +++ b/zenoh/src/net/routing/interceptor/downsampling.rs @@ -0,0 +1,166 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +//! ⚠️ WARNING ⚠️ +//! +//! This module is intended for Zenoh's internal use. +//! +//! [Click here for Zenoh's documentation](../zenoh/index.html) + +use crate::net::routing::interceptor::*; +use std::sync::{Arc, Mutex}; +use zenoh_config::{DownsamplingFlow, DownsamplingItemConf, DownsamplingRuleConf}; +use zenoh_core::zlock; +use zenoh_keyexpr::keyexpr_tree::impls::KeyedSetProvider; +use zenoh_keyexpr::keyexpr_tree::IKeyExprTreeMut; +use zenoh_keyexpr::keyexpr_tree::{support::UnknownWildness, KeBoxTree}; +use zenoh_protocol::network::NetworkBody; +use zenoh_result::ZResult; + +pub(crate) fn downsampling_interceptor_factories( + config: &Vec, +) -> ZResult> { + let mut res: Vec = vec![]; + + for ds in config { + res.push(Box::new(DownsamplingInterceptorFactory::new(ds.clone()))); + } + + Ok(res) +} + +pub struct DownsamplingInterceptorFactory { + interfaces: Option>, + rules: Vec, + flow: DownsamplingFlow, +} + +impl DownsamplingInterceptorFactory { + pub fn new(conf: DownsamplingItemConf) -> Self { + Self { + interfaces: conf.interfaces, + rules: conf.rules, + flow: conf.flow, + } + } +} + +impl InterceptorFactoryTrait for DownsamplingInterceptorFactory { + fn new_transport_unicast( + &self, + transport: &TransportUnicast, + ) -> (Option, Option) { + log::debug!("New downsampler transport unicast {:?}", transport); + if let Some(interfaces) = &self.interfaces { + log::debug!( + "New downsampler transport unicast config interfaces: {:?}", + interfaces + ); + if let Ok(links) = transport.get_links() { + for link in links { + log::debug!( + "New downsampler transport unicast link interfaces: {:?}", + link.interfaces + ); + if !link.interfaces.iter().any(|x| interfaces.contains(x)) { + return (None, None); + } + } + } + }; + + match self.flow { + DownsamplingFlow::Ingress => ( + Some(Box::new(DownsamplingInterceptor::new(self.rules.clone()))), + None, + ), + DownsamplingFlow::Egress => ( + None, + Some(Box::new(DownsamplingInterceptor::new(self.rules.clone()))), + ), + } + } + + fn new_transport_multicast( + &self, + _transport: &TransportMulticast, + ) -> Option { + None + } + + fn new_peer_multicast(&self, _transport: &TransportMulticast) -> Option { + None + } +} + +struct Timestate { + pub threshold: std::time::Duration, + pub latest_message_timestamp: std::time::Instant, +} + +pub(crate) struct DownsamplingInterceptor { + ke_state: Arc>>, +} + +impl InterceptorTrait for DownsamplingInterceptor { + fn intercept( + &self, + ctx: RoutingContext, + ) -> Option> { + if matches!(ctx.msg.body, NetworkBody::Push(_)) { + if let Some(key_expr) = ctx.full_key_expr() { + let mut ke_state = zlock!(self.ke_state); + if let Some(state) = ke_state.weight_at_mut(&key_expr.clone()) { + let timestamp = std::time::Instant::now(); + + if timestamp - state.latest_message_timestamp >= state.threshold { + state.latest_message_timestamp = timestamp; + return Some(ctx); + } else { + return None; + } + } + } + } + + Some(ctx) + } +} + +const NANOS_PER_SEC: f64 = 1_000_000_000.0; + +impl DownsamplingInterceptor { + pub fn new(rules: Vec) -> Self { + let mut ke_state = KeBoxTree::default(); + for rule in rules { + let mut threshold = std::time::Duration::MAX; + let mut latest_message_timestamp = std::time::Instant::now(); + if rule.rate != 0.0 { + threshold = + std::time::Duration::from_nanos((1. / rule.rate * NANOS_PER_SEC) as u64); + latest_message_timestamp -= threshold; + } + ke_state.insert( + &rule.key_expr, + Timestate { + threshold, + latest_message_timestamp, + }, + ); + } + Self { + ke_state: Arc::new(Mutex::new(ke_state)), + } + } +} diff --git a/zenoh/src/net/routing/interceptor/mod.rs b/zenoh/src/net/routing/interceptor/mod.rs index 0efe472fd3..81ff6d15da 100644 --- a/zenoh/src/net/routing/interceptor/mod.rs +++ b/zenoh/src/net/routing/interceptor/mod.rs @@ -23,6 +23,9 @@ use zenoh_protocol::network::NetworkMessage; use zenoh_result::ZResult; use zenoh_transport::{multicast::TransportMulticast, unicast::TransportUnicast}; +pub mod downsampling; +use crate::net::routing::interceptor::downsampling::downsampling_interceptor_factories; + pub(crate) trait InterceptorTrait { fn intercept( &self, @@ -45,11 +48,15 @@ pub(crate) trait InterceptorFactoryTrait { pub(crate) type InterceptorFactory = Box; -pub(crate) fn interceptor_factories(_config: &Config) -> ZResult> { - // Add interceptors here - // @TODO build the list of intercetors with the correct order from the config - // vec![Box::new(LoggerInterceptor {})] - Ok(vec![]) +pub(crate) fn interceptor_factories(config: &Config) -> ZResult> { + let mut res: Vec = vec![]; + + // Uncomment to log the interceptors initialisation + // res.push(Box::new(LoggerInterceptor {})); + + res.extend(downsampling_interceptor_factories(config.downsampling())?); + + Ok(res) } pub(crate) struct InterceptorsChain { diff --git a/zenoh/src/net/routing/mod.rs b/zenoh/src/net/routing/mod.rs index 0b069c1337..373f7d8273 100644 --- a/zenoh/src/net/routing/mod.rs +++ b/zenoh/src/net/routing/mod.rs @@ -24,6 +24,7 @@ pub mod router; use std::{cell::OnceCell, sync::Arc}; +use zenoh_protocol::core::key_expr::OwnedKeyExpr; use zenoh_protocol::{core::WireExpr, network::NetworkMessage}; use self::{dispatcher::face::Face, router::Resource}; @@ -168,4 +169,10 @@ impl RoutingContext { } None } + + #[inline] + pub(crate) fn full_key_expr(&self) -> Option { + let full_expr = self.full_expr()?; + OwnedKeyExpr::new(full_expr).ok() + } } diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs new file mode 100644 index 0000000000..5ae5de1426 --- /dev/null +++ b/zenoh/tests/interceptors.rs @@ -0,0 +1,248 @@ +use std::sync::{Arc, Mutex}; +use zenoh_core::zlock; + +struct IntervalCounter { + first_tick: bool, + last_time: std::time::Instant, + count: u32, + total_time: std::time::Duration, +} + +impl IntervalCounter { + fn new() -> IntervalCounter { + IntervalCounter { + first_tick: true, + last_time: std::time::Instant::now(), + count: 0, + total_time: std::time::Duration::from_secs(0), + } + } + + fn tick(&mut self) { + let curr_time = std::time::Instant::now(); + if self.first_tick { + self.first_tick = false; + } else { + self.total_time += curr_time - self.last_time; + self.count += 1; + } + self.last_time = curr_time; + } + + fn get_middle(&self) -> u32 { + self.total_time.as_millis() as u32 / self.count + } + + fn check_middle(&self, ms: u32) { + let middle = self.get_middle(); + println!("Interval {}, count: {}, middle: {}", ms, self.count, middle); + assert!(middle + 1 >= ms); + } +} + +#[test] +fn downsampling_by_keyexpr() { + let _ = env_logger::builder().is_test(true).try_init(); + + use zenoh::prelude::sync::*; + + // declare subscriber + let zenoh_sub = zenoh::open(Config::default()).res().unwrap(); + + let counter_r100 = Arc::new(Mutex::new(IntervalCounter::new())); + let counter_r100_clone = counter_r100.clone(); + let counter_r50 = Arc::new(Mutex::new(IntervalCounter::new())); + let counter_r50_clone = counter_r50.clone(); + + let total_count = Arc::new(Mutex::new(0)); + let total_count_clone = total_count.clone(); + + let _sub = zenoh_sub + .declare_subscriber("test/downsamples_by_keyexp/*") + .callback(move |sample| { + let mut count = zlock!(total_count_clone); + *count += 1; + if sample.key_expr.as_str() == "test/downsamples_by_keyexp/r100" { + zlock!(counter_r100).tick(); + } else if sample.key_expr.as_str() == "test/downsamples_by_keyexp/r50" { + zlock!(counter_r50).tick(); + } + }) + .res() + .unwrap(); + + // declare publisher + let mut config = Config::default(); + config + .insert_json5( + "downsampling", + r#" + [ + { + flow: "egress", + rules: [ + { key_expr: "test/downsamples_by_keyexp/r100", rate: 10, }, + { key_expr: "test/downsamples_by_keyexp/r50", rate: 20, } + ], + }, + ] + "#, + ) + .unwrap(); + let zenoh_pub = zenoh::open(config).res().unwrap(); + let publisher_r100 = zenoh_pub + .declare_publisher("test/downsamples_by_keyexp/r100") + .res() + .unwrap(); + + let publisher_r50 = zenoh_pub + .declare_publisher("test/downsamples_by_keyexp/r50") + .res() + .unwrap(); + + let publisher_all = zenoh_pub + .declare_publisher("test/downsamples_by_keyexp/all") + .res() + .unwrap(); + + let interval = std::time::Duration::from_millis(1); + let messages_count = 1000; + for i in 0..messages_count { + publisher_r100.put(format!("message {}", i)).res().unwrap(); + publisher_r50.put(format!("message {}", i)).res().unwrap(); + publisher_all.put(format!("message {}", i)).res().unwrap(); + + std::thread::sleep(interval); + } + + for _ in 0..100 { + if *zlock!(total_count) >= messages_count { + break; + } + std::thread::sleep(std::time::Duration::from_millis(100)); + } + assert!(*zlock!(total_count) >= messages_count); + + zlock!(counter_r50_clone).check_middle(50); + zlock!(counter_r100_clone).check_middle(100); +} + +#[cfg(unix)] +#[test] +fn downsampling_by_interface() { + let _ = env_logger::builder().is_test(true).try_init(); + + use zenoh::prelude::sync::*; + + // declare subscriber + let mut config_sub = Config::default(); + config_sub + .insert_json5("listen/endpoints", r#"["tcp/127.0.0.1:7447"]"#) + .unwrap(); + let zenoh_sub = zenoh::open(config_sub).res().unwrap(); + + let counter_r100 = Arc::new(Mutex::new(IntervalCounter::new())); + let counter_r100_clone = counter_r100.clone(); + + let total_count = Arc::new(Mutex::new(0)); + let total_count_clone = total_count.clone(); + + let _sub = zenoh_sub + .declare_subscriber("test/downsamples_by_interface/*") + .callback(move |sample| { + let mut count = zlock!(total_count_clone); + *count += 1; + if sample.key_expr.as_str() == "test/downsamples_by_interface/r100" { + zlock!(counter_r100).tick(); + } + }) + .res() + .unwrap(); + + // declare publisher + let mut config_pub = Config::default(); + config_pub + .insert_json5("connect/endpoints", r#"["tcp/127.0.0.1:7447"]"#) + .unwrap(); + config_pub + .insert_json5( + "downsampling", + r#" + [ + { + interfaces: ["lo", "lo0"], + flow: "egress", + rules: [ + { key_expr: "test/downsamples_by_interface/r100", rate: 10, }, + ], + }, + { + interfaces: ["some_unknown_interface"], + flow: "egress", + rules: [ + { key_expr: "test/downsamples_by_interface/all", rate: 10, }, + ], + }, + ] + "#, + ) + .unwrap(); + + let zenoh_pub = zenoh::open(config_pub).res().unwrap(); + let publisher_r100 = zenoh_pub + .declare_publisher("test/downsamples_by_interface/r100") + .res() + .unwrap(); + + let publisher_all = zenoh_pub + .declare_publisher("test/downsamples_by_interface/all") + .res() + .unwrap(); + + let interval = std::time::Duration::from_millis(1); + let messages_count = 1000; + for i in 0..messages_count { + publisher_r100.put(format!("message {}", i)).res().unwrap(); + publisher_all.put(format!("message {}", i)).res().unwrap(); + + std::thread::sleep(interval); + } + + for _ in 0..100 { + if *zlock!(total_count) >= messages_count { + break; + } + std::thread::sleep(std::time::Duration::from_millis(100)); + } + assert!(*zlock!(total_count) >= messages_count); + + zlock!(counter_r100_clone).check_middle(100); +} + +#[test] +#[should_panic(expected = "unknown variant `down`")] +fn downsampling_config_error_wrong_strategy() { + let _ = env_logger::builder().is_test(true).try_init(); + + use zenoh::prelude::sync::*; + + let mut config = Config::default(); + config + .insert_json5( + "downsampling", + r#" + [ + { + flow: "down", + rules: [ + { keyexpr: "test/downsamples_by_keyexp/r100", rate: 10, }, + { keyexpr: "test/downsamples_by_keyexp/r50", rate: 20, } + ], + }, + ] + "#, + ) + .unwrap(); + + zenoh::open(config).res().unwrap(); +}