Skip to content

Commit

Permalink
Config refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Feb 5, 2024
1 parent 437f1d1 commit ab406d8
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 12 deletions.
11 changes: 11 additions & 0 deletions commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,14 @@ impl Default for SharedMemoryConf {
Self { enabled: false }
}
}

impl Default for DownsamplingItemConf {
fn default() -> Self {
Self {
keyexprs: None,
interfaces: None,
strategy: Some("ratelimit".to_string()),
threshold_ms: None,
}
}
}
5 changes: 2 additions & 3 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,12 @@ impl Zeroize for SecretString {
pub type SecretValue = Secret<SecretString>;

#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct DownsamplerConf {
pub struct DownsamplingItemConf {
/// A list of key-expressions to which the downsampling will be applied.
pub keyexprs: Option<Vec<OwnedKeyExpr>>,
/// A list of interfaces to which the downsampling will be applied.
pub interfaces: Option<Vec<String>>,
/// Downsampling strategy (default: ratelimit).
// TODO(sashacmc): how specify default value and generate DEFAULT_CONFIG?
pub strategy: Option<String>,
/// Minimim timeout between two messages for ratelimit starategy
pub threshold_ms: Option<u64>,
Expand Down Expand Up @@ -421,7 +420,7 @@ validated_struct::validator! {
/// Configuration of the downsampling.
pub downsampling: #[derive(Default)]
DownsamplingConf {
downsamples: Vec<DownsamplerConf>,
items: Vec<DownsamplingItemConf>,
},
/// A list of directories where plugins may be searched for if no `__path__` was specified for them.
/// The executable's current directory will be added to the search paths.
Expand Down
8 changes: 4 additions & 4 deletions zenoh/src/net/routing/interceptor/downsampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use crate::net::routing::interceptor::*;
use std::sync::{Arc, Mutex};
use zenoh_config::DownsamplerConf;
use zenoh_config::DownsamplingItemConf;
use zenoh_protocol::core::key_expr::OwnedKeyExpr;

const RATELIMIT_STRATEGY: &str = "ratelimit";
Expand Down Expand Up @@ -67,7 +67,7 @@ impl InterceptorTrait for EgressMsgDownsamplerRatelimit {
}

impl EgressMsgDownsamplerRatelimit {
pub fn new(conf: DownsamplerConf) -> Self {
pub fn new(conf: DownsamplingItemConf) -> Self {
if let Some(threshold_ms) = conf.threshold_ms {
let threshold = std::time::Duration::from_millis(threshold_ms);
Self {
Expand All @@ -86,11 +86,11 @@ impl EgressMsgDownsamplerRatelimit {
}

pub struct DownsamplerInterceptor {
conf: DownsamplerConf,
conf: DownsamplingItemConf,
}

impl DownsamplerInterceptor {
pub fn new(conf: DownsamplerConf) -> Self {
pub fn new(conf: DownsamplingItemConf) -> Self {
log::debug!("DownsamplerInterceptor enabled: {:?}", conf);
Self { conf }
}
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/net/routing/interceptor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub(crate) type InterceptorFactory = Box<dyn InterceptorFactoryTrait + Send + Sy
pub(crate) fn interceptor_factories(config: &Config) -> Vec<InterceptorFactory> {
let mut res: Vec<InterceptorFactory> = vec![];

for ds in config.downsampling().downsamples() {
for ds in config.downsampling().items() {
res.push(Box::new(DownsamplerInterceptor::new(ds.clone())))
}
res.push(Box::new(LoggerInterceptor {}));
Expand Down
6 changes: 2 additions & 4 deletions zenoh/tests/interceptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ fn downsampling_by_keyexpr() {
let mut config = Config::default();
config
.insert_json5(
"downsampling/downsamples",
"downsampling/items",
r#"
[
{
Expand Down Expand Up @@ -108,7 +108,6 @@ fn downsampling_by_keyexpr() {
let interval = std::time::Duration::from_millis(1);
let messages_count = 1000;
for i in 0..messages_count {
println!("message {}", i);
publisher_r100.put(format!("message {}", i)).res().unwrap();
publisher_r50.put(format!("message {}", i)).res().unwrap();
publisher_all.put(format!("message {}", i)).res().unwrap();
Expand Down Expand Up @@ -167,7 +166,7 @@ fn downsampling_by_interface() {
.unwrap();
config_pub
.insert_json5(
"downsampling/downsamples",
"downsampling/items",
r#"
[
{
Expand Down Expand Up @@ -199,7 +198,6 @@ fn downsampling_by_interface() {
let interval = std::time::Duration::from_millis(1);
let messages_count = 1000;
for i in 0..messages_count {
println!("message {}", i);
publisher_r100.put(format!("message {}", i)).res().unwrap();
publisher_all.put(format!("message {}", i)).res().unwrap();

Expand Down

0 comments on commit ab406d8

Please sign in to comment.