Skip to content

Commit

Permalink
refactor: add comment for Session and Session::close
Browse files Browse the repository at this point in the history
  • Loading branch information
wyfo committed Sep 6, 2024
1 parent f2afe70 commit a1c608f
Showing 1 changed file with 90 additions and 48 deletions.
138 changes: 90 additions & 48 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@ use std::{
collections::HashMap,
convert::TryInto,
fmt,
future::{IntoFuture, Ready},
future::{Future, IntoFuture},
ops::Deref,
pin::Pin,
sync::{
atomic::{AtomicU16, Ordering},
Arc, Mutex, RwLock,
},
time::{Duration, SystemTime, UNIX_EPOCH},
};

use tracing::{error, trace, warn};
use tracing::{error, info, trace, warn};
use uhlc::{Timestamp, HLC};
use zenoh_buffers::ZBuf;
use zenoh_collections::SingleOrVec;
Expand Down Expand Up @@ -410,8 +411,23 @@ impl fmt::Debug for SessionInner {
}
}

/// A zenoh session.
/// The entrypoint of the zenoh API.
///
/// Zenoh session is instantiated using [`zenoh::open`], and can be used to declares various
/// entities like publishers, subscribers, or querybables, as well as making queries.
///
/// Session is an `Arc`-like type, it can be cloned, and it is closed when the last instance
/// is dropped (see [`Session::closed`]).
///
/// # Examples
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use zenoh::prelude::*;
///
/// let session = zenoh::open(zenoh::config::peer()).await.unwrap();
/// session.put("key/expression", "value").res().await.unwrap();
/// # }
pub struct Session(pub(crate) Arc<SessionInner>);

impl Session {
Expand Down Expand Up @@ -544,8 +560,18 @@ impl Session {

/// Close the zenoh [`Session`](Session).
///
/// Sessions are automatically closed when dropped, but you may want to use this function to handle errors or
/// close the Session asynchronously.
/// Every subscriber and queryable declared will stop receiving data, and further attempt to
/// publish or query with the session or publishers will result in an error. Undeclaring an
/// entity after session closing is a no-op. Session state can be checked with
/// [`Session::is_closed`].
///
/// Session are automatically closed when all of its instances are dropped, same as `Arc`.
/// You may still want to use this function to handle errors or close the session
/// asynchronously.
/// <br>
/// Closing the session can also save bandwidth, as it avoids propagating the undeclaration
/// of the remaining entities.
///
///
/// # Examples
/// ```
Expand All @@ -554,13 +580,42 @@ impl Session {
/// use zenoh::prelude::*;
///
/// let session = zenoh::open(zenoh::config::peer()).await.unwrap();
/// let subscriber = session
/// .declare_subscriber("key/expression")
/// .await
/// .unwrap();
/// let subscriber_task = tokio::spawn(async move {
/// while let Ok(sample) = subscriber.recv_async().await {
/// println!("Received: {} {:?}", sample.key_expr(), sample.payload());
/// }
/// });
/// session.close().await.unwrap();
/// // subscriber task will end as `subscriber.recv_async()` will return `Err`
/// // subscriber undeclaration has not been sent on the wire
/// subscriber_task.await.unwrap();
/// # }
/// ```
pub fn close(&self) -> impl Resolve<ZResult<()>> + '_ {
self.0.close()
}

/// Check if the session has been closed.
///
/// # Examples
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use zenoh::prelude::*;
///
/// let session = zenoh::open(zenoh::config::peer()).await.unwrap();
/// assert!(!session.is_closed());
/// session.close().await.unwrap();
/// assert!(session.is_closed());
/// # }
pub fn is_closed(&self) -> bool {
zread!(self.0.state).primitives.is_none()
}

pub fn undeclare<'a, T>(&'a self, decl: T) -> impl Resolve<ZResult<()>> + 'a
where
T: Undeclarable<&'a Session> + 'a,
Expand Down Expand Up @@ -981,43 +1036,15 @@ impl Session {
}
}

impl Session {
#[allow(clippy::new_ret_no_self)]
pub(super) fn new(
config: Config,
#[cfg(feature = "shared-memory")] shm_clients: Option<Arc<ShmClientStorage>>,
) -> impl Resolve<ZResult<Session>> {
ResolveFuture::new(async move {
tracing::debug!("Config: {:?}", &config);
let aggregated_subscribers = config.aggregation().subscribers().clone();
let aggregated_publishers = config.aggregation().publishers().clone();
#[allow(unused_mut)] // Required for shared-memory
let mut runtime = RuntimeBuilder::new(config);
#[cfg(feature = "shared-memory")]
{
runtime = runtime.shm_clients(shm_clients);
}
let mut runtime = runtime.build().await?;

let session = Self::init(
runtime.clone(),
aggregated_subscribers,
aggregated_publishers,
true,
)
.await;
runtime.start().await?;
Ok(session)
})
}
}
impl SessionInner {
fn close(&self) -> impl Resolve<ZResult<()>> + '_ {
ResolveFuture::new(async move {
let Some(primitives) = zwrite!(self.state).primitives.take() else {
return Ok(());
};
trace!("close()");
if self.owns_runtime {
info!(zid = %self.runtime.zid(), "close session");
}
self.task_controller.terminate_all(Duration::from_secs(10));
if self.owns_runtime {
self.runtime.close().await?;
Expand Down Expand Up @@ -2689,16 +2716,7 @@ where
<TryIntoConfig as std::convert::TryInto<crate::config::Config>>::Error: std::fmt::Debug,
{
fn wait(self) -> <Self as Resolvable>::To {
let config: crate::config::Config = self
.config
.try_into()
.map_err(|e| zerror!("Invalid Zenoh configuration {:?}", &e))?;
Session::new(
config,
#[cfg(feature = "shared-memory")]
self.shm_clients,
)
.wait()
zenoh_runtime::ZRuntime::Application.block_in_place(async move { self.await })
}
}

Expand All @@ -2708,10 +2726,34 @@ where
<TryIntoConfig as std::convert::TryInto<crate::config::Config>>::Error: std::fmt::Debug,
{
type Output = <Self as Resolvable>::To;
type IntoFuture = Ready<<Self as Resolvable>::To>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send>>;

fn into_future(self) -> Self::IntoFuture {
std::future::ready(self.wait())
Box::pin(async move {
let config: crate::config::Config = self
.config
.try_into()
.map_err(|e| zerror!("Invalid Zenoh configuration {:?}", &e))?;
tracing::debug!("Config: {:?}", &config);
let aggregated_subscribers = config.aggregation().subscribers().clone();
let aggregated_publishers = config.aggregation().publishers().clone();
#[cfg_attr(not(feature = "shared-memory"), allow(unused_mut))]
let mut runtime = RuntimeBuilder::new(config);
#[cfg(feature = "shared-memory")]
{
runtime = runtime.shm_clients(self.shm_clients);
}
let mut runtime = runtime.build().await?;
let session = Session::init(
runtime.clone(),
aggregated_subscribers,
aggregated_publishers,
true,
)
.await;
runtime.start().await?;
Ok(session)
})
}
}

Expand Down

0 comments on commit a1c608f

Please sign in to comment.