From 9515c7d63bec0744d9a1bf2e86b7242ee9121480 Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Mon, 25 Mar 2024 17:36:36 +0100 Subject: [PATCH] put, delete builder --- .../zenoh-plugin-rest/examples/z_serve_sse.rs | 1 + plugins/zenoh-plugin-rest/src/lib.rs | 1 + zenoh-ext/src/group.rs | 1 + zenoh/src/publication.rs | 284 +++++++++++++----- zenoh/src/sample_builder.rs | 6 + zenoh/src/session.rs | 12 +- zenoh/tests/qos.rs | 1 + zenoh/tests/routing.rs | 1 + zenoh/tests/session.rs | 1 + zenoh/tests/unicity.rs | 1 + 10 files changed, 233 insertions(+), 76 deletions(-) diff --git a/plugins/zenoh-plugin-rest/examples/z_serve_sse.rs b/plugins/zenoh-plugin-rest/examples/z_serve_sse.rs index bb76005d6e..48f152e488 100644 --- a/plugins/zenoh-plugin-rest/examples/z_serve_sse.rs +++ b/plugins/zenoh-plugin-rest/examples/z_serve_sse.rs @@ -15,6 +15,7 @@ use clap::{arg, Command}; use std::time::Duration; use zenoh::prelude::r#async::*; use zenoh::publication::CongestionControl; +use zenoh::sample_builder::QoSBuilderTrait; use zenoh::{config::Config, key_expr::keyexpr}; const HTML: &str = r#" diff --git a/plugins/zenoh-plugin-rest/src/lib.rs b/plugins/zenoh-plugin-rest/src/lib.rs index e2718f6579..cc97590636 100644 --- a/plugins/zenoh-plugin-rest/src/lib.rs +++ b/plugins/zenoh-plugin-rest/src/lib.rs @@ -34,6 +34,7 @@ use zenoh::plugins::{RunningPluginTrait, ZenohPlugin}; use zenoh::prelude::r#async::*; use zenoh::query::{QueryConsolidation, Reply}; use zenoh::runtime::Runtime; +use zenoh::sample_builder::PutSampleBuilderTrait; use zenoh::selector::TIME_RANGE_KEY; use zenoh::Session; use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl}; diff --git a/zenoh-ext/src/group.rs b/zenoh-ext/src/group.rs index 41007d8b87..973baf271b 100644 --- a/zenoh-ext/src/group.rs +++ b/zenoh-ext/src/group.rs @@ -28,6 +28,7 @@ use std::time::{Duration, Instant}; use zenoh::prelude::r#async::*; use zenoh::publication::Publisher; use zenoh::query::ConsolidationMode; +use zenoh::sample_builder::QoSBuilderTrait; use zenoh::Error as ZError; use zenoh::Result as ZResult; use zenoh::Session; diff --git a/zenoh/src/publication.rs b/zenoh/src/publication.rs index 392c0bf8c1..97f485f1e3 100644 --- a/zenoh/src/publication.rs +++ b/zenoh/src/publication.rs @@ -18,6 +18,9 @@ use crate::prelude::*; #[zenoh_macros::unstable] use crate::sample::Attachment; use crate::sample::{DataInfo, QoS, Sample, SampleKind}; +use crate::sample_builder::{ + DeleteSampleBuilderTrait, PutSampleBuilderTrait, QoSBuilderTrait, SampleBuilderTrait, +}; use crate::SessionRef; use crate::Undeclarable; #[cfg(feature = "unstable")] @@ -56,7 +59,14 @@ pub use zenoh_protocol::core::CongestionControl; /// .unwrap(); /// # }) /// ``` -pub type DeleteBuilder<'a, 'b> = PutBuilder<'a, 'b>; +pub struct DeleteBuilder<'a, 'b> { + pub(crate) publisher: PublisherBuilder<'a, 'b>, + pub(crate) timestamp: Option, + #[cfg(feature = "unstable")] + pub(crate) source_info: Option, + #[cfg(feature = "unstable")] + pub(crate) attachment: Option, +} /// A builder for initializing a [`put`](crate::Session::put) operation. /// @@ -81,36 +91,141 @@ pub type DeleteBuilder<'a, 'b> = PutBuilder<'a, 'b>; pub struct PutBuilder<'a, 'b> { pub(crate) publisher: PublisherBuilder<'a, 'b>, pub(crate) payload: Payload, - pub(crate) kind: SampleKind, pub(crate) encoding: Encoding, + pub(crate) timestamp: Option, + #[cfg(feature = "unstable")] + pub(crate) source_info: Option, #[cfg(feature = "unstable")] pub(crate) attachment: Option, } -impl PutBuilder<'_, '_> { - /// Change the `congestion_control` to apply when routing the data. +impl QoSBuilderTrait for PutBuilder<'_, '_> { #[inline] - pub fn congestion_control(mut self, congestion_control: CongestionControl) -> Self { - self.publisher = self.publisher.congestion_control(congestion_control); - self + fn congestion_control(self, congestion_control: CongestionControl) -> Self { + Self { + publisher: self.publisher.congestion_control(congestion_control), + ..self + } } - - /// Change the priority of the written data. #[inline] - pub fn priority(mut self, priority: Priority) -> Self { - self.publisher = self.publisher.priority(priority); - self + fn priority(self, priority: Priority) -> Self { + Self { + publisher: self.publisher.priority(priority), + ..self + } } + #[inline] + fn express(self, is_express: bool) -> Self { + Self { + publisher: self.publisher.express(is_express), + ..self + } + } +} - /// Change the `express` policy to apply when routing the data. - /// When express is set to `true`, then the message will not be batched. - /// This usually has a positive impact on latency but negative impact on throughput. +impl QoSBuilderTrait for DeleteBuilder<'_, '_> { #[inline] - pub fn express(mut self, is_express: bool) -> Self { - self.publisher = self.publisher.express(is_express); - self + fn congestion_control(self, congestion_control: CongestionControl) -> Self { + Self { + publisher: self.publisher.congestion_control(congestion_control), + ..self + } + } + #[inline] + fn priority(self, priority: Priority) -> Self { + Self { + publisher: self.publisher.priority(priority), + ..self + } + } + #[inline] + fn express(self, is_express: bool) -> Self { + Self { + publisher: self.publisher.express(is_express), + ..self + } + } +} + +impl SampleBuilderTrait for PutBuilder<'_, '_> { + fn with_timestamp_opt(self, timestamp: Option) -> Self { + Self { timestamp, ..self } + } + fn with_timestamp(self, timestamp: uhlc::Timestamp) -> Self { + Self { + timestamp: Some(timestamp), + ..self + } + } + #[cfg(feature = "unstable")] + fn with_source_info(self, source_info: SourceInfo) -> Self { + Self { + source_info: Some(source_info), + ..self + } + } + #[cfg(feature = "unstable")] + fn with_attachment_opt(self, attachment: Option) -> Self { + Self { attachment, ..self } + } + #[cfg(feature = "unstable")] + fn with_attachment(self, attachment: Attachment) -> Self { + Self { + attachment: Some(attachment), + ..self + } + } +} + +impl SampleBuilderTrait for DeleteBuilder<'_, '_> { + fn with_timestamp_opt(self, timestamp: Option) -> Self { + Self { timestamp, ..self } + } + fn with_timestamp(self, timestamp: uhlc::Timestamp) -> Self { + Self { + timestamp: Some(timestamp), + ..self + } + } + #[cfg(feature = "unstable")] + fn with_source_info(self, source_info: SourceInfo) -> Self { + Self { + source_info: Some(source_info), + ..self + } + } + #[cfg(feature = "unstable")] + fn with_attachment_opt(self, attachment: Option) -> Self { + Self { attachment, ..self } + } + #[cfg(feature = "unstable")] + fn with_attachment(self, attachment: Attachment) -> Self { + Self { + attachment: Some(attachment), + ..self + } + } +} + +impl PutSampleBuilderTrait for PutBuilder<'_, '_> { + fn with_encoding(self, encoding: Encoding) -> Self { + Self { encoding, ..self } + } + + fn with_payload(self, payload: IntoPayload) -> Self + where + IntoPayload: Into, + { + Self { + payload: payload.into(), + ..self + } } +} +impl DeleteSampleBuilderTrait for DeleteBuilder<'_, '_> {} + +impl PutBuilder<'_, '_> { /// Restrict the matching subscribers that will receive the published data /// to the ones that have the given [`Locality`](crate::prelude::Locality). #[zenoh_macros::unstable] @@ -119,21 +234,15 @@ impl PutBuilder<'_, '_> { self.publisher = self.publisher.allowed_destination(destination); self } +} - /// Set the [`Encoding`] of the written data. - #[inline] - pub fn with_encoding(mut self, encoding: IntoEncoding) -> Self - where - IntoEncoding: Into, - { - self.encoding = encoding.into(); - self - } - +impl DeleteBuilder<'_, '_> { + /// Restrict the matching subscribers that will receive the published data + /// to the ones that have the given [`Locality`](crate::prelude::Locality). #[zenoh_macros::unstable] - /// Attach user-provided data to the written data. - pub fn with_attachment(mut self, attachment: Attachment) -> Self { - self.attachment = Some(attachment); + #[inline] + pub fn allowed_destination(mut self, destination: Locality) -> Self { + self.publisher = self.publisher.allowed_destination(destination); self } } @@ -142,36 +251,40 @@ impl Resolvable for PutBuilder<'_, '_> { type To = ZResult<()>; } +impl Resolvable for DeleteBuilder<'_, '_> { + type To = ZResult<()>; +} + impl SyncResolve for PutBuilder<'_, '_> { #[inline] fn res_sync(self) -> ::To { - let PublisherBuilder { - session, - key_expr, - congestion_control, - priority, - is_express, - destination, - } = self.publisher; - - let publisher = Publisher { - session, - #[cfg(feature = "unstable")] - eid: 0, // This is a one shot Publisher - key_expr: key_expr?, - congestion_control, - priority, - is_express, - destination, - }; - + let publisher = self.publisher.one_time_res_sync()?; resolve_put( &publisher, self.payload, - self.kind, + SampleKind::Put, self.encoding, + self.timestamp, + #[cfg(feature = "unstable")] + self.source_info, + #[cfg(feature = "unstable")] + self.attachment, + ) + } +} + +impl SyncResolve for DeleteBuilder<'_, '_> { + #[inline] + fn res_sync(self) -> ::To { + let publisher = self.publisher.one_time_res_sync()?; + resolve_put( + &publisher, + Payload::empty(), + SampleKind::Delete, + Encoding::ZENOH_BYTES, + self.timestamp, #[cfg(feature = "unstable")] - None, + self.source_info, #[cfg(feature = "unstable")] self.attachment, ) @@ -186,6 +299,14 @@ impl AsyncResolve for PutBuilder<'_, '_> { } } +impl AsyncResolve for DeleteBuilder<'_, '_> { + type Future = Ready; + + fn res_async(self) -> Self::Future { + std::future::ready(self.res_sync()) + } +} + use futures::Sink; use std::convert::TryFrom; use std::convert::TryInto; @@ -293,25 +414,22 @@ impl<'a> Publisher<'a> { /// Change the `congestion_control` to apply when routing the data. #[inline] - pub fn congestion_control(mut self, congestion_control: CongestionControl) -> Self { + pub fn set_congestion_control(&mut self, congestion_control: CongestionControl) { self.congestion_control = congestion_control; - self } /// Change the priority of the written data. #[inline] - pub fn priority(mut self, priority: Priority) -> Self { + pub fn set_priority(&mut self, priority: Priority) { self.priority = priority; - self } /// Restrict the matching subscribers that will receive the published data /// to the ones that have the given [`Locality`](crate::prelude::Locality). #[zenoh_macros::unstable] #[inline] - pub fn allowed_destination(mut self, destination: Locality) -> Self { + pub fn set_allowed_destination(&mut self, destination: Locality) { self.destination = destination; - self } /// Consumes the given `Publisher`, returning a thread-safe reference-counting @@ -355,6 +473,7 @@ impl<'a> Publisher<'a> { payload, kind, encoding: Encoding::ZENOH_BYTES, + timestamp: None, #[cfg(feature = "unstable")] source_info: None, #[cfg(feature = "unstable")] @@ -625,6 +744,7 @@ pub struct Publication<'a> { payload: Payload, kind: SampleKind, encoding: Encoding, + timestamp: Option, #[cfg(feature = "unstable")] pub(crate) source_info: Option, #[cfg(feature = "unstable")] @@ -676,6 +796,7 @@ impl SyncResolve for Publication<'_> { self.payload, self.kind, self.encoding, + self.timestamp, #[cfg(feature = "unstable")] self.source_info, #[cfg(feature = "unstable")] @@ -707,6 +828,7 @@ impl<'a> Sink for Publisher<'a> { payload: item.payload, kind: item.kind, encoding: item.encoding, + timestamp: None, #[cfg(feature = "unstable")] source_info: None, #[cfg(feature = "unstable")] @@ -770,30 +892,32 @@ impl<'a, 'b> Clone for PublisherBuilder<'a, 'b> { } } -impl<'a, 'b> PublisherBuilder<'a, 'b> { +impl QoSBuilderTrait for PublisherBuilder<'_, '_> { /// Change the `congestion_control` to apply when routing the data. #[inline] - pub fn congestion_control(mut self, congestion_control: CongestionControl) -> Self { - self.congestion_control = congestion_control; - self + fn congestion_control(self, congestion_control: CongestionControl) -> Self { + Self { + congestion_control, + ..self + } } /// Change the priority of the written data. #[inline] - pub fn priority(mut self, priority: Priority) -> Self { - self.priority = priority; - self + fn priority(self, priority: Priority) -> Self { + Self { priority, ..self } } /// Change the `express` policy to apply when routing the data. /// When express is set to `true`, then the message will not be batched. /// This usually has a positive impact on latency but negative impact on throughput. #[inline] - pub fn express(mut self, is_express: bool) -> Self { - self.is_express = is_express; - self + fn express(self, is_express: bool) -> Self { + Self { is_express, ..self } } +} +impl<'a, 'b> PublisherBuilder<'a, 'b> { /// Restrict the matching subscribers that will receive the published data /// to the ones that have the given [`Locality`](crate::prelude::Locality). #[zenoh_macros::unstable] @@ -802,6 +926,20 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { self.destination = destination; self } + + // internal function for `PutBuilder` and `DeleteBuilder` + fn one_time_res_sync(self) -> ZResult> { + Ok(Publisher { + session: self.session, + #[cfg(feature = "unstable")] + eid: 0, // This is a one shot Publisher + key_expr: self.key_expr?, + congestion_control: self.congestion_control, + priority: self.priority, + is_express: self.is_express, + destination: self.destination, + }) + } } impl<'a, 'b> Resolvable for PublisherBuilder<'a, 'b> { @@ -874,6 +1012,7 @@ fn resolve_put( payload: Payload, kind: SampleKind, encoding: Encoding, + timestamp: Option, #[cfg(feature = "unstable")] source_info: Option, #[cfg(feature = "unstable")] attachment: Option, ) -> ZResult<()> { @@ -883,8 +1022,11 @@ fn resolve_put( .as_ref() .unwrap() .clone(); - let timestamp = publisher.session.runtime.new_timestamp(); - + let timestamp = if timestamp.is_none() { + publisher.session.runtime.new_timestamp() + } else { + timestamp + }; if publisher.destination != Locality::SessionLocal { primitives.send_push(Push { wire_expr: publisher.key_expr.to_wire(&publisher.session).to_owned(), diff --git a/zenoh/src/sample_builder.rs b/zenoh/src/sample_builder.rs index 0df98773fc..1710cbc85b 100644 --- a/zenoh/src/sample_builder.rs +++ b/zenoh/src/sample_builder.rs @@ -40,17 +40,23 @@ pub trait QoSBuilderTrait { } pub trait SampleBuilderTrait { + /// Sets of clears timestamp fn with_timestamp_opt(self, timestamp: Option) -> Self; + /// Sets timestamp fn with_timestamp(self, timestamp: Timestamp) -> Self; + /// Attach source information #[zenoh_macros::unstable] fn with_source_info(self, source_info: SourceInfo) -> Self; + /// Attach or remove user-provided data in key-value format #[zenoh_macros::unstable] fn with_attachment_opt(self, attachment: Option) -> Self; + /// Attach user-provided data in key-value format #[zenoh_macros::unstable] fn with_attachment(self, attachment: Attachment) -> Self; } pub trait PutSampleBuilderTrait: SampleBuilderTrait { + /// Set the [`Encoding`] fn with_encoding(self, encoding: Encoding) -> Self; fn with_payload(self, payload: IntoPayload) -> Self where diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 89c18ec4a8..e26bdeadaf 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -706,10 +706,12 @@ impl Session { PutBuilder { publisher: self.declare_publisher(key_expr), payload: payload.into(), - kind: SampleKind::Put, + timestamp: None, encoding: Encoding::default(), #[cfg(feature = "unstable")] attachment: None, + #[cfg(feature = "unstable")] + source_info: None, } } @@ -737,13 +739,13 @@ impl Session { TryIntoKeyExpr: TryInto>, >>::Error: Into, { - PutBuilder { + DeleteBuilder { publisher: self.declare_publisher(key_expr), - payload: Payload::empty(), - kind: SampleKind::Delete, - encoding: Encoding::default(), + timestamp: None, #[cfg(feature = "unstable")] attachment: None, + #[cfg(feature = "unstable")] + source_info: None, } } /// Query data from the matching queryables in the system. diff --git a/zenoh/tests/qos.rs b/zenoh/tests/qos.rs index 1a9df306b2..8dc39423cb 100644 --- a/zenoh/tests/qos.rs +++ b/zenoh/tests/qos.rs @@ -15,6 +15,7 @@ use async_std::prelude::FutureExt; use async_std::task; use std::time::Duration; use zenoh::prelude::r#async::*; +use zenoh::sample_builder::QoSBuilderTrait; use zenoh::{publication::Priority, SessionDeclarations}; use zenoh_core::zasync_executor_init; diff --git a/zenoh/tests/routing.rs b/zenoh/tests/routing.rs index 06a8f5da45..123550852e 100644 --- a/zenoh/tests/routing.rs +++ b/zenoh/tests/routing.rs @@ -20,6 +20,7 @@ use std::sync::{atomic::AtomicUsize, Arc}; use std::time::Duration; use zenoh::config::{Config, ModeDependentValue}; use zenoh::prelude::r#async::*; +use zenoh::sample_builder::QoSBuilderTrait; use zenoh::{value::Value, Result}; use zenoh_core::zasync_executor_init; use zenoh_protocol::core::{WhatAmI, WhatAmIMatcher}; diff --git a/zenoh/tests/session.rs b/zenoh/tests/session.rs index e3f5e2df63..955ec7a73f 100644 --- a/zenoh/tests/session.rs +++ b/zenoh/tests/session.rs @@ -17,6 +17,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; use zenoh::prelude::r#async::*; +use zenoh::sample_builder::QoSBuilderTrait; use zenoh_core::zasync_executor_init; const TIMEOUT: Duration = Duration::from_secs(60); diff --git a/zenoh/tests/unicity.rs b/zenoh/tests/unicity.rs index 8eb007b0c0..3d1327398d 100644 --- a/zenoh/tests/unicity.rs +++ b/zenoh/tests/unicity.rs @@ -17,6 +17,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; use zenoh::prelude::r#async::*; +use zenoh::sample_builder::QoSBuilderTrait; use zenoh_core::zasync_executor_init; const TIMEOUT: Duration = Duration::from_secs(60);