Skip to content

Commit

Permalink
Merge branch 'main' into tagging
Browse files Browse the repository at this point in the history
  • Loading branch information
milyin committed Feb 6, 2024
2 parents efb4a45 + e8dca1e commit 2819698
Show file tree
Hide file tree
Showing 45 changed files with 12,577 additions and 1,752 deletions.
1 change: 0 additions & 1 deletion examples/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ impl From<&CommonArgs> for Config {
Some(path) => Config::from_file(path).unwrap(),
None => Config::default(),
};
println!("ARGS mode: {:?} ", value.mode);
match value.mode {
Some(Wai::Peer) => config.set_mode(Some(zenoh::scouting::WhatAmI::Peer)),
Some(Wai::Client) => config.set_mode(Some(zenoh::scouting::WhatAmI::Client)),
Expand Down
1 change: 0 additions & 1 deletion io/zenoh-transport/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
pub mod common;
pub mod manager;
pub mod multicast;
pub mod primitives;
pub mod unicast;

#[cfg(feature = "stats")]
Expand Down
58 changes: 0 additions & 58 deletions io/zenoh-transport/src/primitives/demux.rs

This file was deleted.

130 changes: 0 additions & 130 deletions io/zenoh-transport/src/primitives/mux.rs

This file was deleted.

13 changes: 8 additions & 5 deletions zenoh-ext/examples/z_pub_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ async fn main() {
// Initiate logging
env_logger::init();

let (config, key_expr, value, history, prefix) = parse_args();
let (config, key_expr, value, history, prefix, complete) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();

println!("Declaring PublicationCache on {}", &key_expr);
let mut publication_cache_builder = session
.declare_publication_cache(&key_expr)
.history(history);
.history(history)
.queryable_complete(complete);
if let Some(prefix) = prefix {
publication_cache_builder = publication_cache_builder.queryable_prefix(prefix);
}
Expand All @@ -45,7 +46,7 @@ async fn main() {
}
}

fn parse_args() -> (Config, String, String, usize, Option<String>) {
fn parse_args() -> (Config, String, String, usize, Option<String>, bool) {
let args = Command::new("zenoh-ext pub cache example")
.arg(
arg!(-m --mode [MODE] "The zenoh session mode (peer by default)")
Expand All @@ -59,11 +60,12 @@ fn parse_args() -> (Config, String, String, usize, Option<String>) {
)
.arg(arg!(-v --value [VALUE] "The value to publish.").default_value("Pub from Rust!"))
.arg(
arg!(-h --history [SIZE] "The number of publications to keep in cache")
arg!(-i --history [SIZE] "The number of publications to keep in cache")
.default_value("1"),
)
.arg(arg!(-x --prefix [STRING] "An optional queryable prefix"))
.arg(arg!(-c --config [FILE] "A configuration file."))
.arg(arg!(-o --complete "Set `complete` option to true. This means that this queryable is ulitmate data source, no need to scan other queryables."))
.arg(arg!(--"no-multicast-scouting" "Disable the multicast-based scouting mechanism."))
.get_matches();

Expand Down Expand Up @@ -101,6 +103,7 @@ fn parse_args() -> (Config, String, String, usize, Option<String>) {
let value = args.get_one::<String>("value").unwrap().to_string();
let history: usize = args.get_one::<String>("history").unwrap().parse().unwrap();
let prefix = args.get_one::<String>("prefix").map(|s| (*s).to_owned());
let complete = args.get_flag("complete");

(config, key_expr, value, history, prefix)
(config, key_expr, value, history, prefix, complete)
}
2 changes: 1 addition & 1 deletion zenoh-ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub use publication_cache::{PublicationCache, PublicationCacheBuilder};
pub use querying_subscriber::{
FetchingSubscriber, FetchingSubscriberBuilder, QueryingSubscriberBuilder,
};
pub use session_ext::{ArcSessionExt, SessionExt};
pub use session_ext::SessionExt;
pub use subscriber_ext::SubscriberBuilderExt;
pub use subscriber_ext::SubscriberForward;

Expand Down
51 changes: 26 additions & 25 deletions zenoh-ext/src/publication_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ pub struct PublicationCacheBuilder<'a, 'b, 'c> {
session: SessionRef<'a>,
pub_key_expr: ZResult<KeyExpr<'b>>,
queryable_prefix: Option<ZResult<KeyExpr<'c>>>,
queryable_origin: Locality,
queryable_origin: Option<Locality>,
complete: Option<bool>,
history: usize,
resources_limit: Option<usize>,
}
Expand All @@ -46,7 +47,8 @@ impl<'a, 'b, 'c> PublicationCacheBuilder<'a, 'b, 'c> {
session,
pub_key_expr,
queryable_prefix: None,
queryable_origin: Locality::default(),
queryable_origin: None,
complete: None,
history: 1,
resources_limit: None,
}
Expand All @@ -67,7 +69,13 @@ impl<'a, 'b, 'c> PublicationCacheBuilder<'a, 'b, 'c> {
#[zenoh_macros::unstable]
#[inline]
pub fn queryable_allowed_origin(mut self, origin: Locality) -> Self {
self.queryable_origin = origin;
self.queryable_origin = Some(origin);
self
}

/// Set completeness option for the queryable.
pub fn queryable_complete(mut self, complete: bool) -> Self {
self.complete = Some(complete);
self
}

Expand Down Expand Up @@ -137,28 +145,21 @@ impl<'a> PublicationCache<'a> {
}

// declare the local subscriber that will store the local publications
let (local_sub, queryable) = match conf.session.clone() {
SessionRef::Borrow(session) => (
session
.declare_subscriber(&key_expr)
.allowed_origin(Locality::SessionLocal)
.res_sync()?,
session
.declare_queryable(&queryable_key_expr)
.allowed_origin(conf.queryable_origin)
.res_sync()?,
),
SessionRef::Shared(session) => (
session
.declare_subscriber(&key_expr)
.allowed_origin(Locality::SessionLocal)
.res_sync()?,
session
.declare_queryable(&queryable_key_expr)
.allowed_origin(conf.queryable_origin)
.res_sync()?,
),
};
let local_sub = conf
.session
.declare_subscriber(&key_expr)
.allowed_origin(Locality::SessionLocal)
.res_sync()?;

// declare the queryable which returns the cached publications
let mut queryable = conf.session.declare_queryable(&queryable_key_expr);
if let Some(origin) = conf.queryable_origin {
queryable = queryable.allowed_origin(origin);
}
if let Some(complete) = conf.complete {
queryable = queryable.complete(complete);
}
let queryable = queryable.res_sync()?;

// take local ownership of stuff to be moved into task
let sub_recv = local_sub.receiver.clone();
Expand Down
41 changes: 14 additions & 27 deletions zenoh-ext/src/querying_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,33 +680,20 @@ impl<'a, Receiver> FetchingSubscriber<'a, Receiver> {
// register fetch handler
let handler = register_handler(state.clone(), callback.clone());
// declare subscriber
let subscriber = match conf.session.clone() {
SessionRef::Borrow(session) => match conf.key_space.into() {
crate::KeySpace::User => session
.declare_subscriber(&key_expr)
.callback(sub_callback)
.reliability(conf.reliability)
.allowed_origin(conf.origin)
.res_sync()?,
crate::KeySpace::Liveliness => session
.liveliness()
.declare_subscriber(&key_expr)
.callback(sub_callback)
.res_sync()?,
},
SessionRef::Shared(session) => match conf.key_space.into() {
crate::KeySpace::User => session
.declare_subscriber(&key_expr)
.callback(sub_callback)
.reliability(conf.reliability)
.allowed_origin(conf.origin)
.res_sync()?,
crate::KeySpace::Liveliness => session
.liveliness()
.declare_subscriber(&key_expr)
.callback(sub_callback)
.res_sync()?,
},
let subscriber = match conf.key_space.into() {
crate::KeySpace::User => conf
.session
.declare_subscriber(&key_expr)
.callback(sub_callback)
.reliability(conf.reliability)
.allowed_origin(conf.origin)
.res_sync()?,
crate::KeySpace::Liveliness => conf
.session
.liveliness()
.declare_subscriber(&key_expr)
.callback(sub_callback)
.res_sync()?,
};

let fetch_subscriber = FetchingSubscriber {
Expand Down
Loading

0 comments on commit 2819698

Please sign in to comment.