diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index 1212bc83f4..b42635750b 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -12,11 +12,12 @@ // ZettaScale Zenoh Team, // +#[cfg(feature = "unstable")] +use std::mem::size_of; use std::{ convert::TryFrom, fmt, future::{IntoFuture, Ready}, - mem::size_of, pin::Pin, task::{Context, Poll}, }; diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index e0eac2951d..38548dd344 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -24,7 +24,7 @@ use std::{ time::{Duration, SystemTime, UNIX_EPOCH}, }; -use tracing::{error, trace, warn}; +use tracing::{error, info, trace, warn}; use uhlc::{Timestamp, HLC}; use zenoh_buffers::ZBuf; use zenoh_collections::SingleOrVec; @@ -410,8 +410,23 @@ impl fmt::Debug for SessionInner { } } -/// A zenoh session. +/// The entrypoint of the zenoh API. /// +/// Zenoh session is instantiated using [`zenoh::open`](crate::open) and it can be used to declare various +/// entities like publishers, subscribers, or querybables, as well as issuing queries. +/// +/// Session is an `Arc`-like type, it can be cloned, and it is closed when the last instance +/// is dropped (see [`Session::close`]). +/// +/// # Examples +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use zenoh::prelude::*; +/// +/// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); +/// session.put("key/expression", "value").await.unwrap(); +/// # } pub struct Session(pub(crate) Arc); impl Session { @@ -544,23 +559,62 @@ impl Session { /// Close the zenoh [`Session`](Session). /// - /// Sessions are automatically closed when dropped, but you may want to use this function to handle errors or - /// close the Session asynchronously. + /// Every subscriber and queryable declared will stop receiving data, and further attempt to + /// publish or query with the session or publishers will result in an error. Undeclaring an + /// entity after session closing is a no-op. Session state can be checked with + /// [`Session::is_closed`]. + /// + /// Session are automatically closed when all its instances are dropped, same as `Arc`. + /// You may still want to use this function to handle errors or close the session + /// asynchronously. + ///
+ /// Closing the session can also save bandwidth, as it avoids propagating the undeclaration + /// of the remaining entities. + /// /// /// # Examples - /// ``` + /// ```no_run /// # #[tokio::main] /// # async fn main() { /// use zenoh::prelude::*; /// /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// let subscriber = session + /// .declare_subscriber("key/expression") + /// .await + /// .unwrap(); + /// let subscriber_task = tokio::spawn(async move { + /// while let Ok(sample) = subscriber.recv_async().await { + /// println!("Received: {} {:?}", sample.key_expr(), sample.payload()); + /// } + /// }); /// session.close().await.unwrap(); + /// // subscriber task will end as `subscriber.recv_async()` will return `Err` + /// // subscriber undeclaration has not been sent on the wire + /// subscriber_task.await.unwrap(); /// # } /// ``` pub fn close(&self) -> impl Resolve> + '_ { self.0.close() } + /// Check if the session has been closed. + /// + /// # Examples + /// ``` + /// # #[tokio::main] + /// # async fn main() { + /// use zenoh::prelude::*; + /// + /// let session = zenoh::open(zenoh::config::peer()).await.unwrap(); + /// assert!(!session.is_closed()); + /// session.close().await.unwrap(); + /// assert!(session.is_closed()); + /// # } + pub fn is_closed(&self) -> bool { + zread!(self.0.state).primitives.is_none() + } + pub fn undeclare<'a, T>(&'a self, decl: T) -> impl Resolve> + 'a where T: Undeclarable<&'a Session> + 'a, @@ -1017,7 +1071,9 @@ impl SessionInner { let Some(primitives) = zwrite!(self.state).primitives.take() else { return Ok(()); }; - trace!("close()"); + if self.owns_runtime { + info!(zid = %self.runtime.zid(), "close session"); + } self.task_controller.terminate_all(Duration::from_secs(10)); if self.owns_runtime { self.runtime.close().await?;