Skip to content

Commit

Permalink
Add express support to publisher and put
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Mar 14, 2024
1 parent ea7179f commit 62bf7d3
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
24 changes: 22 additions & 2 deletions zenoh/src/publication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ impl PutBuilder<'_, '_> {
self
}

/// Change the `congestion_control` to apply when routing the data.
#[inline]
pub fn express(mut self, is_express: bool) -> Self {
self.publisher = self.publisher.express(is_express);
self
}

/// Restrict the matching subscribers that will receive the published data
/// to the ones that have the given [`Locality`](crate::prelude::Locality).
#[zenoh_macros::unstable]
Expand Down Expand Up @@ -141,6 +148,7 @@ impl SyncResolve for PutBuilder<'_, '_> {
key_expr,
congestion_control,
priority,
is_express,
destination,
} = self.publisher;

Expand All @@ -151,6 +159,7 @@ impl SyncResolve for PutBuilder<'_, '_> {
key_expr: key_expr?,
congestion_control,
priority,
is_express,
destination,
};

Expand Down Expand Up @@ -248,6 +257,7 @@ pub struct Publisher<'a> {
pub(crate) key_expr: KeyExpr<'a>,
pub(crate) congestion_control: CongestionControl,
pub(crate) priority: Priority,
pub(crate) is_express: bool,
pub(crate) destination: Locality,
}

Expand Down Expand Up @@ -738,6 +748,7 @@ pub struct PublisherBuilder<'a, 'b: 'a> {
pub(crate) key_expr: ZResult<KeyExpr<'b>>,
pub(crate) congestion_control: CongestionControl,
pub(crate) priority: Priority,
pub(crate) is_express: bool,
pub(crate) destination: Locality,
}

Expand All @@ -751,6 +762,7 @@ impl<'a, 'b> Clone for PublisherBuilder<'a, 'b> {
},
congestion_control: self.congestion_control,
priority: self.priority,
is_express: self.is_express,
destination: self.destination,
}
}
Expand All @@ -771,6 +783,13 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> {
self
}

/// Change the `congestion_control` to apply when routing the data.
#[inline]
pub fn express(mut self, is_express: bool) -> Self {
self.is_express = is_express;
self
}

/// Restrict the matching subscribers that will receive the published data
/// to the ones that have the given [`Locality`](crate::prelude::Locality).
#[zenoh_macros::unstable]
Expand Down Expand Up @@ -830,6 +849,7 @@ impl<'a, 'b> SyncResolve for PublisherBuilder<'a, 'b> {
key_expr,
congestion_control: self.congestion_control,
priority: self.priority,
is_express: self.is_express,
destination: self.destination,
};
log::trace!("publish({:?})", publisher.key_expr);
Expand Down Expand Up @@ -867,7 +887,7 @@ fn resolve_put(
ext_qos: ext::QoSType::new(
publisher.priority.into(),
publisher.congestion_control,
false,
publisher.is_express,
),
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
Expand Down Expand Up @@ -933,7 +953,7 @@ fn resolve_put(
qos: QoS::from(ext::QoSType::new(
publisher.priority.into(),
publisher.congestion_control,
false,
publisher.is_express,
)),
};

Expand Down
2 changes: 2 additions & 0 deletions zenoh/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ impl<'s, 'a> SessionDeclarations<'s, 'a> for SessionRef<'a> {
key_expr: key_expr.try_into().map_err(Into::into),
congestion_control: CongestionControl::DEFAULT,
priority: Priority::DEFAULT,
is_express: false,
destination: Locality::default(),
}
}
Expand Down Expand Up @@ -1909,6 +1910,7 @@ impl<'s> SessionDeclarations<'s, 'static> for Arc<Session> {
key_expr: key_expr.try_into().map_err(Into::into),
congestion_control: CongestionControl::DEFAULT,
priority: Priority::DEFAULT,
is_express: false,
destination: Locality::default(),
}
}
Expand Down

0 comments on commit 62bf7d3

Please sign in to comment.