From 1b86de37c16d7d3cd472eba483cc256ee87b8b3d Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Fri, 26 Jan 2024 20:30:05 +0100 Subject: [PATCH 1/6] complete option added, SessionRef refactor --- zenoh-ext/examples/z_pub_cache.rs | 11 ++-- zenoh-ext/src/lib.rs | 2 +- zenoh-ext/src/publication_cache.rs | 51 ++++++++------- zenoh-ext/src/querying_subscriber.rs | 41 ++++-------- zenoh-ext/src/session_ext.rs | 68 +++---------------- zenoh/src/session.rs | 97 ++++++++++++++++++++-------- 6 files changed, 127 insertions(+), 143 deletions(-) diff --git a/zenoh-ext/examples/z_pub_cache.rs b/zenoh-ext/examples/z_pub_cache.rs index 74f2ada1f8..aee9170705 100644 --- a/zenoh-ext/examples/z_pub_cache.rs +++ b/zenoh-ext/examples/z_pub_cache.rs @@ -23,7 +23,7 @@ async fn main() { // Initiate logging env_logger::init(); - let (config, key_expr, value, history, prefix) = parse_args(); + let (config, key_expr, value, history, prefix, complete) = parse_args(); println!("Opening session..."); let session = zenoh::open(config).res().await.unwrap(); @@ -31,7 +31,8 @@ async fn main() { println!("Declaring PublicationCache on {}", &key_expr); let mut publication_cache_builder = session .declare_publication_cache(&key_expr) - .history(history); + .history(history) + .queryable_complete(complete); if let Some(prefix) = prefix { publication_cache_builder = publication_cache_builder.queryable_prefix(prefix); } @@ -45,7 +46,7 @@ async fn main() { } } -fn parse_args() -> (Config, String, String, usize, Option) { +fn parse_args() -> (Config, String, String, usize, Option, bool) { let args = Command::new("zenoh-ext pub cache example") .arg( arg!(-m --mode [MODE] "The zenoh session mode (peer by default)") @@ -64,6 +65,7 @@ fn parse_args() -> (Config, String, String, usize, Option) { ) .arg(arg!(-x --prefix [STRING] "An optional queryable prefix")) .arg(arg!(-c --config [FILE] "A configuration file.")) + // .arg(arg!(-o --complete "Set `complete` option to true. This means that this queryable is ulitmate data source, no need to scan other queryables.")) .arg(arg!(--"no-multicast-scouting" "Disable the multicast-based scouting mechanism.")) .get_matches(); @@ -101,6 +103,7 @@ fn parse_args() -> (Config, String, String, usize, Option) { let value = args.get_one::("value").unwrap().to_string(); let history: usize = args.get_one::("history").unwrap().parse().unwrap(); let prefix = args.get_one::("prefix").map(|s| (*s).to_owned()); + let complete = args.get_flag("complete"); - (config, key_expr, value, history, prefix) + (config, key_expr, value, history, prefix, complete) } diff --git a/zenoh-ext/src/lib.rs b/zenoh-ext/src/lib.rs index dca488ba80..7440d80a53 100644 --- a/zenoh-ext/src/lib.rs +++ b/zenoh-ext/src/lib.rs @@ -20,7 +20,7 @@ pub use publication_cache::{PublicationCache, PublicationCacheBuilder}; pub use querying_subscriber::{ FetchingSubscriber, FetchingSubscriberBuilder, QueryingSubscriberBuilder, }; -pub use session_ext::{ArcSessionExt, SessionExt}; +pub use session_ext::SessionExt; pub use subscriber_ext::SubscriberBuilderExt; pub use subscriber_ext::SubscriberForward; diff --git a/zenoh-ext/src/publication_cache.rs b/zenoh-ext/src/publication_cache.rs index 7ae440b02c..cd5ed964ad 100644 --- a/zenoh-ext/src/publication_cache.rs +++ b/zenoh-ext/src/publication_cache.rs @@ -32,7 +32,8 @@ pub struct PublicationCacheBuilder<'a, 'b, 'c> { session: SessionRef<'a>, pub_key_expr: ZResult>, queryable_prefix: Option>>, - queryable_origin: Locality, + queryable_origin: Option, + complete: Option, history: usize, resources_limit: Option, } @@ -46,7 +47,8 @@ impl<'a, 'b, 'c> PublicationCacheBuilder<'a, 'b, 'c> { session, pub_key_expr, queryable_prefix: None, - queryable_origin: Locality::default(), + queryable_origin: None, + complete: None, history: 1, resources_limit: None, } @@ -67,7 +69,13 @@ impl<'a, 'b, 'c> PublicationCacheBuilder<'a, 'b, 'c> { #[zenoh_macros::unstable] #[inline] pub fn queryable_allowed_origin(mut self, origin: Locality) -> Self { - self.queryable_origin = origin; + self.queryable_origin = Some(origin); + self + } + + /// Set completeness option for the queryable. + pub fn queryable_complete(mut self, complete: bool) -> Self { + self.complete = Some(complete); self } @@ -137,28 +145,21 @@ impl<'a> PublicationCache<'a> { } // declare the local subscriber that will store the local publications - let (local_sub, queryable) = match conf.session.clone() { - SessionRef::Borrow(session) => ( - session - .declare_subscriber(&key_expr) - .allowed_origin(Locality::SessionLocal) - .res_sync()?, - session - .declare_queryable(&queryable_key_expr) - .allowed_origin(conf.queryable_origin) - .res_sync()?, - ), - SessionRef::Shared(session) => ( - session - .declare_subscriber(&key_expr) - .allowed_origin(Locality::SessionLocal) - .res_sync()?, - session - .declare_queryable(&queryable_key_expr) - .allowed_origin(conf.queryable_origin) - .res_sync()?, - ), - }; + let local_sub = conf + .session + .declare_subscriber(&key_expr) + .allowed_origin(Locality::SessionLocal) + .res_sync()?; + + // declare the queryable which returns the cached publications + let mut queryable = conf.session.declare_queryable(&queryable_key_expr); + if let Some(origin) = conf.queryable_origin { + queryable = queryable.allowed_origin(origin); + } + if let Some(complete) = conf.complete { + queryable = queryable.complete(complete); + } + let queryable = queryable.res_sync()?; // take local ownership of stuff to be moved into task let sub_recv = local_sub.receiver.clone(); diff --git a/zenoh-ext/src/querying_subscriber.rs b/zenoh-ext/src/querying_subscriber.rs index 1083c111c4..4a7c4f2ded 100644 --- a/zenoh-ext/src/querying_subscriber.rs +++ b/zenoh-ext/src/querying_subscriber.rs @@ -680,33 +680,20 @@ impl<'a, Receiver> FetchingSubscriber<'a, Receiver> { // register fetch handler let handler = register_handler(state.clone(), callback.clone()); // declare subscriber - let subscriber = match conf.session.clone() { - SessionRef::Borrow(session) => match conf.key_space.into() { - crate::KeySpace::User => session - .declare_subscriber(&key_expr) - .callback(sub_callback) - .reliability(conf.reliability) - .allowed_origin(conf.origin) - .res_sync()?, - crate::KeySpace::Liveliness => session - .liveliness() - .declare_subscriber(&key_expr) - .callback(sub_callback) - .res_sync()?, - }, - SessionRef::Shared(session) => match conf.key_space.into() { - crate::KeySpace::User => session - .declare_subscriber(&key_expr) - .callback(sub_callback) - .reliability(conf.reliability) - .allowed_origin(conf.origin) - .res_sync()?, - crate::KeySpace::Liveliness => session - .liveliness() - .declare_subscriber(&key_expr) - .callback(sub_callback) - .res_sync()?, - }, + let subscriber = match conf.key_space.into() { + crate::KeySpace::User => conf + .session + .declare_subscriber(&key_expr) + .callback(sub_callback) + .reliability(conf.reliability) + .allowed_origin(conf.origin) + .res_sync()?, + crate::KeySpace::Liveliness => conf + .session + .liveliness() + .declare_subscriber(&key_expr) + .callback(sub_callback) + .res_sync()?, }; let fetch_subscriber = FetchingSubscriber { diff --git a/zenoh-ext/src/session_ext.rs b/zenoh-ext/src/session_ext.rs index 3f9a428293..4c2bff93db 100644 --- a/zenoh-ext/src/session_ext.rs +++ b/zenoh-ext/src/session_ext.rs @@ -13,27 +13,22 @@ // use super::PublicationCacheBuilder; use std::convert::TryInto; -use std::sync::Arc; use zenoh::prelude::KeyExpr; use zenoh::{Session, SessionRef}; /// Some extensions to the [`zenoh::Session`](zenoh::Session) -pub trait SessionExt { - type PublicationCacheBuilder<'a, 'b, 'c> - where - Self: 'a; - fn declare_publication_cache<'a, 'b, 'c, TryIntoKeyExpr>( +pub trait SessionExt<'a> { + fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>( &'a self, pub_key_expr: TryIntoKeyExpr, - ) -> Self::PublicationCacheBuilder<'a, 'b, 'c> + ) -> PublicationCacheBuilder<'a, 'b, 'c> where TryIntoKeyExpr: TryInto>, >>::Error: Into; } -impl SessionExt for Session { - type PublicationCacheBuilder<'a, 'b, 'c> = PublicationCacheBuilder<'a, 'b, 'c>; - fn declare_publication_cache<'a, 'b, 'c, TryIntoKeyExpr>( +impl<'a> SessionExt<'a> for SessionRef<'a> { + fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>( &'a self, pub_key_expr: TryIntoKeyExpr, ) -> PublicationCacheBuilder<'a, 'b, 'c> @@ -41,64 +36,21 @@ impl SessionExt for Session { TryIntoKeyExpr: TryInto>, >>::Error: Into, { - PublicationCacheBuilder::new( - SessionRef::Borrow(self), - pub_key_expr.try_into().map_err(Into::into), - ) + PublicationCacheBuilder::new(self.clone(), pub_key_expr.try_into().map_err(Into::into)) } } -impl SessionExt for T { - type PublicationCacheBuilder<'a, 'b, 'c> = PublicationCacheBuilder<'static, 'b, 'c>; - fn declare_publication_cache<'a, 'b, 'c, TryIntoKeyExpr>( - &'a self, - pub_key_expr: TryIntoKeyExpr, - ) -> Self::PublicationCacheBuilder<'a, 'b, 'c> - where - TryIntoKeyExpr: TryInto>, - >>::Error: Into, - { - ArcSessionExt::declare_publication_cache(self, pub_key_expr) - } -} - -pub trait ArcSessionExt { +impl<'a> SessionExt<'a> for Session { fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>( - &self, - pub_key_expr: TryIntoKeyExpr, - ) -> PublicationCacheBuilder<'static, 'b, 'c> - where - TryIntoKeyExpr: TryInto>, - >>::Error: Into; -} - -impl ArcSessionExt for Arc { - /// Examples: - /// ``` - /// # async_std::task::block_on(async { - /// use zenoh::prelude::r#async::*; - /// use zenoh::config::ModeDependentValue::Unique; - /// use zenoh_ext::ArcSessionExt; - /// - /// let mut config = config::default(); - /// config.timestamping.set_enabled(Some(Unique(true))); - /// let session = zenoh::open(config).res().await.unwrap().into_arc(); - /// let publication_cache = session.declare_publication_cache("key/expression").res().await.unwrap(); - /// async_std::task::spawn(async move { - /// publication_cache.key_expr(); - /// }).await; - /// # }) - /// ``` - fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>( - &self, + &'a self, pub_key_expr: TryIntoKeyExpr, - ) -> PublicationCacheBuilder<'static, 'b, 'c> + ) -> PublicationCacheBuilder<'a, 'b, 'c> where TryIntoKeyExpr: TryInto>, >>::Error: Into, { PublicationCacheBuilder::new( - SessionRef::Shared(self.clone()), + SessionRef::Borrow(self), pub_key_expr.try_into().map_err(Into::into), ) } diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 7897b293f9..763b5d0e19 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -284,6 +284,70 @@ pub enum SessionRef<'a> { Shared(Arc), } +impl<'a> SessionRef<'a> { + pub fn declare_subscriber<'b, TryIntoKeyExpr>( + &self, + key_expr: TryIntoKeyExpr, + ) -> SubscriberBuilder<'a, 'b, PushMode, DefaultHandler> + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into, + { + SubscriberBuilder { + session: self.clone(), + key_expr: TryIntoKeyExpr::try_into(key_expr).map_err(Into::into), + reliability: Reliability::default(), + mode: PushMode, + origin: Locality::default(), + handler: DefaultHandler, + } + } + + pub fn declare_queryable<'b, TryIntoKeyExpr>( + &self, + key_expr: TryIntoKeyExpr, + ) -> QueryableBuilder<'a, 'b, DefaultHandler> + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into, + { + QueryableBuilder { + session: self.clone(), + key_expr: key_expr.try_into().map_err(Into::into), + complete: false, + origin: Locality::default(), + handler: DefaultHandler, + } + } + pub fn declare_publisher<'b, TryIntoKeyExpr>( + &self, + key_expr: TryIntoKeyExpr, + ) -> PublisherBuilder<'a, 'b> + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into, + { + PublisherBuilder { + session: self.clone(), + key_expr: key_expr.try_into().map_err(Into::into), + congestion_control: CongestionControl::default(), + priority: Priority::default(), + destination: Locality::default(), + } + } + #[zenoh_macros::unstable] + pub fn liveliness(&self) -> Liveliness<'a> { + Liveliness { + session: self.clone(), + } + } + pub fn info(&self) -> SessionInfo<'a> { + SessionInfo { + session: self.clone(), + } + } +} + impl Deref for SessionRef<'_> { type Target = Session; @@ -512,9 +576,7 @@ impl Session { /// # }) /// ``` pub fn info(&self) -> SessionInfo { - SessionInfo { - session: SessionRef::Borrow(self), - } + SessionRef::Borrow(self).info() } /// Create a [`Subscriber`](Subscriber) for the given key expression. @@ -543,14 +605,7 @@ impl Session { TryIntoKeyExpr: TryInto>, >>::Error: Into, { - SubscriberBuilder { - session: SessionRef::Borrow(self), - key_expr: TryIntoKeyExpr::try_into(key_expr).map_err(Into::into), - reliability: Reliability::default(), - mode: PushMode, - origin: Locality::default(), - handler: DefaultHandler, - } + SessionRef::Borrow(self).declare_subscriber(key_expr) } /// Create a [`Queryable`](Queryable) for the given key expression. @@ -583,13 +638,7 @@ impl Session { TryIntoKeyExpr: TryInto>, >>::Error: Into, { - QueryableBuilder { - session: SessionRef::Borrow(self), - key_expr: key_expr.try_into().map_err(Into::into), - complete: false, - origin: Locality::default(), - handler: DefaultHandler, - } + SessionRef::Borrow(self).declare_queryable(key_expr) } /// Create a [`Publisher`](crate::publication::Publisher) for the given key expression. @@ -619,13 +668,7 @@ impl Session { TryIntoKeyExpr: TryInto>, >>::Error: Into, { - PublisherBuilder { - session: SessionRef::Borrow(self), - key_expr: key_expr.try_into().map_err(Into::into), - congestion_control: CongestionControl::default(), - priority: Priority::default(), - destination: Locality::default(), - } + SessionRef::Borrow(self).declare_publisher(key_expr) } /// Informs Zenoh that you intend to use `key_expr` multiple times and that it should optimize its transmission. @@ -827,9 +870,7 @@ impl Session { /// ``` #[zenoh_macros::unstable] pub fn liveliness(&self) -> Liveliness { - Liveliness { - session: SessionRef::Borrow(self), - } + SessionRef::Borrow(self).liveliness() } } From 1d743e87238d39d94b4631a44c5b80edac0c1ecc Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Fri, 26 Jan 2024 23:02:03 +0100 Subject: [PATCH 2/6] command lipe parse crash fix --- zenoh-ext/examples/z_pub_cache.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zenoh-ext/examples/z_pub_cache.rs b/zenoh-ext/examples/z_pub_cache.rs index aee9170705..882764b8f9 100644 --- a/zenoh-ext/examples/z_pub_cache.rs +++ b/zenoh-ext/examples/z_pub_cache.rs @@ -60,12 +60,12 @@ fn parse_args() -> (Config, String, String, usize, Option, bool) { ) .arg(arg!(-v --value [VALUE] "The value to publish.").default_value("Pub from Rust!")) .arg( - arg!(-h --history [SIZE] "The number of publications to keep in cache") + arg!(-i --history [SIZE] "The number of publications to keep in cache") .default_value("1"), ) .arg(arg!(-x --prefix [STRING] "An optional queryable prefix")) .arg(arg!(-c --config [FILE] "A configuration file.")) - // .arg(arg!(-o --complete "Set `complete` option to true. This means that this queryable is ulitmate data source, no need to scan other queryables.")) + .arg(arg!(-o --complete "Set `complete` option to true. This means that this queryable is ulitmate data source, no need to scan other queryables.")) .arg(arg!(--"no-multicast-scouting" "Disable the multicast-based scouting mechanism.")) .get_matches(); From ec1c22a5eb224883484dcaffe52dc2e41ebfd00a Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Wed, 31 Jan 2024 00:49:09 +0100 Subject: [PATCH 3/6] parametrized SessionDeclarations --- zenoh-ext/src/session_ext.rs | 33 ++++ zenoh/src/session.rs | 302 ++++++++++++++++++----------------- 2 files changed, 188 insertions(+), 147 deletions(-) diff --git a/zenoh-ext/src/session_ext.rs b/zenoh-ext/src/session_ext.rs index 4c2bff93db..417023a6f6 100644 --- a/zenoh-ext/src/session_ext.rs +++ b/zenoh-ext/src/session_ext.rs @@ -13,6 +13,7 @@ // use super::PublicationCacheBuilder; use std::convert::TryInto; +use std::sync::Arc; use zenoh::prelude::KeyExpr; use zenoh::{Session, SessionRef}; @@ -55,3 +56,35 @@ impl<'a> SessionExt<'a> for Session { ) } } + +impl SessionExt<'static> for Arc { + /// Examples: + /// ``` + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// use zenoh::config::ModeDependentValue::Unique; + /// use zenoh_ext::SessionExt; + /// + /// let mut config = config::default(); + /// config.timestamping.set_enabled(Some(Unique(true))); + /// let session = zenoh::open(config).res().await.unwrap().into_arc(); + /// let publication_cache = session.declare_publication_cache("key/expression").res().await.unwrap(); + /// async_std::task::spawn(async move { + /// publication_cache.key_expr(); + /// }).await; + /// # }) + /// ``` + fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>( + &self, + pub_key_expr: TryIntoKeyExpr, + ) -> PublicationCacheBuilder<'static, 'b, 'c> + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into, + { + PublicationCacheBuilder::new( + SessionRef::Shared(self.clone()), + pub_key_expr.try_into().map_err(Into::into), + ) + } +} diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 763b5d0e19..0e41d95d20 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -278,14 +278,156 @@ impl Resource { } } +pub trait SessionDeclarations<'s> { + /// Create a [`Subscriber`](crate::subscriber::Subscriber) for the given key expression. + /// + /// # Arguments + /// + /// * `key_expr` - The resourkey expression to subscribe to + /// + /// # Examples + /// ```no_run + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); + /// let subscriber = session.declare_subscriber("key/expression") + /// .res() + /// .await + /// .unwrap(); + /// async_std::task::spawn(async move { + /// while let Ok(sample) = subscriber.recv_async().await { + /// println!("Received: {:?}", sample); + /// } + /// }).await; + /// # }) + /// ``` + fn declare_subscriber<'a, TryIntoKeyExpr>( + &self, + key_expr: TryIntoKeyExpr, + ) -> SubscriberBuilder<'s, 'a, PushMode, DefaultHandler> + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into; + + /// Create a [`Queryable`](crate::queryable::Queryable) for the given key expression. + /// + /// # Arguments + /// + /// * `key_expr` - The key expression matching the queries the + /// [`Queryable`](crate::queryable::Queryable) will reply to + /// + /// # Examples + /// ```no_run + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); + /// let queryable = session.declare_queryable("key/expression") + /// .res() + /// .await + /// .unwrap(); + /// async_std::task::spawn(async move { + /// while let Ok(query) = queryable.recv_async().await { + /// query.reply(Ok(Sample::try_from( + /// "key/expression", + /// "value", + /// ).unwrap())).res().await.unwrap(); + /// } + /// }).await; + /// # }) + /// ``` + fn declare_queryable<'a, TryIntoKeyExpr>( + &self, + key_expr: TryIntoKeyExpr, + ) -> QueryableBuilder<'s, 'a, DefaultHandler> + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into; + + /// Create a [`Publisher`](crate::publication::Publisher) for the given key expression. + /// + /// # Arguments + /// + /// * `key_expr` - The key expression matching resources to write + /// + /// # Examples + /// ``` + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); + /// let publisher = session.declare_publisher("key/expression") + /// .res() + /// .await + /// .unwrap(); + /// publisher.put("value").res().await.unwrap(); + /// # }) + /// ``` + fn declare_publisher<'a, TryIntoKeyExpr>( + &self, + key_expr: TryIntoKeyExpr, + ) -> PublisherBuilder<'s, 'a> + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into; + + /// Obtain a [`Liveliness`] struct tied to this Zenoh [`Session`]. + /// + /// # Examples + /// ``` + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); + /// let liveliness = session + /// .liveliness() + /// .declare_token("key/expression") + /// .res() + /// .await + /// .unwrap(); + /// # }) + /// ``` + #[zenoh_macros::unstable] + fn liveliness(&self) -> Liveliness<'s>; + + fn info(&self) -> SessionInfo<'s>; +} + #[derive(Clone)] pub enum SessionRef<'a> { Borrow(&'a Session), Shared(Arc), } -impl<'a> SessionRef<'a> { - pub fn declare_subscriber<'b, TryIntoKeyExpr>( +/// Functions to create zenoh entities with `'static` lifetime. +/// +/// This trait contains functions to create zenoh entities like +/// [`Subscriber`](crate::subscriber::Subscriber), and +/// [`Queryable`](crate::queryable::Queryable) with a `'static` lifetime. +/// This is useful to move zenoh entities to several threads and tasks. +/// +/// This trait is implemented for `Arc`. +/// +/// # Examples +/// ```no_run +/// # async_std::task::block_on(async { +/// use zenoh::prelude::r#async::*; +/// +/// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); +/// let subscriber = session.declare_subscriber("key/expression") +/// .res() +/// .await +/// .unwrap(); +/// async_std::task::spawn(async move { +/// while let Ok(sample) = subscriber.recv_async().await { +/// println!("Received: {:?}", sample); +/// } +/// }).await; +/// # }) +/// ``` +impl<'a> SessionDeclarations<'a> for SessionRef<'a> { + fn declare_subscriber<'b, TryIntoKeyExpr>( &self, key_expr: TryIntoKeyExpr, ) -> SubscriberBuilder<'a, 'b, PushMode, DefaultHandler> @@ -303,7 +445,7 @@ impl<'a> SessionRef<'a> { } } - pub fn declare_queryable<'b, TryIntoKeyExpr>( + fn declare_queryable<'b, TryIntoKeyExpr>( &self, key_expr: TryIntoKeyExpr, ) -> QueryableBuilder<'a, 'b, DefaultHandler> @@ -319,7 +461,7 @@ impl<'a> SessionRef<'a> { handler: DefaultHandler, } } - pub fn declare_publisher<'b, TryIntoKeyExpr>( + fn declare_publisher<'b, TryIntoKeyExpr>( &self, key_expr: TryIntoKeyExpr, ) -> PublisherBuilder<'a, 'b> @@ -336,12 +478,12 @@ impl<'a> SessionRef<'a> { } } #[zenoh_macros::unstable] - pub fn liveliness(&self) -> Liveliness<'a> { + fn liveliness(&self) -> Liveliness<'a> { Liveliness { session: self.clone(), } } - pub fn info(&self) -> SessionInfo<'a> { + fn info(&self) -> SessionInfo<'a> { SessionInfo { session: self.clone(), } @@ -2014,7 +2156,7 @@ impl Session { } } -impl SessionDeclarations for Arc { +impl SessionDeclarations<'static> for Arc { /// Create a [`Subscriber`](Subscriber) for the given key expression. /// /// # Arguments @@ -2158,6 +2300,12 @@ impl SessionDeclarations for Arc { session: SessionRef::Shared(self.clone()), } } + + fn info(&self) -> SessionInfo<'static> { + SessionInfo { + session: SessionRef::Shared(self.clone()), + } + } } impl Primitives for Session { @@ -2576,143 +2724,3 @@ impl fmt::Debug for Session { f.debug_struct("Session").field("id", &self.zid()).finish() } } - -/// Functions to create zenoh entities with `'static` lifetime. -/// -/// This trait contains functions to create zenoh entities like -/// [`Subscriber`](crate::subscriber::Subscriber), and -/// [`Queryable`](crate::queryable::Queryable) with a `'static` lifetime. -/// This is useful to move zenoh entities to several threads and tasks. -/// -/// This trait is implemented for `Arc`. -/// -/// # Examples -/// ```no_run -/// # async_std::task::block_on(async { -/// use zenoh::prelude::r#async::*; -/// -/// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); -/// let subscriber = session.declare_subscriber("key/expression") -/// .res() -/// .await -/// .unwrap(); -/// async_std::task::spawn(async move { -/// while let Ok(sample) = subscriber.recv_async().await { -/// println!("Received: {:?}", sample); -/// } -/// }).await; -/// # }) -/// ``` -pub trait SessionDeclarations { - /// Create a [`Subscriber`](crate::subscriber::Subscriber) for the given key expression. - /// - /// # Arguments - /// - /// * `key_expr` - The resourkey expression to subscribe to - /// - /// # Examples - /// ```no_run - /// # async_std::task::block_on(async { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); - /// let subscriber = session.declare_subscriber("key/expression") - /// .res() - /// .await - /// .unwrap(); - /// async_std::task::spawn(async move { - /// while let Ok(sample) = subscriber.recv_async().await { - /// println!("Received: {:?}", sample); - /// } - /// }).await; - /// # }) - /// ``` - fn declare_subscriber<'a, TryIntoKeyExpr>( - &self, - key_expr: TryIntoKeyExpr, - ) -> SubscriberBuilder<'static, 'a, PushMode, DefaultHandler> - where - TryIntoKeyExpr: TryInto>, - >>::Error: Into; - - /// Create a [`Queryable`](crate::queryable::Queryable) for the given key expression. - /// - /// # Arguments - /// - /// * `key_expr` - The key expression matching the queries the - /// [`Queryable`](crate::queryable::Queryable) will reply to - /// - /// # Examples - /// ```no_run - /// # async_std::task::block_on(async { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); - /// let queryable = session.declare_queryable("key/expression") - /// .res() - /// .await - /// .unwrap(); - /// async_std::task::spawn(async move { - /// while let Ok(query) = queryable.recv_async().await { - /// query.reply(Ok(Sample::try_from( - /// "key/expression", - /// "value", - /// ).unwrap())).res().await.unwrap(); - /// } - /// }).await; - /// # }) - /// ``` - fn declare_queryable<'a, TryIntoKeyExpr>( - &self, - key_expr: TryIntoKeyExpr, - ) -> QueryableBuilder<'static, 'a, DefaultHandler> - where - TryIntoKeyExpr: TryInto>, - >>::Error: Into; - - /// Create a [`Publisher`](crate::publication::Publisher) for the given key expression. - /// - /// # Arguments - /// - /// * `key_expr` - The key expression matching resources to write - /// - /// # Examples - /// ``` - /// # async_std::task::block_on(async { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); - /// let publisher = session.declare_publisher("key/expression") - /// .res() - /// .await - /// .unwrap(); - /// publisher.put("value").res().await.unwrap(); - /// # }) - /// ``` - fn declare_publisher<'a, TryIntoKeyExpr>( - &self, - key_expr: TryIntoKeyExpr, - ) -> PublisherBuilder<'static, 'a> - where - TryIntoKeyExpr: TryInto>, - >>::Error: Into; - - /// Obtain a [`Liveliness`] struct tied to this Zenoh [`Session`]. - /// - /// # Examples - /// ``` - /// # async_std::task::block_on(async { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); - /// let liveliness = session - /// .liveliness() - /// .declare_token("key/expression") - /// .res() - /// .await - /// .unwrap(); - /// # }) - /// ``` - #[zenoh_macros::unstable] - fn liveliness(&self) -> Liveliness<'static>; -} From 0c202a665bf5c1ccb0a8cfc4e2ca1459e29beb47 Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Wed, 31 Jan 2024 01:15:44 +0100 Subject: [PATCH 4/6] separated lifetimes for source and result --- zenoh-ext/src/session_ext.rs | 24 +++----- zenoh/src/session.rs | 113 ++++++++++++++++++----------------- 2 files changed, 67 insertions(+), 70 deletions(-) diff --git a/zenoh-ext/src/session_ext.rs b/zenoh-ext/src/session_ext.rs index 417023a6f6..2a2c1df97b 100644 --- a/zenoh-ext/src/session_ext.rs +++ b/zenoh-ext/src/session_ext.rs @@ -18,9 +18,9 @@ use zenoh::prelude::KeyExpr; use zenoh::{Session, SessionRef}; /// Some extensions to the [`zenoh::Session`](zenoh::Session) -pub trait SessionExt<'a> { +pub trait SessionExt<'s, 'a> { fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>( - &'a self, + &'s self, pub_key_expr: TryIntoKeyExpr, ) -> PublicationCacheBuilder<'a, 'b, 'c> where @@ -28,9 +28,9 @@ pub trait SessionExt<'a> { >>::Error: Into; } -impl<'a> SessionExt<'a> for SessionRef<'a> { +impl<'s, 'a> SessionExt<'s, 'a> for SessionRef<'a> { fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>( - &'a self, + &'s self, pub_key_expr: TryIntoKeyExpr, ) -> PublicationCacheBuilder<'a, 'b, 'c> where @@ -41,7 +41,7 @@ impl<'a> SessionExt<'a> for SessionRef<'a> { } } -impl<'a> SessionExt<'a> for Session { +impl<'a> SessionExt<'a, 'a> for Session { fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>( &'a self, pub_key_expr: TryIntoKeyExpr, @@ -50,14 +50,11 @@ impl<'a> SessionExt<'a> for Session { TryIntoKeyExpr: TryInto>, >>::Error: Into, { - PublicationCacheBuilder::new( - SessionRef::Borrow(self), - pub_key_expr.try_into().map_err(Into::into), - ) + SessionRef::Borrow(self).declare_publication_cache(pub_key_expr) } } -impl SessionExt<'static> for Arc { +impl<'s> SessionExt<'s, 'static> for Arc { /// Examples: /// ``` /// # async_std::task::block_on(async { @@ -75,16 +72,13 @@ impl SessionExt<'static> for Arc { /// # }) /// ``` fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>( - &self, + &'s self, pub_key_expr: TryIntoKeyExpr, ) -> PublicationCacheBuilder<'static, 'b, 'c> where TryIntoKeyExpr: TryInto>, >>::Error: Into, { - PublicationCacheBuilder::new( - SessionRef::Shared(self.clone()), - pub_key_expr.try_into().map_err(Into::into), - ) + SessionRef::Shared(self.clone()).declare_publication_cache(pub_key_expr) } } diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 0e41d95d20..ab81f58169 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -278,7 +278,7 @@ impl Resource { } } -pub trait SessionDeclarations<'s> { +pub trait SessionDeclarations<'s, 'a> { /// Create a [`Subscriber`](crate::subscriber::Subscriber) for the given key expression. /// /// # Arguments @@ -302,13 +302,13 @@ pub trait SessionDeclarations<'s> { /// }).await; /// # }) /// ``` - fn declare_subscriber<'a, TryIntoKeyExpr>( - &self, + fn declare_subscriber<'b, TryIntoKeyExpr>( + &'s self, key_expr: TryIntoKeyExpr, - ) -> SubscriberBuilder<'s, 'a, PushMode, DefaultHandler> + ) -> SubscriberBuilder<'a, 'b, PushMode, DefaultHandler> where - TryIntoKeyExpr: TryInto>, - >>::Error: Into; + TryIntoKeyExpr: TryInto>, + >>::Error: Into; /// Create a [`Queryable`](crate::queryable::Queryable) for the given key expression. /// @@ -337,13 +337,13 @@ pub trait SessionDeclarations<'s> { /// }).await; /// # }) /// ``` - fn declare_queryable<'a, TryIntoKeyExpr>( - &self, + fn declare_queryable<'b, TryIntoKeyExpr>( + &'s self, key_expr: TryIntoKeyExpr, - ) -> QueryableBuilder<'s, 'a, DefaultHandler> + ) -> QueryableBuilder<'a, 'b, DefaultHandler> where - TryIntoKeyExpr: TryInto>, - >>::Error: Into; + TryIntoKeyExpr: TryInto>, + >>::Error: Into; /// Create a [`Publisher`](crate::publication::Publisher) for the given key expression. /// @@ -364,13 +364,13 @@ pub trait SessionDeclarations<'s> { /// publisher.put("value").res().await.unwrap(); /// # }) /// ``` - fn declare_publisher<'a, TryIntoKeyExpr>( - &self, + fn declare_publisher<'b, TryIntoKeyExpr>( + &'s self, key_expr: TryIntoKeyExpr, - ) -> PublisherBuilder<'s, 'a> + ) -> PublisherBuilder<'a, 'b> where - TryIntoKeyExpr: TryInto>, - >>::Error: Into; + TryIntoKeyExpr: TryInto>, + >>::Error: Into; /// Obtain a [`Liveliness`] struct tied to this Zenoh [`Session`]. /// @@ -389,9 +389,9 @@ pub trait SessionDeclarations<'s> { /// # }) /// ``` #[zenoh_macros::unstable] - fn liveliness(&self) -> Liveliness<'s>; + fn liveliness(&'s self) -> Liveliness<'a>; - fn info(&self) -> SessionInfo<'s>; + fn info(&'s self) -> SessionInfo<'a>; } #[derive(Clone)] @@ -426,9 +426,9 @@ pub enum SessionRef<'a> { /// }).await; /// # }) /// ``` -impl<'a> SessionDeclarations<'a> for SessionRef<'a> { +impl<'s, 'a> SessionDeclarations<'s, 'a> for SessionRef<'a> { fn declare_subscriber<'b, TryIntoKeyExpr>( - &self, + &'s self, key_expr: TryIntoKeyExpr, ) -> SubscriberBuilder<'a, 'b, PushMode, DefaultHandler> where @@ -446,7 +446,7 @@ impl<'a> SessionDeclarations<'a> for SessionRef<'a> { } fn declare_queryable<'b, TryIntoKeyExpr>( - &self, + &'s self, key_expr: TryIntoKeyExpr, ) -> QueryableBuilder<'a, 'b, DefaultHandler> where @@ -462,7 +462,7 @@ impl<'a> SessionDeclarations<'a> for SessionRef<'a> { } } fn declare_publisher<'b, TryIntoKeyExpr>( - &self, + &'s self, key_expr: TryIntoKeyExpr, ) -> PublisherBuilder<'a, 'b> where @@ -478,12 +478,12 @@ impl<'a> SessionDeclarations<'a> for SessionRef<'a> { } } #[zenoh_macros::unstable] - fn liveliness(&self) -> Liveliness<'a> { + fn liveliness(&'s self) -> Liveliness<'a> { Liveliness { session: self.clone(), } } - fn info(&self) -> SessionInfo<'a> { + fn info(&'s self) -> SessionInfo<'a> { SessionInfo { session: self.clone(), } @@ -705,7 +705,9 @@ impl Session { pub fn config(&self) -> &Notifier { self.runtime.config() } +} +impl<'a> SessionDeclarations<'a, 'a> for Session { /// Get informations about the zenoh [`Session`](Session). /// /// # Examples @@ -717,7 +719,7 @@ impl Session { /// let info = session.info(); /// # }) /// ``` - pub fn info(&self) -> SessionInfo { + fn info(&self) -> SessionInfo { SessionRef::Borrow(self).info() } @@ -739,7 +741,7 @@ impl Session { /// } /// # }) /// ``` - pub fn declare_subscriber<'a, 'b, TryIntoKeyExpr>( + fn declare_subscriber<'b, TryIntoKeyExpr>( &'a self, key_expr: TryIntoKeyExpr, ) -> SubscriberBuilder<'a, 'b, PushMode, DefaultHandler> @@ -772,7 +774,7 @@ impl Session { /// } /// # }) /// ``` - pub fn declare_queryable<'a, 'b, TryIntoKeyExpr>( + fn declare_queryable<'b, TryIntoKeyExpr>( &'a self, key_expr: TryIntoKeyExpr, ) -> QueryableBuilder<'a, 'b, DefaultHandler> @@ -802,7 +804,7 @@ impl Session { /// publisher.put("value").res().await.unwrap(); /// # }) /// ``` - pub fn declare_publisher<'a, 'b, TryIntoKeyExpr>( + fn declare_publisher<'b, TryIntoKeyExpr>( &'a self, key_expr: TryIntoKeyExpr, ) -> PublisherBuilder<'a, 'b> @@ -812,7 +814,29 @@ impl Session { { SessionRef::Borrow(self).declare_publisher(key_expr) } + /// Obtain a [`Liveliness`] struct tied to this Zenoh [`Session`]. + /// + /// # Examples + /// ``` + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let session = zenoh::open(config::peer()).res().await.unwrap(); + /// let liveliness = session + /// .liveliness() + /// .declare_token("key/expression") + /// .res() + /// .await + /// .unwrap(); + /// # }) + /// ``` + #[zenoh_macros::unstable] + fn liveliness(&'a self) -> Liveliness { + SessionRef::Borrow(self).liveliness() + } +} +impl Session { /// Informs Zenoh that you intend to use `key_expr` multiple times and that it should optimize its transmission. /// /// The returned `KeyExpr`'s internal structure may differ from what you would have obtained through a simple @@ -993,27 +1017,6 @@ impl Session { handler: DefaultHandler, } } - - /// Obtain a [`Liveliness`] struct tied to this Zenoh [`Session`]. - /// - /// # Examples - /// ``` - /// # async_std::task::block_on(async { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap(); - /// let liveliness = session - /// .liveliness() - /// .declare_token("key/expression") - /// .res() - /// .await - /// .unwrap(); - /// # }) - /// ``` - #[zenoh_macros::unstable] - pub fn liveliness(&self) -> Liveliness { - SessionRef::Borrow(self).liveliness() - } } impl Session { @@ -2156,7 +2159,7 @@ impl Session { } } -impl SessionDeclarations<'static> for Arc { +impl<'s> SessionDeclarations<'s, 'static> for Arc { /// Create a [`Subscriber`](Subscriber) for the given key expression. /// /// # Arguments @@ -2181,7 +2184,7 @@ impl SessionDeclarations<'static> for Arc { /// # }) /// ``` fn declare_subscriber<'b, TryIntoKeyExpr>( - &self, + &'s self, key_expr: TryIntoKeyExpr, ) -> SubscriberBuilder<'static, 'b, PushMode, DefaultHandler> where @@ -2226,7 +2229,7 @@ impl SessionDeclarations<'static> for Arc { /// # }) /// ``` fn declare_queryable<'b, TryIntoKeyExpr>( - &self, + &'s self, key_expr: TryIntoKeyExpr, ) -> QueryableBuilder<'static, 'b, DefaultHandler> where @@ -2262,7 +2265,7 @@ impl SessionDeclarations<'static> for Arc { /// # }) /// ``` fn declare_publisher<'b, TryIntoKeyExpr>( - &self, + &'s self, key_expr: TryIntoKeyExpr, ) -> PublisherBuilder<'static, 'b> where @@ -2295,13 +2298,13 @@ impl SessionDeclarations<'static> for Arc { /// # }) /// ``` #[zenoh_macros::unstable] - fn liveliness(&self) -> Liveliness<'static> { + fn liveliness(&'s self) -> Liveliness<'static> { Liveliness { session: SessionRef::Shared(self.clone()), } } - fn info(&self) -> SessionInfo<'static> { + fn info(&'s self) -> SessionInfo<'static> { SessionInfo { session: SessionRef::Shared(self.clone()), } From 0111ed7cd9f00d2240c9599a15dff72e0dd82a51 Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Wed, 31 Jan 2024 01:29:53 +0100 Subject: [PATCH 5/6] comments updated --- zenoh/src/session.rs | 154 +++++++++++-------------------------------- 1 file changed, 37 insertions(+), 117 deletions(-) diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index ab81f58169..39d67d0571 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -278,6 +278,32 @@ impl Resource { } } +/// Functions to create zenoh entities +/// +/// This trait contains functions to create zenoh entities like +/// [`Subscriber`](crate::subscriber::Subscriber), and +/// [`Queryable`](crate::queryable::Queryable) +/// +/// This trait is implemented by [`Session`](crate::session::Session) itself and +/// by wrappers [`SessionRef`](crate::session::SessionRef) and [`Arc`](crate::session::Arc) +/// +/// # Examples +/// ```no_run +/// # async_std::task::block_on(async { +/// use zenoh::prelude::r#async::*; +/// +/// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); +/// let subscriber = session.declare_subscriber("key/expression") +/// .res() +/// .await +/// .unwrap(); +/// async_std::task::spawn(async move { +/// while let Ok(sample) = subscriber.recv_async().await { +/// println!("Received: {:?}", sample); +/// } +/// }).await; +/// # }) +/// ``` pub trait SessionDeclarations<'s, 'a> { /// Create a [`Subscriber`](crate::subscriber::Subscriber) for the given key expression. /// @@ -390,7 +416,17 @@ pub trait SessionDeclarations<'s, 'a> { /// ``` #[zenoh_macros::unstable] fn liveliness(&'s self) -> Liveliness<'a>; - + /// Get informations about the zenoh [`Session`](Session). + /// + /// # Examples + /// ``` + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let session = zenoh::open(config::peer()).res().await.unwrap(); + /// let info = session.info(); + /// # }) + /// ``` fn info(&'s self) -> SessionInfo<'a>; } @@ -400,32 +436,6 @@ pub enum SessionRef<'a> { Shared(Arc), } -/// Functions to create zenoh entities with `'static` lifetime. -/// -/// This trait contains functions to create zenoh entities like -/// [`Subscriber`](crate::subscriber::Subscriber), and -/// [`Queryable`](crate::queryable::Queryable) with a `'static` lifetime. -/// This is useful to move zenoh entities to several threads and tasks. -/// -/// This trait is implemented for `Arc`. -/// -/// # Examples -/// ```no_run -/// # async_std::task::block_on(async { -/// use zenoh::prelude::r#async::*; -/// -/// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); -/// let subscriber = session.declare_subscriber("key/expression") -/// .res() -/// .await -/// .unwrap(); -/// async_std::task::spawn(async move { -/// while let Ok(sample) = subscriber.recv_async().await { -/// println!("Received: {:?}", sample); -/// } -/// }).await; -/// # }) -/// ``` impl<'s, 'a> SessionDeclarations<'s, 'a> for SessionRef<'a> { fn declare_subscriber<'b, TryIntoKeyExpr>( &'s self, @@ -444,7 +454,6 @@ impl<'s, 'a> SessionDeclarations<'s, 'a> for SessionRef<'a> { handler: DefaultHandler, } } - fn declare_queryable<'b, TryIntoKeyExpr>( &'s self, key_expr: TryIntoKeyExpr, @@ -708,39 +717,9 @@ impl Session { } impl<'a> SessionDeclarations<'a, 'a> for Session { - /// Get informations about the zenoh [`Session`](Session). - /// - /// # Examples - /// ``` - /// # async_std::task::block_on(async { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap(); - /// let info = session.info(); - /// # }) - /// ``` fn info(&self) -> SessionInfo { SessionRef::Borrow(self).info() } - - /// Create a [`Subscriber`](Subscriber) for the given key expression. - /// - /// # Arguments - /// - /// * `key_expr` - The key expression to subscribe to - /// - /// # Examples - /// ```no_run - /// # async_std::task::block_on(async { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap(); - /// let subscriber = session.declare_subscriber("key/expression").res().await.unwrap(); - /// while let Ok(sample) = subscriber.recv_async().await { - /// println!("Received: {:?}", sample); - /// } - /// # }) - /// ``` fn declare_subscriber<'b, TryIntoKeyExpr>( &'a self, key_expr: TryIntoKeyExpr, @@ -751,29 +730,6 @@ impl<'a> SessionDeclarations<'a, 'a> for Session { { SessionRef::Borrow(self).declare_subscriber(key_expr) } - - /// Create a [`Queryable`](Queryable) for the given key expression. - /// - /// # Arguments - /// - /// * `key_expr` - The key expression matching the queries the - /// [`Queryable`](Queryable) will reply to - /// - /// # Examples - /// ```no_run - /// # async_std::task::block_on(async { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap(); - /// let queryable = session.declare_queryable("key/expression").res().await.unwrap(); - /// while let Ok(query) = queryable.recv_async().await { - /// query.reply(Ok(Sample::try_from( - /// "key/expression", - /// "value", - /// ).unwrap())).res().await.unwrap(); - /// } - /// # }) - /// ``` fn declare_queryable<'b, TryIntoKeyExpr>( &'a self, key_expr: TryIntoKeyExpr, @@ -784,26 +740,6 @@ impl<'a> SessionDeclarations<'a, 'a> for Session { { SessionRef::Borrow(self).declare_queryable(key_expr) } - - /// Create a [`Publisher`](crate::publication::Publisher) for the given key expression. - /// - /// # Arguments - /// - /// * `key_expr` - The key expression matching resources to write - /// - /// # Examples - /// ``` - /// # async_std::task::block_on(async { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap(); - /// let publisher = session.declare_publisher("key/expression") - /// .res() - /// .await - /// .unwrap(); - /// publisher.put("value").res().await.unwrap(); - /// # }) - /// ``` fn declare_publisher<'b, TryIntoKeyExpr>( &'a self, key_expr: TryIntoKeyExpr, @@ -814,22 +750,6 @@ impl<'a> SessionDeclarations<'a, 'a> for Session { { SessionRef::Borrow(self).declare_publisher(key_expr) } - /// Obtain a [`Liveliness`] struct tied to this Zenoh [`Session`]. - /// - /// # Examples - /// ``` - /// # async_std::task::block_on(async { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap(); - /// let liveliness = session - /// .liveliness() - /// .declare_token("key/expression") - /// .res() - /// .await - /// .unwrap(); - /// # }) - /// ``` #[zenoh_macros::unstable] fn liveliness(&'a self) -> Liveliness { SessionRef::Borrow(self).liveliness() From 09a72ff57f0aaaf5f2b247386e0bdb2c4bcd124b Mon Sep 17 00:00:00 2001 From: Michael Ilyin Date: Wed, 31 Jan 2024 01:37:24 +0100 Subject: [PATCH 6/6] moved SessionDeclaration to end to simplify review --- zenoh/src/session.rs | 304 +++++++++++++++++++++---------------------- 1 file changed, 152 insertions(+), 152 deletions(-) diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 39d67d0571..c22e15988b 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -278,158 +278,6 @@ impl Resource { } } -/// Functions to create zenoh entities -/// -/// This trait contains functions to create zenoh entities like -/// [`Subscriber`](crate::subscriber::Subscriber), and -/// [`Queryable`](crate::queryable::Queryable) -/// -/// This trait is implemented by [`Session`](crate::session::Session) itself and -/// by wrappers [`SessionRef`](crate::session::SessionRef) and [`Arc`](crate::session::Arc) -/// -/// # Examples -/// ```no_run -/// # async_std::task::block_on(async { -/// use zenoh::prelude::r#async::*; -/// -/// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); -/// let subscriber = session.declare_subscriber("key/expression") -/// .res() -/// .await -/// .unwrap(); -/// async_std::task::spawn(async move { -/// while let Ok(sample) = subscriber.recv_async().await { -/// println!("Received: {:?}", sample); -/// } -/// }).await; -/// # }) -/// ``` -pub trait SessionDeclarations<'s, 'a> { - /// Create a [`Subscriber`](crate::subscriber::Subscriber) for the given key expression. - /// - /// # Arguments - /// - /// * `key_expr` - The resourkey expression to subscribe to - /// - /// # Examples - /// ```no_run - /// # async_std::task::block_on(async { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); - /// let subscriber = session.declare_subscriber("key/expression") - /// .res() - /// .await - /// .unwrap(); - /// async_std::task::spawn(async move { - /// while let Ok(sample) = subscriber.recv_async().await { - /// println!("Received: {:?}", sample); - /// } - /// }).await; - /// # }) - /// ``` - fn declare_subscriber<'b, TryIntoKeyExpr>( - &'s self, - key_expr: TryIntoKeyExpr, - ) -> SubscriberBuilder<'a, 'b, PushMode, DefaultHandler> - where - TryIntoKeyExpr: TryInto>, - >>::Error: Into; - - /// Create a [`Queryable`](crate::queryable::Queryable) for the given key expression. - /// - /// # Arguments - /// - /// * `key_expr` - The key expression matching the queries the - /// [`Queryable`](crate::queryable::Queryable) will reply to - /// - /// # Examples - /// ```no_run - /// # async_std::task::block_on(async { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); - /// let queryable = session.declare_queryable("key/expression") - /// .res() - /// .await - /// .unwrap(); - /// async_std::task::spawn(async move { - /// while let Ok(query) = queryable.recv_async().await { - /// query.reply(Ok(Sample::try_from( - /// "key/expression", - /// "value", - /// ).unwrap())).res().await.unwrap(); - /// } - /// }).await; - /// # }) - /// ``` - fn declare_queryable<'b, TryIntoKeyExpr>( - &'s self, - key_expr: TryIntoKeyExpr, - ) -> QueryableBuilder<'a, 'b, DefaultHandler> - where - TryIntoKeyExpr: TryInto>, - >>::Error: Into; - - /// Create a [`Publisher`](crate::publication::Publisher) for the given key expression. - /// - /// # Arguments - /// - /// * `key_expr` - The key expression matching resources to write - /// - /// # Examples - /// ``` - /// # async_std::task::block_on(async { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); - /// let publisher = session.declare_publisher("key/expression") - /// .res() - /// .await - /// .unwrap(); - /// publisher.put("value").res().await.unwrap(); - /// # }) - /// ``` - fn declare_publisher<'b, TryIntoKeyExpr>( - &'s self, - key_expr: TryIntoKeyExpr, - ) -> PublisherBuilder<'a, 'b> - where - TryIntoKeyExpr: TryInto>, - >>::Error: Into; - - /// Obtain a [`Liveliness`] struct tied to this Zenoh [`Session`]. - /// - /// # Examples - /// ``` - /// # async_std::task::block_on(async { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); - /// let liveliness = session - /// .liveliness() - /// .declare_token("key/expression") - /// .res() - /// .await - /// .unwrap(); - /// # }) - /// ``` - #[zenoh_macros::unstable] - fn liveliness(&'s self) -> Liveliness<'a>; - /// Get informations about the zenoh [`Session`](Session). - /// - /// # Examples - /// ``` - /// # async_std::task::block_on(async { - /// use zenoh::prelude::r#async::*; - /// - /// let session = zenoh::open(config::peer()).res().await.unwrap(); - /// let info = session.info(); - /// # }) - /// ``` - fn info(&'s self) -> SessionInfo<'a>; -} - #[derive(Clone)] pub enum SessionRef<'a> { Borrow(&'a Session), @@ -2647,3 +2495,155 @@ impl fmt::Debug for Session { f.debug_struct("Session").field("id", &self.zid()).finish() } } + +/// Functions to create zenoh entities +/// +/// This trait contains functions to create zenoh entities like +/// [`Subscriber`](crate::subscriber::Subscriber), and +/// [`Queryable`](crate::queryable::Queryable) +/// +/// This trait is implemented by [`Session`](crate::session::Session) itself and +/// by wrappers [`SessionRef`](crate::session::SessionRef) and [`Arc`](crate::session::Arc) +/// +/// # Examples +/// ```no_run +/// # async_std::task::block_on(async { +/// use zenoh::prelude::r#async::*; +/// +/// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); +/// let subscriber = session.declare_subscriber("key/expression") +/// .res() +/// .await +/// .unwrap(); +/// async_std::task::spawn(async move { +/// while let Ok(sample) = subscriber.recv_async().await { +/// println!("Received: {:?}", sample); +/// } +/// }).await; +/// # }) +/// ``` +pub trait SessionDeclarations<'s, 'a> { + /// Create a [`Subscriber`](crate::subscriber::Subscriber) for the given key expression. + /// + /// # Arguments + /// + /// * `key_expr` - The resourkey expression to subscribe to + /// + /// # Examples + /// ```no_run + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); + /// let subscriber = session.declare_subscriber("key/expression") + /// .res() + /// .await + /// .unwrap(); + /// async_std::task::spawn(async move { + /// while let Ok(sample) = subscriber.recv_async().await { + /// println!("Received: {:?}", sample); + /// } + /// }).await; + /// # }) + /// ``` + fn declare_subscriber<'b, TryIntoKeyExpr>( + &'s self, + key_expr: TryIntoKeyExpr, + ) -> SubscriberBuilder<'a, 'b, PushMode, DefaultHandler> + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into; + + /// Create a [`Queryable`](crate::queryable::Queryable) for the given key expression. + /// + /// # Arguments + /// + /// * `key_expr` - The key expression matching the queries the + /// [`Queryable`](crate::queryable::Queryable) will reply to + /// + /// # Examples + /// ```no_run + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); + /// let queryable = session.declare_queryable("key/expression") + /// .res() + /// .await + /// .unwrap(); + /// async_std::task::spawn(async move { + /// while let Ok(query) = queryable.recv_async().await { + /// query.reply(Ok(Sample::try_from( + /// "key/expression", + /// "value", + /// ).unwrap())).res().await.unwrap(); + /// } + /// }).await; + /// # }) + /// ``` + fn declare_queryable<'b, TryIntoKeyExpr>( + &'s self, + key_expr: TryIntoKeyExpr, + ) -> QueryableBuilder<'a, 'b, DefaultHandler> + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into; + + /// Create a [`Publisher`](crate::publication::Publisher) for the given key expression. + /// + /// # Arguments + /// + /// * `key_expr` - The key expression matching resources to write + /// + /// # Examples + /// ``` + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); + /// let publisher = session.declare_publisher("key/expression") + /// .res() + /// .await + /// .unwrap(); + /// publisher.put("value").res().await.unwrap(); + /// # }) + /// ``` + fn declare_publisher<'b, TryIntoKeyExpr>( + &'s self, + key_expr: TryIntoKeyExpr, + ) -> PublisherBuilder<'a, 'b> + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into; + + /// Obtain a [`Liveliness`] struct tied to this Zenoh [`Session`]. + /// + /// # Examples + /// ``` + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let session = zenoh::open(config::peer()).res().await.unwrap().into_arc(); + /// let liveliness = session + /// .liveliness() + /// .declare_token("key/expression") + /// .res() + /// .await + /// .unwrap(); + /// # }) + /// ``` + #[zenoh_macros::unstable] + fn liveliness(&'s self) -> Liveliness<'a>; + /// Get informations about the zenoh [`Session`](Session). + /// + /// # Examples + /// ``` + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// + /// let session = zenoh::open(config::peer()).res().await.unwrap(); + /// let info = session.info(); + /// # }) + /// ``` + fn info(&'s self) -> SessionInfo<'a>; +}