Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the downsampling interceptor #728

Merged
merged 1 commit into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,20 @@
// ],
// },

// /// The downsampling declaration.
// downsampling: [
// {
// /// A list of network interfaces messages will be processed on, the rest will be passed as is.
// interfaces: [ "wlan0" ],
// /// Data flow messages will be processed on. ("egress" or "ingress")
// flow: "egress",
// /// A list of downsampling rules: key_expression and the rate (maximum frequency in Hertz)
// rules: [
// { key_expr: "demo/example/zenoh-rs-pub", rate: 0.1 },
// ],
// },
// ],

/// Configure internal transport parameters
transport: {
unicast: {
Expand Down
31 changes: 31 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,33 @@ impl Zeroize for SecretString {

pub type SecretValue = Secret<SecretString>;

#[derive(Debug, Deserialize, Serialize, Clone)]
sashacmc marked this conversation as resolved.
Show resolved Hide resolved
#[serde(rename_all = "lowercase")]
pub enum DownsamplingFlow {
Egress,
Ingress,
}

#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct DownsamplingRuleConf {
/// A list of key-expressions to which the downsampling will be applied.
/// Downsampling will be applied for all key extensions if the parameter is None
pub key_expr: OwnedKeyExpr,
/// The maximum frequency in Hertz;
pub rate: f64,
}

#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct DownsamplingItemConf {
/// A list of interfaces to which the downsampling will be applied
/// Downsampling will be applied for all interfaces if the parameter is None
pub interfaces: Option<Vec<String>>,
/// A list of interfaces to which the downsampling will be applied.
pub rules: Vec<DownsamplingRuleConf>,
/// Downsampling flow direction: egress, ingress
pub flow: DownsamplingFlow,
}

pub trait ConfigValidator: Send + Sync {
fn check_config(
&self,
Expand Down Expand Up @@ -405,6 +432,10 @@ validated_struct::validator! {
},

},

/// Configuration of the downsampling.
downsampling: Vec<DownsamplingItemConf>,

/// A list of directories where plugins may be searched for if no `__path__` was specified for them.
/// The executable's current directory will be added to the search paths.
plugins_search_dirs: Vec<String>, // TODO (low-prio): Switch this String to a PathBuf? (applies to other paths in the config as well)
Expand Down
1 change: 1 addition & 0 deletions zenoh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ zenoh-collections = { workspace = true, features = ["std"] }
zenoh-config = { workspace = true }
zenoh-core = { workspace = true }
zenoh-crypto = { workspace = true }
zenoh-keyexpr = { workspace = true }
zenoh-link = { workspace = true }
zenoh-macros = { workspace = true }
zenoh-plugin-trait = { workspace = true }
Expand Down
166 changes: 166 additions & 0 deletions zenoh/src/net/routing/interceptor/downsampling.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

//! ⚠️ WARNING ⚠️
//!
//! This module is intended for Zenoh's internal use.
//!
//! [Click here for Zenoh's documentation](../zenoh/index.html)

use crate::net::routing::interceptor::*;
use std::sync::{Arc, Mutex};
use zenoh_config::{DownsamplingFlow, DownsamplingItemConf, DownsamplingRuleConf};
use zenoh_core::zlock;
use zenoh_keyexpr::keyexpr_tree::impls::KeyedSetProvider;
use zenoh_keyexpr::keyexpr_tree::IKeyExprTreeMut;
use zenoh_keyexpr::keyexpr_tree::{support::UnknownWildness, KeBoxTree};
use zenoh_protocol::network::NetworkBody;
use zenoh_result::ZResult;

pub(crate) fn downsampling_interceptor_factories(
config: &Vec<DownsamplingItemConf>,
) -> ZResult<Vec<InterceptorFactory>> {
let mut res: Vec<InterceptorFactory> = vec![];

for ds in config {
res.push(Box::new(DownsamplingInterceptorFactory::new(ds.clone())));
}

Ok(res)
}

pub struct DownsamplingInterceptorFactory {
interfaces: Option<Vec<String>>,
rules: Vec<DownsamplingRuleConf>,
flow: DownsamplingFlow,
}

impl DownsamplingInterceptorFactory {
pub fn new(conf: DownsamplingItemConf) -> Self {
Self {
interfaces: conf.interfaces,
rules: conf.rules,
flow: conf.flow,
}
}
}

impl InterceptorFactoryTrait for DownsamplingInterceptorFactory {
fn new_transport_unicast(
&self,
transport: &TransportUnicast,
) -> (Option<IngressInterceptor>, Option<EgressInterceptor>) {
log::debug!("New downsampler transport unicast {:?}", transport);
if let Some(interfaces) = &self.interfaces {
log::debug!(
"New downsampler transport unicast config interfaces: {:?}",
interfaces
);
if let Ok(links) = transport.get_links() {
for link in links {
log::debug!(
"New downsampler transport unicast link interfaces: {:?}",
link.interfaces
);
if !link.interfaces.iter().any(|x| interfaces.contains(x)) {
return (None, None);
}
}
}
};

match self.flow {
DownsamplingFlow::Ingress => (
Some(Box::new(DownsamplingInterceptor::new(self.rules.clone()))),
None,
),
DownsamplingFlow::Egress => (
None,
Some(Box::new(DownsamplingInterceptor::new(self.rules.clone()))),
),
}
}

fn new_transport_multicast(
&self,
_transport: &TransportMulticast,
) -> Option<EgressInterceptor> {
None
}

fn new_peer_multicast(&self, _transport: &TransportMulticast) -> Option<IngressInterceptor> {
None
}
}

struct Timestate {
pub threshold: std::time::Duration,
pub latest_message_timestamp: std::time::Instant,
}

pub(crate) struct DownsamplingInterceptor {
ke_state: Arc<Mutex<KeBoxTree<Timestate, UnknownWildness, KeyedSetProvider>>>,
}

impl InterceptorTrait for DownsamplingInterceptor {
fn intercept(
&self,
ctx: RoutingContext<NetworkMessage>,
) -> Option<RoutingContext<NetworkMessage>> {
if matches!(ctx.msg.body, NetworkBody::Push(_)) {
if let Some(key_expr) = ctx.full_key_expr() {
let mut ke_state = zlock!(self.ke_state);
if let Some(state) = ke_state.weight_at_mut(&key_expr.clone()) {
let timestamp = std::time::Instant::now();

if timestamp - state.latest_message_timestamp >= state.threshold {
state.latest_message_timestamp = timestamp;
return Some(ctx);
} else {
return None;
}
}
}
}

Some(ctx)
}
}

const NANOS_PER_SEC: f64 = 1_000_000_000.0;

impl DownsamplingInterceptor {
pub fn new(rules: Vec<DownsamplingRuleConf>) -> Self {
let mut ke_state = KeBoxTree::default();
for rule in rules {
let mut threshold = std::time::Duration::MAX;
let mut latest_message_timestamp = std::time::Instant::now();
if rule.rate != 0.0 {
threshold =
std::time::Duration::from_nanos((1. / rule.rate * NANOS_PER_SEC) as u64);
latest_message_timestamp -= threshold;
}
ke_state.insert(
&rule.key_expr,
Timestate {
threshold,
latest_message_timestamp,
},
);
}
Self {
ke_state: Arc::new(Mutex::new(ke_state)),
}
}
}
17 changes: 12 additions & 5 deletions zenoh/src/net/routing/interceptor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ use zenoh_protocol::network::NetworkMessage;
use zenoh_result::ZResult;
use zenoh_transport::{multicast::TransportMulticast, unicast::TransportUnicast};

pub mod downsampling;
use crate::net::routing::interceptor::downsampling::downsampling_interceptor_factories;

pub(crate) trait InterceptorTrait {
fn intercept(
&self,
Expand All @@ -45,11 +48,15 @@ pub(crate) trait InterceptorFactoryTrait {

pub(crate) type InterceptorFactory = Box<dyn InterceptorFactoryTrait + Send + Sync>;

pub(crate) fn interceptor_factories(_config: &Config) -> ZResult<Vec<InterceptorFactory>> {
// Add interceptors here
// @TODO build the list of intercetors with the correct order from the config
// vec![Box::new(LoggerInterceptor {})]
Ok(vec![])
pub(crate) fn interceptor_factories(config: &Config) -> ZResult<Vec<InterceptorFactory>> {
let mut res: Vec<InterceptorFactory> = vec![];

// Uncomment to log the interceptors initialisation
// res.push(Box::new(LoggerInterceptor {}));

res.extend(downsampling_interceptor_factories(config.downsampling())?);

Ok(res)
}

pub(crate) struct InterceptorsChain {
Expand Down
7 changes: 7 additions & 0 deletions zenoh/src/net/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub mod router;

use std::{cell::OnceCell, sync::Arc};

use zenoh_protocol::core::key_expr::OwnedKeyExpr;
use zenoh_protocol::{core::WireExpr, network::NetworkMessage};

use self::{dispatcher::face::Face, router::Resource};
Expand Down Expand Up @@ -168,4 +169,10 @@ impl RoutingContext<NetworkMessage> {
}
None
}

#[inline]
pub(crate) fn full_key_expr(&self) -> Option<OwnedKeyExpr> {
sashacmc marked this conversation as resolved.
Show resolved Hide resolved
let full_expr = self.full_expr()?;
OwnedKeyExpr::new(full_expr).ok()
}
}
Loading