Skip to content

Commit

Permalink
Add downsampling interceptor POC
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Jan 24, 2024
1 parent 62049b1 commit 3fd92fb
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 5 deletions.
11 changes: 11 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ impl Zeroize for SecretString {

pub type SecretValue = Secret<SecretString>;

#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct DownsamplerConf {
pub keyexpr: OwnedKeyExpr,
pub threshold_ms: u64,
}

pub type ValidationFunction = std::sync::Arc<
dyn Fn(
&str,
Expand Down Expand Up @@ -399,6 +405,11 @@ validated_struct::validator! {
},

},
/// Configuration of the downsampling.
pub downsampling: #[derive(Default)]
DownsamplingConf {
downsamples: Vec<DownsamplerConf>,
},
/// A list of directories where plugins may be searched for if no `__path__` was specified for them.
/// The executable's current directory will be added to the search paths.
plugins_search_dirs: Vec<String>, // TODO (low-prio): Switch this String to a PathBuf? (applies to other paths in the config as well)
Expand Down
16 changes: 16 additions & 0 deletions zenoh/src/net/routing/dispatcher/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,22 @@ impl Tables {
// let queries_default_timeout =
// Duration::from_millis(unwrap_or_default!(config.queries_default_timeout()));
let hat_code = hat::new_hat(whatami, config);

//TODO(sashacmc): add interceptors config reloading there or incapsulate in the interceptors, but it
//will require interface changes
//
//// config reloading sample:
//let cfg_rx = config.subscribe();
//task::spawn({
// async move {
// while let Ok(change) = cfg_rx.recv_async().await {
// let change = change.strip_prefix('/').unwrap_or(&change);
// if !change.starts_with("plugins") {
// continue;
// }
// }
//});

Tables {
zid,
whatami,
Expand Down
126 changes: 126 additions & 0 deletions zenoh/src/net/routing/interceptor/downsampling.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

//! ⚠️ WARNING ⚠️
//!
//! This module is intended for Zenoh's internal use.
//!
//! [Click here for Zenoh's documentation](../zenoh/index.html)
use crate::net::routing::interceptor::*;
use crate::KeyExpr;
use std::sync::{Arc, Mutex};
use zenoh_config::DownsamplerConf;

pub(crate) struct IngressMsgDownsampler {
conf: DownsamplerConf,
latest_message_timestamp: Arc<Mutex<std::time::Instant>>,
}

impl InterceptorTrait for IngressMsgDownsampler {
fn intercept(
&self,
ctx: RoutingContext<NetworkMessage>,
) -> Option<RoutingContext<NetworkMessage>> {
if let Some(full_expr) = ctx.full_expr() {
match KeyExpr::new(full_expr) {
Ok(keyexpr) => {
if !self.conf.keyexpr.intersects(&keyexpr) {
return Some(ctx);
}

let timestamp = std::time::Instant::now();
let mut latest_message_timestamp =
self.latest_message_timestamp.lock().unwrap();

if timestamp - *latest_message_timestamp
>= std::time::Duration::from_millis(self.conf.threshold_ms)
{
*latest_message_timestamp = timestamp;
log::trace!("Interceptor: Passed threshold, passing.");
Some(ctx)
} else {
log::trace!("Interceptor: Skipped due to threshold.");
None
}
}
Err(_) => {
log::warn!("Interceptor: Wrong KeyExpr, passing.");
Some(ctx)
}
}
} else {
// message has no key expr
Some(ctx)
}
}
}

impl IngressMsgDownsampler {
pub fn new(conf: DownsamplerConf) -> Self {
// TODO (sashacmc): I need just := 0, but how???
let zero_ts =
std::time::Instant::now() - std::time::Duration::from_micros(conf.threshold_ms);
Self {
conf,
latest_message_timestamp: Arc::new(Mutex::new(zero_ts)),
}
}
}

pub(crate) struct EgressMsgDownsampler {}

impl InterceptorTrait for EgressMsgDownsampler {
fn intercept(
&self,
ctx: RoutingContext<NetworkMessage>,
) -> Option<RoutingContext<NetworkMessage>> {
// TODO(sashacmc): Do we need Ergress Downsampler?
Some(ctx)
}
}

pub struct DownsamplerInterceptor {
conf: DownsamplerConf,
}

impl DownsamplerInterceptor {
pub fn new(conf: DownsamplerConf) -> Self {
log::debug!("DownsamplerInterceptor enabled: {:?}", conf);
Self { conf }
}
}

impl InterceptorFactoryTrait for DownsamplerInterceptor {
fn new_transport_unicast(
&self,
transport: &TransportUnicast,
) -> (Option<IngressInterceptor>, Option<EgressInterceptor>) {
log::debug!("New transport unicast {:?}", transport);
(
Some(Box::new(IngressMsgDownsampler::new(self.conf.clone()))),
Some(Box::new(EgressMsgDownsampler {})),
)
}

fn new_transport_multicast(&self, transport: &TransportMulticast) -> Option<EgressInterceptor> {
log::debug!("New transport multicast {:?}", transport);
Some(Box::new(EgressMsgDownsampler {}))
}

fn new_peer_multicast(&self, transport: &TransportMulticast) -> Option<IngressInterceptor> {
log::debug!("New peer multicast {:?}", transport);
Some(Box::new(IngressMsgDownsampler::new(self.conf.clone())))
}
}
17 changes: 12 additions & 5 deletions zenoh/src/net/routing/interceptor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
//! This module is intended for Zenoh's internal use.
//!
//! [Click here for Zenoh's documentation](../zenoh/index.html)
use super::RoutingContext;
use zenoh_config::Config;
use zenoh_protocol::network::NetworkMessage;
use zenoh_transport::{multicast::TransportMulticast, unicast::TransportUnicast};

pub mod downsampling;
use crate::net::routing::interceptor::downsampling::DownsamplerInterceptor;

pub(crate) trait InterceptorTrait {
fn intercept(
&self,
Expand All @@ -44,11 +48,14 @@ pub(crate) trait InterceptorFactoryTrait {

pub(crate) type InterceptorFactory = Box<dyn InterceptorFactoryTrait + Send + Sync>;

pub(crate) fn interceptor_factories(_config: &Config) -> Vec<InterceptorFactory> {
// Add interceptors here
// TODO build the list of intercetors with the correct order from the config
// vec![Box::new(LoggerInterceptor {})]
vec![]
pub(crate) fn interceptor_factories(config: &Config) -> Vec<InterceptorFactory> {
let mut res: Vec<InterceptorFactory> = vec![];

for ds in config.downsampling().downsamples() {
res.push(Box::new(DownsamplerInterceptor::new(ds.clone())))
}
res.push(Box::new(LoggerInterceptor {}));
res
}

pub(crate) struct InterceptorsChain {
Expand Down
79 changes: 79 additions & 0 deletions zenoh/tests/interceptors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use std::sync::{Arc, Mutex};

#[test]
fn downsampling() {
let _ = env_logger::builder().is_test(true).try_init();

use zenoh::prelude::sync::*;

// declare publisher
let mut config = Config::default();
config
.insert_json5(
"downsampling/downsamples",
r#"
[
{
keyexpr: "test/downsamples/r100",
threshold_ms: 100,
},
{
keyexpr: "test/downsamples/r50",
threshold_ms: 50,
},
]
"#,
)
.unwrap();

// declare subscriber
let zenoh_sub = zenoh::open(config).res().unwrap();

let last_time_r100 = Arc::new(Mutex::new(
std::time::Instant::now() - std::time::Duration::from_millis(100),
));
let last_time_r50 = Arc::new(Mutex::new(
std::time::Instant::now() - std::time::Duration::from_millis(50),
));

let _sub = zenoh_sub
.declare_subscriber("test/downsamples/*")
.callback(move |sample| {
let curr_time = std::time::Instant::now();
if sample.key_expr.as_str() == "test/downsamples/r100" {
let mut last_time = last_time_r100.lock().unwrap();
let interval = (curr_time - *last_time).as_millis() + 1;
*last_time = curr_time;
println!("interval 100: {}", interval);
assert!(interval >= 100);
} else if sample.key_expr.as_str() == "test/downsamples/r50" {
let mut last_time = last_time_r50.lock().unwrap();
let interval = (curr_time - *last_time).as_millis() + 1;
*last_time = curr_time;
println!("interval 50: {}", interval);
assert!(interval >= 50);
}
})
.res()
.unwrap();

let zenoh_pub = zenoh::open(Config::default()).res().unwrap();
let publisher_r100 = zenoh_pub
.declare_publisher("test/downsamples/r100")
.res()
.unwrap();

let publisher_r50 = zenoh_pub
.declare_publisher("test/downsamples/r50")
.res()
.unwrap();

let interval = std::time::Duration::from_millis(1);
for i in 0..1000 {
println!("message {}", i);
publisher_r100.put(format!("message {}", i)).res().unwrap();
publisher_r50.put(format!("message {}", i)).res().unwrap();

std::thread::sleep(interval);
}
}

0 comments on commit 3fd92fb

Please sign in to comment.