From 2e48d819990966723247fb7de678ff9d4fef7d12 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Thu, 15 Feb 2024 09:23:31 +0100 Subject: [PATCH] ConsolidationMode rework --- commons/zenoh-codec/src/zenoh/query.rs | 8 ++---- commons/zenoh-codec/src/zenoh/reply.rs | 6 ++-- commons/zenoh-protocol/src/zenoh/query.rs | 18 +++++------- zenoh-ext/src/subscriber_ext.rs | 2 +- zenoh/src/liveliness.rs | 2 +- zenoh/src/net/routing/dispatcher/queries.rs | 4 +-- zenoh/src/query.rs | 31 ++++++++------------- zenoh/src/queryable.rs | 2 +- zenoh/src/session.rs | 12 ++++---- 9 files changed, 35 insertions(+), 50 deletions(-) diff --git a/commons/zenoh-codec/src/zenoh/query.rs b/commons/zenoh-codec/src/zenoh/query.rs index cb0506e474..efac7b5671 100644 --- a/commons/zenoh-codec/src/zenoh/query.rs +++ b/commons/zenoh-codec/src/zenoh/query.rs @@ -39,7 +39,6 @@ where Consolidation::None => 1, Consolidation::Monotonic => 2, Consolidation::Latest => 3, - Consolidation::Unique => 4, }; self.write(&mut *writer, v) } @@ -58,7 +57,6 @@ where 1 => Consolidation::None, 2 => Consolidation::Monotonic, 3 => Consolidation::Latest, - 4 => Consolidation::Unique, _ => Consolidation::Auto, // Fallback on Auto if Consolidation is unknown }; Ok(c) @@ -83,7 +81,7 @@ where // Header let mut header = id::QUERY; - if consolidation != &Consolidation::default() { + if consolidation != &Consolidation::DEFAULT { header |= flag::C; } if !parameters.is_empty() { @@ -99,7 +97,7 @@ where self.write(&mut *writer, header)?; // Body - if consolidation != &Consolidation::default() { + if consolidation != &Consolidation::DEFAULT { self.write(&mut *writer, *consolidation)?; } if !parameters.is_empty() { @@ -153,7 +151,7 @@ where } // Body - let mut consolidation = Consolidation::default(); + let mut consolidation = Consolidation::DEFAULT; if imsg::has_flag(self.header, flag::C) { consolidation = self.codec.read(&mut *reader)?; } diff --git a/commons/zenoh-codec/src/zenoh/reply.rs b/commons/zenoh-codec/src/zenoh/reply.rs index d54e98cc5e..308004a1c2 100644 --- a/commons/zenoh-codec/src/zenoh/reply.rs +++ b/commons/zenoh-codec/src/zenoh/reply.rs @@ -41,7 +41,7 @@ where // Header let mut header = id::REPLY; - if consolidation != &Consolidation::default() { + if consolidation != &Consolidation::DEFAULT { header |= flag::C; } let mut n_exts = ext_unknown.len() as u8; @@ -51,7 +51,7 @@ where self.write(&mut *writer, header)?; // Body - if consolidation != &Consolidation::default() { + if consolidation != &Consolidation::DEFAULT { self.write(&mut *writer, *consolidation)?; } @@ -93,7 +93,7 @@ where } // Body - let mut consolidation = Consolidation::default(); + let mut consolidation = Consolidation::DEFAULT; if imsg::has_flag(self.header, flag::C) { consolidation = self.codec.read(&mut *reader)?; } diff --git a/commons/zenoh-protocol/src/zenoh/query.rs b/commons/zenoh-protocol/src/zenoh/query.rs index 17dfa23df8..9beaecc1df 100644 --- a/commons/zenoh-protocol/src/zenoh/query.rs +++ b/commons/zenoh-protocol/src/zenoh/query.rs @@ -33,25 +33,21 @@ pub enum Consolidation { Monotonic, /// Holds back samples to only send the set of samples that had the highest timestamp for their key. Latest, - /// Remove the duplicates of any samples based on the their timestamp. - Unique, + // Remove the duplicates of any samples based on the their timestamp. + // Unique, } impl Consolidation { + pub const DEFAULT: Self = Self::Auto; + #[cfg(feature = "test")] pub fn rand() -> Self { use rand::prelude::SliceRandom; let mut rng = rand::thread_rng(); - *[ - Self::None, - Self::Monotonic, - Self::Latest, - Self::Unique, - Self::Auto, - ] - .choose(&mut rng) - .unwrap() + *[Self::None, Self::Monotonic, Self::Latest, Self::Auto] + .choose(&mut rng) + .unwrap() } } diff --git a/zenoh-ext/src/subscriber_ext.rs b/zenoh-ext/src/subscriber_ext.rs index a2987f8833..15d7122745 100644 --- a/zenoh-ext/src/subscriber_ext.rs +++ b/zenoh-ext/src/subscriber_ext.rs @@ -338,7 +338,7 @@ impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler> origin: Locality::default(), query_selector: None, query_target: QueryTarget::default(), - query_consolidation: QueryConsolidation::default(), + query_consolidation: QueryConsolidation::DEFAULT, query_accept_replies: ReplyKeyExpr::MatchingQuery, query_timeout: Duration::from_secs(10), handler: self.handler, diff --git a/zenoh/src/liveliness.rs b/zenoh/src/liveliness.rs index 0883041bb7..a41ef4ebc7 100644 --- a/zenoh/src/liveliness.rs +++ b/zenoh/src/liveliness.rs @@ -748,7 +748,7 @@ where &self.key_expr?.into(), &Some(KeyExpr::from(*KE_PREFIX_LIVELINESS)), QueryTarget::default(), - QueryConsolidation::default(), + QueryConsolidation::DEFAULT, Locality::default(), self.timeout, None, diff --git a/zenoh/src/net/routing/dispatcher/queries.rs b/zenoh/src/net/routing/dispatcher/queries.rs index a6748650ab..a78ea67cbc 100644 --- a/zenoh/src/net/routing/dispatcher/queries.rs +++ b/zenoh/src/net/routing/dispatcher/queries.rs @@ -555,8 +555,8 @@ pub fn route_query( for (wexpr, payload) in local_replies { let payload = ResponseBody::Reply(Reply { - consolidation: Consolidation::default(), // @TODO: handle Del case - ext_unknown: vec![], // @TODO: handle unknown extensions + consolidation: Consolidation::DEFAULT, // @TODO: handle Del case + ext_unknown: vec![], // @TODO: handle unknown extensions payload: ReplyBody::Put(Put { // @TODO: handle Del case timestamp: None, // @TODO: handle timestamp diff --git a/zenoh/src/query.rs b/zenoh/src/query.rs index c4f3fb35e9..3c71afd1f3 100644 --- a/zenoh/src/query.rs +++ b/zenoh/src/query.rs @@ -23,47 +23,38 @@ use std::collections::HashMap; use std::future::Ready; use std::time::Duration; use zenoh_core::{AsyncResolve, Resolvable, SyncResolve}; +use zenoh_protocol::zenoh::query::Consolidation; use zenoh_result::ZResult; /// The [`Queryable`](crate::queryable::Queryable)s that should be target of a [`get`](Session::get). pub use zenoh_protocol::core::QueryTarget; /// The kind of consolidation. -pub use zenoh_protocol::core::ConsolidationMode; - -/// The operation: either manual or automatic. -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub enum Mode { - Auto, - Manual(T), -} +pub type ConsolidationMode = Consolidation; /// The replies consolidation strategy to apply on replies to a [`get`](Session::get). #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub struct QueryConsolidation { - pub(crate) mode: Mode, + pub(crate) mode: ConsolidationMode, } impl QueryConsolidation { /// Automatic query consolidation strategy selection. - pub const AUTO: Self = Self { mode: Mode::Auto }; + pub const AUTO: Self = Self { + mode: ConsolidationMode::Auto, + }; + pub const DEFAULT: Self = Self::AUTO; pub(crate) const fn from_mode(mode: ConsolidationMode) -> Self { - Self { - mode: Mode::Manual(mode), - } + Self { mode } } /// Returns the requested [`ConsolidationMode`]. - pub fn mode(&self) -> Mode { + pub fn mode(&self) -> ConsolidationMode { self.mode } } -impl From> for QueryConsolidation { - fn from(mode: Mode) -> Self { - Self { mode } - } -} + impl From for QueryConsolidation { fn from(mode: ConsolidationMode) -> Self { Self::from_mode(mode) @@ -72,7 +63,7 @@ impl From for QueryConsolidation { impl Default for QueryConsolidation { fn default() -> Self { - QueryConsolidation::AUTO + QueryConsolidation::DEFAULT } } diff --git a/zenoh/src/queryable.rs b/zenoh/src/queryable.rs index 4e9f4914dd..dec83c5b29 100644 --- a/zenoh/src/queryable.rs +++ b/zenoh/src/queryable.rs @@ -241,7 +241,7 @@ impl SyncResolve for ReplyBuilder<'_> { mapping: Mapping::Sender, }, payload: ResponseBody::Reply(zenoh::Reply { - consolidation: zenoh::Consolidation::default(), + consolidation: zenoh::Consolidation::DEFAULT, ext_unknown: vec![], payload: match kind { SampleKind::Put => ReplyBody::Put(Put { diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 46cfd5e499..2416ded7cd 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -776,7 +776,7 @@ impl Session { selector, scope: Ok(None), target: QueryTarget::default(), - consolidation: QueryConsolidation::default(), + consolidation: QueryConsolidation::DEFAULT, destination: Locality::default(), timeout, value: None, @@ -1728,14 +1728,14 @@ impl Session { log::trace!("get({}, {:?}, {:?})", selector, target, consolidation); let mut state = zwrite!(self.state); let consolidation = match consolidation.mode { - Mode::Auto => { + ConsolidationMode::Auto => { if selector.decode().any(|(k, _)| k.as_ref() == TIME_RANGE_KEY) { ConsolidationMode::None } else { ConsolidationMode::Latest } } - Mode::Manual(mode) => mode, + mode => mode, }; let qid = state.qid_counter.fetch_add(1, Ordering::SeqCst); let nb_final = match destination { @@ -1808,7 +1808,7 @@ impl Session { ext_budget: None, ext_timeout: Some(timeout), payload: RequestBody::Query(zenoh_protocol::zenoh::Query { - consolidation: consolidation.into(), + consolidation, parameters: selector.parameters().to_string(), ext_sinfo: None, ext_body: value.as_ref().map(|v| query::ext::QueryBodyType { @@ -1829,7 +1829,7 @@ impl Session { selector.parameters(), qid, target, - consolidation.into(), + consolidation, value.as_ref().map(|v| query::ext::QueryBodyType { #[cfg(feature = "shared-memory")] ext_shm: None, @@ -2446,7 +2446,7 @@ impl Primitives for Session { } } } - ConsolidationMode::Latest => { + Consolidation::Auto | ConsolidationMode::Latest => { match query.replies.as_ref().unwrap().get( new_reply.sample.as_ref().unwrap().key_expr.as_keyexpr(), ) {