Skip to content

Commit

Permalink
Add multiply keyexprs/interfaces support
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Jan 31, 2024
1 parent 42cf252 commit 9784647
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 24 deletions.
7 changes: 5 additions & 2 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ pub type SecretValue = Secret<SecretString>;

#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct DownsamplerConf {
pub keyexpr: Option<OwnedKeyExpr>,
pub interface: Option<String>,
/// A list of key-expressions to which the downsampling will be applied.
pub keyexprs: Option<Vec<OwnedKeyExpr>>,
/// A list of interfaces to which the downsampling will be applied.
pub interfaces: Option<Vec<String>>,
/// Downsampling strategy.
pub strategy: Option<String>,
pub threshold_ms: Option<u64>,
}
Expand Down
38 changes: 22 additions & 16 deletions zenoh/src/net/routing/interceptor/downsampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl InterceptorTrait for IngressMsgDownsampler {
}

pub(crate) struct EgressMsgDownsampler {
keyexpr: Option<OwnedKeyExpr>,
keyexprs: Option<Vec<OwnedKeyExpr>>,
threshold: std::time::Duration,
latest_message_timestamp: Arc<Mutex<std::time::Instant>>,
}
Expand All @@ -47,12 +47,16 @@ impl InterceptorTrait for EgressMsgDownsampler {
&self,
ctx: RoutingContext<NetworkMessage>,
) -> Option<RoutingContext<NetworkMessage>> {
if let Some(cfg_keyexpr) = self.keyexpr.as_ref() {
if let Some(cfg_keyexprs) = self.keyexprs.as_ref() {
let mut matched = false;
if let Some(keyexpr) = ctx.full_key_expr() {
if !cfg_keyexpr.intersects(&keyexpr) {
return Some(ctx);
for cfg_keyexpr in cfg_keyexprs {
if cfg_keyexpr.intersects(&keyexpr) {
matched = true;
}
}
} else {
}
if !matched {
return Some(ctx);
}
}
Expand All @@ -76,7 +80,7 @@ impl EgressMsgDownsampler {
if let Some(threshold_ms) = conf.threshold_ms {
let threshold = std::time::Duration::from_millis(threshold_ms);
Self {
keyexpr: conf.keyexpr,
keyexprs: conf.keyexprs,
threshold,
// TODO (sashacmc): I need just := 0, but how???
latest_message_timestamp: Arc::new(Mutex::new(
Expand Down Expand Up @@ -106,21 +110,23 @@ impl InterceptorFactoryTrait for DownsamplerInterceptor {
&self,
transport: &TransportUnicast,
) -> (Option<IngressInterceptor>, Option<EgressInterceptor>) {
log::debug!("New transport unicast {:?}", transport);
if let Some(interface) = self.conf.interface.clone() {
log::debug!("New downsampler transport unicast interface: {}", interface);
print!("New downsampler transport unicast interface: {}", interface);
log::debug!("New downsampler transport unicast {:?}", transport);
if let Some(interfaces) = &self.conf.interfaces {
log::debug!(
"New downsampler transport unicast config interfaces: {:?}",
interfaces
);
if let Ok(links) = transport.get_links() {
for link in links {
log::debug!(
"New downsampler transport unicast interfaces: {:?}",
link.interfaces
);
print!(
"New downsampler transport unicast interfaces: {:?}",
"New downsampler transport unicast link interfaces: {:?}",
link.interfaces
);
if !link.interfaces.contains(&interface) {
if !link
.interfaces
.iter()
.any(|interface| interfaces.contains(&interface))
{
return (None, None);
}
}
Expand Down
12 changes: 6 additions & 6 deletions zenoh/tests/interceptors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ fn downsampling_by_keyexpr() {
r#"
[
{
keyexpr: "test/downsamples_by_keyexp/r100",
keyexprs: ["test/downsamples_by_keyexp/r100"],
threshold_ms: 100,
},
{
keyexpr: "test/downsamples_by_keyexp/r50",
keyexprs: ["test/downsamples_by_keyexp/r50"],
threshold_ms: 50,
},
]
Expand Down Expand Up @@ -138,13 +138,13 @@ fn downsampling_by_interface() {
r#"
[
{
keyexpr: "test/downsamples_by_interface/r100",
interface: "lo",
keyexprs: ["test/downsamples_by_interface/r100"],
interfaces: ["lo", "lo0"],
threshold_ms: 100,
},
{
keyexpr: "test/downsamples_by_interface/all",
interface: "some_unknown_interface",
keyexprs: ["test/downsamples_by_interface/all"],
interfaces: ["some_unknown_interface"],
threshold_ms: 100,
},
]
Expand Down

0 comments on commit 9784647

Please sign in to comment.