Skip to content

Commit

Permalink
Fix declare_publication_cache for Arc<Session> (#564)
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart authored Oct 10, 2023
1 parent 1f935b1 commit 5edd0eb
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 23 deletions.
2 changes: 1 addition & 1 deletion zenoh-ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub use publication_cache::{PublicationCache, PublicationCacheBuilder};
pub use querying_subscriber::{
FetchingSubscriber, FetchingSubscriberBuilder, QueryingSubscriberBuilder,
};
pub use session_ext::SessionExt;
pub use session_ext::{ArcSessionExt, SessionExt};
pub use subscriber_ext::SubscriberBuilderExt;
pub use subscriber_ext::SubscriberForward;

Expand Down
40 changes: 25 additions & 15 deletions zenoh-ext/src/publication_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ use std::future::Ready;
use zenoh::prelude::r#async::*;
use zenoh::queryable::{Query, Queryable};
use zenoh::subscriber::FlumeSubscriber;
use zenoh::Session;
use zenoh::SessionRef;
use zenoh_core::{AsyncResolve, Resolvable, SyncResolve};
use zenoh_result::{bail, ZResult};
use zenoh_util::core::ResolveFuture;

/// The builder of PublicationCache, allowing to configure it.
pub struct PublicationCacheBuilder<'a, 'b, 'c> {
session: &'a Session,
session: SessionRef<'a>,
pub_key_expr: ZResult<KeyExpr<'b>>,
queryable_prefix: Option<ZResult<KeyExpr<'c>>>,
queryable_origin: Locality,
Expand All @@ -38,7 +38,7 @@ pub struct PublicationCacheBuilder<'a, 'b, 'c> {

impl<'a, 'b, 'c> PublicationCacheBuilder<'a, 'b, 'c> {
pub(crate) fn new(
session: &'a Session,
session: SessionRef<'a>,
pub_key_expr: ZResult<KeyExpr<'b>>,
) -> PublicationCacheBuilder<'a, 'b, 'c> {
PublicationCacheBuilder {
Expand Down Expand Up @@ -136,18 +136,28 @@ impl<'a> PublicationCache<'a> {
}

// declare the local subscriber that will store the local publications
let local_sub = conf
.session
.declare_subscriber(&key_expr)
.allowed_origin(Locality::SessionLocal)
.res_sync()?;

// declare the queryable that will answer to queries on cache
let queryable = conf
.session
.declare_queryable(&queryable_key_expr)
.allowed_origin(conf.queryable_origin)
.res_sync()?;
let (local_sub, queryable) = match conf.session.clone() {
SessionRef::Borrow(session) => (
session
.declare_subscriber(&key_expr)
.allowed_origin(Locality::SessionLocal)
.res_sync()?,
session
.declare_queryable(&queryable_key_expr)
.allowed_origin(conf.queryable_origin)
.res_sync()?,
),
SessionRef::Shared(session) => (
session
.declare_subscriber(&key_expr)
.allowed_origin(Locality::SessionLocal)
.res_sync()?,
session
.declare_queryable(&queryable_key_expr)
.allowed_origin(conf.queryable_origin)
.res_sync()?,
),
};

// take local ownership of stuff to be moved into task
let sub_recv = local_sub.receiver.clone();
Expand Down
46 changes: 39 additions & 7 deletions zenoh-ext/src/session_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use super::PublicationCacheBuilder;
use std::convert::TryInto;
use std::sync::Arc;
use zenoh::prelude::KeyExpr;
use zenoh::Session;
use zenoh::{Session, SessionRef};

/// Some extensions to the [`zenoh::Session`](zenoh::Session)
pub trait SessionExt {
Expand All @@ -37,19 +37,51 @@ impl SessionExt for Session {
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
{
PublicationCacheBuilder::new(self, pub_key_expr.try_into().map_err(Into::into))
PublicationCacheBuilder::new(
SessionRef::Borrow(self),
pub_key_expr.try_into().map_err(Into::into),
)
}
}

impl SessionExt for Arc<Session> {
fn declare_publication_cache<'a, 'b, 'c, TryIntoKeyExpr>(
&'a self,
pub trait ArcSessionExt {
fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>(
&self,
pub_key_expr: TryIntoKeyExpr,
) -> PublicationCacheBuilder<'a, 'b, 'c>
) -> PublicationCacheBuilder<'static, 'b, 'c>
where
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>;
}

impl ArcSessionExt for Arc<Session> {
/// Examples:
/// ```
/// # async_std::task::block_on(async {
/// use zenoh::prelude::r#async::*;
/// use zenoh::config::ModeDependentValue::Unique;
/// use zenoh_ext::ArcSessionExt;
///
/// let mut config = config::default();
/// config.timestamping.set_enabled(Some(Unique(true)));
/// let session = zenoh::open(config).res().await.unwrap().into_arc();
/// let publication_cache = session.declare_publication_cache("key/expression").res().await.unwrap();
/// async_std::task::spawn(async move {
/// publication_cache.key_expr();
/// }).await;
/// # })
/// ```
fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>(
&self,
pub_key_expr: TryIntoKeyExpr,
) -> PublicationCacheBuilder<'static, 'b, 'c>
where
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
{
PublicationCacheBuilder::new(self, pub_key_expr.try_into().map_err(Into::into))
PublicationCacheBuilder::new(
SessionRef::Shared(self.clone()),
pub_key_expr.try_into().map_err(Into::into),
)
}
}

0 comments on commit 5edd0eb

Please sign in to comment.