Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

'complete' option in PublicationCache, refactoring #688

Merged
merged 7 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions zenoh-ext/examples/z_pub_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ async fn main() {
// Initiate logging
env_logger::init();

let (config, key_expr, value, history, prefix) = parse_args();
let (config, key_expr, value, history, prefix, complete) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();

println!("Declaring PublicationCache on {}", &key_expr);
let mut publication_cache_builder = session
.declare_publication_cache(&key_expr)
.history(history);
.history(history)
.queryable_complete(complete);
if let Some(prefix) = prefix {
publication_cache_builder = publication_cache_builder.queryable_prefix(prefix);
}
Expand All @@ -45,7 +46,7 @@ async fn main() {
}
}

fn parse_args() -> (Config, String, String, usize, Option<String>) {
fn parse_args() -> (Config, String, String, usize, Option<String>, bool) {
let args = Command::new("zenoh-ext pub cache example")
.arg(
arg!(-m --mode [MODE] "The zenoh session mode (peer by default)")
Expand All @@ -59,11 +60,12 @@ fn parse_args() -> (Config, String, String, usize, Option<String>) {
)
.arg(arg!(-v --value [VALUE] "The value to publish.").default_value("Pub from Rust!"))
.arg(
arg!(-h --history [SIZE] "The number of publications to keep in cache")
arg!(-i --history [SIZE] "The number of publications to keep in cache")
.default_value("1"),
)
.arg(arg!(-x --prefix [STRING] "An optional queryable prefix"))
.arg(arg!(-c --config [FILE] "A configuration file."))
.arg(arg!(-o --complete "Set `complete` option to true. This means that this queryable is ulitmate data source, no need to scan other queryables."))
.arg(arg!(--"no-multicast-scouting" "Disable the multicast-based scouting mechanism."))
.get_matches();

Expand Down Expand Up @@ -101,6 +103,7 @@ fn parse_args() -> (Config, String, String, usize, Option<String>) {
let value = args.get_one::<String>("value").unwrap().to_string();
let history: usize = args.get_one::<String>("history").unwrap().parse().unwrap();
let prefix = args.get_one::<String>("prefix").map(|s| (*s).to_owned());
let complete = args.get_flag("complete");

(config, key_expr, value, history, prefix)
(config, key_expr, value, history, prefix, complete)
}
2 changes: 1 addition & 1 deletion zenoh-ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub use publication_cache::{PublicationCache, PublicationCacheBuilder};
pub use querying_subscriber::{
FetchingSubscriber, FetchingSubscriberBuilder, QueryingSubscriberBuilder,
};
pub use session_ext::{ArcSessionExt, SessionExt};
pub use session_ext::SessionExt;
pub use subscriber_ext::SubscriberBuilderExt;
pub use subscriber_ext::SubscriberForward;

Expand Down
51 changes: 26 additions & 25 deletions zenoh-ext/src/publication_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ pub struct PublicationCacheBuilder<'a, 'b, 'c> {
session: SessionRef<'a>,
pub_key_expr: ZResult<KeyExpr<'b>>,
queryable_prefix: Option<ZResult<KeyExpr<'c>>>,
queryable_origin: Locality,
queryable_origin: Option<Locality>,
complete: Option<bool>,
history: usize,
resources_limit: Option<usize>,
}
Expand All @@ -46,7 +47,8 @@ impl<'a, 'b, 'c> PublicationCacheBuilder<'a, 'b, 'c> {
session,
pub_key_expr,
queryable_prefix: None,
queryable_origin: Locality::default(),
queryable_origin: None,
complete: None,
history: 1,
resources_limit: None,
}
Expand All @@ -67,7 +69,13 @@ impl<'a, 'b, 'c> PublicationCacheBuilder<'a, 'b, 'c> {
#[zenoh_macros::unstable]
#[inline]
pub fn queryable_allowed_origin(mut self, origin: Locality) -> Self {
self.queryable_origin = origin;
self.queryable_origin = Some(origin);
self
}

/// Set completeness option for the queryable.
pub fn queryable_complete(mut self, complete: bool) -> Self {
self.complete = Some(complete);
self
}

Expand Down Expand Up @@ -137,28 +145,21 @@ impl<'a> PublicationCache<'a> {
}

// declare the local subscriber that will store the local publications
let (local_sub, queryable) = match conf.session.clone() {
SessionRef::Borrow(session) => (
session
.declare_subscriber(&key_expr)
.allowed_origin(Locality::SessionLocal)
.res_sync()?,
session
.declare_queryable(&queryable_key_expr)
.allowed_origin(conf.queryable_origin)
.res_sync()?,
),
SessionRef::Shared(session) => (
session
.declare_subscriber(&key_expr)
.allowed_origin(Locality::SessionLocal)
.res_sync()?,
session
.declare_queryable(&queryable_key_expr)
.allowed_origin(conf.queryable_origin)
.res_sync()?,
),
};
let local_sub = conf
.session
.declare_subscriber(&key_expr)
.allowed_origin(Locality::SessionLocal)
.res_sync()?;

// declare the queryable which returns the cached publications
let mut queryable = conf.session.declare_queryable(&queryable_key_expr);
if let Some(origin) = conf.queryable_origin {
queryable = queryable.allowed_origin(origin);
}
if let Some(complete) = conf.complete {
queryable = queryable.complete(complete);
}
let queryable = queryable.res_sync()?;

// take local ownership of stuff to be moved into task
let sub_recv = local_sub.receiver.clone();
Expand Down
41 changes: 14 additions & 27 deletions zenoh-ext/src/querying_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,33 +680,20 @@ impl<'a, Receiver> FetchingSubscriber<'a, Receiver> {
// register fetch handler
let handler = register_handler(state.clone(), callback.clone());
// declare subscriber
let subscriber = match conf.session.clone() {
SessionRef::Borrow(session) => match conf.key_space.into() {
crate::KeySpace::User => session
.declare_subscriber(&key_expr)
.callback(sub_callback)
.reliability(conf.reliability)
.allowed_origin(conf.origin)
.res_sync()?,
crate::KeySpace::Liveliness => session
.liveliness()
.declare_subscriber(&key_expr)
.callback(sub_callback)
.res_sync()?,
},
SessionRef::Shared(session) => match conf.key_space.into() {
crate::KeySpace::User => session
.declare_subscriber(&key_expr)
.callback(sub_callback)
.reliability(conf.reliability)
.allowed_origin(conf.origin)
.res_sync()?,
crate::KeySpace::Liveliness => session
.liveliness()
.declare_subscriber(&key_expr)
.callback(sub_callback)
.res_sync()?,
},
let subscriber = match conf.key_space.into() {
crate::KeySpace::User => conf
.session
.declare_subscriber(&key_expr)
.callback(sub_callback)
.reliability(conf.reliability)
.allowed_origin(conf.origin)
.res_sync()?,
crate::KeySpace::Liveliness => conf
.session
.liveliness()
.declare_subscriber(&key_expr)
.callback(sub_callback)
.res_sync()?,
};

let fetch_subscriber = FetchingSubscriber {
Expand Down
53 changes: 16 additions & 37 deletions zenoh-ext/src/session_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,67 +18,49 @@ use zenoh::prelude::KeyExpr;
use zenoh::{Session, SessionRef};

/// Some extensions to the [`zenoh::Session`](zenoh::Session)
pub trait SessionExt {
type PublicationCacheBuilder<'a, 'b, 'c>
where
Self: 'a;
fn declare_publication_cache<'a, 'b, 'c, TryIntoKeyExpr>(
&'a self,
pub trait SessionExt<'s, 'a> {
fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>(
&'s self,
pub_key_expr: TryIntoKeyExpr,
) -> Self::PublicationCacheBuilder<'a, 'b, 'c>
) -> PublicationCacheBuilder<'a, 'b, 'c>
where
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>;
}

impl SessionExt for Session {
type PublicationCacheBuilder<'a, 'b, 'c> = PublicationCacheBuilder<'a, 'b, 'c>;
fn declare_publication_cache<'a, 'b, 'c, TryIntoKeyExpr>(
&'a self,
impl<'s, 'a> SessionExt<'s, 'a> for SessionRef<'a> {
fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>(
&'s self,
pub_key_expr: TryIntoKeyExpr,
) -> PublicationCacheBuilder<'a, 'b, 'c>
where
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
{
PublicationCacheBuilder::new(
SessionRef::Borrow(self),
pub_key_expr.try_into().map_err(Into::into),
)
PublicationCacheBuilder::new(self.clone(), pub_key_expr.try_into().map_err(Into::into))
}
}

impl<T: ArcSessionExt + 'static> SessionExt for T {
type PublicationCacheBuilder<'a, 'b, 'c> = PublicationCacheBuilder<'static, 'b, 'c>;
fn declare_publication_cache<'a, 'b, 'c, TryIntoKeyExpr>(
impl<'a> SessionExt<'a, 'a> for Session {
fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>(
&'a self,
pub_key_expr: TryIntoKeyExpr,
) -> Self::PublicationCacheBuilder<'a, 'b, 'c>
) -> PublicationCacheBuilder<'a, 'b, 'c>
where
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
{
ArcSessionExt::declare_publication_cache(self, pub_key_expr)
SessionRef::Borrow(self).declare_publication_cache(pub_key_expr)
}
}

pub trait ArcSessionExt {
fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>(
&self,
pub_key_expr: TryIntoKeyExpr,
) -> PublicationCacheBuilder<'static, 'b, 'c>
where
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>;
}

impl ArcSessionExt for Arc<Session> {
impl<'s> SessionExt<'s, 'static> for Arc<Session> {
/// Examples:
/// ```
/// # async_std::task::block_on(async {
/// use zenoh::prelude::r#async::*;
/// use zenoh::config::ModeDependentValue::Unique;
/// use zenoh_ext::ArcSessionExt;
/// use zenoh_ext::SessionExt;
///
/// let mut config = config::default();
/// config.timestamping.set_enabled(Some(Unique(true)));
Expand All @@ -90,16 +72,13 @@ impl ArcSessionExt for Arc<Session> {
/// # })
/// ```
fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>(
&self,
&'s self,
pub_key_expr: TryIntoKeyExpr,
) -> PublicationCacheBuilder<'static, 'b, 'c>
where
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
{
PublicationCacheBuilder::new(
SessionRef::Shared(self.clone()),
pub_key_expr.try_into().map_err(Into::into),
)
SessionRef::Shared(self.clone()).declare_publication_cache(pub_key_expr)
}
}
Loading