Skip to content

Commit

Permalink
Merge pull request #1353 from eclipse-zenoh/dev/subscriber_reliability
Browse files Browse the repository at this point in the history
Remove subscriber reliability option
  • Loading branch information
Mallets authored Sep 11, 2024
2 parents efe4675 + 7273ab9 commit 36b97aa
Show file tree
Hide file tree
Showing 16 changed files with 117 additions and 283 deletions.
40 changes: 4 additions & 36 deletions commons/zenoh-codec/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use zenoh_buffers::{
ZBuf,
};
use zenoh_protocol::{
common::{iext, imsg, ZExtZ64},
common::{iext, imsg},
core::{ExprId, ExprLen, WireExpr},
network::{
declare::{self, common, keyexpr, queryable, subscriber, token, Declare, DeclareBody},
Expand Down Expand Up @@ -384,9 +384,6 @@ where
}
}

// SubscriberInfo
crate::impl_zextz64!(subscriber::ext::SubscriberInfo, subscriber::ext::Info::ID);

// DeclareSubscriber
impl<W> WCodec<&subscriber::DeclareSubscriber, &mut W> for Zenoh080
where
Expand All @@ -395,18 +392,10 @@ where
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &subscriber::DeclareSubscriber) -> Self::Output {
let subscriber::DeclareSubscriber {
id,
wire_expr,
ext_info,
} = x;
let subscriber::DeclareSubscriber { id, wire_expr } = x;

// Header
let mut header = declare::id::D_SUBSCRIBER;
let mut n_exts = (ext_info != &subscriber::ext::SubscriberInfo::DEFAULT) as u8;
if n_exts != 0 {
header |= subscriber::flag::Z;
}
if wire_expr.mapping != Mapping::DEFAULT {
header |= subscriber::flag::M;
}
Expand All @@ -420,10 +409,6 @@ where
self.write(&mut *writer, wire_expr)?;

// Extensions
if ext_info != &subscriber::ext::SubscriberInfo::DEFAULT {
n_exts -= 1;
self.write(&mut *writer, (*ext_info, n_exts != 0))?;
}

Ok(())
}
Expand Down Expand Up @@ -465,30 +450,13 @@ where
};

// Extensions
let mut ext_info = subscriber::ext::SubscriberInfo::DEFAULT;

let mut has_ext = imsg::has_flag(self.header, subscriber::flag::Z);
while has_ext {
let ext: u8 = self.codec.read(&mut *reader)?;
let eodec = Zenoh080Header::new(ext);
match iext::eid(ext) {
subscriber::ext::Info::ID => {
let (i, ext): (subscriber::ext::SubscriberInfo, bool) =
eodec.read(&mut *reader)?;
ext_info = i;
has_ext = ext;
}
_ => {
has_ext = extension::skip(reader, "DeclareSubscriber", ext)?;
}
}
has_ext = extension::skip(reader, "DeclareSubscriber", ext)?;
}

Ok(subscriber::DeclareSubscriber {
id,
wire_expr,
ext_info,
})
Ok(subscriber::DeclareSubscriber { id, wire_expr })
}
}

Expand Down
78 changes: 3 additions & 75 deletions commons/zenoh-protocol/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ pub use subscriber::*;
pub use token::*;

use crate::{
common::{imsg, ZExtZ64, ZExtZBuf},
core::{ExprId, Reliability, WireExpr},
common::{ZExtZ64, ZExtZBuf},
core::{ExprId, WireExpr},
network::Mapping,
zextz64, zextzbuf,
};
Expand Down Expand Up @@ -341,73 +341,6 @@ pub mod subscriber {
pub struct DeclareSubscriber {
pub id: SubscriberId,
pub wire_expr: WireExpr<'static>,
pub ext_info: ext::SubscriberInfo,
}

pub mod ext {
use super::*;

pub type Info = zextz64!(0x01, false);

/// # The subscription mode.
///
/// ```text
/// 7 6 5 4 3 2 1 0
/// +-+-+-+-+-+-+-+-+
/// |Z|0_1| ID |
/// +-+-+-+---------+
/// % reserved |R%
/// +---------------+
///
/// - if R==1 then the subscription is reliable, else it is best effort
/// - rsv: Reserved
/// ```
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SubscriberInfo {
pub reliability: Reliability,
}

impl SubscriberInfo {
pub const R: u64 = 1;

pub const DEFAULT: Self = Self {
reliability: Reliability::DEFAULT,
};

#[cfg(feature = "test")]
pub fn rand() -> Self {
let reliability = Reliability::rand();

Self { reliability }
}
}

impl Default for SubscriberInfo {
fn default() -> Self {
Self::DEFAULT
}
}

impl From<Info> for SubscriberInfo {
fn from(ext: Info) -> Self {
let reliability = if imsg::has_option(ext.value, SubscriberInfo::R) {
Reliability::Reliable
} else {
Reliability::BestEffort
};
Self { reliability }
}
}

impl From<SubscriberInfo> for Info {
fn from(ext: SubscriberInfo) -> Self {
let mut v: u64 = 0;
if ext.reliability == Reliability::Reliable {
v |= SubscriberInfo::R;
}
Info::new(v)
}
}
}

impl DeclareSubscriber {
Expand All @@ -418,13 +351,8 @@ pub mod subscriber {

let id: SubscriberId = rng.gen();
let wire_expr = WireExpr::rand();
let ext_info = ext::SubscriberInfo::rand();

Self {
id,
wire_expr,
ext_info,
}
Self { id, wire_expr }
}
}

Expand Down
65 changes: 39 additions & 26 deletions zenoh-ext/src/querying_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ use std::{
time::{Duration, SystemTime, UNIX_EPOCH},
};

#[cfg(feature = "unstable")]
use zenoh::pubsub::Reliability;
use zenoh::{
handlers::{locked, DefaultHandler, IntoHandler},
internal::zlock,
key_expr::KeyExpr,
prelude::Wait,
pubsub::{Reliability, Subscriber},
pubsub::Subscriber,
query::{QueryConsolidation, QueryTarget, ReplyKeyExpr, Selector},
sample::{Locality, Sample, SampleBuilder},
session::{SessionDeclarations, SessionRef},
Expand All @@ -41,7 +43,6 @@ pub struct QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler> {
pub(crate) session: SessionRef<'a>,
pub(crate) key_expr: ZResult<KeyExpr<'b>>,
pub(crate) key_space: KeySpace,
pub(crate) reliability: Reliability,
pub(crate) origin: Locality,
pub(crate) query_selector: Option<ZResult<Selector<'b>>>,
pub(crate) query_target: QueryTarget,
Expand All @@ -65,7 +66,6 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle
session,
key_expr,
key_space,
reliability,
origin,
query_selector,
query_target,
Expand All @@ -78,7 +78,6 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle
session,
key_expr,
key_space,
reliability,
origin,
query_selector,
query_target,
Expand Down Expand Up @@ -118,7 +117,6 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle
session,
key_expr,
key_space,
reliability,
origin,
query_selector,
query_target,
Expand All @@ -131,7 +129,6 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle
session,
key_expr,
key_space,
reliability,
origin,
query_selector,
query_target,
Expand All @@ -145,23 +142,35 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle

impl<'a, 'b, Handler> QueryingSubscriberBuilder<'a, 'b, crate::UserSpace, Handler> {
/// Change the subscription reliability.
#[inline]
#[cfg(feature = "unstable")]
#[deprecated(
since = "1.0.0",
note = "please use `reliability` on `declare_publisher` or `put`"
)]
#[allow(unused_mut, unused_variables)]
pub fn reliability(mut self, reliability: Reliability) -> Self {
self.reliability = reliability;
self
}

/// Change the subscription reliability to Reliable.
#[inline]
#[cfg(feature = "unstable")]
#[deprecated(
since = "1.0.0",
note = "please use `reliability` on `declare_publisher` or `put`"
)]
#[allow(unused_mut)]
pub fn reliable(mut self) -> Self {
self.reliability = Reliability::Reliable;
self
}

/// Change the subscription reliability to BestEffort.
#[inline]
#[cfg(feature = "unstable")]
#[deprecated(
since = "1.0.0",
note = "please use `reliability` on `declare_publisher` or `put`"
)]
#[allow(unused_mut)]
pub fn best_effort(mut self) -> Self {
self.reliability = Reliability::BestEffort;
self
}

Expand Down Expand Up @@ -249,7 +258,6 @@ where
session: self.session,
key_expr: Ok(key_expr.clone()),
key_space: self.key_space,
reliability: self.reliability,
origin: self.origin,
fetch: |cb| match key_space {
crate::KeySpace::User => match query_selector {
Expand Down Expand Up @@ -365,7 +373,6 @@ pub struct FetchingSubscriberBuilder<
pub(crate) session: SessionRef<'a>,
pub(crate) key_expr: ZResult<KeyExpr<'b>>,
pub(crate) key_space: KeySpace,
pub(crate) reliability: Reliability,
pub(crate) origin: Locality,
pub(crate) fetch: Fetch,
pub(crate) handler: Handler,
Expand All @@ -390,7 +397,6 @@ where
session: self.session,
key_expr: self.key_expr.map(|s| s.into_owned()),
key_space: self.key_space,
reliability: self.reliability,
origin: self.origin,
fetch: self.fetch,
handler: self.handler,
Expand Down Expand Up @@ -422,7 +428,6 @@ where
session,
key_expr,
key_space,
reliability,
origin,
fetch,
handler: _,
Expand All @@ -432,7 +437,6 @@ where
session,
key_expr,
key_space,
reliability,
origin,
fetch,
handler: callback,
Expand Down Expand Up @@ -476,7 +480,6 @@ where
session,
key_expr,
key_space,
reliability,
origin,
fetch,
handler: _,
Expand All @@ -486,7 +489,6 @@ where
session,
key_expr,
key_space,
reliability,
origin,
fetch,
handler,
Expand All @@ -506,23 +508,35 @@ where
TryIntoSample: ExtractSample,
{
/// Change the subscription reliability.
#[inline]
#[cfg(feature = "unstable")]
#[deprecated(
since = "1.0.0",
note = "please use `reliability` on `declare_publisher` or `put`"
)]
#[allow(unused_mut, unused_variables)]
pub fn reliability(mut self, reliability: Reliability) -> Self {
self.reliability = reliability;
self
}

/// Change the subscription reliability to Reliable.
#[inline]
#[cfg(feature = "unstable")]
#[deprecated(
since = "1.0.0",
note = "please use `reliability` on `declare_publisher` or `put`"
)]
#[allow(unused_mut)]
pub fn reliable(mut self) -> Self {
self.reliability = Reliability::Reliable;
self
}

/// Change the subscription reliability to BestEffort.
#[inline]
#[cfg(feature = "unstable")]
#[deprecated(
since = "1.0.0",
note = "please use `reliability` on `declare_publisher` or `put`"
)]
#[allow(unused_mut)]
pub fn best_effort(mut self) -> Self {
self.reliability = Reliability::BestEffort;
self
}

Expand Down Expand Up @@ -698,7 +712,6 @@ impl<'a, Handler> FetchingSubscriber<'a, Handler> {
.session
.declare_subscriber(&key_expr)
.callback(sub_callback)
.reliability(conf.reliability)
.allowed_origin(conf.origin)
.wait()?,
crate::KeySpace::Liveliness => conf
Expand Down
Loading

0 comments on commit 36b97aa

Please sign in to comment.