Skip to content

Commit

Permalink
Start tagging features in Zenoh
Browse files Browse the repository at this point in the history
  • Loading branch information
p-avital committed Jan 29, 2024
1 parent 0943243 commit b7dcc29
Show file tree
Hide file tree
Showing 10 changed files with 46 additions and 1 deletion.
2 changes: 2 additions & 0 deletions commons/zenoh-protocol/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@ impl TryFrom<u8> for Priority {
}
}

/// The reliability request of a subscriber.
// tags{options.reliability}
#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)]
#[repr(u8)]
pub enum Reliability {
Expand Down
2 changes: 2 additions & 0 deletions commons/zenoh-protocol/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

// ignore_tagging
pub mod declare;
pub mod oam;
pub mod push;
Expand Down
2 changes: 2 additions & 0 deletions commons/zenoh-protocol/src/scouting/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

// ignore_tagging
pub mod hello;
pub mod scout;

Expand Down
2 changes: 2 additions & 0 deletions commons/zenoh-protocol/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

// ignore_tagging
pub mod close;
pub mod fragment;
pub mod frame;
Expand Down
2 changes: 2 additions & 0 deletions commons/zenoh-protocol/src/zenoh/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

// ignore_tagging
pub mod ack;
pub mod del;
pub mod err;
Expand Down
7 changes: 7 additions & 0 deletions zenoh/src/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use zenoh_protocol::core::{WhatAmI, ZenohId};
/// let zid = session.info().zid().res().await;
/// # })
/// ```
// tags{}
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
#[derive(Debug)]
pub struct ZidBuilder<'a> {
Expand Down Expand Up @@ -69,6 +70,7 @@ impl<'a> AsyncResolve for ZidBuilder<'a> {
/// while let Some(router_zid) = routers_zid.next() {}
/// # })
/// ```
// tags{}
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
#[derive(Debug)]
pub struct RoutersZidBuilder<'a> {
Expand Down Expand Up @@ -116,6 +118,7 @@ impl<'a> AsyncResolve for RoutersZidBuilder<'a> {
/// while let Some(peer_zid) = peers_zid.next() {}
/// # })
/// ```
// tags{}
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
#[derive(Debug)]
pub struct PeersZidBuilder<'a> {
Expand Down Expand Up @@ -162,6 +165,7 @@ impl<'a> AsyncResolve for PeersZidBuilder<'a> {
/// let zid = info.zid().res().await;
/// # })
/// ```
// tags{}
pub struct SessionInfo<'a> {
pub(crate) session: SessionRef<'a>,
}
Expand All @@ -178,6 +182,7 @@ impl SessionInfo<'_> {
/// let zid = session.info().zid().res().await;
/// # })
/// ```
// tags{session.zid}
pub fn zid(&self) -> ZidBuilder<'_> {
ZidBuilder {
session: self.session.clone(),
Expand All @@ -197,6 +202,7 @@ impl SessionInfo<'_> {
/// while let Some(router_zid) = routers_zid.next() {}
/// # })
/// ```
// tags{session.info.routers_zid}
pub fn routers_zid(&self) -> RoutersZidBuilder<'_> {
RoutersZidBuilder {
session: self.session.clone(),
Expand All @@ -215,6 +221,7 @@ impl SessionInfo<'_> {
/// while let Some(peer_zid) = peers_zid.next() {}
/// # })
/// ```
// tags{session.info.peers_zid}
pub fn peers_zid(&self) -> PeersZidBuilder<'_> {
PeersZidBuilder {
session: self.session.clone(),
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/net/routing/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ fn register_peer_subscription(
}
}

pub fn declare_peer_subscription(
pub(crate) fn declare_peer_subscription(
tables: &TablesLock,
rtables: RwLockReadGuard<Tables>,
face: &mut Arc<FaceState>,
Expand Down
1 change: 1 addition & 0 deletions zenoh/src/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use zenoh_protocol::core::Encoding;
pub type SourceSn = u64;

/// The locality of samples to be received by subscribers or targeted by publishers.
// tags{options.locality}
#[zenoh_macros::unstable]
#[derive(Clone, Copy, Debug, Default, Serialize, PartialEq, Eq)]
pub enum Locality {
Expand Down
16 changes: 16 additions & 0 deletions zenoh/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ impl Session {
/// }).await;
/// # })
/// ```
// tags{session.rc}
pub fn into_arc(self) -> Arc<Self> {
Arc::new(self)
}
Expand Down Expand Up @@ -421,16 +422,19 @@ impl Session {
/// }).await;
/// # })
/// ```
// tags{}
pub fn leak(s: Self) -> &'static mut Self {
Box::leak(Box::new(s))
}

/// Returns the identifier of the current session. `zid()` is a convenient shortcut.
/// See [`Session::info()`](`Session::info()`) and [`SessionInfo::zid()`](`SessionInfo::zid()`) for more details.
// tags{session.zid}
pub fn zid(&self) -> ZenohId {
self.info().zid().res_sync()
}

// tags{session.clock}
pub fn hlc(&self) -> Option<&HLC> {
self.runtime.hlc()
}
Expand All @@ -449,6 +453,7 @@ impl Session {
/// session.close().res().await.unwrap();
/// # })
/// ```
// tags{session.close}
pub fn close(self) -> impl Resolve<ZResult<()>> {
ResolveFuture::new(async move {
trace!("close()");
Expand All @@ -461,6 +466,7 @@ impl Session {
})
}

// tags{}
pub fn undeclare<'a, T, O>(&'a self, decl: T) -> O
where
O: Resolve<ZResult<()>>,
Expand Down Expand Up @@ -496,6 +502,7 @@ impl Session {
/// let _ = session.config().insert_json5("connect/endpoints", r#"["tcp/127.0.0.1/7447"]"#);
/// # })
/// ```
// tags{session.config{get, subscribe}}
pub fn config(&self) -> &Notifier<Config> {
self.runtime.config()
}
Expand All @@ -511,6 +518,7 @@ impl Session {
/// let info = session.info();
/// # })
/// ```
// tags{session.info}
pub fn info(&self) -> SessionInfo {
SessionInfo {
session: SessionRef::Borrow(self),
Expand All @@ -535,6 +543,7 @@ impl Session {
/// }
/// # })
/// ```
// tags{session.declare_subscriber}
pub fn declare_subscriber<'a, 'b, TryIntoKeyExpr>(
&'a self,
key_expr: TryIntoKeyExpr,
Expand Down Expand Up @@ -575,6 +584,7 @@ impl Session {
/// }
/// # })
/// ```
// tags{session.declare_queryable}
pub fn declare_queryable<'a, 'b, TryIntoKeyExpr>(
&'a self,
key_expr: TryIntoKeyExpr,
Expand Down Expand Up @@ -611,6 +621,7 @@ impl Session {
/// publisher.put("value").res().await.unwrap();
/// # })
/// ```
// tags{session.declare_publisher}
pub fn declare_publisher<'a, 'b, TryIntoKeyExpr>(
&'a self,
key_expr: TryIntoKeyExpr,
Expand Down Expand Up @@ -642,6 +653,7 @@ impl Session {
/// let key_expr = session.declare_keyexpr("key/expression").res().await.unwrap();
/// # })
/// ```
// tags{session.declare_keyexpr}
pub fn declare_keyexpr<'a, 'b: 'a, TryIntoKeyExpr>(
&'a self,
key_expr: TryIntoKeyExpr,
Expand Down Expand Up @@ -708,6 +720,7 @@ impl Session {
/// .unwrap();
/// # })
/// ```
// tags{session.put}
#[inline]
pub fn put<'a, 'b: 'a, TryIntoKeyExpr, IntoValue>(
&'a self,
Expand Down Expand Up @@ -743,6 +756,7 @@ impl Session {
/// session.delete("key/expression").res().await.unwrap();
/// # })
/// ```
// tags{session.delete}
#[inline]
pub fn delete<'a, 'b: 'a, TryIntoKeyExpr>(
&'a self,
Expand Down Expand Up @@ -781,6 +795,7 @@ impl Session {
/// }
/// # })
/// ```
// tags{session.get}
pub fn get<'a, 'b: 'a, IntoSelector>(
&'a self,
selector: IntoSelector,
Expand Down Expand Up @@ -825,6 +840,7 @@ impl Session {
/// .unwrap();
/// # })
/// ```
// tags{liveliness}
#[zenoh_macros::unstable]
pub fn liveliness(&self) -> Liveliness {
Liveliness {
Expand Down
11 changes: 11 additions & 0 deletions zenoh/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ impl<'a, 'b, Mode> SubscriberBuilder<'a, 'b, Mode, DefaultHandler> {
/// .unwrap();
/// # })
/// ```
// tags{session.declare_subscriber.cb}
#[inline]
pub fn callback<Callback>(self, callback: Callback) -> SubscriberBuilder<'a, 'b, Mode, Callback>
where
Expand Down Expand Up @@ -392,6 +393,7 @@ impl<'a, 'b, Mode> SubscriberBuilder<'a, 'b, Mode, DefaultHandler> {
/// .unwrap();
/// # })
/// ```
// tags{session.declare_subscriber.cb.locked}
#[inline]
pub fn callback_mut<CallbackMut>(
self,
Expand Down Expand Up @@ -422,6 +424,7 @@ impl<'a, 'b, Mode> SubscriberBuilder<'a, 'b, Mode, DefaultHandler> {
/// }
/// # })
/// ```
// tags{session.declare_subscriber.cb.generic}
#[inline]
pub fn with<Handler>(self, handler: Handler) -> SubscriberBuilder<'a, 'b, Mode, Handler>
where
Expand All @@ -447,6 +450,7 @@ impl<'a, 'b, Mode> SubscriberBuilder<'a, 'b, Mode, DefaultHandler> {
}
impl<'a, 'b, Mode, Handler> SubscriberBuilder<'a, 'b, Mode, Handler> {
/// Change the subscription reliability.
// tags{session.declare_subscriber.options.reliability}
#[inline]
pub fn reliability(mut self, reliability: Reliability) -> Self {
self.reliability = reliability;
Expand All @@ -469,6 +473,7 @@ impl<'a, 'b, Mode, Handler> SubscriberBuilder<'a, 'b, Mode, Handler> {

/// Restrict the matching publications that will be receive by this [`Subscriber`]
/// to the ones that have the given [`Locality`](crate::prelude::Locality).
// tags{session.declare_subscriber.options.locality}
#[zenoh_macros::unstable]
#[inline]
pub fn allowed_origin(mut self, origin: Locality) -> Self {
Expand All @@ -477,6 +482,7 @@ impl<'a, 'b, Mode, Handler> SubscriberBuilder<'a, 'b, Mode, Handler> {
}

/// Change the subscription mode to Pull.
// tags{session.declare_subscriber.options.pull}
#[inline]
pub fn pull_mode(self) -> SubscriberBuilder<'a, 'b, PullMode, Handler> {
let SubscriberBuilder {
Expand All @@ -498,6 +504,7 @@ impl<'a, 'b, Mode, Handler> SubscriberBuilder<'a, 'b, Mode, Handler> {
}

/// Change the subscription mode to Push.
// tags{session.declare_subscriber.options.push}
#[inline]
pub fn push_mode(self) -> SubscriberBuilder<'a, 'b, PushMode, Handler> {
let SubscriberBuilder {
Expand Down Expand Up @@ -725,6 +732,7 @@ impl<'a, Receiver> PullSubscriber<'a, Receiver> {
/// subscriber.pull();
/// # })
/// ```
// tags{session.declare_subscriber.pull}
#[inline]
pub fn pull(&self) -> impl Resolve<ZResult<()>> + '_ {
self.subscriber.pull()
Expand All @@ -749,6 +757,7 @@ impl<'a, Receiver> PullSubscriber<'a, Receiver> {
/// subscriber.undeclare().res().await.unwrap();
/// # })
/// ```
// tags{session.declare_subscriber.pull.undeclare}
#[inline]
pub fn undeclare(self) -> impl Resolve<ZResult<()>> + 'a {
self.subscriber.undeclare()
Expand All @@ -757,6 +766,7 @@ impl<'a, Receiver> PullSubscriber<'a, Receiver> {

impl<'a, Receiver> Subscriber<'a, Receiver> {
/// Returns the [`KeyExpr`] this Subscriber subscribes to.
// tags{session.declare_subscriber.push.key_expr}
pub fn key_expr(&self) -> &KeyExpr<'static> {
&self.subscriber.state.key_expr
}
Expand All @@ -779,6 +789,7 @@ impl<'a, Receiver> Subscriber<'a, Receiver> {
/// subscriber.undeclare().res().await.unwrap();
/// # })
/// ```
// tags{session.declare_subscriber.push.undeclare}
#[inline]
pub fn undeclare(self) -> SubscriberUndeclaration<'a> {
self.subscriber.undeclare()
Expand Down

0 comments on commit b7dcc29

Please sign in to comment.