Skip to content

Commit

Permalink
Merge pull request #688 from eclipse-zenoh/pubcache_complete
Browse files Browse the repository at this point in the history
'complete' option in PublicationCache, refactoring
  • Loading branch information
milyin authored Feb 2, 2024
2 parents 5601669 + 09a72ff commit e8dca1e
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 246 deletions.
13 changes: 8 additions & 5 deletions zenoh-ext/examples/z_pub_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ async fn main() {
// Initiate logging
env_logger::init();

let (config, key_expr, value, history, prefix) = parse_args();
let (config, key_expr, value, history, prefix, complete) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();

println!("Declaring PublicationCache on {}", &key_expr);
let mut publication_cache_builder = session
.declare_publication_cache(&key_expr)
.history(history);
.history(history)
.queryable_complete(complete);
if let Some(prefix) = prefix {
publication_cache_builder = publication_cache_builder.queryable_prefix(prefix);
}
Expand All @@ -45,7 +46,7 @@ async fn main() {
}
}

fn parse_args() -> (Config, String, String, usize, Option<String>) {
fn parse_args() -> (Config, String, String, usize, Option<String>, bool) {
let args = Command::new("zenoh-ext pub cache example")
.arg(
arg!(-m --mode [MODE] "The zenoh session mode (peer by default)")
Expand All @@ -59,11 +60,12 @@ fn parse_args() -> (Config, String, String, usize, Option<String>) {
)
.arg(arg!(-v --value [VALUE] "The value to publish.").default_value("Pub from Rust!"))
.arg(
arg!(-h --history [SIZE] "The number of publications to keep in cache")
arg!(-i --history [SIZE] "The number of publications to keep in cache")
.default_value("1"),
)
.arg(arg!(-x --prefix [STRING] "An optional queryable prefix"))
.arg(arg!(-c --config [FILE] "A configuration file."))
.arg(arg!(-o --complete "Set `complete` option to true. This means that this queryable is ulitmate data source, no need to scan other queryables."))
.arg(arg!(--"no-multicast-scouting" "Disable the multicast-based scouting mechanism."))
.get_matches();

Expand Down Expand Up @@ -101,6 +103,7 @@ fn parse_args() -> (Config, String, String, usize, Option<String>) {
let value = args.get_one::<String>("value").unwrap().to_string();
let history: usize = args.get_one::<String>("history").unwrap().parse().unwrap();
let prefix = args.get_one::<String>("prefix").map(|s| (*s).to_owned());
let complete = args.get_flag("complete");

(config, key_expr, value, history, prefix)
(config, key_expr, value, history, prefix, complete)
}
2 changes: 1 addition & 1 deletion zenoh-ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub use publication_cache::{PublicationCache, PublicationCacheBuilder};
pub use querying_subscriber::{
FetchingSubscriber, FetchingSubscriberBuilder, QueryingSubscriberBuilder,
};
pub use session_ext::{ArcSessionExt, SessionExt};
pub use session_ext::SessionExt;
pub use subscriber_ext::SubscriberBuilderExt;
pub use subscriber_ext::SubscriberForward;

Expand Down
51 changes: 26 additions & 25 deletions zenoh-ext/src/publication_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ pub struct PublicationCacheBuilder<'a, 'b, 'c> {
session: SessionRef<'a>,
pub_key_expr: ZResult<KeyExpr<'b>>,
queryable_prefix: Option<ZResult<KeyExpr<'c>>>,
queryable_origin: Locality,
queryable_origin: Option<Locality>,
complete: Option<bool>,
history: usize,
resources_limit: Option<usize>,
}
Expand All @@ -46,7 +47,8 @@ impl<'a, 'b, 'c> PublicationCacheBuilder<'a, 'b, 'c> {
session,
pub_key_expr,
queryable_prefix: None,
queryable_origin: Locality::default(),
queryable_origin: None,
complete: None,
history: 1,
resources_limit: None,
}
Expand All @@ -67,7 +69,13 @@ impl<'a, 'b, 'c> PublicationCacheBuilder<'a, 'b, 'c> {
#[zenoh_macros::unstable]
#[inline]
pub fn queryable_allowed_origin(mut self, origin: Locality) -> Self {
self.queryable_origin = origin;
self.queryable_origin = Some(origin);
self
}

/// Set completeness option for the queryable.
pub fn queryable_complete(mut self, complete: bool) -> Self {
self.complete = Some(complete);
self
}

Expand Down Expand Up @@ -137,28 +145,21 @@ impl<'a> PublicationCache<'a> {
}

// declare the local subscriber that will store the local publications
let (local_sub, queryable) = match conf.session.clone() {
SessionRef::Borrow(session) => (
session
.declare_subscriber(&key_expr)
.allowed_origin(Locality::SessionLocal)
.res_sync()?,
session
.declare_queryable(&queryable_key_expr)
.allowed_origin(conf.queryable_origin)
.res_sync()?,
),
SessionRef::Shared(session) => (
session
.declare_subscriber(&key_expr)
.allowed_origin(Locality::SessionLocal)
.res_sync()?,
session
.declare_queryable(&queryable_key_expr)
.allowed_origin(conf.queryable_origin)
.res_sync()?,
),
};
let local_sub = conf
.session
.declare_subscriber(&key_expr)
.allowed_origin(Locality::SessionLocal)
.res_sync()?;

// declare the queryable which returns the cached publications
let mut queryable = conf.session.declare_queryable(&queryable_key_expr);
if let Some(origin) = conf.queryable_origin {
queryable = queryable.allowed_origin(origin);
}
if let Some(complete) = conf.complete {
queryable = queryable.complete(complete);
}
let queryable = queryable.res_sync()?;

// take local ownership of stuff to be moved into task
let sub_recv = local_sub.receiver.clone();
Expand Down
41 changes: 14 additions & 27 deletions zenoh-ext/src/querying_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,33 +680,20 @@ impl<'a, Receiver> FetchingSubscriber<'a, Receiver> {
// register fetch handler
let handler = register_handler(state.clone(), callback.clone());
// declare subscriber
let subscriber = match conf.session.clone() {
SessionRef::Borrow(session) => match conf.key_space.into() {
crate::KeySpace::User => session
.declare_subscriber(&key_expr)
.callback(sub_callback)
.reliability(conf.reliability)
.allowed_origin(conf.origin)
.res_sync()?,
crate::KeySpace::Liveliness => session
.liveliness()
.declare_subscriber(&key_expr)
.callback(sub_callback)
.res_sync()?,
},
SessionRef::Shared(session) => match conf.key_space.into() {
crate::KeySpace::User => session
.declare_subscriber(&key_expr)
.callback(sub_callback)
.reliability(conf.reliability)
.allowed_origin(conf.origin)
.res_sync()?,
crate::KeySpace::Liveliness => session
.liveliness()
.declare_subscriber(&key_expr)
.callback(sub_callback)
.res_sync()?,
},
let subscriber = match conf.key_space.into() {
crate::KeySpace::User => conf
.session
.declare_subscriber(&key_expr)
.callback(sub_callback)
.reliability(conf.reliability)
.allowed_origin(conf.origin)
.res_sync()?,
crate::KeySpace::Liveliness => conf
.session
.liveliness()
.declare_subscriber(&key_expr)
.callback(sub_callback)
.res_sync()?,
};

let fetch_subscriber = FetchingSubscriber {
Expand Down
53 changes: 16 additions & 37 deletions zenoh-ext/src/session_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,67 +18,49 @@ use zenoh::prelude::KeyExpr;
use zenoh::{Session, SessionRef};

/// Some extensions to the [`zenoh::Session`](zenoh::Session)
pub trait SessionExt {
type PublicationCacheBuilder<'a, 'b, 'c>
where
Self: 'a;
fn declare_publication_cache<'a, 'b, 'c, TryIntoKeyExpr>(
&'a self,
pub trait SessionExt<'s, 'a> {
fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>(
&'s self,
pub_key_expr: TryIntoKeyExpr,
) -> Self::PublicationCacheBuilder<'a, 'b, 'c>
) -> PublicationCacheBuilder<'a, 'b, 'c>
where
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>;
}

impl SessionExt for Session {
type PublicationCacheBuilder<'a, 'b, 'c> = PublicationCacheBuilder<'a, 'b, 'c>;
fn declare_publication_cache<'a, 'b, 'c, TryIntoKeyExpr>(
&'a self,
impl<'s, 'a> SessionExt<'s, 'a> for SessionRef<'a> {
fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>(
&'s self,
pub_key_expr: TryIntoKeyExpr,
) -> PublicationCacheBuilder<'a, 'b, 'c>
where
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
{
PublicationCacheBuilder::new(
SessionRef::Borrow(self),
pub_key_expr.try_into().map_err(Into::into),
)
PublicationCacheBuilder::new(self.clone(), pub_key_expr.try_into().map_err(Into::into))
}
}

impl<T: ArcSessionExt + 'static> SessionExt for T {
type PublicationCacheBuilder<'a, 'b, 'c> = PublicationCacheBuilder<'static, 'b, 'c>;
fn declare_publication_cache<'a, 'b, 'c, TryIntoKeyExpr>(
impl<'a> SessionExt<'a, 'a> for Session {
fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>(
&'a self,
pub_key_expr: TryIntoKeyExpr,
) -> Self::PublicationCacheBuilder<'a, 'b, 'c>
) -> PublicationCacheBuilder<'a, 'b, 'c>
where
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
{
ArcSessionExt::declare_publication_cache(self, pub_key_expr)
SessionRef::Borrow(self).declare_publication_cache(pub_key_expr)
}
}

pub trait ArcSessionExt {
fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>(
&self,
pub_key_expr: TryIntoKeyExpr,
) -> PublicationCacheBuilder<'static, 'b, 'c>
where
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>;
}

impl ArcSessionExt for Arc<Session> {
impl<'s> SessionExt<'s, 'static> for Arc<Session> {
/// Examples:
/// ```
/// # async_std::task::block_on(async {
/// use zenoh::prelude::r#async::*;
/// use zenoh::config::ModeDependentValue::Unique;
/// use zenoh_ext::ArcSessionExt;
/// use zenoh_ext::SessionExt;
///
/// let mut config = config::default();
/// config.timestamping.set_enabled(Some(Unique(true)));
Expand All @@ -90,16 +72,13 @@ impl ArcSessionExt for Arc<Session> {
/// # })
/// ```
fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>(
&self,
&'s self,
pub_key_expr: TryIntoKeyExpr,
) -> PublicationCacheBuilder<'static, 'b, 'c>
where
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
{
PublicationCacheBuilder::new(
SessionRef::Shared(self.clone()),
pub_key_expr.try_into().map_err(Into::into),
)
SessionRef::Shared(self.clone()).declare_publication_cache(pub_key_expr)
}
}
Loading

0 comments on commit e8dca1e

Please sign in to comment.