diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs index b8f9164cfd..436078becf 100644 --- a/zenoh/tests/interceptors.rs +++ b/zenoh/tests/interceptors.rs @@ -11,273 +11,227 @@ // Contributors: // ZettaScale Zenoh Team, // -use std::sync::{Arc, Mutex}; -use zenoh_core::zlock; - +use std::collections::HashMap; +use std::sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, +}; +use zenoh::prelude::sync::*; +use zenoh::prelude::Config; +use zenoh_config::{DownsamplingItemConf, DownsamplingRuleConf, InterceptorFlow}; + +// Tokio's time granularity on different platforms #[cfg(target_os = "windows")] static MINIMAL_SLEEP_INTERVAL_MS: u64 = 17; #[cfg(not(target_os = "windows"))] static MINIMAL_SLEEP_INTERVAL_MS: u64 = 2; -struct IntervalCounter { - first_tick: bool, - last_time: std::time::Instant, - count: u32, - total_time: std::time::Duration, -} +static REPEAT: usize = 3; +static WARMUP_MS: u64 = 500; -impl IntervalCounter { - fn new() -> IntervalCounter { - IntervalCounter { - first_tick: true, - last_time: std::time::Instant::now(), - count: 0, - total_time: std::time::Duration::from_secs(0), - } - } +fn build_config( + locator: &str, + ds_config: Vec, + flow: InterceptorFlow, +) -> (Config, Config) { + let mut pub_config = Config::default(); + pub_config + .scouting + .multicast + .set_enabled(Some(false)) + .unwrap(); - fn tick(&mut self) { - let curr_time = std::time::Instant::now(); - if self.first_tick { - self.first_tick = false; - } else { - self.total_time += curr_time - self.last_time; - self.count += 1; - } - self.last_time = curr_time; - } + let mut sub_config = Config::default(); + sub_config + .scouting + .multicast + .set_enabled(Some(false)) + .unwrap(); - fn get_middle(&self) -> u32 { - assert!(self.count > 0); - self.total_time.as_millis() as u32 / self.count - } + sub_config.listen.endpoints = vec![locator.parse().unwrap()]; + pub_config.connect.endpoints = vec![locator.parse().unwrap()]; - fn get_count(&self) -> u32 { - self.count - } + match flow { + InterceptorFlow::Egress => pub_config.set_downsampling(ds_config).unwrap(), + InterceptorFlow::Ingress => sub_config.set_downsampling(ds_config).unwrap(), + }; - fn check_middle(&self, ms: u32) { - let middle = self.get_middle(); - println!("Interval {}, count: {}, middle: {}", ms, self.count, middle); - assert!(middle + 1 >= ms); - } + (pub_config, sub_config) } -fn downsampling_by_keyexpr_impl(egress: bool) { - zenoh_util::try_init_log_from_env(); - - use zenoh::prelude::sync::*; - - let ds_cfg = format!( - r#" - [ - {{ - flow: "{}", - rules: [ - {{ key_expr: "test/downsamples_by_keyexp/r100", freq: 10, }}, - {{ key_expr: "test/downsamples_by_keyexp/r50", freq: 20, }} - ], - }}, - ] "#, - (if egress { "egress" } else { "ingress" }) +fn downsampling_test( + pub_config: Config, + sub_config: Config, + ke_prefix: &str, + ke_of_rates: Vec>, + rate_check: F, +) where + F: Fn(KeyExpr<'_>, usize) -> bool + Send + 'static, +{ + type Counters<'a> = Arc, AtomicUsize>>; + let counters: Counters = Arc::new( + ke_of_rates + .clone() + .into_iter() + .map(|ke| (ke, AtomicUsize::new(0))) + .collect(), ); - // declare subscriber - let mut config_sub = Config::default(); - if !egress { - config_sub.insert_json5("downsampling", &ds_cfg).unwrap(); - } - config_sub - .insert_json5("listen/endpoints", r#"["tcp/127.0.0.1:38446"]"#) - .unwrap(); - config_sub - .scouting - .multicast - .set_enabled(Some(false)) - .unwrap(); - let zenoh_sub = zenoh::open(config_sub).res().unwrap(); - - let counter_r100 = Arc::new(Mutex::new(IntervalCounter::new())); - let counter_r100_clone = counter_r100.clone(); - let counter_r50 = Arc::new(Mutex::new(IntervalCounter::new())); - let counter_r50_clone = counter_r50.clone(); - - let total_count = Arc::new(Mutex::new(0)); - let total_count_clone = total_count.clone(); - - let _sub = zenoh_sub - .declare_subscriber("test/downsamples_by_keyexp/*") - .callback(move |sample| { - let mut count = zlock!(total_count_clone); - *count += 1; - if sample.key_expr.as_str() == "test/downsamples_by_keyexp/r100" { - zlock!(counter_r100).tick(); - } else if sample.key_expr.as_str() == "test/downsamples_by_keyexp/r50" { - zlock!(counter_r50).tick(); + let sub_session = zenoh::open(sub_config).res().unwrap(); + let _sub = sub_session + .declare_subscriber(format!("{ke_prefix}/*")) + .callback({ + let counters = counters.clone(); + move |sample| { + counters + .get(&sample.key_expr) + .map(|ctr| ctr.fetch_add(1, Ordering::SeqCst)); } }) .res() .unwrap(); - // declare publisher - let mut config_pub = Config::default(); - if egress { - config_pub.insert_json5("downsampling", &ds_cfg).unwrap(); + let is_terminated = Arc::new(AtomicBool::new(false)); + let c_is_terminated = is_terminated.clone(); + let handle = std::thread::spawn(move || { + let pub_session = zenoh::open(pub_config).res().unwrap(); + let publishers: Vec<_> = ke_of_rates + .into_iter() + .map(|ke| pub_session.declare_publisher(ke).res().unwrap()) + .collect(); + let interval = std::time::Duration::from_millis(MINIMAL_SLEEP_INTERVAL_MS); + while !c_is_terminated.load(Ordering::SeqCst) { + publishers.iter().for_each(|publ| { + publ.put("message").res().unwrap(); + }); + std::thread::sleep(interval); + } + }); + + std::thread::sleep(std::time::Duration::from_millis(WARMUP_MS)); + counters.iter().for_each(|(_, ctr)| { + ctr.swap(0, Ordering::SeqCst); + }); + + for _ in 0..REPEAT { + std::thread::sleep(std::time::Duration::from_secs(1)); + counters.iter().for_each(|(ke, ctr)| { + let rate = ctr.swap(0, Ordering::SeqCst); + if !rate_check(ke.into(), rate) { + panic!("The test failed on the {ke:?} at the rate of {rate:?}"); + } + }); } - config_pub - .insert_json5("connect/endpoints", r#"["tcp/127.0.0.1:38446"]"#) - .unwrap(); - config_pub - .scouting - .multicast - .set_enabled(Some(false)) - .unwrap(); - let zenoh_pub = zenoh::open(config_pub).res().unwrap(); - let publisher_r100 = zenoh_pub - .declare_publisher("test/downsamples_by_keyexp/r100") - .res() - .unwrap(); - - let publisher_r50 = zenoh_pub - .declare_publisher("test/downsamples_by_keyexp/r50") - .res() - .unwrap(); - let publisher_all = zenoh_pub - .declare_publisher("test/downsamples_by_keyexp/all") - .res() - .unwrap(); - - // WARN(yuyuan): 2 ms is the limit of tokio - let interval = std::time::Duration::from_millis(MINIMAL_SLEEP_INTERVAL_MS); - let messages_count = 1000; - for i in 0..messages_count { - publisher_r100.put(format!("message {}", i)).res().unwrap(); - publisher_r50.put(format!("message {}", i)).res().unwrap(); - publisher_all.put(format!("message {}", i)).res().unwrap(); - std::thread::sleep(interval); + let _ = is_terminated.swap(true, Ordering::SeqCst); + if let Err(err) = handle.join() { + panic!("Failed to join the handle due to {err:?}"); } +} + +fn downsampling_by_keyexpr_impl(flow: InterceptorFlow) { + let ke_prefix = "test/downsamples_by_keyexp"; + let locator = "tcp/127.0.0.1:38446"; + + let ke_10hz: KeyExpr = format!("{ke_prefix}/10hz").try_into().unwrap(); + let ke_20hz: KeyExpr = format!("{ke_prefix}/20hz").try_into().unwrap(); + + let ds_config = DownsamplingItemConf { + flow, + interfaces: None, + rules: vec![ + DownsamplingRuleConf { + key_expr: ke_10hz.clone().into(), + freq: 10.0, + }, + DownsamplingRuleConf { + key_expr: ke_20hz.clone().into(), + freq: 20.0, + }, + ], + }; - for _ in 0..100 { - if *zlock!(total_count) >= messages_count - && zlock!(counter_r50_clone).get_count() > 0 - && zlock!(counter_r100_clone).get_count() > 0 - { - break; + let ke_of_rates: Vec> = ds_config + .rules + .iter() + .map(|x| x.key_expr.clone().into()) + .collect(); + + let rate_check = move |ke: KeyExpr, rate: usize| -> bool { + tracing::info!("keyexpr: {ke}, rate: {rate}"); + if ke == ke_10hz { + rate > 0 && rate <= 10 + } else if ke == ke_20hz { + rate > 0 && rate <= 20 + } else { + tracing::error!("Shouldn't reach this case. Invalid keyexpr {ke} detected."); + false } - std::thread::sleep(std::time::Duration::from_millis(100)); - } - assert!(*zlock!(total_count) >= messages_count); + }; + + let (pub_config, sub_config) = build_config(locator, vec![ds_config], flow); - zlock!(counter_r50_clone).check_middle(50); - zlock!(counter_r100_clone).check_middle(100); + downsampling_test(pub_config, sub_config, ke_prefix, ke_of_rates, rate_check); } #[test] fn downsampling_by_keyexpr() { - downsampling_by_keyexpr_impl(true); - downsampling_by_keyexpr_impl(false); + zenoh_util::try_init_log_from_env(); + downsampling_by_keyexpr_impl(InterceptorFlow::Ingress); + downsampling_by_keyexpr_impl(InterceptorFlow::Egress); } #[cfg(unix)] -fn downsampling_by_interface_impl(egress: bool) { - zenoh_util::try_init_log_from_env(); - - use zenoh::prelude::sync::*; - - let ds_cfg = format!( - r#" - [ - {{ - interfaces: ["lo", "lo0"], - flow: "{0}", - rules: [ - {{ key_expr: "test/downsamples_by_interface/r100", freq: 10, }}, - ], - }}, - {{ - interfaces: ["some_unknown_interface"], - flow: "{0}", - rules: [ - {{ key_expr: "test/downsamples_by_interface/all", freq: 10, }}, - ], - }}, - ] "#, - (if egress { "egress" } else { "ingress" }) - ); - // declare subscriber - let mut config_sub = Config::default(); - config_sub - .insert_json5("listen/endpoints", r#"["tcp/127.0.0.1:38447"]"#) - .unwrap(); - if !egress { - config_sub.insert_json5("downsampling", &ds_cfg).unwrap(); +fn downsampling_by_interface_impl(flow: InterceptorFlow) { + let ke_prefix = "test/downsamples_by_interface"; + let locator = "tcp/127.0.0.1:38447"; + + let ke_10hz: KeyExpr = format!("{ke_prefix}/10hz").try_into().unwrap(); + let ke_no_effect: KeyExpr = format!("{ke_prefix}/no_effect").try_into().unwrap(); + let ke_of_rates: Vec> = vec![ke_10hz.clone(), ke_no_effect.clone()]; + + let ds_config = vec![ + DownsamplingItemConf { + flow, + interfaces: Some(vec!["lo".to_string(), "lo0".to_string()]), + rules: vec![DownsamplingRuleConf { + key_expr: ke_10hz.clone().into(), + freq: 10.0, + }], + }, + DownsamplingItemConf { + flow, + interfaces: Some(vec!["some_unknown_interface".to_string()]), + rules: vec![DownsamplingRuleConf { + key_expr: ke_no_effect.clone().into(), + freq: 10.0, + }], + }, + ]; + + let rate_check = move |ke: KeyExpr, rate: usize| -> bool { + tracing::info!("keyexpr: {ke}, rate: {rate}"); + if ke == ke_10hz { + rate > 0 && rate <= 10 + } else if ke == ke_no_effect { + rate > 10 + } else { + tracing::error!("Shouldn't reach this case. Invalid keyexpr {ke} detected."); + false + } }; - let zenoh_sub = zenoh::open(config_sub).res().unwrap(); - - let counter_r100 = Arc::new(Mutex::new(IntervalCounter::new())); - let counter_r100_clone = counter_r100.clone(); - let total_count = Arc::new(Mutex::new(0)); - let total_count_clone = total_count.clone(); + let (pub_config, sub_config) = build_config(locator, ds_config, flow); - let _sub = zenoh_sub - .declare_subscriber("test/downsamples_by_interface/*") - .callback(move |sample| { - let mut count = zlock!(total_count_clone); - *count += 1; - if sample.key_expr.as_str() == "test/downsamples_by_interface/r100" { - zlock!(counter_r100).tick(); - } - }) - .res() - .unwrap(); - - // declare publisher - let mut config_pub = Config::default(); - config_pub - .insert_json5("connect/endpoints", r#"["tcp/127.0.0.1:38447"]"#) - .unwrap(); - if egress { - config_pub.insert_json5("downsampling", &ds_cfg).unwrap(); - } - let zenoh_pub = zenoh::open(config_pub).res().unwrap(); - let publisher_r100 = zenoh_pub - .declare_publisher("test/downsamples_by_interface/r100") - .res() - .unwrap(); - - let publisher_all = zenoh_pub - .declare_publisher("test/downsamples_by_interface/all") - .res() - .unwrap(); - - // WARN(yuyuan): 2 ms is the limit of tokio - let interval = std::time::Duration::from_millis(MINIMAL_SLEEP_INTERVAL_MS); - let messages_count = 1000; - for i in 0..messages_count { - publisher_r100.put(format!("message {}", i)).res().unwrap(); - publisher_all.put(format!("message {}", i)).res().unwrap(); - - std::thread::sleep(interval); - } - - for _ in 0..100 { - if *zlock!(total_count) >= messages_count && zlock!(counter_r100_clone).get_count() > 0 { - break; - } - std::thread::sleep(std::time::Duration::from_millis(100)); - } - assert!(*zlock!(total_count) >= messages_count); - - zlock!(counter_r100_clone).check_middle(100); + downsampling_test(pub_config, sub_config, ke_prefix, ke_of_rates, rate_check); } #[cfg(unix)] #[test] fn downsampling_by_interface() { - downsampling_by_interface_impl(true); - downsampling_by_interface_impl(false); + zenoh_util::try_init_log_from_env(); + downsampling_by_interface_impl(InterceptorFlow::Ingress); + downsampling_by_interface_impl(InterceptorFlow::Egress); } #[test]