Skip to content

Commit

Permalink
resolvable removed from simple builders
Browse files Browse the repository at this point in the history
  • Loading branch information
milyin committed Mar 28, 2024
1 parent 5bbef9c commit c427ac7
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 94 deletions.
5 changes: 2 additions & 3 deletions plugins/zenoh-plugin-storage-manager/src/replica/aligner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use zenoh::prelude::r#async::*;
use zenoh::sample_builder::{PutSampleBuilder, TimestampBuilderTrait, ValueBuilderTrait};
use zenoh::time::Timestamp;
use zenoh::Session;
use zenoh_core::{AsyncResolve, SyncResolve};

pub struct Aligner {
session: Arc<Session>,
Expand Down Expand Up @@ -113,7 +112,7 @@ impl Aligner {
let sample = PutSampleBuilder::new(key, payload)
.encoding(encoding)
.timestamp(ts)
.res_sync();
.into();
log::debug!("[ALIGNER] Adding {:?} to storage", sample);
self.tx_sample.send_async(sample).await.unwrap_or_else(|e| {
log::error!("[ALIGNER] Error adding sample to storage: {}", e)
Expand Down Expand Up @@ -331,7 +330,7 @@ impl Aligner {
.get(&selector)
.consolidation(zenoh::query::ConsolidationMode::None)
.accept_replies(zenoh::query::ReplyKeyExpr::Any)
.res_async()
.res()
.await
{
Ok(replies) => {
Expand Down
29 changes: 12 additions & 17 deletions plugins/zenoh-plugin-storage-manager/src/replica/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
use zenoh::buffers::buffer::SplitBuffer;
use zenoh::buffers::ZBuf;
use zenoh::key_expr::KeyExpr;
use zenoh::prelude::r#async::*;
use zenoh::query::{ConsolidationMode, QueryTarget};
use zenoh::sample::{Sample, SampleKind};
use zenoh::sample_builder::{
Expand All @@ -34,7 +35,6 @@ use zenoh::value::Value;
use zenoh::{Result as ZResult, Session, SessionDeclarations};
use zenoh_backend_traits::config::{GarbageCollectionConfig, StorageConfig};
use zenoh_backend_traits::{Capability, History, Persistence, StorageInsertionResult, StoredData};
use zenoh_core::{AsyncResolve, SyncResolve};
use zenoh_keyexpr::key_expr::OwnedKeyExpr;
use zenoh_keyexpr::keyexpr_tree::impls::KeyedSetProvider;
use zenoh_keyexpr::keyexpr_tree::{support::NonWild, support::UnknownWildness, KeBoxTree};
Expand Down Expand Up @@ -144,12 +144,7 @@ impl StorageService {
t.add_async(gc).await;

// subscribe on key_expr
let storage_sub = match self
.session
.declare_subscriber(&self.key_expr)
.res_async()
.await
{
let storage_sub = match self.session.declare_subscriber(&self.key_expr).res().await {
Ok(storage_sub) => storage_sub,
Err(e) => {
log::error!("Error starting storage '{}': {}", self.name, e);
Expand All @@ -162,7 +157,7 @@ impl StorageService {
.session
.declare_queryable(&self.key_expr)
.complete(self.complete)
.res_async()
.res()
.await
{
Ok(storage_queryable) => storage_queryable,
Expand Down Expand Up @@ -239,7 +234,7 @@ impl StorageService {
}
};
let timestamp = sample.timestamp().cloned().unwrap_or(new_reception_timestamp());
let sample = SampleBuilder::from(sample).timestamp(timestamp).res_sync();
let sample = SampleBuilder::from(sample).timestamp(timestamp).into();
self.process_sample(sample).await;
},
// on query on key_expr
Expand Down Expand Up @@ -303,7 +298,7 @@ impl StorageService {
);
// there might be the case that the actual update was outdated due to a wild card update, but not stored yet in the storage.
// get the relevant wild card entry and use that value and timestamp to update the storage
let sample_to_store = match self
let sample_to_store: Sample = match self
.ovderriding_wild_update(&k, sample.timestamp().unwrap())
.await
{
Expand All @@ -317,17 +312,17 @@ impl StorageService {
PutSampleBuilder::new(KeyExpr::from(k.clone()), payload)
.encoding(encoding)
.timestamp(data.timestamp)
.res_sync()
.into()
}
Some(Update {
kind: SampleKind::Delete,
data,
}) => DeleteSampleBuilder::new(KeyExpr::from(k.clone()))
.timestamp(data.timestamp)
.res_sync(),
.into(),
None => SampleBuilder::from(sample.clone())
.keyexpr(k.clone())
.res_sync(),
.into(),
};

let stripped_key = match self.strip_prefix(sample_to_store.key_expr()) {
Expand Down Expand Up @@ -534,7 +529,7 @@ impl StorageService {
.reply(key.clone(), payload)
.encoding(encoding)
.timestamp(entry.timestamp)
.res_async()
.res()
.await
{
log::warn!(
Expand Down Expand Up @@ -569,7 +564,7 @@ impl StorageService {
.reply(q.key_expr().clone(), payload)
.encoding(encoding)
.timestamp(entry.timestamp)
.res_async()
.res()
.await
{
log::warn!(
Expand All @@ -584,7 +579,7 @@ impl StorageService {
let err_message =
format!("Storage '{}' raised an error on query: {}", self.name, e);
log::warn!("{}", err_message);
if let Err(e) = q.reply_err(err_message).res_async().await {
if let Err(e) = q.reply_err(err_message).res().await {
log::warn!(
"Storage '{}' raised an error replying a query: {}",
self.name,
Expand Down Expand Up @@ -666,7 +661,7 @@ impl StorageService {
.get(KeyExpr::from(&self.key_expr).with_parameters("_time=[..]"))
.target(QueryTarget::All)
.consolidation(ConsolidationMode::None)
.res_async()
.res()
.await
{
Ok(replies) => replies,
Expand Down
5 changes: 3 additions & 2 deletions zenoh-ext/src/querying_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,8 +665,9 @@ impl<'a, Receiver> FetchingSubscriber<'a, Receiver> {
// ensure the sample has a timestamp, thus it will always be sorted into the MergeQueue
// after any timestamped Sample possibly coming from a fetch reply.
let timestamp = s.timestamp().cloned().unwrap_or(new_reception_timestamp());
let s = SampleBuilder::from(s).timestamp(timestamp).res_sync();
state.merge_queue.push(s);
state
.merge_queue
.push(SampleBuilder::from(s).timestamp(timestamp).into());
}
}
};
Expand Down
9 changes: 3 additions & 6 deletions zenoh/src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,7 @@ impl Resolvable for ReplySampleBuilder<'_> {

impl SyncResolve for ReplySampleBuilder<'_> {
fn res_sync(self) -> <Self as Resolvable>::To {
let sample = self.sample_builder.res_sync();
self.query._reply_sample(sample)
self.query._reply_sample(self.sample_builder.into())
}
}

Expand Down Expand Up @@ -453,8 +452,7 @@ impl<'a> Resolvable for ReplyBuilder<'a> {

impl SyncResolve for ReplyBuilder<'_> {
fn res_sync(self) -> <Self as Resolvable>::To {
let sample = self.sample_builder.res_sync();
self.query._reply_sample(sample)
self.query._reply_sample(self.sample_builder.into())
}
}

Expand All @@ -464,8 +462,7 @@ impl<'a> Resolvable for ReplyDelBuilder<'a> {

impl SyncResolve for ReplyDelBuilder<'_> {
fn res_sync(self) -> <Self as Resolvable>::To {
let sample = self.sample_builder.res_sync();
self.query._reply_sample(sample)
self.query._reply_sample(self.sample_builder.into())
}
}

Expand Down
18 changes: 0 additions & 18 deletions zenoh/src/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use crate::Priority;
#[zenoh_macros::unstable]
use serde::Serialize;
use std::{convert::TryFrom, fmt};
use zenoh_core::{AsyncResolve, Resolvable, SyncResolve};
use zenoh_protocol::core::EntityGlobalId;
use zenoh_protocol::network::declare::ext::QoSType;
use zenoh_protocol::{core::CongestionControl, zenoh};
Expand Down Expand Up @@ -595,23 +594,6 @@ impl From<QoSBuilder> for QoS {
}
}

impl Resolvable for QoSBuilder {
type To = QoS;
}

impl SyncResolve for QoSBuilder {
fn res_sync(self) -> <Self as Resolvable>::To {
self.0
}
}

impl AsyncResolve for QoSBuilder {
type Future = futures::future::Ready<Self::To>;
fn res_async(self) -> Self::Future {
futures::future::ready(self.0)
}
}

impl QoSBuilderTrait for QoSBuilder {
fn congestion_control(self, congestion_control: CongestionControl) -> Self {
let mut inner = self.0.inner;
Expand Down
60 changes: 12 additions & 48 deletions zenoh/src/sample_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ use crate::Sample;
use crate::SampleKind;
use uhlc::Timestamp;
use zenoh_core::zresult;
use zenoh_core::AsyncResolve;
use zenoh_core::Resolvable;
use zenoh_core::SyncResolve;
use zenoh_protocol::core::CongestionControl;

pub trait QoSBuilderTrait {
Expand Down Expand Up @@ -124,17 +121,17 @@ impl SampleBuilderTrait for SampleBuilder {
impl QoSBuilderTrait for SampleBuilder {
fn congestion_control(self, congestion_control: CongestionControl) -> Self {
let qos: QoSBuilder = self.0.qos.into();
let qos = qos.congestion_control(congestion_control).res_sync();
let qos = qos.congestion_control(congestion_control).into();
Self(Sample { qos, ..self.0 })
}
fn priority(self, priority: Priority) -> Self {
let qos: QoSBuilder = self.0.qos.into();
let qos = qos.priority(priority).res_sync();
let qos = qos.priority(priority).into();
Self(Sample { qos, ..self.0 })
}
fn express(self, is_express: bool) -> Self {
let qos: QoSBuilder = self.0.qos.into();
let qos = qos.express(is_express).res_sync();
let qos = qos.express(is_express).into();
Self(Sample { qos, ..self.0 })
}
}
Expand Down Expand Up @@ -325,53 +322,20 @@ impl TryFrom<Sample> for DeleteSampleBuilder {
}
}

impl Resolvable for SampleBuilder {
type To = Sample;
}

impl Resolvable for PutSampleBuilder {
type To = Sample;
}

impl Resolvable for DeleteSampleBuilder {
type To = Sample;
}

impl SyncResolve for SampleBuilder {
fn res_sync(self) -> Self::To {
self.0
}
}

impl SyncResolve for PutSampleBuilder {
fn res_sync(self) -> Self::To {
self.0.res_sync()
}
}

impl SyncResolve for DeleteSampleBuilder {
fn res_sync(self) -> Self::To {
self.0.res_sync()
}
}

impl AsyncResolve for SampleBuilder {
type Future = futures::future::Ready<Self::To>;
fn res_async(self) -> Self::Future {
futures::future::ready(self.0)
impl From<SampleBuilder> for Sample {
fn from(sample_builder: SampleBuilder) -> Self {
sample_builder.0
}
}

impl AsyncResolve for PutSampleBuilder {
type Future = futures::future::Ready<Self::To>;
fn res_async(self) -> Self::Future {
self.0.res_async()
impl From<PutSampleBuilder> for Sample {
fn from(put_sample_builder: PutSampleBuilder) -> Self {
put_sample_builder.0 .0
}
}

impl AsyncResolve for DeleteSampleBuilder {
type Future = futures::future::Ready<Self::To>;
fn res_async(self) -> Self::Future {
self.0.res_async()
impl From<DeleteSampleBuilder> for Sample {
fn from(delete_sample_builder: DeleteSampleBuilder) -> Self {
delete_sample_builder.0 .0
}
}

0 comments on commit c427ac7

Please sign in to comment.