diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs
index baa4eb45f3..1eef911fd9 100644
--- a/zenoh/src/api/builders/publisher.rs
+++ b/zenoh/src/api/builders/publisher.rs
@@ -186,11 +186,17 @@ impl
Resolvable for PublicationBuilder
{
impl Wait for PublicationBuilder, PublicationBuilderPut> {
#[inline]
fn wait(self) -> ::To {
- let publisher = self.publisher.create_one_shot_publisher()?;
- publisher.resolve_put(
+ self.publisher.session.0.resolve_put(
+ &self.publisher.key_expr?,
self.kind.payload,
SampleKind::Put,
self.kind.encoding,
+ self.publisher.congestion_control,
+ self.publisher.priority,
+ self.publisher.is_express,
+ self.publisher.destination,
+ #[cfg(feature = "unstable")]
+ self.publisher.reliability,
self.timestamp,
#[cfg(feature = "unstable")]
self.source_info,
@@ -202,11 +208,17 @@ impl Wait for PublicationBuilder, PublicationBuilderPut
impl Wait for PublicationBuilder, PublicationBuilderDelete> {
#[inline]
fn wait(self) -> ::To {
- let publisher = self.publisher.create_one_shot_publisher()?;
- publisher.resolve_put(
+ self.publisher.session.0.resolve_put(
+ &self.publisher.key_expr?,
ZBytes::empty(),
SampleKind::Delete,
Encoding::ZENOH_BYTES,
+ self.publisher.congestion_control,
+ self.publisher.priority,
+ self.publisher.is_express,
+ self.publisher.destination,
+ #[cfg(feature = "unstable")]
+ self.publisher.reliability,
self.timestamp,
#[cfg(feature = "unstable")]
self.source_info,
@@ -329,25 +341,6 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> {
..self
}
}
-
- // internal function for performing the publication
- fn create_one_shot_publisher(self) -> ZResult> {
- Ok(Publisher {
- session: self.session.downgrade(),
- id: 0, // This is a one shot Publisher
- key_expr: self.key_expr?,
- encoding: self.encoding,
- congestion_control: self.congestion_control,
- priority: self.priority,
- is_express: self.is_express,
- destination: self.destination,
- #[cfg(feature = "unstable")]
- reliability: self.reliability,
- #[cfg(feature = "unstable")]
- matching_listeners: Default::default(),
- undeclare_on_drop: true,
- })
- }
}
impl<'a, 'b> Resolvable for PublisherBuilder<'a, 'b> {
@@ -420,10 +413,17 @@ impl<'a, 'b> IntoFuture for PublisherBuilder<'a, 'b> {
impl Wait for PublicationBuilder<&Publisher<'_>, PublicationBuilderPut> {
fn wait(self) -> ::To {
- self.publisher.resolve_put(
+ self.publisher.session.resolve_put(
+ &self.publisher.key_expr,
self.kind.payload,
SampleKind::Put,
self.kind.encoding,
+ self.publisher.congestion_control,
+ self.publisher.priority,
+ self.publisher.is_express,
+ self.publisher.destination,
+ #[cfg(feature = "unstable")]
+ self.publisher.reliability,
self.timestamp,
#[cfg(feature = "unstable")]
self.source_info,
@@ -434,10 +434,17 @@ impl Wait for PublicationBuilder<&Publisher<'_>, PublicationBuilderPut> {
impl Wait for PublicationBuilder<&Publisher<'_>, PublicationBuilderDelete> {
fn wait(self) -> ::To {
- self.publisher.resolve_put(
+ self.publisher.session.resolve_put(
+ &self.publisher.key_expr,
ZBytes::empty(),
SampleKind::Delete,
Encoding::ZENOH_BYTES,
+ self.publisher.congestion_control,
+ self.publisher.priority,
+ self.publisher.is_express,
+ self.publisher.destination,
+ #[cfg(feature = "unstable")]
+ self.publisher.reliability,
self.timestamp,
#[cfg(feature = "unstable")]
self.source_info,
diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs
index dfaf2deb2e..4870fac445 100644
--- a/zenoh/src/api/publisher.rs
+++ b/zenoh/src/api/publisher.rs
@@ -22,12 +22,8 @@ use std::{
use futures::Sink;
use tracing::error;
-use zenoh_core::{zread, Resolvable, Resolve, Wait};
-use zenoh_protocol::{
- core::{CongestionControl, Reliability},
- network::{push::ext, Push},
- zenoh::{Del, PushBody, Put},
-};
+use zenoh_core::{Resolvable, Resolve, Wait};
+use zenoh_protocol::core::CongestionControl;
use zenoh_result::{Error, ZResult};
#[cfg(feature = "unstable")]
use {
@@ -38,6 +34,7 @@ use {
std::{collections::HashSet, sync::Arc, sync::Mutex},
zenoh_config::wrappers::EntityGlobalId,
zenoh_protocol::core::EntityGlobalIdProto,
+ zenoh_protocol::core::Reliability,
};
use super::{
@@ -48,13 +45,10 @@ use super::{
bytes::ZBytes,
encoding::Encoding,
key_expr::KeyExpr,
- sample::{DataInfo, Locality, QoS, Sample, SampleFields, SampleKind},
+ sample::{Locality, Sample, SampleFields},
session::UndeclarableSealed,
};
-use crate::{
- api::{session::WeakSession, subscriber::SubscriberKind, Id},
- net::primitives::Primitives,
-};
+use crate::api::{session::WeakSession, Id};
pub(crate) struct PublisherState {
pub(crate) id: Id,
@@ -382,10 +376,17 @@ impl<'a> Sink for Publisher<'a> {
attachment,
..
} = item.into();
- self.resolve_put(
+ self.session.resolve_put(
+ &self.key_expr,
payload,
kind,
encoding,
+ self.congestion_control,
+ self.priority,
+ self.is_express,
+ self.destination,
+ #[cfg(feature = "unstable")]
+ self.reliability,
None,
#[cfg(feature = "unstable")]
SourceInfo::empty(),
@@ -404,95 +405,6 @@ impl<'a> Sink for Publisher<'a> {
}
}
-impl Publisher<'_> {
- #[allow(clippy::too_many_arguments)] // TODO fixme
- pub(crate) fn resolve_put(
- &self,
- payload: ZBytes,
- kind: SampleKind,
- encoding: Encoding,
- timestamp: Option,
- #[cfg(feature = "unstable")] source_info: SourceInfo,
- attachment: Option,
- ) -> ZResult<()> {
- tracing::trace!("write({:?}, [...])", &self.key_expr);
- let primitives = zread!(self.session.state).primitives()?;
- let timestamp = if timestamp.is_none() {
- self.session.runtime.new_timestamp()
- } else {
- timestamp
- };
- if self.destination != Locality::SessionLocal {
- primitives.send_push(
- Push {
- wire_expr: self.key_expr.to_wire(&self.session).to_owned(),
- ext_qos: ext::QoSType::new(
- self.priority.into(),
- self.congestion_control,
- self.is_express,
- ),
- ext_tstamp: None,
- ext_nodeid: ext::NodeIdType::DEFAULT,
- payload: match kind {
- SampleKind::Put => PushBody::Put(Put {
- timestamp,
- encoding: encoding.clone().into(),
- #[cfg(feature = "unstable")]
- ext_sinfo: source_info.into(),
- #[cfg(not(feature = "unstable"))]
- ext_sinfo: None,
- #[cfg(feature = "shared-memory")]
- ext_shm: None,
- ext_attachment: attachment.clone().map(|a| a.into()),
- ext_unknown: vec![],
- payload: payload.clone().into(),
- }),
- SampleKind::Delete => PushBody::Del(Del {
- timestamp,
- #[cfg(feature = "unstable")]
- ext_sinfo: source_info.into(),
- #[cfg(not(feature = "unstable"))]
- ext_sinfo: None,
- ext_attachment: attachment.clone().map(|a| a.into()),
- ext_unknown: vec![],
- }),
- },
- },
- #[cfg(feature = "unstable")]
- self.reliability,
- #[cfg(not(feature = "unstable"))]
- Reliability::DEFAULT,
- );
- }
- if self.destination != Locality::Remote {
- let data_info = DataInfo {
- kind,
- encoding: Some(encoding),
- timestamp,
- source_id: None,
- source_sn: None,
- qos: QoS::from(ext::QoSType::new(
- self.priority.into(),
- self.congestion_control,
- self.is_express,
- )),
- };
-
- self.session.execute_subscriber_callbacks(
- true,
- &self.key_expr.to_wire(&self.session),
- Some(data_info),
- payload.into(),
- SubscriberKind::Subscriber,
- #[cfg(feature = "unstable")]
- self.reliability,
- attachment,
- );
- }
- Ok(())
- }
-}
-
/// The Priority of zenoh messages.
#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)]
#[repr(u8)]
diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs
index 21fe8cc82a..4d98820468 100644
--- a/zenoh/src/api/session.rs
+++ b/zenoh/src/api/session.rs
@@ -52,6 +52,7 @@ use zenoh_protocol::{
UndeclareSubscriber,
},
interest::{InterestMode, InterestOptions},
+ push,
request::{self, ext::TargetType},
AtomicRequestId, DeclareFinal, Interest, Mapping, Push, Request, RequestId, Response,
ResponseFinal,
@@ -1913,6 +1914,96 @@ impl SessionInner {
}
}
+ #[allow(clippy::too_many_arguments)] // TODO fixme
+ pub(crate) fn resolve_put(
+ &self,
+ key_expr: &KeyExpr,
+ payload: ZBytes,
+ kind: SampleKind,
+ encoding: Encoding,
+ congestion_control: CongestionControl,
+ priority: Priority,
+ is_express: bool,
+ destination: Locality,
+ #[cfg(feature = "unstable")] reliability: Reliability,
+ timestamp: Option,
+ #[cfg(feature = "unstable")] source_info: SourceInfo,
+ attachment: Option,
+ ) -> ZResult<()> {
+ trace!("write({:?}, [...])", key_expr);
+ let primitives = zread!(self.state).primitives()?;
+ let timestamp = timestamp.or_else(|| self.runtime.new_timestamp());
+ let wire_expr = key_expr.to_wire(self);
+ if destination != Locality::SessionLocal {
+ primitives.send_push(
+ Push {
+ wire_expr: wire_expr.to_owned(),
+ ext_qos: push::ext::QoSType::new(
+ priority.into(),
+ congestion_control,
+ is_express,
+ ),
+ ext_tstamp: None,
+ ext_nodeid: push::ext::NodeIdType::DEFAULT,
+ payload: match kind {
+ SampleKind::Put => PushBody::Put(Put {
+ timestamp,
+ encoding: encoding.clone().into(),
+ #[cfg(feature = "unstable")]
+ ext_sinfo: source_info.into(),
+ #[cfg(not(feature = "unstable"))]
+ ext_sinfo: None,
+ #[cfg(feature = "shared-memory")]
+ ext_shm: None,
+ ext_attachment: attachment.clone().map(|a| a.into()),
+ ext_unknown: vec![],
+ payload: payload.clone().into(),
+ }),
+ SampleKind::Delete => PushBody::Del(Del {
+ timestamp,
+ #[cfg(feature = "unstable")]
+ ext_sinfo: source_info.into(),
+ #[cfg(not(feature = "unstable"))]
+ ext_sinfo: None,
+ ext_attachment: attachment.clone().map(|a| a.into()),
+ ext_unknown: vec![],
+ }),
+ },
+ },
+ #[cfg(feature = "unstable")]
+ reliability,
+ #[cfg(not(feature = "unstable"))]
+ Reliability::DEFAULT,
+ );
+ }
+ if destination != Locality::Remote {
+ let data_info = DataInfo {
+ kind,
+ encoding: Some(encoding),
+ timestamp,
+ source_id: None,
+ source_sn: None,
+ qos: QoS::from(push::ext::QoSType::new(
+ priority.into(),
+ congestion_control,
+ is_express,
+ )),
+ };
+
+ self.execute_subscriber_callbacks(
+ true,
+ &wire_expr,
+ Some(data_info),
+ payload.into(),
+ SubscriberKind::Subscriber,
+ #[cfg(feature = "unstable")]
+ reliability,
+ attachment,
+ );
+ }
+ Ok(())
+ }
+
#[allow(clippy::too_many_arguments)]
pub(crate) fn query(
self: &Arc,