diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index baa4eb45f3..1eef911fd9 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -186,11 +186,17 @@ impl Resolvable for PublicationBuilder { impl Wait for PublicationBuilder, PublicationBuilderPut> { #[inline] fn wait(self) -> ::To { - let publisher = self.publisher.create_one_shot_publisher()?; - publisher.resolve_put( + self.publisher.session.0.resolve_put( + &self.publisher.key_expr?, self.kind.payload, SampleKind::Put, self.kind.encoding, + self.publisher.congestion_control, + self.publisher.priority, + self.publisher.is_express, + self.publisher.destination, + #[cfg(feature = "unstable")] + self.publisher.reliability, self.timestamp, #[cfg(feature = "unstable")] self.source_info, @@ -202,11 +208,17 @@ impl Wait for PublicationBuilder, PublicationBuilderPut impl Wait for PublicationBuilder, PublicationBuilderDelete> { #[inline] fn wait(self) -> ::To { - let publisher = self.publisher.create_one_shot_publisher()?; - publisher.resolve_put( + self.publisher.session.0.resolve_put( + &self.publisher.key_expr?, ZBytes::empty(), SampleKind::Delete, Encoding::ZENOH_BYTES, + self.publisher.congestion_control, + self.publisher.priority, + self.publisher.is_express, + self.publisher.destination, + #[cfg(feature = "unstable")] + self.publisher.reliability, self.timestamp, #[cfg(feature = "unstable")] self.source_info, @@ -329,25 +341,6 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { ..self } } - - // internal function for performing the publication - fn create_one_shot_publisher(self) -> ZResult> { - Ok(Publisher { - session: self.session.downgrade(), - id: 0, // This is a one shot Publisher - key_expr: self.key_expr?, - encoding: self.encoding, - congestion_control: self.congestion_control, - priority: self.priority, - is_express: self.is_express, - destination: self.destination, - #[cfg(feature = "unstable")] - reliability: self.reliability, - #[cfg(feature = "unstable")] - matching_listeners: Default::default(), - undeclare_on_drop: true, - }) - } } impl<'a, 'b> Resolvable for PublisherBuilder<'a, 'b> { @@ -420,10 +413,17 @@ impl<'a, 'b> IntoFuture for PublisherBuilder<'a, 'b> { impl Wait for PublicationBuilder<&Publisher<'_>, PublicationBuilderPut> { fn wait(self) -> ::To { - self.publisher.resolve_put( + self.publisher.session.resolve_put( + &self.publisher.key_expr, self.kind.payload, SampleKind::Put, self.kind.encoding, + self.publisher.congestion_control, + self.publisher.priority, + self.publisher.is_express, + self.publisher.destination, + #[cfg(feature = "unstable")] + self.publisher.reliability, self.timestamp, #[cfg(feature = "unstable")] self.source_info, @@ -434,10 +434,17 @@ impl Wait for PublicationBuilder<&Publisher<'_>, PublicationBuilderPut> { impl Wait for PublicationBuilder<&Publisher<'_>, PublicationBuilderDelete> { fn wait(self) -> ::To { - self.publisher.resolve_put( + self.publisher.session.resolve_put( + &self.publisher.key_expr, ZBytes::empty(), SampleKind::Delete, Encoding::ZENOH_BYTES, + self.publisher.congestion_control, + self.publisher.priority, + self.publisher.is_express, + self.publisher.destination, + #[cfg(feature = "unstable")] + self.publisher.reliability, self.timestamp, #[cfg(feature = "unstable")] self.source_info, diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index dfaf2deb2e..4870fac445 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -22,12 +22,8 @@ use std::{ use futures::Sink; use tracing::error; -use zenoh_core::{zread, Resolvable, Resolve, Wait}; -use zenoh_protocol::{ - core::{CongestionControl, Reliability}, - network::{push::ext, Push}, - zenoh::{Del, PushBody, Put}, -}; +use zenoh_core::{Resolvable, Resolve, Wait}; +use zenoh_protocol::core::CongestionControl; use zenoh_result::{Error, ZResult}; #[cfg(feature = "unstable")] use { @@ -38,6 +34,7 @@ use { std::{collections::HashSet, sync::Arc, sync::Mutex}, zenoh_config::wrappers::EntityGlobalId, zenoh_protocol::core::EntityGlobalIdProto, + zenoh_protocol::core::Reliability, }; use super::{ @@ -48,13 +45,10 @@ use super::{ bytes::ZBytes, encoding::Encoding, key_expr::KeyExpr, - sample::{DataInfo, Locality, QoS, Sample, SampleFields, SampleKind}, + sample::{Locality, Sample, SampleFields}, session::UndeclarableSealed, }; -use crate::{ - api::{session::WeakSession, subscriber::SubscriberKind, Id}, - net::primitives::Primitives, -}; +use crate::api::{session::WeakSession, Id}; pub(crate) struct PublisherState { pub(crate) id: Id, @@ -382,10 +376,17 @@ impl<'a> Sink for Publisher<'a> { attachment, .. } = item.into(); - self.resolve_put( + self.session.resolve_put( + &self.key_expr, payload, kind, encoding, + self.congestion_control, + self.priority, + self.is_express, + self.destination, + #[cfg(feature = "unstable")] + self.reliability, None, #[cfg(feature = "unstable")] SourceInfo::empty(), @@ -404,95 +405,6 @@ impl<'a> Sink for Publisher<'a> { } } -impl Publisher<'_> { - #[allow(clippy::too_many_arguments)] // TODO fixme - pub(crate) fn resolve_put( - &self, - payload: ZBytes, - kind: SampleKind, - encoding: Encoding, - timestamp: Option, - #[cfg(feature = "unstable")] source_info: SourceInfo, - attachment: Option, - ) -> ZResult<()> { - tracing::trace!("write({:?}, [...])", &self.key_expr); - let primitives = zread!(self.session.state).primitives()?; - let timestamp = if timestamp.is_none() { - self.session.runtime.new_timestamp() - } else { - timestamp - }; - if self.destination != Locality::SessionLocal { - primitives.send_push( - Push { - wire_expr: self.key_expr.to_wire(&self.session).to_owned(), - ext_qos: ext::QoSType::new( - self.priority.into(), - self.congestion_control, - self.is_express, - ), - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - payload: match kind { - SampleKind::Put => PushBody::Put(Put { - timestamp, - encoding: encoding.clone().into(), - #[cfg(feature = "unstable")] - ext_sinfo: source_info.into(), - #[cfg(not(feature = "unstable"))] - ext_sinfo: None, - #[cfg(feature = "shared-memory")] - ext_shm: None, - ext_attachment: attachment.clone().map(|a| a.into()), - ext_unknown: vec![], - payload: payload.clone().into(), - }), - SampleKind::Delete => PushBody::Del(Del { - timestamp, - #[cfg(feature = "unstable")] - ext_sinfo: source_info.into(), - #[cfg(not(feature = "unstable"))] - ext_sinfo: None, - ext_attachment: attachment.clone().map(|a| a.into()), - ext_unknown: vec![], - }), - }, - }, - #[cfg(feature = "unstable")] - self.reliability, - #[cfg(not(feature = "unstable"))] - Reliability::DEFAULT, - ); - } - if self.destination != Locality::Remote { - let data_info = DataInfo { - kind, - encoding: Some(encoding), - timestamp, - source_id: None, - source_sn: None, - qos: QoS::from(ext::QoSType::new( - self.priority.into(), - self.congestion_control, - self.is_express, - )), - }; - - self.session.execute_subscriber_callbacks( - true, - &self.key_expr.to_wire(&self.session), - Some(data_info), - payload.into(), - SubscriberKind::Subscriber, - #[cfg(feature = "unstable")] - self.reliability, - attachment, - ); - } - Ok(()) - } -} - /// The Priority of zenoh messages. #[derive(Debug, Default, Copy, Clone, PartialEq, Eq)] #[repr(u8)] diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 4dbad4e6b2..de5dff3dec 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -50,6 +50,7 @@ use zenoh_protocol::{ UndeclareSubscriber, }, interest::{InterestMode, InterestOptions}, + push, request::{self, ext::TargetType}, AtomicRequestId, DeclareFinal, Interest, Mapping, Push, Request, RequestId, Response, ResponseFinal, @@ -1878,6 +1879,96 @@ impl SessionInner { } } + #[allow(clippy::too_many_arguments)] // TODO fixme + pub(crate) fn resolve_put( + &self, + key_expr: &KeyExpr, + payload: ZBytes, + kind: SampleKind, + encoding: Encoding, + congestion_control: CongestionControl, + priority: Priority, + is_express: bool, + destination: Locality, + #[cfg(feature = "unstable")] reliability: Reliability, + timestamp: Option, + #[cfg(feature = "unstable")] source_info: SourceInfo, + attachment: Option, + ) -> ZResult<()> { + trace!("write({:?}, [...])", key_expr); + let primitives = zread!(self.state).primitives()?; + let timestamp = timestamp.or_else(|| self.runtime.new_timestamp()); + let wire_expr = key_expr.to_wire(self); + if destination != Locality::SessionLocal { + primitives.send_push( + Push { + wire_expr: wire_expr.to_owned(), + ext_qos: push::ext::QoSType::new( + priority.into(), + congestion_control, + is_express, + ), + ext_tstamp: None, + ext_nodeid: push::ext::NodeIdType::DEFAULT, + payload: match kind { + SampleKind::Put => PushBody::Put(Put { + timestamp, + encoding: encoding.clone().into(), + #[cfg(feature = "unstable")] + ext_sinfo: source_info.into(), + #[cfg(not(feature = "unstable"))] + ext_sinfo: None, + #[cfg(feature = "shared-memory")] + ext_shm: None, + ext_attachment: attachment.clone().map(|a| a.into()), + ext_unknown: vec![], + payload: payload.clone().into(), + }), + SampleKind::Delete => PushBody::Del(Del { + timestamp, + #[cfg(feature = "unstable")] + ext_sinfo: source_info.into(), + #[cfg(not(feature = "unstable"))] + ext_sinfo: None, + ext_attachment: attachment.clone().map(|a| a.into()), + ext_unknown: vec![], + }), + }, + }, + #[cfg(feature = "unstable")] + reliability, + #[cfg(not(feature = "unstable"))] + Reliability::DEFAULT, + ); + } + if destination != Locality::Remote { + let data_info = DataInfo { + kind, + encoding: Some(encoding), + timestamp, + source_id: None, + source_sn: None, + qos: QoS::from(push::ext::QoSType::new( + priority.into(), + congestion_control, + is_express, + )), + }; + + self.execute_subscriber_callbacks( + true, + &wire_expr, + Some(data_info), + payload.into(), + SubscriberKind::Subscriber, + #[cfg(feature = "unstable")] + reliability, + attachment, + ); + } + Ok(()) + } + #[allow(clippy::too_many_arguments)] pub(crate) fn query( self: &Arc,