diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 5eb0aa1af2..753abdb8e6 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -72,8 +72,11 @@ pub type SecretValue = Secret; #[derive(Debug, Deserialize, Serialize, Clone)] pub struct DownsamplerConf { - pub keyexpr: Option, - pub interface: Option, + /// A list of key-expressions to which the downsampling will be applied. + pub keyexprs: Option>, + /// A list of interfaces to which the downsampling will be applied. + pub interfaces: Option>, + /// Downsampling strategy. pub strategy: Option, pub threshold_ms: Option, } diff --git a/zenoh/src/net/routing/interceptor/downsampling.rs b/zenoh/src/net/routing/interceptor/downsampling.rs index 94b11a3033..31e5544970 100644 --- a/zenoh/src/net/routing/interceptor/downsampling.rs +++ b/zenoh/src/net/routing/interceptor/downsampling.rs @@ -37,7 +37,7 @@ impl InterceptorTrait for IngressMsgDownsampler { } pub(crate) struct EgressMsgDownsampler { - keyexpr: Option, + keyexprs: Option>, threshold: std::time::Duration, latest_message_timestamp: Arc>, } @@ -47,12 +47,16 @@ impl InterceptorTrait for EgressMsgDownsampler { &self, ctx: RoutingContext, ) -> Option> { - if let Some(cfg_keyexpr) = self.keyexpr.as_ref() { + if let Some(cfg_keyexprs) = self.keyexprs.as_ref() { + let mut matched = false; if let Some(keyexpr) = ctx.full_key_expr() { - if !cfg_keyexpr.intersects(&keyexpr) { - return Some(ctx); + for cfg_keyexpr in cfg_keyexprs { + if cfg_keyexpr.intersects(&keyexpr) { + matched = true; + } } - } else { + } + if !matched { return Some(ctx); } } @@ -76,7 +80,7 @@ impl EgressMsgDownsampler { if let Some(threshold_ms) = conf.threshold_ms { let threshold = std::time::Duration::from_millis(threshold_ms); Self { - keyexpr: conf.keyexpr, + keyexprs: conf.keyexprs, threshold, // TODO (sashacmc): I need just := 0, but how??? latest_message_timestamp: Arc::new(Mutex::new( @@ -106,21 +110,23 @@ impl InterceptorFactoryTrait for DownsamplerInterceptor { &self, transport: &TransportUnicast, ) -> (Option, Option) { - log::debug!("New transport unicast {:?}", transport); - if let Some(interface) = self.conf.interface.clone() { - log::debug!("New downsampler transport unicast interface: {}", interface); - print!("New downsampler transport unicast interface: {}", interface); + log::debug!("New downsampler transport unicast {:?}", transport); + if let Some(interfaces) = &self.conf.interfaces { + log::debug!( + "New downsampler transport unicast config interfaces: {:?}", + interfaces + ); if let Ok(links) = transport.get_links() { for link in links { log::debug!( - "New downsampler transport unicast interfaces: {:?}", - link.interfaces - ); - print!( - "New downsampler transport unicast interfaces: {:?}", + "New downsampler transport unicast link interfaces: {:?}", link.interfaces ); - if !link.interfaces.contains(&interface) { + if !link + .interfaces + .iter() + .any(|interface| interfaces.contains(&interface)) + { return (None, None); } } diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs index 4838b8f587..502940ab41 100644 --- a/zenoh/tests/interceptors.rs +++ b/zenoh/tests/interceptors.rs @@ -50,11 +50,11 @@ fn downsampling_by_keyexpr() { r#" [ { - keyexpr: "test/downsamples_by_keyexp/r100", + keyexprs: ["test/downsamples_by_keyexp/r100"], threshold_ms: 100, }, { - keyexpr: "test/downsamples_by_keyexp/r50", + keyexprs: ["test/downsamples_by_keyexp/r50"], threshold_ms: 50, }, ] @@ -138,13 +138,13 @@ fn downsampling_by_interface() { r#" [ { - keyexpr: "test/downsamples_by_interface/r100", - interface: "lo", + keyexprs: ["test/downsamples_by_interface/r100"], + interfaces: ["lo", "lo0"], threshold_ms: 100, }, { - keyexpr: "test/downsamples_by_interface/all", - interface: "some_unknown_interface", + keyexprs: ["test/downsamples_by_interface/all"], + interfaces: ["some_unknown_interface"], threshold_ms: 100, }, ]