Skip to content

Commit

Permalink
Add Publication::with_source_info() function
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Feb 12, 2024
1 parent 8b3c094 commit ccd6b9c
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 0 deletions.
2 changes: 2 additions & 0 deletions zenoh/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ pub(crate) mod common {
pub use crate::query::ConsolidationMode;
#[zenoh_macros::unstable]
pub use crate::sample::Locality;
#[zenoh_macros::unstable]
pub use crate::sample::SourceInfo;
#[cfg(not(feature = "unstable"))]
pub(crate) use crate::sample::Locality;
pub use crate::sample::Sample;
Expand Down
46 changes: 46 additions & 0 deletions zenoh/src/publication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use crate::prelude::*;
#[zenoh_macros::unstable]
use crate::sample::Attachment;
use crate::sample::DataInfo;
#[zenoh_macros::unstable]
use crate::sample::SourceInfo;
use crate::Encoding;
use crate::SessionRef;
use crate::Undeclarable;
Expand All @@ -30,6 +32,8 @@ use zenoh_core::{zread, AsyncResolve, Resolvable, Resolve, SyncResolve};
use zenoh_protocol::network::push::ext;
use zenoh_protocol::network::Mapping;
use zenoh_protocol::network::Push;
#[zenoh_macros::unstable]
use zenoh_protocol::zenoh::ext::SourceInfoType;
use zenoh_protocol::zenoh::Del;
use zenoh_protocol::zenoh::PushBody;
use zenoh_protocol::zenoh::Put;
Expand Down Expand Up @@ -159,6 +163,8 @@ impl SyncResolve for PutBuilder<'_, '_> {
self.value,
self.kind,
#[cfg(feature = "unstable")]
None,
#[cfg(feature = "unstable")]
self.attachment,
)
}
Expand Down Expand Up @@ -340,6 +346,8 @@ impl<'a> Publisher<'a> {
value,
kind,
#[cfg(feature = "unstable")]
source_info: None,
#[cfg(feature = "unstable")]
attachment: None,
}
}
Expand Down Expand Up @@ -626,6 +634,8 @@ pub struct Publication<'a> {
value: Value,
kind: SampleKind,
#[cfg(feature = "unstable")]
pub(crate) source_info: Option<SourceInfo>,
#[cfg(feature = "unstable")]
pub(crate) attachment: Option<Attachment>,
}

Expand All @@ -635,6 +645,27 @@ impl<'a> Publication<'a> {
self.attachment = Some(attachment);
self
}

/// Send data with the given [`SourceInfo`].
///
/// # Examples
/// ```
/// # async_std::task::block_on(async {
/// use zenoh::prelude::r#async::*;
///
/// let session = zenoh::open(config::peer()).res().await.unwrap();
/// let publisher = session.declare_publisher("key/expression").res().await.unwrap();
/// publisher.put("Value").with_source_info(SourceInfo {
/// id: publisher.id(),
/// sn: 0,
/// }).res().await.unwrap();
/// # })
/// ```
#[zenoh_macros::unstable]
pub fn with_source_info(mut self, source_info: SourceInfo) -> Self {
self.source_info = Some(source_info);
self
}
}

impl Resolvable for Publication<'_> {
Expand All @@ -648,6 +679,8 @@ impl SyncResolve for Publication<'_> {
self.value,
self.kind,
#[cfg(feature = "unstable")]
self.source_info,
#[cfg(feature = "unstable")]
self.attachment,
)
}
Expand Down Expand Up @@ -823,6 +856,7 @@ fn resolve_put(
publisher: &Publisher<'_>,
value: Value,
kind: SampleKind,
#[cfg(feature = "unstable")] source_info: Option<SourceInfo>,
#[cfg(feature = "unstable")] attachment: Option<Attachment>,
) -> ZResult<()> {
log::trace!("write({:?}, [...])", &publisher.key_expr);
Expand Down Expand Up @@ -856,6 +890,12 @@ fn resolve_put(
PushBody::Put(Put {
timestamp,
encoding: value.encoding.clone(),
#[cfg(feature = "unstable")]
ext_sinfo: source_info.map(|s| SourceInfoType {
id: s.source_id.unwrap_or_default(),
sn: s.source_sn.unwrap_or_default() as u32,
}),
#[cfg(not(feature = "unstable"))]
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
Expand All @@ -875,6 +915,12 @@ fn resolve_put(
}
PushBody::Del(Del {
timestamp,
#[cfg(feature = "unstable")]
ext_sinfo: source_info.map(|s| SourceInfoType {
id: s.source_id.unwrap_or_default(),
sn: s.source_sn.unwrap_or_default() as u32,
}),
#[cfg(not(feature = "unstable"))]
ext_sinfo: None,
ext_attachment,
ext_unknown: vec![],
Expand Down

0 comments on commit ccd6b9c

Please sign in to comment.