Skip to content

Commit

Permalink
ConsolidationMode rework
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Feb 15, 2024
1 parent efe1135 commit 2e48d81
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 50 deletions.
8 changes: 3 additions & 5 deletions commons/zenoh-codec/src/zenoh/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ where
Consolidation::None => 1,
Consolidation::Monotonic => 2,
Consolidation::Latest => 3,
Consolidation::Unique => 4,
};
self.write(&mut *writer, v)
}
Expand All @@ -58,7 +57,6 @@ where
1 => Consolidation::None,
2 => Consolidation::Monotonic,
3 => Consolidation::Latest,
4 => Consolidation::Unique,
_ => Consolidation::Auto, // Fallback on Auto if Consolidation is unknown
};
Ok(c)
Expand All @@ -83,7 +81,7 @@ where

// Header
let mut header = id::QUERY;
if consolidation != &Consolidation::default() {
if consolidation != &Consolidation::DEFAULT {
header |= flag::C;
}
if !parameters.is_empty() {
Expand All @@ -99,7 +97,7 @@ where
self.write(&mut *writer, header)?;

// Body
if consolidation != &Consolidation::default() {
if consolidation != &Consolidation::DEFAULT {
self.write(&mut *writer, *consolidation)?;
}
if !parameters.is_empty() {
Expand Down Expand Up @@ -153,7 +151,7 @@ where
}

// Body
let mut consolidation = Consolidation::default();
let mut consolidation = Consolidation::DEFAULT;
if imsg::has_flag(self.header, flag::C) {
consolidation = self.codec.read(&mut *reader)?;
}
Expand Down
6 changes: 3 additions & 3 deletions commons/zenoh-codec/src/zenoh/reply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ where

// Header
let mut header = id::REPLY;
if consolidation != &Consolidation::default() {
if consolidation != &Consolidation::DEFAULT {
header |= flag::C;
}
let mut n_exts = ext_unknown.len() as u8;
Expand All @@ -51,7 +51,7 @@ where
self.write(&mut *writer, header)?;

// Body
if consolidation != &Consolidation::default() {
if consolidation != &Consolidation::DEFAULT {
self.write(&mut *writer, *consolidation)?;
}

Expand Down Expand Up @@ -93,7 +93,7 @@ where
}

// Body
let mut consolidation = Consolidation::default();
let mut consolidation = Consolidation::DEFAULT;
if imsg::has_flag(self.header, flag::C) {
consolidation = self.codec.read(&mut *reader)?;
}
Expand Down
18 changes: 7 additions & 11 deletions commons/zenoh-protocol/src/zenoh/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,21 @@ pub enum Consolidation {
Monotonic,
/// Holds back samples to only send the set of samples that had the highest timestamp for their key.
Latest,
/// Remove the duplicates of any samples based on the their timestamp.
Unique,
// Remove the duplicates of any samples based on the their timestamp.
// Unique,
}

impl Consolidation {
pub const DEFAULT: Self = Self::Auto;

#[cfg(feature = "test")]
pub fn rand() -> Self {
use rand::prelude::SliceRandom;
let mut rng = rand::thread_rng();

*[
Self::None,
Self::Monotonic,
Self::Latest,
Self::Unique,
Self::Auto,
]
.choose(&mut rng)
.unwrap()
*[Self::None, Self::Monotonic, Self::Latest, Self::Auto]
.choose(&mut rng)
.unwrap()
}
}

Expand Down
2 changes: 1 addition & 1 deletion zenoh-ext/src/subscriber_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler>
origin: Locality::default(),
query_selector: None,
query_target: QueryTarget::default(),
query_consolidation: QueryConsolidation::default(),
query_consolidation: QueryConsolidation::DEFAULT,
query_accept_replies: ReplyKeyExpr::MatchingQuery,
query_timeout: Duration::from_secs(10),
handler: self.handler,
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ where
&self.key_expr?.into(),
&Some(KeyExpr::from(*KE_PREFIX_LIVELINESS)),
QueryTarget::default(),
QueryConsolidation::default(),
QueryConsolidation::DEFAULT,
Locality::default(),
self.timeout,
None,
Expand Down
4 changes: 2 additions & 2 deletions zenoh/src/net/routing/dispatcher/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,8 +555,8 @@ pub fn route_query(

for (wexpr, payload) in local_replies {
let payload = ResponseBody::Reply(Reply {
consolidation: Consolidation::default(), // @TODO: handle Del case
ext_unknown: vec![], // @TODO: handle unknown extensions
consolidation: Consolidation::DEFAULT, // @TODO: handle Del case
ext_unknown: vec![], // @TODO: handle unknown extensions
payload: ReplyBody::Put(Put {
// @TODO: handle Del case
timestamp: None, // @TODO: handle timestamp
Expand Down
31 changes: 11 additions & 20 deletions zenoh/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,47 +23,38 @@ use std::collections::HashMap;
use std::future::Ready;
use std::time::Duration;
use zenoh_core::{AsyncResolve, Resolvable, SyncResolve};
use zenoh_protocol::zenoh::query::Consolidation;
use zenoh_result::ZResult;

/// The [`Queryable`](crate::queryable::Queryable)s that should be target of a [`get`](Session::get).
pub use zenoh_protocol::core::QueryTarget;

/// The kind of consolidation.
pub use zenoh_protocol::core::ConsolidationMode;

/// The operation: either manual or automatic.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Mode<T> {
Auto,
Manual(T),
}
pub type ConsolidationMode = Consolidation;

/// The replies consolidation strategy to apply on replies to a [`get`](Session::get).
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct QueryConsolidation {
pub(crate) mode: Mode<ConsolidationMode>,
pub(crate) mode: ConsolidationMode,
}

impl QueryConsolidation {
/// Automatic query consolidation strategy selection.
pub const AUTO: Self = Self { mode: Mode::Auto };
pub const AUTO: Self = Self {
mode: ConsolidationMode::Auto,
};
pub const DEFAULT: Self = Self::AUTO;

pub(crate) const fn from_mode(mode: ConsolidationMode) -> Self {
Self {
mode: Mode::Manual(mode),
}
Self { mode }
}

/// Returns the requested [`ConsolidationMode`].
pub fn mode(&self) -> Mode<ConsolidationMode> {
pub fn mode(&self) -> ConsolidationMode {
self.mode
}
}
impl From<Mode<ConsolidationMode>> for QueryConsolidation {
fn from(mode: Mode<ConsolidationMode>) -> Self {
Self { mode }
}
}

impl From<ConsolidationMode> for QueryConsolidation {
fn from(mode: ConsolidationMode) -> Self {
Self::from_mode(mode)
Expand All @@ -72,7 +63,7 @@ impl From<ConsolidationMode> for QueryConsolidation {

impl Default for QueryConsolidation {
fn default() -> Self {
QueryConsolidation::AUTO
QueryConsolidation::DEFAULT
}
}

Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ impl SyncResolve for ReplyBuilder<'_> {
mapping: Mapping::Sender,
},
payload: ResponseBody::Reply(zenoh::Reply {
consolidation: zenoh::Consolidation::default(),
consolidation: zenoh::Consolidation::DEFAULT,
ext_unknown: vec![],
payload: match kind {
SampleKind::Put => ReplyBody::Put(Put {
Expand Down
12 changes: 6 additions & 6 deletions zenoh/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ impl Session {
selector,
scope: Ok(None),
target: QueryTarget::default(),
consolidation: QueryConsolidation::default(),
consolidation: QueryConsolidation::DEFAULT,
destination: Locality::default(),
timeout,
value: None,
Expand Down Expand Up @@ -1728,14 +1728,14 @@ impl Session {
log::trace!("get({}, {:?}, {:?})", selector, target, consolidation);
let mut state = zwrite!(self.state);
let consolidation = match consolidation.mode {
Mode::Auto => {
ConsolidationMode::Auto => {
if selector.decode().any(|(k, _)| k.as_ref() == TIME_RANGE_KEY) {
ConsolidationMode::None
} else {
ConsolidationMode::Latest
}
}
Mode::Manual(mode) => mode,
mode => mode,
};
let qid = state.qid_counter.fetch_add(1, Ordering::SeqCst);
let nb_final = match destination {
Expand Down Expand Up @@ -1808,7 +1808,7 @@ impl Session {
ext_budget: None,
ext_timeout: Some(timeout),
payload: RequestBody::Query(zenoh_protocol::zenoh::Query {
consolidation: consolidation.into(),
consolidation,
parameters: selector.parameters().to_string(),
ext_sinfo: None,
ext_body: value.as_ref().map(|v| query::ext::QueryBodyType {
Expand All @@ -1829,7 +1829,7 @@ impl Session {
selector.parameters(),
qid,
target,
consolidation.into(),
consolidation,
value.as_ref().map(|v| query::ext::QueryBodyType {
#[cfg(feature = "shared-memory")]
ext_shm: None,
Expand Down Expand Up @@ -2446,7 +2446,7 @@ impl Primitives for Session {
}
}
}
ConsolidationMode::Latest => {
Consolidation::Auto | ConsolidationMode::Latest => {
match query.replies.as_ref().unwrap().get(
new_reply.sample.as_ref().unwrap().key_expr.as_keyexpr(),
) {
Expand Down

0 comments on commit 2e48d81

Please sign in to comment.