From dff8456530481ba0423382cfcd945358a9ea24cb Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Mon, 9 Sep 2024 18:09:30 +0200 Subject: [PATCH] chore: merge branch 'main' into 'dev/arcsession' (#1384) * feat(zenoh_id): exposing into slice & try from slice Allowing users to create a ZenohId from a slice, using TryFrom, and also allowing users to convert a ZenohId into a [u8; 16]. * Add LivlinessSubscriber history option (#1355) * Close #1357 * feat(zenoh_id): replacing from slice with function --------- Co-authored-by: Darius Maitia Co-authored-by: OlivierHecart Co-authored-by: Luca Cominardi --- commons/zenoh-config/src/wrappers.rs | 13 +++++++ examples/examples/z_sub_liveliness.rs | 12 ++++--- .../zenoh-link-unixsock_stream/src/unicast.rs | 2 +- zenoh/src/api/liveliness.rs | 35 ++++++++++++++++++- zenoh/src/api/session.rs | 7 +++- 5 files changed, 62 insertions(+), 7 deletions(-) diff --git a/commons/zenoh-config/src/wrappers.rs b/commons/zenoh-config/src/wrappers.rs index fd6d2ef50b..d04950f21e 100644 --- a/commons/zenoh-config/src/wrappers.rs +++ b/commons/zenoh-config/src/wrappers.rs @@ -35,6 +35,10 @@ impl ZenohId { pub fn into_keyexpr(self) -> OwnedKeyExpr { self.into() } + + pub fn to_le_bytes(self) -> [u8; uhlc::ID::MAX_SIZE] { + self.0.to_le_bytes() + } } impl fmt::Debug for ZenohId { @@ -54,6 +58,15 @@ impl From for ZenohId { } } +impl TryFrom<&[u8]> for ZenohId { + type Error = zenoh_result::Error; + + fn try_from(value: &[u8]) -> Result { + let proto: ZenohIdProto = value.try_into()?; + Ok(ZenohId::from(proto)) + } +} + impl From for ZenohIdProto { fn from(id: ZenohId) -> Self { id.0 diff --git a/examples/examples/z_sub_liveliness.rs b/examples/examples/z_sub_liveliness.rs index 0b70d20786..d3f13a05cc 100644 --- a/examples/examples/z_sub_liveliness.rs +++ b/examples/examples/z_sub_liveliness.rs @@ -20,7 +20,7 @@ async fn main() { // Initiate logging zenoh::try_init_log_from_env(); - let (config, key_expr) = parse_args(); + let (config, key_expr, history) = parse_args(); println!("Opening session..."); let session = zenoh::open(config).await.unwrap(); @@ -30,6 +30,7 @@ async fn main() { let subscriber = session .liveliness() .declare_subscriber(&key_expr) + .history(history) .await .unwrap(); @@ -51,13 +52,16 @@ async fn main() { #[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] struct Args { #[arg(short, long, default_value = "group1/**")] - /// The key expression to write to. + /// The key expression to subscribe to. key: KeyExpr<'static>, + #[arg(long)] + /// Get historical liveliness tokens. + history: bool, #[command(flatten)] common: CommonArgs, } -fn parse_args() -> (Config, KeyExpr<'static>) { +fn parse_args() -> (Config, KeyExpr<'static>, bool) { let args = Args::parse(); - (args.common.into(), args.key) + (args.common.into(), args.key, args.history) } diff --git a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs index a07267416d..5632da26f4 100644 --- a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs @@ -333,7 +333,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUnixSocketStream { nix::fcntl::flock(lock_fd, nix::fcntl::FlockArg::LockExclusiveNonblock).map_err(|e| { let _ = nix::unistd::close(lock_fd); let e = zerror!( - "Can not create a new UnixSocketStream listener on {} - Unable to acquire look: {}", + "Can not create a new UnixSocketStream listener on {} - Unable to acquire lock: {}", path, e ); diff --git a/zenoh/src/api/liveliness.rs b/zenoh/src/api/liveliness.rs index 7c07056c1e..24c9d9871d 100644 --- a/zenoh/src/api/liveliness.rs +++ b/zenoh/src/api/liveliness.rs @@ -172,6 +172,7 @@ impl<'a> Liveliness<'a> { key_expr: TryIntoKeyExpr::try_into(key_expr).map_err(Into::into), handler: DefaultHandler::default(), undeclare_on_drop: true, + history: false, } } @@ -436,6 +437,7 @@ pub struct LivelinessSubscriberBuilder<'a, 'b, Handler> { pub key_expr: ZResult>, pub handler: Handler, pub undeclare_on_drop: bool, + pub history: bool, } #[zenoh_macros::unstable] @@ -471,12 +473,14 @@ impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> { key_expr, handler: _, undeclare_on_drop: _, + history, } = self; LivelinessSubscriberBuilder { session, key_expr, handler: callback, undeclare_on_drop: false, + history, } } @@ -544,12 +548,36 @@ impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> { key_expr, handler: _, undeclare_on_drop: _, + history, } = self; LivelinessSubscriberBuilder { session, key_expr, handler, undeclare_on_drop: true, + history, + } + } +} + +#[zenoh_macros::unstable] +impl LivelinessSubscriberBuilder<'_, '_, Handler> { + #[inline] + #[zenoh_macros::unstable] + pub fn history(self, history: bool) -> Self { + let LivelinessSubscriberBuilder { + session, + key_expr, + handler, + undeclare_on_drop, + history: _, + } = self; + LivelinessSubscriberBuilder { + session, + key_expr, + handler, + undeclare_on_drop, + history, } } } @@ -578,7 +606,12 @@ where let (callback, handler) = self.handler.into_handler(); session .0 - .declare_liveliness_subscriber_inner(&key_expr, Locality::default(), callback) + .declare_liveliness_subscriber_inner( + &key_expr, + Locality::default(), + self.history, + callback, + ) .map(|sub_state| Subscriber { inner: SubscriberInner { #[cfg(feature = "unstable")] diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 4247f918a8..38d1ce1584 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -1542,6 +1542,7 @@ impl SessionInner { &self, key_expr: &KeyExpr, origin: Locality, + history: bool, callback: Callback<'static, Sample>, ) -> ZResult> { let mut state = zwrite!(self.state); @@ -1589,7 +1590,11 @@ impl SessionInner { primitives.send_interest(Interest { id, - mode: InterestMode::Future, + mode: if history { + InterestMode::CurrentFuture + } else { + InterestMode::Future + }, options: InterestOptions::KEYEXPRS + InterestOptions::TOKENS, wire_expr: Some(key_expr.to_wire(self).to_owned()), ext_qos: declare::ext::QoSType::DECLARE,