Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConsolidationMode can be Auto #738

Merged
merged 3 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions commons/zenoh-codec/src/zenoh/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ where
Consolidation::None => 1,
Consolidation::Monotonic => 2,
Consolidation::Latest => 3,
Consolidation::Unique => 4,
};
self.write(&mut *writer, v)
}
Expand All @@ -58,7 +57,6 @@ where
1 => Consolidation::None,
2 => Consolidation::Monotonic,
3 => Consolidation::Latest,
4 => Consolidation::Unique,
_ => Consolidation::Auto, // Fallback on Auto if Consolidation is unknown
};
Ok(c)
Expand Down
16 changes: 5 additions & 11 deletions commons/zenoh-protocol/src/zenoh/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ pub enum Consolidation {
Monotonic,
/// Holds back samples to only send the set of samples that had the highest timestamp for their key.
Latest,
/// Remove the duplicates of any samples based on the their timestamp.
Unique,
// Remove the duplicates of any samples based on the their timestamp.
// Unique,
}

impl Consolidation {
Expand All @@ -45,15 +45,9 @@ impl Consolidation {
use rand::prelude::SliceRandom;
let mut rng = rand::thread_rng();

*[
Self::None,
Self::Monotonic,
Self::Latest,
Self::Unique,
Self::Auto,
]
.choose(&mut rng)
.unwrap()
*[Self::None, Self::Monotonic, Self::Latest, Self::Auto]
.choose(&mut rng)
.unwrap()
}
}

Expand Down
45 changes: 8 additions & 37 deletions zenoh/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,13 @@ use std::collections::HashMap;
use std::future::Ready;
use std::time::Duration;
use zenoh_core::{AsyncResolve, Resolvable, SyncResolve};
use zenoh_protocol::zenoh::query::Consolidation;
use zenoh_result::ZResult;

/// The [`Queryable`](crate::queryable::Queryable)s that should be target of a [`get`](Session::get).
pub type QueryTarget = zenoh_protocol::network::request::ext::TargetType;

/// The kind of consolidation.
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
pub enum ConsolidationMode {
/// No consolidation applied: multiple samples may be received for the same key-timestamp.
None,
/// Monotonic consolidation immediately forwards samples, except if one with an equal or more recent timestamp
/// has already been sent with the same key.
///
/// This optimizes latency while potentially reducing bandwidth.
///
/// Note that this doesn't cause re-ordering, but drops the samples for which a more recent timestamp has already
/// been observed with the same key.
Monotonic,
/// Holds back samples to only send the set of samples that had the highest timestamp for their key.
Latest,
}

impl From<ConsolidationMode> for Consolidation {
fn from(val: ConsolidationMode) -> Self {
match val {
ConsolidationMode::None => Consolidation::None,
ConsolidationMode::Monotonic => Consolidation::Monotonic,
ConsolidationMode::Latest => Consolidation::Latest,
}
}
}
pub type ConsolidationMode = zenoh_protocol::zenoh::query::Consolidation;

/// The operation: either manual or automatic.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
Expand All @@ -65,30 +40,26 @@ pub enum Mode<T> {
/// The replies consolidation strategy to apply on replies to a [`get`](Session::get).
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct QueryConsolidation {
pub(crate) mode: Mode<ConsolidationMode>,
pub(crate) mode: ConsolidationMode,
}

impl QueryConsolidation {
pub const DEFAULT: Self = Self::AUTO;
/// Automatic query consolidation strategy selection.
pub const AUTO: Self = Self { mode: Mode::Auto };
pub const AUTO: Self = Self {
mode: ConsolidationMode::Auto,
};

pub(crate) const fn from_mode(mode: ConsolidationMode) -> Self {
Self {
mode: Mode::Manual(mode),
}
Self { mode }
}

/// Returns the requested [`ConsolidationMode`].
pub fn mode(&self) -> Mode<ConsolidationMode> {
pub fn mode(&self) -> ConsolidationMode {
self.mode
}
}
impl From<Mode<ConsolidationMode>> for QueryConsolidation {
fn from(mode: Mode<ConsolidationMode>) -> Self {
Self { mode }
}
}

impl From<ConsolidationMode> for QueryConsolidation {
fn from(mode: ConsolidationMode) -> Self {
Self::from_mode(mode)
Expand Down
10 changes: 5 additions & 5 deletions zenoh/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1728,14 +1728,14 @@ impl Session {
log::trace!("get({}, {:?}, {:?})", selector, target, consolidation);
let mut state = zwrite!(self.state);
let consolidation = match consolidation.mode {
Mode::Auto => {
ConsolidationMode::Auto => {
if selector.decode().any(|(k, _)| k.as_ref() == TIME_RANGE_KEY) {
ConsolidationMode::None
} else {
ConsolidationMode::Latest
}
}
Mode::Manual(mode) => mode,
mode => mode,
};
let qid = state.qid_counter.fetch_add(1, Ordering::SeqCst);
let nb_final = match destination {
Expand Down Expand Up @@ -1808,7 +1808,7 @@ impl Session {
ext_budget: None,
ext_timeout: Some(timeout),
payload: RequestBody::Query(zenoh_protocol::zenoh::Query {
consolidation: consolidation.into(),
consolidation,
parameters: selector.parameters().to_string(),
ext_sinfo: None,
ext_body: value.as_ref().map(|v| query::ext::QueryBodyType {
Expand All @@ -1829,7 +1829,7 @@ impl Session {
selector.parameters(),
qid,
target,
consolidation.into(),
consolidation,
value.as_ref().map(|v| query::ext::QueryBodyType {
#[cfg(feature = "shared-memory")]
ext_shm: None,
Expand Down Expand Up @@ -2441,7 +2441,7 @@ impl Primitives for Session {
}
}
}
ConsolidationMode::Latest => {
Consolidation::Auto | ConsolidationMode::Latest => {
match query.replies.as_ref().unwrap().get(
new_reply.sample.as_ref().unwrap().key_expr.as_keyexpr(),
) {
Expand Down