Skip to content

Commit

Permalink
Merge with master
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Nov 27, 2023
2 parents 9acebcb + bbd12e2 commit 19e97e9
Show file tree
Hide file tree
Showing 21 changed files with 1,358 additions and 193 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 13 additions & 2 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,15 @@ impl Config {

impl std::fmt::Display for Config {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", serde_json::to_string(self).unwrap())
serde_json::to_value(self)
.map(|mut json| {
sift_privates(&mut json);
write!(f, "{json}")
})
.map_err(|e| {
_ = write!(f, "{e:?}");
fmt::Error
})?
}
}

Expand Down Expand Up @@ -1030,9 +1038,12 @@ impl<'a> serde::Deserialize<'a> for PluginsConfig {
})
}
}

impl std::fmt::Debug for PluginsConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", &self.values)
let mut values: Value = self.values.clone();
sift_privates(&mut values);
write!(f, "{:?}", values)
}
}

Expand Down
10 changes: 6 additions & 4 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,14 @@
z_storage -k demo/**
```

### z_pub_shm & z_sub_shm
### z_pub_shm & z_sub

A pub/sub example involving the shared-memory feature.
Note that on subscriber side, the same `z_sub` example than for non-shared-memory example is used.

Typical Subscriber usage:
```bash
z_sub_shm
z_sub
```

Typical Publisher usage:
Expand Down Expand Up @@ -188,16 +189,17 @@
z_ping 1024
```

### z_pub_shm_thr & z_sub_shm_thr
### z_pub_shm_thr & z_sub_thr

Pub/Sub throughput test involving the shared-memory feature.
This example allows performing throughput measurements between a publisher performing
put operations with the shared-memory feature and a subscriber receiving notifications
of those puts.
Note that on subscriber side, the same `z_sub_thr` example than for non-shared-memory example is used.

Typical Subscriber usage:
```bash
z_sub_shm_thr
z_sub_thr
```

Typical Publisher usage:
Expand Down
9 changes: 7 additions & 2 deletions examples/examples/z_pub_shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ async fn main() -> Result<(), zenoh::Error> {
// Initiate logging
env_logger::init();

let (config, path, value) = parse_args();
let (mut config, path, value) = parse_args();

// A probing procedure for shared memory is performed upon session opening. To enable `z_pub_shm` to operate
// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
config.transport.shared_memory.set_enabled(true).unwrap();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();
Expand All @@ -39,6 +44,7 @@ async fn main() -> Result<(), zenoh::Error> {
let publisher = session.declare_publisher(&path).res().await.unwrap();

for idx in 0..(K * N as u32) {
sleep(Duration::from_secs(1)).await;
let mut sbuf = match shm.alloc(1024) {
Ok(buf) => buf,
Err(_) => {
Expand Down Expand Up @@ -88,7 +94,6 @@ async fn main() -> Result<(), zenoh::Error> {
let defrag = shm.defragment();
println!("De-framented {defrag} bytes");
}
// sleep(Duration::from_millis(100)).await;
// Dropping the SharedMemoryBuf means to free it.
drop(sbuf);
}
Expand Down
7 changes: 6 additions & 1 deletion examples/examples/z_pub_shm_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ use zenoh::shm::SharedMemoryManager;
async fn main() {
// initiate logging
env_logger::init();
let (config, sm_size, size) = parse_args();
let (mut config, sm_size, size) = parse_args();

// A probing procedure for shared memory is performed upon session opening. To enable `z_pub_shm_thr` to operate
// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
config.transport.shared_memory.set_enabled(true).unwrap();

let z = zenoh::open(config).res().await.unwrap();
let id = z.zid();
Expand Down
21 changes: 19 additions & 2 deletions examples/examples/z_queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use clap::{App, Arg};
use futures::prelude::*;
use futures::select;
use std::convert::TryFrom;
use std::sync::atomic::Ordering::Relaxed;
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;
Expand All @@ -26,6 +27,7 @@ async fn main() {
env_logger::init();

let (config, key_expr, value, complete) = parse_args();
let send_errors = std::sync::atomic::AtomicBool::new(false);

let key_expr = KeyExpr::try_from(key_expr).unwrap();
println!("Opening session...");
Expand All @@ -39,7 +41,7 @@ async fn main() {
.await
.unwrap();

println!("Enter 'q' to quit...");
println!("Enter 'q' to quit, 'e' to reply an error to next query...");
let mut stdin = async_std::io::stdin();
let mut input = [0_u8];
loop {
Expand All @@ -50,8 +52,22 @@ async fn main() {
None => println!(">> [Queryable ] Received Query '{}'", query.selector()),
Some(value) => println!(">> [Queryable ] Received Query '{}' with value '{}'", query.selector(), value),
}
let reply = if send_errors.swap(false, Relaxed) {
println!(
">> [Queryable ] Replying (ERROR: '{}')",
value,
);
Err(value.clone().into())
} else {
println!(
">> [Queryable ] Responding ('{}': '{}')",
key_expr.as_str(),
value,
);
Ok(Sample::new(key_expr.clone(), value.clone()))
};
query
.reply(Ok(Sample::new(key_expr.clone(), value.clone())))
.reply(reply)
.res()
.await
.unwrap_or_else(|e| println!(">> [Queryable ] Error sending reply: {e}"));
Expand All @@ -61,6 +77,7 @@ async fn main() {
match input[0] {
b'q' => break,
0 => sleep(Duration::from_secs(1)).await,
b'e' => send_errors.store(true, Relaxed),
_ => (),
}
}
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_scout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ async fn main() {
.await;

// stop scouting
drop(receiver);
receiver.stop();
}
7 changes: 6 additions & 1 deletion examples/examples/z_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ async fn main() {
// Initiate logging
env_logger::init();

let (config, key_expr) = parse_args();
let (mut config, key_expr) = parse_args();

// A probing procedure for shared memory is performed upon session opening. To enable `z_pub_shm` to operate
// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
config.transport.shared_memory.set_enabled(true).unwrap();

println!("Opening session...");
let session = zenoh::open(config).res().await.unwrap();
Expand Down
7 changes: 6 additions & 1 deletion examples/examples/z_sub_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ fn main() {
// initiate logging
env_logger::init();

let (config, m, n) = parse_args();
let (mut config, m, n) = parse_args();

// A probing procedure for shared memory is performed upon session opening. To enable `z_pub_shm_thr` to operate
// over shared memory (and to not fallback on network mode), shared memory needs to be enabled also on the
// subscriber side. By doing so, the probing procedure will succeed and shared memory will operate as expected.
config.transport.shared_memory.set_enabled(true).unwrap();

let session = zenoh::open(config).res().unwrap();

Expand Down
61 changes: 40 additions & 21 deletions zenoh-ext/src/querying_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,9 @@ impl<'a, Receiver> FetchingSubscriber<'a, Receiver> {

let key_expr = conf.key_expr?;

// declare subscriber at first
// register fetch handler
let handler = register_handler(state.clone(), callback.clone());
// declare subscriber
let subscriber = match conf.session.clone() {
SessionRef::Borrow(session) => match conf.key_space.into() {
crate::KeySpace::User => session
Expand Down Expand Up @@ -707,15 +709,15 @@ impl<'a, Receiver> FetchingSubscriber<'a, Receiver> {
},
};

let mut fetch_subscriber = FetchingSubscriber {
let fetch_subscriber = FetchingSubscriber {
subscriber,
callback,
state,
receiver,
};

// start fetch
fetch_subscriber.fetch(conf.fetch).res_sync()?;
// run fetch
run_fetch(conf.fetch, handler)?;

Ok(fetch_subscriber)
}
Expand Down Expand Up @@ -776,7 +778,7 @@ impl<'a, Receiver> FetchingSubscriber<'a, Receiver> {
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()> + Send + Sync,
TryIntoSample,
>(
&mut self,
&self,
fetch: Fetch,
) -> impl Resolve<ZResult<()>>
where
Expand Down Expand Up @@ -882,22 +884,8 @@ where
<TryIntoSample as TryInto<Sample>>::Error: Into<zenoh_core::Error>,
{
fn res_sync(self) -> <Self as Resolvable>::To {
zlock!(self.state).pending_fetches += 1;
// pending fetches will be decremented in RepliesHandler drop()
let handler = RepliesHandler {
state: self.state,
callback: self.callback,
};

log::debug!("Fetch");
(self.fetch)(Box::new(move |s: TryIntoSample| match s.try_into() {
Ok(s) => {
let mut state = zlock!(handler.state);
log::trace!("Fetched sample received: push it to merge_queue");
state.merge_queue.push(s);
}
Err(e) => log::debug!("Received error fetching data: {}", e.into()),
}))
let handler = register_handler(self.state, self.callback);
run_fetch(self.fetch, handler)
}
}

Expand All @@ -913,3 +901,34 @@ where
std::future::ready(self.res_sync())
}
}

fn register_handler(
state: Arc<Mutex<InnerState>>,
callback: Arc<dyn Fn(Sample) + Send + Sync>,
) -> RepliesHandler {
zlock!(state).pending_fetches += 1;
// pending fetches will be decremented in RepliesHandler drop()
RepliesHandler { state, callback }
}

fn run_fetch<
Fetch: FnOnce(Box<dyn Fn(TryIntoSample) + Send + Sync>) -> ZResult<()>,
TryIntoSample,
>(
fetch: Fetch,
handler: RepliesHandler,
) -> ZResult<()>
where
TryIntoSample: TryInto<Sample>,
<TryIntoSample as TryInto<Sample>>::Error: Into<zenoh_core::Error>,
{
log::debug!("Fetch data for FetchingSubscriber");
(fetch)(Box::new(move |s: TryIntoSample| match s.try_into() {
Ok(s) => {
let mut state = zlock!(handler.state);
log::trace!("Fetched sample received: push it to merge_queue");
state.merge_queue.push(s);
}
Err(e) => log::debug!("Received error fetching data: {}", e.into()),
}))
}
20 changes: 19 additions & 1 deletion zenoh-ext/src/session_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@ use zenoh::{Session, SessionRef};

/// Some extensions to the [`zenoh::Session`](zenoh::Session)
pub trait SessionExt {
type PublicationCacheBuilder<'a, 'b, 'c>
where
Self: 'a;
fn declare_publication_cache<'a, 'b, 'c, TryIntoKeyExpr>(
&'a self,
pub_key_expr: TryIntoKeyExpr,
) -> PublicationCacheBuilder<'a, 'b, 'c>
) -> Self::PublicationCacheBuilder<'a, 'b, 'c>
where
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>;
}

impl SessionExt for Session {
type PublicationCacheBuilder<'a, 'b, 'c> = PublicationCacheBuilder<'a, 'b, 'c>;
fn declare_publication_cache<'a, 'b, 'c, TryIntoKeyExpr>(
&'a self,
pub_key_expr: TryIntoKeyExpr,
Expand All @@ -44,6 +48,20 @@ impl SessionExt for Session {
}
}

impl<T: ArcSessionExt + 'static> SessionExt for T {
type PublicationCacheBuilder<'a, 'b, 'c> = PublicationCacheBuilder<'static, 'b, 'c>;
fn declare_publication_cache<'a, 'b, 'c, TryIntoKeyExpr>(
&'a self,
pub_key_expr: TryIntoKeyExpr,
) -> Self::PublicationCacheBuilder<'a, 'b, 'c>
where
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
{
ArcSessionExt::declare_publication_cache(self, pub_key_expr)
}
}

pub trait ArcSessionExt {
fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>(
&self,
Expand Down
Loading

0 comments on commit 19e97e9

Please sign in to comment.