Skip to content

Commit

Permalink
chore: merge branch 'main' into 'dev/arcsession' (#1384)
Browse files Browse the repository at this point in the history
* feat(zenoh_id): exposing into slice & try from slice

Allowing users to create a ZenohId from a slice, using TryFrom, and also
allowing users to convert a ZenohId into a [u8; 16].

* Add LivlinessSubscriber history option (#1355)

* Close #1357

* feat(zenoh_id): replacing from slice with  function

---------

Co-authored-by: Darius Maitia <[email protected]>
Co-authored-by: OlivierHecart <[email protected]>
Co-authored-by: Luca Cominardi <[email protected]>
  • Loading branch information
4 people authored Sep 9, 2024
1 parent 8cd6914 commit dff8456
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 7 deletions.
13 changes: 13 additions & 0 deletions commons/zenoh-config/src/wrappers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ impl ZenohId {
pub fn into_keyexpr(self) -> OwnedKeyExpr {
self.into()
}

pub fn to_le_bytes(self) -> [u8; uhlc::ID::MAX_SIZE] {
self.0.to_le_bytes()
}
}

impl fmt::Debug for ZenohId {
Expand All @@ -54,6 +58,15 @@ impl From<ZenohIdProto> for ZenohId {
}
}

impl TryFrom<&[u8]> for ZenohId {
type Error = zenoh_result::Error;

fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
let proto: ZenohIdProto = value.try_into()?;
Ok(ZenohId::from(proto))
}
}

impl From<ZenohId> for ZenohIdProto {
fn from(id: ZenohId) -> Self {
id.0
Expand Down
12 changes: 8 additions & 4 deletions examples/examples/z_sub_liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async fn main() {
// Initiate logging
zenoh::try_init_log_from_env();

let (config, key_expr) = parse_args();
let (config, key_expr, history) = parse_args();

println!("Opening session...");
let session = zenoh::open(config).await.unwrap();
Expand All @@ -30,6 +30,7 @@ async fn main() {
let subscriber = session
.liveliness()
.declare_subscriber(&key_expr)
.history(history)
.await
.unwrap();

Expand All @@ -51,13 +52,16 @@ async fn main() {
#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)]
struct Args {
#[arg(short, long, default_value = "group1/**")]
/// The key expression to write to.
/// The key expression to subscribe to.
key: KeyExpr<'static>,
#[arg(long)]
/// Get historical liveliness tokens.
history: bool,
#[command(flatten)]
common: CommonArgs,
}

fn parse_args() -> (Config, KeyExpr<'static>) {
fn parse_args() -> (Config, KeyExpr<'static>, bool) {
let args = Args::parse();
(args.common.into(), args.key)
(args.common.into(), args.key, args.history)
}
2 changes: 1 addition & 1 deletion io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastUnixSocketStream {
nix::fcntl::flock(lock_fd, nix::fcntl::FlockArg::LockExclusiveNonblock).map_err(|e| {
let _ = nix::unistd::close(lock_fd);
let e = zerror!(
"Can not create a new UnixSocketStream listener on {} - Unable to acquire look: {}",
"Can not create a new UnixSocketStream listener on {} - Unable to acquire lock: {}",
path,
e
);
Expand Down
35 changes: 34 additions & 1 deletion zenoh/src/api/liveliness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ impl<'a> Liveliness<'a> {
key_expr: TryIntoKeyExpr::try_into(key_expr).map_err(Into::into),
handler: DefaultHandler::default(),
undeclare_on_drop: true,
history: false,
}
}

Expand Down Expand Up @@ -436,6 +437,7 @@ pub struct LivelinessSubscriberBuilder<'a, 'b, Handler> {
pub key_expr: ZResult<KeyExpr<'b>>,
pub handler: Handler,
pub undeclare_on_drop: bool,
pub history: bool,
}

#[zenoh_macros::unstable]
Expand Down Expand Up @@ -471,12 +473,14 @@ impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> {
key_expr,
handler: _,
undeclare_on_drop: _,
history,
} = self;
LivelinessSubscriberBuilder {
session,
key_expr,
handler: callback,
undeclare_on_drop: false,
history,
}
}

Expand Down Expand Up @@ -544,12 +548,36 @@ impl<'a, 'b> LivelinessSubscriberBuilder<'a, 'b, DefaultHandler> {
key_expr,
handler: _,
undeclare_on_drop: _,
history,
} = self;
LivelinessSubscriberBuilder {
session,
key_expr,
handler,
undeclare_on_drop: true,
history,
}
}
}

#[zenoh_macros::unstable]
impl<Handler> LivelinessSubscriberBuilder<'_, '_, Handler> {
#[inline]
#[zenoh_macros::unstable]
pub fn history(self, history: bool) -> Self {
let LivelinessSubscriberBuilder {
session,
key_expr,
handler,
undeclare_on_drop,
history: _,
} = self;
LivelinessSubscriberBuilder {
session,
key_expr,
handler,
undeclare_on_drop,
history,
}
}
}
Expand Down Expand Up @@ -578,7 +606,12 @@ where
let (callback, handler) = self.handler.into_handler();
session
.0
.declare_liveliness_subscriber_inner(&key_expr, Locality::default(), callback)
.declare_liveliness_subscriber_inner(
&key_expr,
Locality::default(),
self.history,
callback,
)
.map(|sub_state| Subscriber {
inner: SubscriberInner {
#[cfg(feature = "unstable")]
Expand Down
7 changes: 6 additions & 1 deletion zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1542,6 +1542,7 @@ impl SessionInner {
&self,
key_expr: &KeyExpr,
origin: Locality,
history: bool,
callback: Callback<'static, Sample>,
) -> ZResult<Arc<SubscriberState>> {
let mut state = zwrite!(self.state);
Expand Down Expand Up @@ -1589,7 +1590,11 @@ impl SessionInner {

primitives.send_interest(Interest {
id,
mode: InterestMode::Future,
mode: if history {
InterestMode::CurrentFuture
} else {
InterestMode::Future
},
options: InterestOptions::KEYEXPRS + InterestOptions::TOKENS,
wire_expr: Some(key_expr.to_wire(self).to_owned()),
ext_qos: declare::ext::QoSType::DECLARE,
Expand Down

0 comments on commit dff8456

Please sign in to comment.