Skip to content

Commit

Permalink
Rework interceptor configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Feb 9, 2024
1 parent e39082b commit 25621ea
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 92 deletions.
2 changes: 1 addition & 1 deletion commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl Default for DownsamplingItemConf {
Self {
keyexprs: None,
interfaces: None,
strategy: Some("ratelimit".to_string()),
strategy: Some(DownsamplingStrategy::Ratelimit),
threshold_ms: None,
}
}
Expand Down
7 changes: 6 additions & 1 deletion commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,19 @@ impl Zeroize for SecretString {

pub type SecretValue = Secret<SecretString>;

#[derive(Debug, Deserialize, Serialize, Clone)]
pub enum DownsamplingStrategy {
Ratelimit,
}

#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct DownsamplingItemConf {
/// A list of key-expressions to which the downsampling will be applied.
pub keyexprs: Option<Vec<OwnedKeyExpr>>,
/// A list of interfaces to which the downsampling will be applied.
pub interfaces: Option<Vec<String>>,
/// Downsampling strategy (default: ratelimit).
pub strategy: Option<String>,
pub strategy: Option<DownsamplingStrategy>,
/// Minimim timeout between two messages for ratelimit starategy
pub threshold_ms: Option<u64>,
}
Expand Down
14 changes: 10 additions & 4 deletions zenoh/src/net/routing/dispatcher/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use zenoh_config::unwrap_or_default;
use zenoh_config::Config;
use zenoh_protocol::core::{ExprId, WhatAmI, ZenohId};
use zenoh_protocol::network::Mapping;
use zenoh_result::ZResult;
// use zenoh_collections::Timer;
use zenoh_sync::get_mut_unchecked;

Expand Down Expand Up @@ -76,15 +77,20 @@ pub struct Tables {
}

impl Tables {
pub fn new(zid: ZenohId, whatami: WhatAmI, hlc: Option<Arc<HLC>>, config: &Config) -> Self {
pub fn new(
zid: ZenohId,
whatami: WhatAmI,
hlc: Option<Arc<HLC>>,
config: &Config,
) -> ZResult<Self> {
let drop_future_timestamp =
unwrap_or_default!(config.timestamping().drop_future_timestamp());
let router_peers_failover_brokering =
unwrap_or_default!(config.routing().router().peers_failover_brokering());
// let queries_default_timeout =
// Duration::from_millis(unwrap_or_default!(config.queries_default_timeout()));
let hat_code = hat::new_hat(whatami, config);
Tables {
Ok(Tables {
zid,
whatami,
face_counter: 0,
Expand All @@ -96,11 +102,11 @@ impl Tables {
faces: HashMap::new(),
mcast_groups: vec![],
mcast_faces: vec![],
interceptors: interceptor_factories(config),
interceptors: interceptor_factories(config)?,
pull_caches_lock: Mutex::new(()),
hat: hat_code.new_tables(router_peers_failover_brokering),
hat_code: hat_code.into(),
}
})
}

#[doc(hidden)]
Expand Down
165 changes: 90 additions & 75 deletions zenoh/src/net/routing/interceptor/downsampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,91 +21,71 @@
use crate::net::routing::interceptor::*;
use async_liveliness_monitor::support::AtomicInstant;
use std::sync::atomic::Ordering;
use zenoh_config::DownsamplingItemConf;
use zenoh_config::{DownsamplingConf, DownsamplingItemConf, DownsamplingStrategy};
use zenoh_protocol::core::key_expr::OwnedKeyExpr;
use zenoh_result::{zerror, ZResult};

const RATELIMIT_STRATEGY: &str = "ratelimit";
pub(crate) fn downsampling_interceptor_factories(
config: &DownsamplingConf,
) -> ZResult<Vec<InterceptorFactory>> {
let mut res: Vec<InterceptorFactory> = vec![];

// EgressMsgDownsamplerRatelimit implements default ratelimit strategy, it has following
for ds in config.items() {
let strategy = ds
.strategy
.clone()
.unwrap_or(DownsamplingStrategy::Ratelimit);

match strategy {
DownsamplingStrategy::Ratelimit => {
res.push(Box::new(DownsamplerInterceptorRateLimit::new(ds.clone())?));
}
}
}

Ok(res)
}

// DownsamplerInterceptorRateLimit implements default ratelimit strategy, it has following
// configuration parameters:
// threshold - mandatory, indicates the minimum interval between messages.
// All messages arriving with a smaller interval will be skipped
// keyexprs - optional, list of key expressions, only messages matching this expression will be processed,
// the rest will be passed as is.
// interfaces - optional, list of interfaces messages will be processed on,
// the rest will be passed as is.
// keyexprs - optional, list of key expressions, only messages matching this expression will be processed,
// the rest will be passed as is.
// threshold - mandatory, indicates the minimum interval between messages.
// All messages arriving with a smaller interval will be skipped
//
// TODO: we can also add decimation strategy, with mandatory "factor" parameter

pub(crate) struct EgressMsgDownsamplerRatelimit {
pub struct DownsamplerInterceptorRateLimit {
interfaces: Option<Vec<String>>,
keyexprs: Option<Vec<OwnedKeyExpr>>,
threshold: std::time::Duration,
latest_message_timestamp: AtomicInstant,
}

impl InterceptorTrait for EgressMsgDownsamplerRatelimit {
fn intercept(
&self,
ctx: RoutingContext<NetworkMessage>,
) -> Option<RoutingContext<NetworkMessage>> {
if let Some(cfg_keyexprs) = self.keyexprs.as_ref() {
let matched = ctx.full_key_expr().map_or(false, |keyexpr| {
cfg_keyexprs
.iter()
.any(|cfg_keyexpr| cfg_keyexpr.intersects(&keyexpr))
});

if !matched {
return Some(ctx);
}
}

let timestamp = std::time::Instant::now();

if timestamp - self.latest_message_timestamp.load(Ordering::Relaxed) >= self.threshold {
self.latest_message_timestamp
.store(timestamp, Ordering::Relaxed);
Some(ctx)
} else {
None
}
}
}

impl EgressMsgDownsamplerRatelimit {
pub fn new(conf: DownsamplingItemConf) -> Self {
impl DownsamplerInterceptorRateLimit {
pub fn new(conf: DownsamplingItemConf) -> ZResult<Self> {
if let Some(threshold_ms) = conf.threshold_ms {
log::debug!("DownsamplerInterceptor enabled: {:?}", conf);
let threshold = std::time::Duration::from_millis(threshold_ms);
Self {
Ok(Self {
interfaces: conf.interfaces,
keyexprs: conf.keyexprs,
threshold,
latest_message_timestamp: AtomicInstant::new(std::time::Instant::now() - threshold),
}
})
} else {
// TODO (sashacmc): how correctly process an error?
panic!("Rate limit downsampler shoud have a threshold_ms parameter");
Err(zerror!("Rate limit downsampler shoud have a threshold_ms parameter").into())
}
}
}

pub struct DownsamplerInterceptor {
conf: DownsamplingItemConf,
}

impl DownsamplerInterceptor {
pub fn new(conf: DownsamplingItemConf) -> Self {
log::debug!("DownsamplerInterceptor enabled: {:?}", conf);
Self { conf }
}
}

impl InterceptorFactoryTrait for DownsamplerInterceptor {
impl InterceptorFactoryTrait for DownsamplerInterceptorRateLimit {
fn new_transport_unicast(
&self,
transport: &TransportUnicast,
) -> (Option<IngressInterceptor>, Option<EgressInterceptor>) {
log::debug!("New downsampler transport unicast {:?}", transport);
if let Some(interfaces) = &self.conf.interfaces {
if let Some(interfaces) = &self.interfaces {
log::debug!(
"New downsampler transport unicast config interfaces: {:?}",
interfaces
Expand All @@ -123,23 +103,13 @@ impl InterceptorFactoryTrait for DownsamplerInterceptor {
}
};

let strategy = self
.conf
.strategy
.as_ref()
.map_or_else(|| RATELIMIT_STRATEGY.to_string(), |s| s.clone());

if strategy == RATELIMIT_STRATEGY {
(
None,
Some(Box::new(EgressMsgDownsamplerRatelimit::new(
self.conf.clone(),
))),
)
} else {
log::error!("Unsupported downsampling strategy: {}", strategy);
(None, None)
}
(
None,
Some(Box::new(EgressMsgDownsamplerRatelimit::new(
self.keyexprs.clone(),
self.threshold.clone(),
))),
)
}

fn new_transport_multicast(
Expand All @@ -153,3 +123,48 @@ impl InterceptorFactoryTrait for DownsamplerInterceptor {
None
}
}

pub(crate) struct EgressMsgDownsamplerRatelimit {
keyexprs: Option<Vec<OwnedKeyExpr>>,
threshold: std::time::Duration,
latest_message_timestamp: AtomicInstant,
}

impl InterceptorTrait for EgressMsgDownsamplerRatelimit {
fn intercept(
&self,
ctx: RoutingContext<NetworkMessage>,
) -> Option<RoutingContext<NetworkMessage>> {
if let Some(cfg_keyexprs) = self.keyexprs.as_ref() {
let matched = ctx.full_key_expr().map_or(false, |keyexpr| {
cfg_keyexprs
.iter()
.any(|cfg_keyexpr| cfg_keyexpr.intersects(&keyexpr))
});

if !matched {
return Some(ctx);
}
}

let timestamp = std::time::Instant::now();

if timestamp - self.latest_message_timestamp.load(Ordering::Relaxed) >= self.threshold {
self.latest_message_timestamp
.store(timestamp, Ordering::Relaxed);
Some(ctx)
} else {
None
}
}
}

impl EgressMsgDownsamplerRatelimit {
pub fn new(keyexprs: Option<Vec<OwnedKeyExpr>>, threshold: std::time::Duration) -> Self {
Self {
keyexprs,
threshold,
latest_message_timestamp: AtomicInstant::new(std::time::Instant::now() - threshold),
}
}
}
12 changes: 6 additions & 6 deletions zenoh/src/net/routing/interceptor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
use super::RoutingContext;
use zenoh_config::Config;
use zenoh_protocol::network::NetworkMessage;
use zenoh_result::ZResult;
use zenoh_transport::{multicast::TransportMulticast, unicast::TransportUnicast};

pub mod downsampling;
use crate::net::routing::interceptor::downsampling::DownsamplerInterceptor;
use crate::net::routing::interceptor::downsampling::downsampling_interceptor_factories;

pub(crate) trait InterceptorTrait {
fn intercept(
Expand All @@ -47,15 +48,14 @@ pub(crate) trait InterceptorFactoryTrait {

pub(crate) type InterceptorFactory = Box<dyn InterceptorFactoryTrait + Send + Sync>;

pub(crate) fn interceptor_factories(config: &Config) -> Vec<InterceptorFactory> {
pub(crate) fn interceptor_factories(config: &Config) -> ZResult<Vec<InterceptorFactory>> {
let mut res: Vec<InterceptorFactory> = vec![];

for ds in config.downsampling().items() {
res.push(Box::new(DownsamplerInterceptor::new(ds.clone())))
}
res.extend(downsampling_interceptor_factories(config.downsampling())?);

// Uncomment to log the interceptors initialisation
// res.push(Box::new(LoggerInterceptor {}));
res
Ok(res)
}

pub(crate) struct InterceptorsChain {
Expand Down
13 changes: 9 additions & 4 deletions zenoh/src/net/routing/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,20 @@ pub struct Router {
}

impl Router {
pub fn new(zid: ZenohId, whatami: WhatAmI, hlc: Option<Arc<HLC>>, config: &Config) -> Self {
Router {
pub fn new(
zid: ZenohId,
whatami: WhatAmI,
hlc: Option<Arc<HLC>>,
config: &Config,
) -> ZResult<Self> {
Ok(Router {
// whatami,
tables: Arc::new(TablesLock {
tables: RwLock::new(Tables::new(zid, whatami, hlc, config)),
tables: RwLock::new(Tables::new(zid, whatami, hlc, config)?),
ctrl_lock: Mutex::new(hat::new_hat(whatami, config)),
queries_lock: RwLock::new(()),
}),
}
})
}

#[allow(clippy::too_many_arguments)]
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/net/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl Runtime {
let hlc = (*unwrap_or_default!(config.timestamping().enabled().get(whatami)))
.then(|| Arc::new(HLCBuilder::new().with_id(uhlc::ID::from(&zid)).build()));

let router = Arc::new(Router::new(zid, whatami, hlc.clone(), &config));
let router = Arc::new(Router::new(zid, whatami, hlc.clone(), &config)?);

let handler = Arc::new(RuntimeTransportEventHandler {
runtime: std::sync::RwLock::new(None),
Expand Down
Loading

0 comments on commit 25621ea

Please sign in to comment.