Skip to content

Commit

Permalink
publication builder shortened
Browse files Browse the repository at this point in the history
  • Loading branch information
milyin committed Apr 1, 2024
1 parent e4c4be1 commit d631f76
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 122 deletions.
145 changes: 33 additions & 112 deletions zenoh/src/publication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,33 +41,15 @@ use zenoh_result::ZResult;
/// The kind of congestion control.
pub use zenoh_protocol::core::CongestionControl;

/// A builder for initializing a [`delete`](crate::Session::delete) operation.
///
/// # Examples
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use zenoh::prelude::r#async::*;
/// use zenoh::publication::CongestionControl;
///
/// let session = zenoh::open(config::peer()).res().await.unwrap();
/// session
/// .delete("key/expression")
/// .res()
/// .await
/// .unwrap();
/// # }
/// ```
pub struct DeleteBuilder<'a, 'b> {
pub(crate) publisher: PublisherBuilder<'a, 'b>,
pub(crate) timestamp: Option<uhlc::Timestamp>,
#[cfg(feature = "unstable")]
pub(crate) source_info: SourceInfo,
#[cfg(feature = "unstable")]
pub(crate) attachment: Option<Attachment>,
#[derive(Debug, Clone)]
pub struct PublicationBuilderPut {
pub(crate) payload: Payload,
pub(crate) encoding: Encoding,
}
#[derive(Debug, Clone)]
pub struct PublicationBuilderDelete;

/// A builder for initializing a [`put`](crate::Session::put) operation.
/// A builder for initializing a [`put`](crate::Session::put) and [`delete`](crate::Session::delete) operations
///
/// # Examples
/// ```
Expand All @@ -89,18 +71,17 @@ pub struct DeleteBuilder<'a, 'b> {
/// ```
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
#[derive(Debug, Clone)]
pub struct PutBuilder<'a, 'b> {
pub struct PublicationBuilder<'a, 'b, T> {
pub(crate) publisher: PublisherBuilder<'a, 'b>,
pub(crate) payload: Payload,
pub(crate) encoding: Encoding,
pub(crate) kind: T,
pub(crate) timestamp: Option<uhlc::Timestamp>,
#[cfg(feature = "unstable")]
pub(crate) source_info: SourceInfo,
#[cfg(feature = "unstable")]
pub(crate) attachment: Option<Attachment>,
}

impl QoSBuilderTrait for PutBuilder<'_, '_> {
impl<T> QoSBuilderTrait for PublicationBuilder<'_, '_, T> {
#[inline]
fn congestion_control(self, congestion_control: CongestionControl) -> Self {
Self {
Expand All @@ -124,66 +105,16 @@ impl QoSBuilderTrait for PutBuilder<'_, '_> {
}
}

impl QoSBuilderTrait for DeleteBuilder<'_, '_> {
#[inline]
fn congestion_control(self, congestion_control: CongestionControl) -> Self {
Self {
publisher: self.publisher.congestion_control(congestion_control),
..self
}
}
#[inline]
fn priority(self, priority: Priority) -> Self {
Self {
publisher: self.publisher.priority(priority),
..self
}
}
#[inline]
fn express(self, is_express: bool) -> Self {
Self {
publisher: self.publisher.express(is_express),
..self
}
}
}

impl TimestampBuilderTrait for PutBuilder<'_, '_> {
fn timestamp<T: Into<Option<uhlc::Timestamp>>>(self, timestamp: T) -> Self {
Self {
timestamp: timestamp.into(),
..self
}
}
}

impl SampleBuilderTrait for PutBuilder<'_, '_> {
#[cfg(feature = "unstable")]
fn source_info(self, source_info: SourceInfo) -> Self {
Self {
source_info,
..self
}
}
#[cfg(feature = "unstable")]
fn attachment<T: Into<Option<Attachment>>>(self, attachment: T) -> Self {
Self {
attachment: attachment.into(),
..self
}
}
}

impl TimestampBuilderTrait for DeleteBuilder<'_, '_> {
fn timestamp<T: Into<Option<uhlc::Timestamp>>>(self, timestamp: T) -> Self {
impl<T> TimestampBuilderTrait for PublicationBuilder<'_, '_, T> {
fn timestamp<TS: Into<Option<uhlc::Timestamp>>>(self, timestamp: TS) -> Self {
Self {
timestamp: timestamp.into(),
..self
}
}
}

impl SampleBuilderTrait for DeleteBuilder<'_, '_> {
impl<T> SampleBuilderTrait for PublicationBuilder<'_, '_, T> {
#[cfg(feature = "unstable")]
fn source_info(self, source_info: SourceInfo) -> Self {
Self {
Expand All @@ -192,18 +123,21 @@ impl SampleBuilderTrait for DeleteBuilder<'_, '_> {
}
}
#[cfg(feature = "unstable")]
fn attachment<T: Into<Option<Attachment>>>(self, attachment: T) -> Self {
fn attachment<TA: Into<Option<Attachment>>>(self, attachment: TA) -> Self {
Self {
attachment: attachment.into(),
..self
}
}
}

impl ValueBuilderTrait for PutBuilder<'_, '_> {
impl ValueBuilderTrait for PublicationBuilder<'_, '_, PublicationBuilderPut> {
fn encoding<T: Into<Encoding>>(self, encoding: T) -> Self {
Self {
encoding: encoding.into(),
kind: PublicationBuilderPut {
encoding: encoding.into(),
..self.kind
},
..self
}
}
Expand All @@ -213,32 +147,23 @@ impl ValueBuilderTrait for PutBuilder<'_, '_> {
IntoPayload: Into<Payload>,
{
Self {
payload: payload.into(),
kind: PublicationBuilderPut {
payload: payload.into(),
..self.kind
},
..self
}
}
fn value<T: Into<Value>>(self, value: T) -> Self {
let Value { payload, encoding } = value.into();
Self {
payload,
encoding,
kind: PublicationBuilderPut { payload, encoding },
..self
}
}
}

impl PutBuilder<'_, '_> {
/// Restrict the matching subscribers that will receive the published data
/// to the ones that have the given [`Locality`](crate::prelude::Locality).
#[zenoh_macros::unstable]
#[inline]
pub fn allowed_destination(mut self, destination: Locality) -> Self {
self.publisher = self.publisher.allowed_destination(destination);
self
}
}

impl DeleteBuilder<'_, '_> {
impl<T> PublicationBuilder<'_, '_, T> {
/// Restrict the matching subscribers that will receive the published data
/// to the ones that have the given [`Locality`](crate::prelude::Locality).
#[zenoh_macros::unstable]
Expand All @@ -249,23 +174,19 @@ impl DeleteBuilder<'_, '_> {
}
}

impl Resolvable for PutBuilder<'_, '_> {
type To = ZResult<()>;
}

impl Resolvable for DeleteBuilder<'_, '_> {
impl<T> Resolvable for PublicationBuilder<'_, '_, T> {
type To = ZResult<()>;
}

impl SyncResolve for PutBuilder<'_, '_> {
impl SyncResolve for PublicationBuilder<'_, '_, PublicationBuilderPut> {
#[inline]
fn res_sync(self) -> <Self as Resolvable>::To {
let publisher = self.publisher.create_one_shot_publisher()?;
resolve_put(
&publisher,
self.payload,
self.kind.payload,
SampleKind::Put,
self.encoding,
self.kind.encoding,
self.timestamp,
#[cfg(feature = "unstable")]
self.source_info,
Expand All @@ -275,7 +196,7 @@ impl SyncResolve for PutBuilder<'_, '_> {
}
}

impl SyncResolve for DeleteBuilder<'_, '_> {
impl SyncResolve for PublicationBuilder<'_, '_, PublicationBuilderDelete> {
#[inline]
fn res_sync(self) -> <Self as Resolvable>::To {
let publisher = self.publisher.create_one_shot_publisher()?;
Expand All @@ -293,15 +214,15 @@ impl SyncResolve for DeleteBuilder<'_, '_> {
}
}

impl AsyncResolve for PutBuilder<'_, '_> {
impl AsyncResolve for PublicationBuilder<'_, '_, PublicationBuilderPut> {
type Future = Ready<Self::To>;

fn res_async(self) -> Self::Future {
std::future::ready(self.res_sync())
}
}

impl AsyncResolve for DeleteBuilder<'_, '_> {
impl AsyncResolve for PublicationBuilder<'_, '_, PublicationBuilderDelete> {
type Future = Ready<Self::To>;

fn res_async(self) -> Self::Future {
Expand Down Expand Up @@ -1038,7 +959,7 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> {
self
}

// internal function for `PutBuilder` and `DeleteBuilder`
// internal function for perfroming the publication
fn create_one_shot_publisher(self) -> ZResult<Publisher<'a>> {
Ok(Publisher {
session: self.session,
Expand Down
8 changes: 4 additions & 4 deletions zenoh/src/sample/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ pub trait ValueBuilderTrait {
fn value<T: Into<Value>>(self, value: T) -> Self;
}

#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct SampleBuilderPut;
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct SampleBuilderDelete;
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct SampleBuilderAny;

#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct SampleBuilder<T> {
sample: Sample,
_t: PhantomData<T>,
Expand Down
15 changes: 9 additions & 6 deletions zenoh/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,17 +705,19 @@ impl Session {
&'a self,
key_expr: TryIntoKeyExpr,
payload: IntoPayload,
) -> PutBuilder<'a, 'b>
) -> PublicationBuilder<'a, 'b, PublicationBuilderPut>
where
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
IntoPayload: Into<Payload>,
{
PutBuilder {
PublicationBuilder {
publisher: self.declare_publisher(key_expr),
payload: payload.into(),
kind: PublicationBuilderPut {
payload: payload.into(),
encoding: Encoding::default(),
},
timestamp: None,
encoding: Encoding::default(),
#[cfg(feature = "unstable")]
attachment: None,
#[cfg(feature = "unstable")]
Expand Down Expand Up @@ -743,13 +745,14 @@ impl Session {
pub fn delete<'a, 'b: 'a, TryIntoKeyExpr>(
&'a self,
key_expr: TryIntoKeyExpr,
) -> DeleteBuilder<'a, 'b>
) -> PublicationBuilder<'a, 'b, PublicationBuilderDelete>
where
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
{
DeleteBuilder {
PublicationBuilder {
publisher: self.declare_publisher(key_expr),
kind: PublicationBuilderDelete,
timestamp: None,
#[cfg(feature = "unstable")]
attachment: None,
Expand Down

0 comments on commit d631f76

Please sign in to comment.