Skip to content

Commit

Permalink
ConsolidationMode moved into the API
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Feb 9, 2024
1 parent 89555f1 commit d3da646
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 16 deletions.
14 changes: 3 additions & 11 deletions commons/zenoh-protocol/src/zenoh/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use crate::{common::ZExtUnknown, core::ConsolidationMode};
use crate::common::ZExtUnknown;
use alloc::{string::String, vec::Vec};

/// The kind of consolidation.
Expand All @@ -38,6 +38,8 @@ pub enum Consolidation {
}

impl Consolidation {
pub const DEFAULT: Self = Self::Auto;

#[cfg(feature = "test")]
pub fn rand() -> Self {
use rand::prelude::SliceRandom;
Expand All @@ -55,16 +57,6 @@ impl Consolidation {
}
}

impl From<ConsolidationMode> for Consolidation {
fn from(val: ConsolidationMode) -> Self {
match val {
ConsolidationMode::None => Consolidation::None,
ConsolidationMode::Monotonic => Consolidation::Monotonic,
ConsolidationMode::Latest => Consolidation::Latest,
}
}
}

/// # Query message
///
/// ```text
Expand Down
2 changes: 1 addition & 1 deletion zenoh-ext/src/subscriber_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ impl<'a, 'b, Handler> SubscriberBuilderExt<'a, 'b, Handler>
origin: Locality::default(),
query_selector: None,
query_target: QueryTarget::default(),
query_consolidation: QueryConsolidation::default(),
query_consolidation: QueryConsolidation::DEFAULT,
query_accept_replies: ReplyKeyExpr::MatchingQuery,
query_timeout: Duration::from_secs(10),
handler: self.handler,
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ where
&self.key_expr?.into(),
&Some(KeyExpr::from(*KE_PREFIX_LIVELINESS)),
QueryTarget::default(),
QueryConsolidation::default(),
QueryConsolidation::DEFAULT,
Locality::default(),
self.timeout,
None,
Expand Down
30 changes: 28 additions & 2 deletions zenoh/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,38 @@ use std::collections::HashMap;
use std::future::Ready;
use std::time::Duration;
use zenoh_core::{AsyncResolve, Resolvable, SyncResolve};
use zenoh_protocol::zenoh::query::Consolidation;
use zenoh_result::ZResult;

/// The [`Queryable`](crate::queryable::Queryable)s that should be target of a [`get`](Session::get).
pub use zenoh_protocol::core::QueryTarget;

/// The kind of consolidation.
pub use zenoh_protocol::core::ConsolidationMode;
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
pub enum ConsolidationMode {
/// No consolidation applied: multiple samples may be received for the same key-timestamp.
None,
/// Monotonic consolidation immediately forwards samples, except if one with an equal or more recent timestamp
/// has already been sent with the same key.
///
/// This optimizes latency while potentially reducing bandwidth.
///
/// Note that this doesn't cause re-ordering, but drops the samples for which a more recent timestamp has already
/// been observed with the same key.
Monotonic,
/// Holds back samples to only send the set of samples that had the highest timestamp for their key.
Latest,
}

impl From<ConsolidationMode> for Consolidation {
fn from(val: ConsolidationMode) -> Self {
match val {
ConsolidationMode::None => Consolidation::None,
ConsolidationMode::Monotonic => Consolidation::Monotonic,
ConsolidationMode::Latest => Consolidation::Latest,
}
}
}

/// The operation: either manual or automatic.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
Expand All @@ -45,6 +70,7 @@ pub struct QueryConsolidation {
}

impl QueryConsolidation {
pub const DEFAULT: Self = Self::AUTO;
/// Automatic query consolidation strategy selection.
pub const AUTO: Self = Self { mode: Mode::Auto };

Expand Down Expand Up @@ -72,7 +98,7 @@ impl From<ConsolidationMode> for QueryConsolidation {

impl Default for QueryConsolidation {
fn default() -> Self {
QueryConsolidation::AUTO
Self::DEFAULT
}
}

Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ impl Session {
selector,
scope: Ok(None),
target: QueryTarget::default(),
consolidation: QueryConsolidation::default(),
consolidation: QueryConsolidation::DEFAULT,
destination: Locality::default(),
timeout,
value: None,
Expand Down

0 comments on commit d3da646

Please sign in to comment.