Skip to content

Commit

Permalink
Update downsampling interceptor (#788)
Browse files Browse the repository at this point in the history
* Implement caching for downsampling interceptor

* Rename rate to freq

* Improve test stability
  • Loading branch information
sashacmc authored Mar 5, 2024
1 parent c80124a commit f2e99b6
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 33 deletions.
4 changes: 2 additions & 2 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@
// interfaces: [ "wlan0" ],
// /// Data flow messages will be processed on. ("egress" or "ingress")
// flow: "egress",
// /// A list of downsampling rules: key_expression and the rate (maximum frequency in Hertz)
// /// A list of downsampling rules: key_expression and the maximum frequency in Hertz
// rules: [
// { key_expr: "demo/example/zenoh-rs-pub", rate: 0.1 },
// { key_expr: "demo/example/zenoh-rs-pub", freq: 0.1 },
// ],
// },
// ],
Expand Down
2 changes: 1 addition & 1 deletion commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub struct DownsamplingRuleConf {
/// Downsampling will be applied for all key extensions if the parameter is None
pub key_expr: OwnedKeyExpr,
/// The maximum frequency in Hertz;
pub rate: f64,
pub freq: f64,
}

#[derive(Debug, Deserialize, Serialize, Clone)]
Expand Down
66 changes: 44 additions & 22 deletions zenoh/src/net/routing/interceptor/downsampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@
//! [Click here for Zenoh's documentation](../zenoh/index.html)
use crate::net::routing::interceptor::*;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use zenoh_config::{DownsamplingFlow, DownsamplingItemConf, DownsamplingRuleConf};
use zenoh_core::zlock;
use zenoh_keyexpr::keyexpr_tree::impls::KeyedSetProvider;
use zenoh_keyexpr::keyexpr_tree::IKeyExprTreeMut;
use zenoh_keyexpr::keyexpr_tree::{support::UnknownWildness, KeBoxTree};
use zenoh_keyexpr::keyexpr_tree::{IKeyExprTree, IKeyExprTreeMut};
use zenoh_protocol::network::NetworkBody;
use zenoh_result::ZResult;

Expand Down Expand Up @@ -82,12 +83,16 @@ impl InterceptorFactoryTrait for DownsamplingInterceptorFactory {

match self.flow {
DownsamplingFlow::Ingress => (
Some(Box::new(DownsamplingInterceptor::new(self.rules.clone()))),
Some(Box::new(ComputeOnMiss::new(DownsamplingInterceptor::new(
self.rules.clone(),
)))),
None,
),
DownsamplingFlow::Egress => (
None,
Some(Box::new(DownsamplingInterceptor::new(self.rules.clone()))),
Some(Box::new(ComputeOnMiss::new(DownsamplingInterceptor::new(
self.rules.clone(),
)))),
),
}
}
Expand All @@ -110,31 +115,45 @@ struct Timestate {
}

pub(crate) struct DownsamplingInterceptor {
ke_state: Arc<Mutex<KeBoxTree<Timestate, UnknownWildness, KeyedSetProvider>>>,
ke_id: Arc<Mutex<KeBoxTree<usize, UnknownWildness, KeyedSetProvider>>>,
ke_state: Arc<Mutex<HashMap<usize, Timestate>>>,
}

impl InterceptorTrait for DownsamplingInterceptor {
fn compute_keyexpr_cache(&self, _key_expr: &KeyExpr<'_>) -> Option<Box<dyn Any + Send + Sync>> {
None
fn compute_keyexpr_cache(&self, key_expr: &KeyExpr<'_>) -> Option<Box<dyn Any + Send + Sync>> {
let ke_id = zlock!(self.ke_id);
if let Some(id) = ke_id.weight_at(&key_expr.clone()) {
Some(Box::new(Some(*id)))
} else {
Some(Box::new(None::<usize>))
}
}

fn intercept(
&self,
ctx: RoutingContext<NetworkMessage>,
_cache: Option<&Box<dyn Any + Send + Sync>>,
cache: Option<&Box<dyn Any + Send + Sync>>,
) -> Option<RoutingContext<NetworkMessage>> {
if matches!(ctx.msg.body, NetworkBody::Push(_)) {
if let Some(key_expr) = ctx.full_key_expr() {
let mut ke_state = zlock!(self.ke_state);
if let Some(state) = ke_state.weight_at_mut(&key_expr.clone()) {
let timestamp = std::time::Instant::now();

if timestamp - state.latest_message_timestamp >= state.threshold {
state.latest_message_timestamp = timestamp;
return Some(ctx);
} else {
return None;
if let Some(cache) = cache {
if let Some(id) = cache.downcast_ref::<Option<usize>>() {
if let Some(id) = id {
let mut ke_state = zlock!(self.ke_state);
if let Some(state) = ke_state.get_mut(id) {
let timestamp = std::time::Instant::now();

if timestamp - state.latest_message_timestamp >= state.threshold {
state.latest_message_timestamp = timestamp;
return Some(ctx);
} else {
return None;
}
} else {
log::debug!("unxpected cache ID {}", id);
}
}
} else {
log::debug!("unxpected cache type {:?}", ctx.full_expr());
}
}
}
Expand All @@ -147,24 +166,27 @@ const NANOS_PER_SEC: f64 = 1_000_000_000.0;

impl DownsamplingInterceptor {
pub fn new(rules: Vec<DownsamplingRuleConf>) -> Self {
let mut ke_state = KeBoxTree::default();
for rule in rules {
let mut ke_id = KeBoxTree::default();
let mut ke_state = HashMap::default();
for (id, rule) in rules.into_iter().enumerate() {
let mut threshold = std::time::Duration::MAX;
let mut latest_message_timestamp = std::time::Instant::now();
if rule.rate != 0.0 {
if rule.freq != 0.0 {
threshold =
std::time::Duration::from_nanos((1. / rule.rate * NANOS_PER_SEC) as u64);
std::time::Duration::from_nanos((1. / rule.freq * NANOS_PER_SEC) as u64);
latest_message_timestamp -= threshold;
}
ke_id.insert(&rule.key_expr, id);
ke_state.insert(
&rule.key_expr,
id,
Timestate {
threshold,
latest_message_timestamp,
},
);
}
Self {
ke_id: Arc::new(Mutex::new(ke_id)),
ke_state: Arc::new(Mutex::new(ke_state)),
}
}
Expand Down
23 changes: 15 additions & 8 deletions zenoh/tests/interceptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ impl IntervalCounter {
self.total_time.as_millis() as u32 / self.count
}

fn get_count(&self) -> u32 {
self.count
}

fn check_middle(&self, ms: u32) {
let middle = self.get_middle();
println!("Interval {}, count: {}, middle: {}", ms, self.count, middle);
Expand All @@ -51,8 +55,8 @@ fn downsampling_by_keyexpr_impl(egress: bool) {
{{
flow: "{}",
rules: [
{{ key_expr: "test/downsamples_by_keyexp/r100", rate: 10, }},
{{ key_expr: "test/downsamples_by_keyexp/r50", rate: 20, }}
{{ key_expr: "test/downsamples_by_keyexp/r100", freq: 10, }},
{{ key_expr: "test/downsamples_by_keyexp/r50", freq: 20, }}
],
}},
] "#,
Expand Down Expand Up @@ -120,7 +124,10 @@ fn downsampling_by_keyexpr_impl(egress: bool) {
}

for _ in 0..100 {
if *zlock!(total_count) >= messages_count {
if *zlock!(total_count) >= messages_count
&& zlock!(counter_r50_clone).get_count() > 0
&& zlock!(counter_r100_clone).get_count() > 0
{
break;
}
std::thread::sleep(std::time::Duration::from_millis(100));
Expand Down Expand Up @@ -150,14 +157,14 @@ fn downsampling_by_interface_impl(egress: bool) {
interfaces: ["lo", "lo0"],
flow: "{0}",
rules: [
{{ key_expr: "test/downsamples_by_interface/r100", rate: 10, }},
{{ key_expr: "test/downsamples_by_interface/r100", freq: 10, }},
],
}},
{{
interfaces: ["some_unknown_interface"],
flow: "{0}",
rules: [
{{ key_expr: "test/downsamples_by_interface/all", rate: 10, }},
{{ key_expr: "test/downsamples_by_interface/all", freq: 10, }},
],
}},
] "#,
Expand Down Expand Up @@ -220,7 +227,7 @@ fn downsampling_by_interface_impl(egress: bool) {
}

for _ in 0..100 {
if *zlock!(total_count) >= messages_count {
if *zlock!(total_count) >= messages_count && zlock!(counter_r100_clone).get_count() > 0 {
break;
}
std::thread::sleep(std::time::Duration::from_millis(100));
Expand Down Expand Up @@ -253,8 +260,8 @@ fn downsampling_config_error_wrong_strategy() {
{
flow: "down",
rules: [
{ keyexpr: "test/downsamples_by_keyexp/r100", rate: 10, },
{ keyexpr: "test/downsamples_by_keyexp/r50", rate: 20, }
{ keyexpr: "test/downsamples_by_keyexp/r100", freq: 10, },
{ keyexpr: "test/downsamples_by_keyexp/r50", freq: 20, }
],
},
]
Expand Down

0 comments on commit f2e99b6

Please sign in to comment.