Skip to content

Commit

Permalink
refactor: add comment for Session and Session::close
Browse files Browse the repository at this point in the history
  • Loading branch information
wyfo committed Sep 9, 2024
1 parent f2afe70 commit 8a77792
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 6 deletions.
3 changes: 2 additions & 1 deletion zenoh/src/api/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
// ZettaScale Zenoh Team, <[email protected]>
//

#[cfg(feature = "unstable")]
use std::mem::size_of;
use std::{
convert::TryFrom,
fmt,
future::{IntoFuture, Ready},
mem::size_of,
pin::Pin,
task::{Context, Poll},
};
Expand Down
66 changes: 61 additions & 5 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::{
time::{Duration, SystemTime, UNIX_EPOCH},
};

use tracing::{error, trace, warn};
use tracing::{error, info, trace, warn};
use uhlc::{Timestamp, HLC};
use zenoh_buffers::ZBuf;
use zenoh_collections::SingleOrVec;
Expand Down Expand Up @@ -410,8 +410,23 @@ impl fmt::Debug for SessionInner {
}
}

/// A zenoh session.
/// The entrypoint of the zenoh API.
///
/// Zenoh session is instantiated using [`zenoh::open`], and can be used to declares various
/// entities like publishers, subscribers, or querybables, as well as making queries.
///
/// Session is an `Arc`-like type, it can be cloned, and it is closed when the last instance
/// is dropped (see [`Session::closed`]).
///
/// # Examples
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use zenoh::prelude::*;
///
/// let session = zenoh::open(zenoh::config::peer()).await.unwrap();
/// session.put("key/expression", "value").res().await.unwrap();
/// # }
pub struct Session(pub(crate) Arc<SessionInner>);

impl Session {
Expand Down Expand Up @@ -544,8 +559,18 @@ impl Session {

/// Close the zenoh [`Session`](Session).
///
/// Sessions are automatically closed when dropped, but you may want to use this function to handle errors or
/// close the Session asynchronously.
/// Every subscriber and queryable declared will stop receiving data, and further attempt to
/// publish or query with the session or publishers will result in an error. Undeclaring an
/// entity after session closing is a no-op. Session state can be checked with
/// [`Session::is_closed`].
///
/// Session are automatically closed when all of its instances are dropped, same as `Arc`.
/// You may still want to use this function to handle errors or close the session
/// asynchronously.
/// <br>
/// Closing the session can also save bandwidth, as it avoids propagating the undeclaration
/// of the remaining entities.
///
///
/// # Examples
/// ```
Expand All @@ -554,13 +579,42 @@ impl Session {
/// use zenoh::prelude::*;
///
/// let session = zenoh::open(zenoh::config::peer()).await.unwrap();
/// let subscriber = session
/// .declare_subscriber("key/expression")
/// .await
/// .unwrap();
/// let subscriber_task = tokio::spawn(async move {
/// while let Ok(sample) = subscriber.recv_async().await {
/// println!("Received: {} {:?}", sample.key_expr(), sample.payload());
/// }
/// });
/// session.close().await.unwrap();
/// // subscriber task will end as `subscriber.recv_async()` will return `Err`
/// // subscriber undeclaration has not been sent on the wire
/// subscriber_task.await.unwrap();
/// # }
/// ```
pub fn close(&self) -> impl Resolve<ZResult<()>> + '_ {
self.0.close()
}

/// Check if the session has been closed.
///
/// # Examples
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use zenoh::prelude::*;
///
/// let session = zenoh::open(zenoh::config::peer()).await.unwrap();
/// assert!(!session.is_closed());
/// session.close().await.unwrap();
/// assert!(session.is_closed());
/// # }
pub fn is_closed(&self) -> bool {
zread!(self.0.state).primitives.is_none()
}

pub fn undeclare<'a, T>(&'a self, decl: T) -> impl Resolve<ZResult<()>> + 'a
where
T: Undeclarable<&'a Session> + 'a,
Expand Down Expand Up @@ -1017,7 +1071,9 @@ impl SessionInner {
let Some(primitives) = zwrite!(self.state).primitives.take() else {
return Ok(());
};
trace!("close()");
if self.owns_runtime {
info!(zid = %self.runtime.zid(), "close session");
}
self.task_controller.terminate_all(Duration::from_secs(10));
if self.owns_runtime {
self.runtime.close().await?;
Expand Down

0 comments on commit 8a77792

Please sign in to comment.