Skip to content

Commit

Permalink
Add the downsampling interceptor (#728)
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc authored Feb 15, 2024
1 parent 40efd99 commit 078898d
Show file tree
Hide file tree
Showing 8 changed files with 480 additions and 5 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,20 @@
// ],
// },

// /// The downsampling declaration.
// downsampling: [
// {
// /// A list of network interfaces messages will be processed on, the rest will be passed as is.
// interfaces: [ "wlan0" ],
// /// Data flow messages will be processed on. ("egress" or "ingress")
// flow: "egress",
// /// A list of downsampling rules: key_expression and the rate (maximum frequency in Hertz)
// rules: [
// { key_expr: "demo/example/zenoh-rs-pub", rate: 0.1 },
// ],
// },
// ],

/// Configure internal transport parameters
transport: {
unicast: {
Expand Down
31 changes: 31 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,33 @@ impl Zeroize for SecretString {

pub type SecretValue = Secret<SecretString>;

#[derive(Debug, Deserialize, Serialize, Clone)]
#[serde(rename_all = "lowercase")]
pub enum DownsamplingFlow {
Egress,
Ingress,
}

#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct DownsamplingRuleConf {
/// A list of key-expressions to which the downsampling will be applied.
/// Downsampling will be applied for all key extensions if the parameter is None
pub key_expr: OwnedKeyExpr,
/// The maximum frequency in Hertz;
pub rate: f64,
}

#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct DownsamplingItemConf {
/// A list of interfaces to which the downsampling will be applied
/// Downsampling will be applied for all interfaces if the parameter is None
pub interfaces: Option<Vec<String>>,
/// A list of interfaces to which the downsampling will be applied.
pub rules: Vec<DownsamplingRuleConf>,
/// Downsampling flow direction: egress, ingress
pub flow: DownsamplingFlow,
}

pub trait ConfigValidator: Send + Sync {
fn check_config(
&self,
Expand Down Expand Up @@ -405,6 +432,10 @@ validated_struct::validator! {
},

},

/// Configuration of the downsampling.
downsampling: Vec<DownsamplingItemConf>,

/// A list of directories where plugins may be searched for if no `__path__` was specified for them.
/// The executable's current directory will be added to the search paths.
plugins_search_dirs: Vec<String>, // TODO (low-prio): Switch this String to a PathBuf? (applies to other paths in the config as well)
Expand Down
1 change: 1 addition & 0 deletions zenoh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ zenoh-collections = { workspace = true, features = ["std"] }
zenoh-config = { workspace = true }
zenoh-core = { workspace = true }
zenoh-crypto = { workspace = true }
zenoh-keyexpr = { workspace = true }
zenoh-link = { workspace = true }
zenoh-macros = { workspace = true }
zenoh-plugin-trait = { workspace = true }
Expand Down
166 changes: 166 additions & 0 deletions zenoh/src/net/routing/interceptor/downsampling.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

//! ⚠️ WARNING ⚠️
//!
//! This module is intended for Zenoh's internal use.
//!
//! [Click here for Zenoh's documentation](../zenoh/index.html)
use crate::net::routing::interceptor::*;
use std::sync::{Arc, Mutex};
use zenoh_config::{DownsamplingFlow, DownsamplingItemConf, DownsamplingRuleConf};
use zenoh_core::zlock;
use zenoh_keyexpr::keyexpr_tree::impls::KeyedSetProvider;
use zenoh_keyexpr::keyexpr_tree::IKeyExprTreeMut;
use zenoh_keyexpr::keyexpr_tree::{support::UnknownWildness, KeBoxTree};
use zenoh_protocol::network::NetworkBody;
use zenoh_result::ZResult;

pub(crate) fn downsampling_interceptor_factories(
config: &Vec<DownsamplingItemConf>,
) -> ZResult<Vec<InterceptorFactory>> {
let mut res: Vec<InterceptorFactory> = vec![];

for ds in config {
res.push(Box::new(DownsamplingInterceptorFactory::new(ds.clone())));
}

Ok(res)
}

pub struct DownsamplingInterceptorFactory {
interfaces: Option<Vec<String>>,
rules: Vec<DownsamplingRuleConf>,
flow: DownsamplingFlow,
}

impl DownsamplingInterceptorFactory {
pub fn new(conf: DownsamplingItemConf) -> Self {
Self {
interfaces: conf.interfaces,
rules: conf.rules,
flow: conf.flow,
}
}
}

impl InterceptorFactoryTrait for DownsamplingInterceptorFactory {
fn new_transport_unicast(
&self,
transport: &TransportUnicast,
) -> (Option<IngressInterceptor>, Option<EgressInterceptor>) {
log::debug!("New downsampler transport unicast {:?}", transport);
if let Some(interfaces) = &self.interfaces {
log::debug!(
"New downsampler transport unicast config interfaces: {:?}",
interfaces
);
if let Ok(links) = transport.get_links() {
for link in links {
log::debug!(
"New downsampler transport unicast link interfaces: {:?}",
link.interfaces
);
if !link.interfaces.iter().any(|x| interfaces.contains(x)) {
return (None, None);
}
}
}
};

match self.flow {
DownsamplingFlow::Ingress => (
Some(Box::new(DownsamplingInterceptor::new(self.rules.clone()))),
None,
),
DownsamplingFlow::Egress => (
None,
Some(Box::new(DownsamplingInterceptor::new(self.rules.clone()))),
),
}
}

fn new_transport_multicast(
&self,
_transport: &TransportMulticast,
) -> Option<EgressInterceptor> {
None
}

fn new_peer_multicast(&self, _transport: &TransportMulticast) -> Option<IngressInterceptor> {
None
}
}

struct Timestate {
pub threshold: std::time::Duration,
pub latest_message_timestamp: std::time::Instant,
}

pub(crate) struct DownsamplingInterceptor {
ke_state: Arc<Mutex<KeBoxTree<Timestate, UnknownWildness, KeyedSetProvider>>>,
}

impl InterceptorTrait for DownsamplingInterceptor {
fn intercept(
&self,
ctx: RoutingContext<NetworkMessage>,
) -> Option<RoutingContext<NetworkMessage>> {
if matches!(ctx.msg.body, NetworkBody::Push(_)) {
if let Some(key_expr) = ctx.full_key_expr() {
let mut ke_state = zlock!(self.ke_state);
if let Some(state) = ke_state.weight_at_mut(&key_expr.clone()) {
let timestamp = std::time::Instant::now();

if timestamp - state.latest_message_timestamp >= state.threshold {
state.latest_message_timestamp = timestamp;
return Some(ctx);
} else {
return None;
}
}
}
}

Some(ctx)
}
}

const NANOS_PER_SEC: f64 = 1_000_000_000.0;

impl DownsamplingInterceptor {
pub fn new(rules: Vec<DownsamplingRuleConf>) -> Self {
let mut ke_state = KeBoxTree::default();
for rule in rules {
let mut threshold = std::time::Duration::MAX;
let mut latest_message_timestamp = std::time::Instant::now();
if rule.rate != 0.0 {
threshold =
std::time::Duration::from_nanos((1. / rule.rate * NANOS_PER_SEC) as u64);
latest_message_timestamp -= threshold;
}
ke_state.insert(
&rule.key_expr,
Timestate {
threshold,
latest_message_timestamp,
},
);
}
Self {
ke_state: Arc::new(Mutex::new(ke_state)),
}
}
}
17 changes: 12 additions & 5 deletions zenoh/src/net/routing/interceptor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ use zenoh_protocol::network::NetworkMessage;
use zenoh_result::ZResult;
use zenoh_transport::{multicast::TransportMulticast, unicast::TransportUnicast};

pub mod downsampling;
use crate::net::routing::interceptor::downsampling::downsampling_interceptor_factories;

pub(crate) trait InterceptorTrait {
fn intercept(
&self,
Expand All @@ -45,11 +48,15 @@ pub(crate) trait InterceptorFactoryTrait {

pub(crate) type InterceptorFactory = Box<dyn InterceptorFactoryTrait + Send + Sync>;

pub(crate) fn interceptor_factories(_config: &Config) -> ZResult<Vec<InterceptorFactory>> {
// Add interceptors here
// @TODO build the list of intercetors with the correct order from the config
// vec![Box::new(LoggerInterceptor {})]
Ok(vec![])
pub(crate) fn interceptor_factories(config: &Config) -> ZResult<Vec<InterceptorFactory>> {
let mut res: Vec<InterceptorFactory> = vec![];

// Uncomment to log the interceptors initialisation
// res.push(Box::new(LoggerInterceptor {}));

res.extend(downsampling_interceptor_factories(config.downsampling())?);

Ok(res)
}

pub(crate) struct InterceptorsChain {
Expand Down
7 changes: 7 additions & 0 deletions zenoh/src/net/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub mod router;

use std::{cell::OnceCell, sync::Arc};

use zenoh_protocol::core::key_expr::OwnedKeyExpr;
use zenoh_protocol::{core::WireExpr, network::NetworkMessage};

use self::{dispatcher::face::Face, router::Resource};
Expand Down Expand Up @@ -168,4 +169,10 @@ impl RoutingContext<NetworkMessage> {
}
None
}

#[inline]
pub(crate) fn full_key_expr(&self) -> Option<OwnedKeyExpr> {
let full_expr = self.full_expr()?;
OwnedKeyExpr::new(full_expr).ok()
}
}
Loading

0 comments on commit 078898d

Please sign in to comment.