Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: add comment for Session and Session::close #1369

Merged
merged 7 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion zenoh/src/api/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
// ZettaScale Zenoh Team, <[email protected]>
//

#[cfg(feature = "unstable")]
use std::mem::size_of;
use std::{
convert::TryFrom,
fmt,
future::{IntoFuture, Ready},
mem::size_of,
pin::Pin,
task::{Context, Poll},
};
Expand Down
68 changes: 62 additions & 6 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::{
time::{Duration, SystemTime, UNIX_EPOCH},
};

use tracing::{error, trace, warn};
use tracing::{error, info, trace, warn};
use uhlc::{Timestamp, HLC};
use zenoh_buffers::ZBuf;
use zenoh_collections::SingleOrVec;
Expand Down Expand Up @@ -410,8 +410,23 @@ impl fmt::Debug for SessionInner {
}
}

/// A zenoh session.
/// The entrypoint of the zenoh API.
///
/// Zenoh session is instantiated using [`zenoh::open`](crate::open) and it can be used to declare various
/// entities like publishers, subscribers, or querybables, as well as issuing queries.
///
/// Session is an `Arc`-like type, it can be cloned, and it is closed when the last instance
/// is dropped (see [`Session::close`]).
///
/// # Examples
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use zenoh::prelude::*;
///
/// let session = zenoh::open(zenoh::config::peer()).await.unwrap();
/// session.put("key/expression", "value").await.unwrap();
/// # }
pub struct Session(pub(crate) Arc<SessionInner>);

impl Session {
Expand Down Expand Up @@ -544,23 +559,62 @@ impl Session {

/// Close the zenoh [`Session`](Session).
///
/// Sessions are automatically closed when dropped, but you may want to use this function to handle errors or
/// close the Session asynchronously.
/// Every subscriber and queryable declared will stop receiving data, and further attempt to
/// publish or query with the session or publishers will result in an error. Undeclaring an
/// entity after session closing is a no-op. Session state can be checked with
/// [`Session::is_closed`].
///
/// Session are automatically closed when all its instances are dropped, same as `Arc`.
/// You may still want to use this function to handle errors or close the session
/// asynchronously.
/// <br>
/// Closing the session can also save bandwidth, as it avoids propagating the undeclaration
/// of the remaining entities.
///
///
/// # Examples
/// ```
/// ```no_run
/// # #[tokio::main]
/// # async fn main() {
/// use zenoh::prelude::*;
///
/// let session = zenoh::open(zenoh::config::peer()).await.unwrap();
/// let subscriber = session
/// .declare_subscriber("key/expression")
/// .await
/// .unwrap();
/// let subscriber_task = tokio::spawn(async move {
/// while let Ok(sample) = subscriber.recv_async().await {
/// println!("Received: {} {:?}", sample.key_expr(), sample.payload());
/// }
/// });
/// session.close().await.unwrap();
/// // subscriber task will end as `subscriber.recv_async()` will return `Err`
/// // subscriber undeclaration has not been sent on the wire
/// subscriber_task.await.unwrap();
/// # }
/// ```
pub fn close(&self) -> impl Resolve<ZResult<()>> + '_ {
self.0.close()
}

/// Check if the session has been closed.
///
/// # Examples
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use zenoh::prelude::*;
///
/// let session = zenoh::open(zenoh::config::peer()).await.unwrap();
/// assert!(!session.is_closed());
/// session.close().await.unwrap();
/// assert!(session.is_closed());
/// # }
pub fn is_closed(&self) -> bool {
zread!(self.0.state).primitives.is_none()
}

pub fn undeclare<'a, T>(&'a self, decl: T) -> impl Resolve<ZResult<()>> + 'a
where
T: Undeclarable<&'a Session> + 'a,
Expand Down Expand Up @@ -1017,7 +1071,9 @@ impl SessionInner {
let Some(primitives) = zwrite!(self.state).primitives.take() else {
return Ok(());
};
trace!("close()");
if self.owns_runtime {
info!(zid = %self.runtime.zid(), "close session");
}
self.task_controller.terminate_all(Duration::from_secs(10));
if self.owns_runtime {
self.runtime.close().await?;
Expand Down