Skip to content

Commit

Permalink
Merge branch 'master' into simplified_load_plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
milyin committed Nov 20, 2023
2 parents ce3f585 + d2399ca commit 8ce56fc
Show file tree
Hide file tree
Showing 8 changed files with 256 additions and 136 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,9 @@ impl Config {

impl std::fmt::Display for Config {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", serde_json::to_string(self).unwrap())
let mut json = serde_json::to_value(self).unwrap();
sift_privates(&mut json);
write!(f, "{json}")
}
}

Expand Down
21 changes: 19 additions & 2 deletions examples/examples/z_queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use clap::{App, Arg};
use futures::prelude::*;
use futures::select;
use std::convert::TryFrom;
use std::sync::atomic::Ordering::Relaxed;
use std::time::Duration;
use zenoh::config::Config;
use zenoh::prelude::r#async::*;
Expand All @@ -26,6 +27,7 @@ async fn main() {
env_logger::init();

let (config, key_expr, value, complete) = parse_args();
let send_errors = std::sync::atomic::AtomicBool::new(false);

let key_expr = KeyExpr::try_from(key_expr).unwrap();
println!("Opening session...");
Expand All @@ -39,7 +41,7 @@ async fn main() {
.await
.unwrap();

println!("Enter 'q' to quit...");
println!("Enter 'q' to quit, 'e' to reply an error to next query...");
let mut stdin = async_std::io::stdin();
let mut input = [0_u8];
loop {
Expand All @@ -50,8 +52,22 @@ async fn main() {
None => println!(">> [Queryable ] Received Query '{}'", query.selector()),
Some(value) => println!(">> [Queryable ] Received Query '{}' with value '{}'", query.selector(), value),
}
let reply = if send_errors.swap(false, Relaxed) {
println!(
">> [Queryable ] Replying (ERROR: '{}')",
value,
);
Err(value.clone().into())
} else {
println!(
">> [Queryable ] Responding ('{}': '{}')",
key_expr.as_str(),
value,
);
Ok(Sample::new(key_expr.clone(), value.clone()))
};
query
.reply(Ok(Sample::new(key_expr.clone(), value.clone())))
.reply(reply)
.res()
.await
.unwrap_or_else(|e| println!(">> [Queryable ] Error sending reply: {e}"));
Expand All @@ -61,6 +77,7 @@ async fn main() {
match input[0] {
b'q' => break,
0 => sleep(Duration::from_secs(1)).await,
b'e' => send_errors.store(true, Relaxed),
_ => (),
}
}
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/z_scout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ async fn main() {
.await;

// stop scouting
drop(receiver);
receiver.stop();
}
20 changes: 19 additions & 1 deletion zenoh-ext/src/session_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@ use zenoh::{Session, SessionRef};

/// Some extensions to the [`zenoh::Session`](zenoh::Session)
pub trait SessionExt {
type PublicationCacheBuilder<'a, 'b, 'c>
where
Self: 'a;
fn declare_publication_cache<'a, 'b, 'c, TryIntoKeyExpr>(
&'a self,
pub_key_expr: TryIntoKeyExpr,
) -> PublicationCacheBuilder<'a, 'b, 'c>
) -> Self::PublicationCacheBuilder<'a, 'b, 'c>
where
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>;
}

impl SessionExt for Session {
type PublicationCacheBuilder<'a, 'b, 'c> = PublicationCacheBuilder<'a, 'b, 'c>;
fn declare_publication_cache<'a, 'b, 'c, TryIntoKeyExpr>(
&'a self,
pub_key_expr: TryIntoKeyExpr,
Expand All @@ -44,6 +48,20 @@ impl SessionExt for Session {
}
}

impl<T: ArcSessionExt + 'static> SessionExt for T {
type PublicationCacheBuilder<'a, 'b, 'c> = PublicationCacheBuilder<'static, 'b, 'c>;
fn declare_publication_cache<'a, 'b, 'c, TryIntoKeyExpr>(
&'a self,
pub_key_expr: TryIntoKeyExpr,
) -> Self::PublicationCacheBuilder<'a, 'b, 'c>
where
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
{
ArcSessionExt::declare_publication_cache(self, pub_key_expr)
}
}

pub trait ArcSessionExt {
fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>(
&self,
Expand Down
32 changes: 31 additions & 1 deletion zenoh/src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::sync::Arc;
use zenoh_core::{AsyncResolve, Resolvable, SyncResolve};
use zenoh_protocol::core::WireExpr;
use zenoh_protocol::network::{response, Mapping, RequestId, Response, ResponseFinal};
use zenoh_protocol::zenoh::ext::ValueType;
use zenoh_protocol::zenoh::reply::ext::ConsolidationType;
use zenoh_protocol::zenoh::{self, ResponseBody};
use zenoh_result::ZResult;
Expand Down Expand Up @@ -198,7 +199,36 @@ impl SyncResolve for ReplyBuilder<'_> {
});
Ok(())
}
Err(_) => Err(zerror!("Replying errors is not yet supported!").into()),
Err(payload) => {
self.query.inner.primitives.send_response(Response {
rid: self.query.inner.qid,
wire_expr: WireExpr {
scope: 0,
suffix: std::borrow::Cow::Owned(self.query.key_expr().as_str().to_owned()),
mapping: Mapping::Sender,
},
payload: ResponseBody::Err(zenoh::Err {
timestamp: None,
is_infrastructure: false,
ext_sinfo: None,
ext_unknown: vec![],
ext_body: Some(ValueType {
#[cfg(feature = "shared-memory")]
ext_shm: None,
payload: payload.payload,
encoding: payload.encoding,
}),
code: 0, // TODO
}),
ext_qos: response::ext::QoSType::response_default(),
ext_tstamp: None,
ext_respid: Some(response::ext::ResponderIdType {
zid: self.query.inner.zid,
eid: 0, // TODO
}),
});
Ok(())
}
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion zenoh/src/scouting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ impl ScoutInner {
/// # })
/// ```
pub fn stop(self) {
// drop
// This drops the inner `stop_sender` and hence stops the scouting receiver
std::mem::drop(self);
}
}

Expand Down
Loading

0 comments on commit 8ce56fc

Please sign in to comment.