From 8a777926eab59c70f08cf46cd047db6653c17b06 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Mon, 9 Sep 2024 11:41:53 +0200 Subject: [PATCH 1/7] refactor: add comment for `Session` and `Session::close` --- zenoh/src/api/publisher.rs | 3 +- zenoh/src/api/session.rs | 66 +++++++++++++++++++++++++++++++++++--- 2 files changed, 63 insertions(+), 6 deletions(-) diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index 1212bc83f4..b42635750b 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -12,11 +12,12 @@ // ZettaScale Zenoh Team, // +#[cfg(feature = "unstable")] +use std::mem::size_of; use std::{ convert::TryFrom, fmt, future::{IntoFuture, Ready}, - mem::size_of, pin::Pin, task::{Context, Poll}, }; diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index e0eac2951d..53ba069497 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -24,7 +24,7 @@ use std::{ time::{Duration, SystemTime, UNIX_EPOCH}, }; -use tracing::{error, trace, warn}; +use tracing::{error, info, trace, warn}; use uhlc::{Timestamp, HLC}; use zenoh_buffers::ZBuf; use zenoh_collections::SingleOrVec; @@ -410,8 +410,23 @@ impl fmt::Debug for SessionInner { } } -/// A zenoh session. +/// The entrypoint of the zenoh API. /// +/// Zenoh session is instantiated using [`zenoh::open`], and can be used to declares various +/// entities like publishers, subscribers, or querybables, as well as making queries. +/// +/// Session is an `Arc`-like type, it can be cloned, and it is closed when the last instance +/// is dropped (see [`Session::closed`]). +/// +/// # Examples +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use zenoh::prelude::*; +/// +/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// session.put("key/expression", "value").res().await.unwrap(); +/// # } pub struct Session(pub(crate) Arc); impl Session { @@ -544,8 +559,18 @@ impl Session { /// Close the zenoh [`Session`](Session). /// - /// Sessions are automatically closed when dropped, but you may want to use this function to handle errors or - /// close the Session asynchronously. + /// Every subscriber and queryable declared will stop receiving data, and further attempt to + /// publish or query with the session or publishers will result in an error. Undeclaring an + /// entity after session closing is a no-op. Session state can be checked with + /// [`Session::is_closed`]. + /// + /// Session are automatically closed when all of its instances are dropped, same as `Arc`. + /// You may still want to use this function to handle errors or close the session + /// asynchronously. + ///
+ /// Closing the session can also save bandwidth, as it avoids propagating the undeclaration + /// of the remaining entities. + /// /// /// # Examples /// ``` @@ -554,13 +579,42 @@ impl Session { /// use zenoh::prelude::*; /// /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let subscriber = session + /// .declare_subscriber("key/expression") + /// .await + /// .unwrap(); + /// let subscriber_task = tokio::spawn(async move { + /// while let Ok(sample) = subscriber.recv_async().await { + /// println!("Received: {} {:?}", sample.key_expr(), sample.payload()); + /// } + /// }); /// session.close().await.unwrap(); + /// // subscriber task will end as `subscriber.recv_async()` will return `Err` + /// // subscriber undeclaration has not been sent on the wire + /// subscriber_task.await.unwrap(); /// # } /// ``` pub fn close(&self) -> impl Resolve> + '_ { self.0.close() } + /// Check if the session has been closed. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use zenoh::prelude::*; + /// + /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// assert!(!session.is_closed()); + /// session.close().await.unwrap(); + /// assert!(session.is_closed()); + /// # } + pub fn is_closed(&self) -> bool { + zread!(self.0.state).primitives.is_none() + } + pub fn undeclare<'a, T>(&'a self, decl: T) -> impl Resolve> + 'a where T: Undeclarable<&'a Session> + 'a, @@ -1017,7 +1071,9 @@ impl SessionInner { let Some(primitives) = zwrite!(self.state).primitives.take() else { return Ok(()); }; - trace!("close()"); + if self.owns_runtime { + info!(zid = %self.runtime.zid(), "close session"); + } self.task_controller.terminate_all(Duration::from_secs(10)); if self.owns_runtime { self.runtime.close().await?; From 47a1c53b91285b895da2bfabc3f3d9f6441e6638 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Mon, 9 Sep 2024 12:03:06 +0200 Subject: [PATCH 2/7] Update zenoh/src/api/session.rs Co-authored-by: Luca Cominardi --- zenoh/src/api/session.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 53ba069497..9a2f67f5ce 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -413,7 +413,7 @@ impl fmt::Debug for SessionInner { /// The entrypoint of the zenoh API. /// /// Zenoh session is instantiated using [`zenoh::open`], and can be used to declares various -/// entities like publishers, subscribers, or querybables, as well as making queries. +/// entities like publishers, subscribers, or querybables, as well as issuing queries. /// /// Session is an `Arc`-like type, it can be cloned, and it is closed when the last instance /// is dropped (see [`Session::closed`]). From f2cffa2e2fe6d9781414c0b595633032057a1222 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Mon, 9 Sep 2024 12:03:13 +0200 Subject: [PATCH 3/7] Update zenoh/src/api/session.rs Co-authored-by: Luca Cominardi --- zenoh/src/api/session.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 9a2f67f5ce..8ae941b41e 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -412,7 +412,7 @@ impl fmt::Debug for SessionInner { /// The entrypoint of the zenoh API. /// -/// Zenoh session is instantiated using [`zenoh::open`], and can be used to declares various +/// Zenoh session is instantiated using [`zenoh::open`] and it can be used to declare various /// entities like publishers, subscribers, or querybables, as well as issuing queries. /// /// Session is an `Arc`-like type, it can be cloned, and it is closed when the last instance From 60c330b4251d3eb7611e3c4e104052fec15d5f8c Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Mon, 9 Sep 2024 12:05:52 +0200 Subject: [PATCH 4/7] Update zenoh/src/api/session.rs Co-authored-by: Luca Cominardi --- zenoh/src/api/session.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 8ae941b41e..9d33100c54 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -564,7 +564,7 @@ impl Session { /// entity after session closing is a no-op. Session state can be checked with /// [`Session::is_closed`]. /// - /// Session are automatically closed when all of its instances are dropped, same as `Arc`. + /// Session are automatically closed when all its instances are dropped, same as `Arc`. /// You may still want to use this function to handle errors or close the session /// asynchronously. ///
From 62cf8ddf549594c7bc9d50551bd7b09c10bb75d1 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Mon, 9 Sep 2024 13:16:52 +0200 Subject: [PATCH 5/7] fix: don't run `Session::close` example --- zenoh/src/api/session.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 9d33100c54..d851ed50df 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -573,7 +573,7 @@ impl Session { /// /// /// # Examples - /// ``` + /// ```no-run /// # #[tokio::main] /// # async fn main() { /// use zenoh::prelude::*; From 60a96fed64e1620f5d3a1f2048e253ae57ea09b7 Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Mon, 9 Sep 2024 13:30:00 +0200 Subject: [PATCH 6/7] fix: fix `Session` example --- zenoh/src/api/session.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index d851ed50df..03b41af966 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -425,7 +425,7 @@ impl fmt::Debug for SessionInner { /// use zenoh::prelude::*; /// /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); -/// session.put("key/expression", "value").res().await.unwrap(); +/// session.put("key/expression", "value").await.unwrap(); /// # } pub struct Session(pub(crate) Arc); From 26885c4a2af9e29e721dd12d10af25c8a3d69b0f Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Mon, 9 Sep 2024 13:46:13 +0200 Subject: [PATCH 7/7] fix: fix `Session` doc --- zenoh/src/api/session.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 03b41af966..38548dd344 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -412,11 +412,11 @@ impl fmt::Debug for SessionInner { /// The entrypoint of the zenoh API. /// -/// Zenoh session is instantiated using [`zenoh::open`] and it can be used to declare various +/// Zenoh session is instantiated using [`zenoh::open`](crate::open) and it can be used to declare various /// entities like publishers, subscribers, or querybables, as well as issuing queries. /// /// Session is an `Arc`-like type, it can be cloned, and it is closed when the last instance -/// is dropped (see [`Session::closed`]). +/// is dropped (see [`Session::close`]). /// /// # Examples /// ``` @@ -573,7 +573,7 @@ impl Session { /// /// /// # Examples - /// ```no-run + /// ```no_run /// # #[tokio::main] /// # async fn main() { /// use zenoh::prelude::*;