From 5edd0eb50a30fe9130f457add87864645362acd0 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Tue, 10 Oct 2023 16:23:29 +0200 Subject: [PATCH] Fix declare_publication_cache for Arc (#564) --- zenoh-ext/src/lib.rs | 2 +- zenoh-ext/src/publication_cache.rs | 40 ++++++++++++++++---------- zenoh-ext/src/session_ext.rs | 46 +++++++++++++++++++++++++----- 3 files changed, 65 insertions(+), 23 deletions(-) diff --git a/zenoh-ext/src/lib.rs b/zenoh-ext/src/lib.rs index 7440d80a53..dca488ba80 100644 --- a/zenoh-ext/src/lib.rs +++ b/zenoh-ext/src/lib.rs @@ -20,7 +20,7 @@ pub use publication_cache::{PublicationCache, PublicationCacheBuilder}; pub use querying_subscriber::{ FetchingSubscriber, FetchingSubscriberBuilder, QueryingSubscriberBuilder, }; -pub use session_ext::SessionExt; +pub use session_ext::{ArcSessionExt, SessionExt}; pub use subscriber_ext::SubscriberBuilderExt; pub use subscriber_ext::SubscriberForward; diff --git a/zenoh-ext/src/publication_cache.rs b/zenoh-ext/src/publication_cache.rs index 2da6edf6c7..78b951436e 100644 --- a/zenoh-ext/src/publication_cache.rs +++ b/zenoh-ext/src/publication_cache.rs @@ -21,14 +21,14 @@ use std::future::Ready; use zenoh::prelude::r#async::*; use zenoh::queryable::{Query, Queryable}; use zenoh::subscriber::FlumeSubscriber; -use zenoh::Session; +use zenoh::SessionRef; use zenoh_core::{AsyncResolve, Resolvable, SyncResolve}; use zenoh_result::{bail, ZResult}; use zenoh_util::core::ResolveFuture; /// The builder of PublicationCache, allowing to configure it. pub struct PublicationCacheBuilder<'a, 'b, 'c> { - session: &'a Session, + session: SessionRef<'a>, pub_key_expr: ZResult>, queryable_prefix: Option>>, queryable_origin: Locality, @@ -38,7 +38,7 @@ pub struct PublicationCacheBuilder<'a, 'b, 'c> { impl<'a, 'b, 'c> PublicationCacheBuilder<'a, 'b, 'c> { pub(crate) fn new( - session: &'a Session, + session: SessionRef<'a>, pub_key_expr: ZResult>, ) -> PublicationCacheBuilder<'a, 'b, 'c> { PublicationCacheBuilder { @@ -136,18 +136,28 @@ impl<'a> PublicationCache<'a> { } // declare the local subscriber that will store the local publications - let local_sub = conf - .session - .declare_subscriber(&key_expr) - .allowed_origin(Locality::SessionLocal) - .res_sync()?; - - // declare the queryable that will answer to queries on cache - let queryable = conf - .session - .declare_queryable(&queryable_key_expr) - .allowed_origin(conf.queryable_origin) - .res_sync()?; + let (local_sub, queryable) = match conf.session.clone() { + SessionRef::Borrow(session) => ( + session + .declare_subscriber(&key_expr) + .allowed_origin(Locality::SessionLocal) + .res_sync()?, + session + .declare_queryable(&queryable_key_expr) + .allowed_origin(conf.queryable_origin) + .res_sync()?, + ), + SessionRef::Shared(session) => ( + session + .declare_subscriber(&key_expr) + .allowed_origin(Locality::SessionLocal) + .res_sync()?, + session + .declare_queryable(&queryable_key_expr) + .allowed_origin(conf.queryable_origin) + .res_sync()?, + ), + }; // take local ownership of stuff to be moved into task let sub_recv = local_sub.receiver.clone(); diff --git a/zenoh-ext/src/session_ext.rs b/zenoh-ext/src/session_ext.rs index bfcdebf4fc..3ffd328e1a 100644 --- a/zenoh-ext/src/session_ext.rs +++ b/zenoh-ext/src/session_ext.rs @@ -15,7 +15,7 @@ use super::PublicationCacheBuilder; use std::convert::TryInto; use std::sync::Arc; use zenoh::prelude::KeyExpr; -use zenoh::Session; +use zenoh::{Session, SessionRef}; /// Some extensions to the [`zenoh::Session`](zenoh::Session) pub trait SessionExt { @@ -37,19 +37,51 @@ impl SessionExt for Session { TryIntoKeyExpr: TryInto>, >>::Error: Into, { - PublicationCacheBuilder::new(self, pub_key_expr.try_into().map_err(Into::into)) + PublicationCacheBuilder::new( + SessionRef::Borrow(self), + pub_key_expr.try_into().map_err(Into::into), + ) } } -impl SessionExt for Arc { - fn declare_publication_cache<'a, 'b, 'c, TryIntoKeyExpr>( - &'a self, +pub trait ArcSessionExt { + fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>( + &self, pub_key_expr: TryIntoKeyExpr, - ) -> PublicationCacheBuilder<'a, 'b, 'c> + ) -> PublicationCacheBuilder<'static, 'b, 'c> + where + TryIntoKeyExpr: TryInto>, + >>::Error: Into; +} + +impl ArcSessionExt for Arc { + /// Examples: + /// ``` + /// # async_std::task::block_on(async { + /// use zenoh::prelude::r#async::*; + /// use zenoh::config::ModeDependentValue::Unique; + /// use zenoh_ext::ArcSessionExt; + /// + /// let mut config = config::default(); + /// config.timestamping.set_enabled(Some(Unique(true))); + /// let session = zenoh::open(config).res().await.unwrap().into_arc(); + /// let publication_cache = session.declare_publication_cache("key/expression").res().await.unwrap(); + /// async_std::task::spawn(async move { + /// publication_cache.key_expr(); + /// }).await; + /// # }) + /// ``` + fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>( + &self, + pub_key_expr: TryIntoKeyExpr, + ) -> PublicationCacheBuilder<'static, 'b, 'c> where TryIntoKeyExpr: TryInto>, >>::Error: Into, { - PublicationCacheBuilder::new(self, pub_key_expr.try_into().map_err(Into::into)) + PublicationCacheBuilder::new( + SessionRef::Shared(self.clone()), + pub_key_expr.try_into().map_err(Into::into), + ) } }