From 660fdd6094d2fdd5bda24c963c4f0d2d0549dbef Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Mon, 9 Sep 2024 14:55:28 +0200 Subject: [PATCH] Add LivlinessSubscriber history option (#1355) --- examples/examples/z_sub_liveliness.rs | 12 ++++++---- zenoh/src/api/liveliness.rs | 33 ++++++++++++++++++++++++++- zenoh/src/api/session.rs | 7 +++++- 3 files changed, 46 insertions(+), 6 deletions(-) diff --git a/examples/examples/z_sub_liveliness.rs b/examples/examples/z_sub_liveliness.rs index bb91c9f491..93d063a344 100644 --- a/examples/examples/z_sub_liveliness.rs +++ b/examples/examples/z_sub_liveliness.rs @@ -20,7 +20,7 @@ async fn main() { // Initiate logging zenoh::try_init_log_from_env(); - let (config, key_expr) = parse_args(); + let (config, key_expr, history) = parse_args(); println!("Opening session..."); let session = zenoh::open(config).await.unwrap(); @@ -30,6 +30,7 @@ async fn main() { let subscriber = session .liveliness() .declare_subscriber(&key_expr) + .history(history) .await .unwrap(); @@ -51,13 +52,16 @@ async fn main() { #[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)] struct Args { #[arg(short, long, default_value = "group1/**")] - /// The key expression to write to. + /// The key expression to subscribe to. key: KeyExpr<'static>, + #[arg(long)] + /// Get historical liveliness tokens. + history: bool, #[command(flatten)] common: CommonArgs, } -fn parse_args() -> (Config, KeyExpr<'static>) { +fn parse_args() -> (Config, KeyExpr<'static>, bool) { let args = Args::parse(); - (args.common.into(), args.key) + (args.common.into(), args.key, args.history) } diff --git a/zenoh/src/api/liveliness.rs b/zenoh/src/api/liveliness.rs index 64f87c6de5..e9a12400fd 100644 --- a/zenoh/src/api/liveliness.rs +++ b/zenoh/src/api/liveliness.rs @@ -169,6 +169,7 @@ impl<'a> Liveliness<'a> { session: self.session.clone(), key_expr: TryIntoKeyExpr::try_into(key_expr).map_err(Into::into), handler: DefaultHandler::default(), + history: false, } } @@ -439,6 +440,7 @@ pub struct LivelinessSubscriberBuilder<'a, 'b, Handler> { pub session: SessionRef<'a>, pub key_expr: ZResult>, pub handler: Handler, + pub history: bool, } #[zenoh_macros::unstable] @@ -473,11 +475,13 @@ impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> { session, key_expr, handler: _, + history, } = self; LivelinessSubscriberBuilder { session, key_expr, handler: callback, + history, } } @@ -544,11 +548,33 @@ impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> { session, key_expr, handler: _, + history, } = self; LivelinessSubscriberBuilder { session, key_expr, handler, + history, + } + } +} + +#[zenoh_macros::unstable] +impl LivelinessSubscriberBuilder<'_, '_, Handler> { + #[inline] + #[zenoh_macros::unstable] + pub fn history(self, history: bool) -> Self { + let LivelinessSubscriberBuilder { + session, + key_expr, + handler, + history: _, + } = self; + LivelinessSubscriberBuilder { + session, + key_expr, + handler, + history, } } } @@ -576,7 +602,12 @@ where let session = self.session; let (callback, handler) = self.handler.into_handler(); session - .declare_liveliness_subscriber_inner(&key_expr, Locality::default(), callback) + .declare_liveliness_subscriber_inner( + &key_expr, + Locality::default(), + self.history, + callback, + ) .map(|sub_state| Subscriber { subscriber: SubscriberInner { session, diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 26704d378a..72ad150a11 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -1428,6 +1428,7 @@ impl Session { &self, key_expr: &KeyExpr, origin: Locality, + history: bool, callback: Callback<'static, Sample>, ) -> ZResult> { let mut state = zwrite!(self.state); @@ -1475,7 +1476,11 @@ impl Session { primitives.send_interest(Interest { id, - mode: InterestMode::Future, + mode: if history { + InterestMode::CurrentFuture + } else { + InterestMode::Future + }, options: InterestOptions::KEYEXPRS + InterestOptions::TOKENS, wire_expr: Some(key_expr.to_wire(self).to_owned()), ext_qos: declare::ext::QoSType::DECLARE,