Skip to content

Commit

Permalink
fix: remove Session::put onehost publisher (#1412)
Browse files Browse the repository at this point in the history
This publisher caused error when it was dropped.
  • Loading branch information
wyfo authored Sep 13, 2024
1 parent 937144a commit aa1f703
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 126 deletions.
57 changes: 32 additions & 25 deletions zenoh/src/api/builders/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,17 @@ impl<P, T> Resolvable for PublicationBuilder<P, T> {
impl Wait for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderPut> {
#[inline]
fn wait(self) -> <Self as Resolvable>::To {
let publisher = self.publisher.create_one_shot_publisher()?;
publisher.resolve_put(
self.publisher.session.0.resolve_put(
&self.publisher.key_expr?,
self.kind.payload,
SampleKind::Put,
self.kind.encoding,
self.publisher.congestion_control,
self.publisher.priority,
self.publisher.is_express,
self.publisher.destination,
#[cfg(feature = "unstable")]
self.publisher.reliability,
self.timestamp,
#[cfg(feature = "unstable")]
self.source_info,
Expand All @@ -202,11 +208,17 @@ impl Wait for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderPut
impl Wait for PublicationBuilder<PublisherBuilder<'_, '_>, PublicationBuilderDelete> {
#[inline]
fn wait(self) -> <Self as Resolvable>::To {
let publisher = self.publisher.create_one_shot_publisher()?;
publisher.resolve_put(
self.publisher.session.0.resolve_put(
&self.publisher.key_expr?,
ZBytes::empty(),
SampleKind::Delete,
Encoding::ZENOH_BYTES,
self.publisher.congestion_control,
self.publisher.priority,
self.publisher.is_express,
self.publisher.destination,
#[cfg(feature = "unstable")]
self.publisher.reliability,
self.timestamp,
#[cfg(feature = "unstable")]
self.source_info,
Expand Down Expand Up @@ -329,25 +341,6 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> {
..self
}
}

// internal function for performing the publication
fn create_one_shot_publisher(self) -> ZResult<Publisher<'b>> {
Ok(Publisher {
session: self.session.downgrade(),
id: 0, // This is a one shot Publisher
key_expr: self.key_expr?,
encoding: self.encoding,
congestion_control: self.congestion_control,
priority: self.priority,
is_express: self.is_express,
destination: self.destination,
#[cfg(feature = "unstable")]
reliability: self.reliability,
#[cfg(feature = "unstable")]
matching_listeners: Default::default(),
undeclare_on_drop: true,
})
}
}

impl<'a, 'b> Resolvable for PublisherBuilder<'a, 'b> {
Expand Down Expand Up @@ -420,10 +413,17 @@ impl<'a, 'b> IntoFuture for PublisherBuilder<'a, 'b> {

impl Wait for PublicationBuilder<&Publisher<'_>, PublicationBuilderPut> {
fn wait(self) -> <Self as Resolvable>::To {
self.publisher.resolve_put(
self.publisher.session.resolve_put(
&self.publisher.key_expr,
self.kind.payload,
SampleKind::Put,
self.kind.encoding,
self.publisher.congestion_control,
self.publisher.priority,
self.publisher.is_express,
self.publisher.destination,
#[cfg(feature = "unstable")]
self.publisher.reliability,
self.timestamp,
#[cfg(feature = "unstable")]
self.source_info,
Expand All @@ -434,10 +434,17 @@ impl Wait for PublicationBuilder<&Publisher<'_>, PublicationBuilderPut> {

impl Wait for PublicationBuilder<&Publisher<'_>, PublicationBuilderDelete> {
fn wait(self) -> <Self as Resolvable>::To {
self.publisher.resolve_put(
self.publisher.session.resolve_put(
&self.publisher.key_expr,
ZBytes::empty(),
SampleKind::Delete,
Encoding::ZENOH_BYTES,
self.publisher.congestion_control,
self.publisher.priority,
self.publisher.is_express,
self.publisher.destination,
#[cfg(feature = "unstable")]
self.publisher.reliability,
self.timestamp,
#[cfg(feature = "unstable")]
self.source_info,
Expand Down
114 changes: 13 additions & 101 deletions zenoh/src/api/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,8 @@ use std::{

use futures::Sink;
use tracing::error;
use zenoh_core::{zread, Resolvable, Resolve, Wait};
use zenoh_protocol::{
core::{CongestionControl, Reliability},
network::{push::ext, Push},
zenoh::{Del, PushBody, Put},
};
use zenoh_core::{Resolvable, Resolve, Wait};
use zenoh_protocol::core::CongestionControl;
use zenoh_result::{Error, ZResult};
#[cfg(feature = "unstable")]
use {
Expand All @@ -38,6 +34,7 @@ use {
std::{collections::HashSet, sync::Arc, sync::Mutex},
zenoh_config::wrappers::EntityGlobalId,
zenoh_protocol::core::EntityGlobalIdProto,
zenoh_protocol::core::Reliability,
};

use super::{
Expand All @@ -48,13 +45,10 @@ use super::{
bytes::ZBytes,
encoding::Encoding,
key_expr::KeyExpr,
sample::{DataInfo, Locality, QoS, Sample, SampleFields, SampleKind},
sample::{Locality, Sample, SampleFields},
session::UndeclarableSealed,
};
use crate::{
api::{session::WeakSession, subscriber::SubscriberKind, Id},
net::primitives::Primitives,
};
use crate::api::{session::WeakSession, Id};

pub(crate) struct PublisherState {
pub(crate) id: Id,
Expand Down Expand Up @@ -382,10 +376,17 @@ impl<'a> Sink<Sample> for Publisher<'a> {
attachment,
..
} = item.into();
self.resolve_put(
self.session.resolve_put(
&self.key_expr,
payload,
kind,
encoding,
self.congestion_control,
self.priority,
self.is_express,
self.destination,
#[cfg(feature = "unstable")]
self.reliability,
None,
#[cfg(feature = "unstable")]
SourceInfo::empty(),
Expand All @@ -404,95 +405,6 @@ impl<'a> Sink<Sample> for Publisher<'a> {
}
}

impl Publisher<'_> {
#[allow(clippy::too_many_arguments)] // TODO fixme
pub(crate) fn resolve_put(
&self,
payload: ZBytes,
kind: SampleKind,
encoding: Encoding,
timestamp: Option<uhlc::Timestamp>,
#[cfg(feature = "unstable")] source_info: SourceInfo,
attachment: Option<ZBytes>,
) -> ZResult<()> {
tracing::trace!("write({:?}, [...])", &self.key_expr);
let primitives = zread!(self.session.state).primitives()?;
let timestamp = if timestamp.is_none() {
self.session.runtime.new_timestamp()
} else {
timestamp
};
if self.destination != Locality::SessionLocal {
primitives.send_push(
Push {
wire_expr: self.key_expr.to_wire(&self.session).to_owned(),
ext_qos: ext::QoSType::new(
self.priority.into(),
self.congestion_control,
self.is_express,
),
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
payload: match kind {
SampleKind::Put => PushBody::Put(Put {
timestamp,
encoding: encoding.clone().into(),
#[cfg(feature = "unstable")]
ext_sinfo: source_info.into(),
#[cfg(not(feature = "unstable"))]
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
ext_attachment: attachment.clone().map(|a| a.into()),
ext_unknown: vec![],
payload: payload.clone().into(),
}),
SampleKind::Delete => PushBody::Del(Del {
timestamp,
#[cfg(feature = "unstable")]
ext_sinfo: source_info.into(),
#[cfg(not(feature = "unstable"))]
ext_sinfo: None,
ext_attachment: attachment.clone().map(|a| a.into()),
ext_unknown: vec![],
}),
},
},
#[cfg(feature = "unstable")]
self.reliability,
#[cfg(not(feature = "unstable"))]
Reliability::DEFAULT,
);
}
if self.destination != Locality::Remote {
let data_info = DataInfo {
kind,
encoding: Some(encoding),
timestamp,
source_id: None,
source_sn: None,
qos: QoS::from(ext::QoSType::new(
self.priority.into(),
self.congestion_control,
self.is_express,
)),
};

self.session.execute_subscriber_callbacks(
true,
&self.key_expr.to_wire(&self.session),
Some(data_info),
payload.into(),
SubscriberKind::Subscriber,
#[cfg(feature = "unstable")]
self.reliability,
attachment,
);
}
Ok(())
}
}

/// The Priority of zenoh messages.
#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)]
#[repr(u8)]
Expand Down
91 changes: 91 additions & 0 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use zenoh_protocol::{
UndeclareSubscriber,
},
interest::{InterestMode, InterestOptions},
push,
request::{self, ext::TargetType},
AtomicRequestId, DeclareFinal, Interest, Mapping, Push, Request, RequestId, Response,
ResponseFinal,
Expand Down Expand Up @@ -1913,6 +1914,96 @@ impl SessionInner {
}
}

#[allow(clippy::too_many_arguments)] // TODO fixme
pub(crate) fn resolve_put(
&self,
key_expr: &KeyExpr,
payload: ZBytes,
kind: SampleKind,
encoding: Encoding,
congestion_control: CongestionControl,
priority: Priority,
is_express: bool,
destination: Locality,
#[cfg(feature = "unstable")] reliability: Reliability,
timestamp: Option<uhlc::Timestamp>,
#[cfg(feature = "unstable")] source_info: SourceInfo,
attachment: Option<ZBytes>,
) -> ZResult<()> {
trace!("write({:?}, [...])", key_expr);
let primitives = zread!(self.state).primitives()?;
let timestamp = timestamp.or_else(|| self.runtime.new_timestamp());
let wire_expr = key_expr.to_wire(self);
if destination != Locality::SessionLocal {
primitives.send_push(
Push {
wire_expr: wire_expr.to_owned(),
ext_qos: push::ext::QoSType::new(
priority.into(),
congestion_control,
is_express,
),
ext_tstamp: None,
ext_nodeid: push::ext::NodeIdType::DEFAULT,
payload: match kind {
SampleKind::Put => PushBody::Put(Put {
timestamp,
encoding: encoding.clone().into(),
#[cfg(feature = "unstable")]
ext_sinfo: source_info.into(),
#[cfg(not(feature = "unstable"))]
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
ext_attachment: attachment.clone().map(|a| a.into()),
ext_unknown: vec![],
payload: payload.clone().into(),
}),
SampleKind::Delete => PushBody::Del(Del {
timestamp,
#[cfg(feature = "unstable")]
ext_sinfo: source_info.into(),
#[cfg(not(feature = "unstable"))]
ext_sinfo: None,
ext_attachment: attachment.clone().map(|a| a.into()),
ext_unknown: vec![],
}),
},
},
#[cfg(feature = "unstable")]
reliability,
#[cfg(not(feature = "unstable"))]
Reliability::DEFAULT,
);
}
if destination != Locality::Remote {
let data_info = DataInfo {
kind,
encoding: Some(encoding),
timestamp,
source_id: None,
source_sn: None,
qos: QoS::from(push::ext::QoSType::new(
priority.into(),
congestion_control,
is_express,
)),
};

self.execute_subscriber_callbacks(
true,
&wire_expr,
Some(data_info),
payload.into(),
SubscriberKind::Subscriber,
#[cfg(feature = "unstable")]
reliability,
attachment,
);
}
Ok(())
}

#[allow(clippy::too_many_arguments)]
pub(crate) fn query(
self: &Arc<Self>,
Expand Down

0 comments on commit aa1f703

Please sign in to comment.