Skip to content

Commit

Permalink
refactor: refactor weak session implementation
Browse files Browse the repository at this point in the history
The previous implementation of `WeakSession` relied on a weak counter
 wrapped in a mutex. It was unnecessarily complex, as an atomic
 strong counter does the job better.
  • Loading branch information
wyfo committed Dec 9, 2024
1 parent b3ccf82 commit 68f2458
Showing 1 changed file with 8 additions and 31 deletions.
39 changes: 8 additions & 31 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ use std::{
collections::HashMap,
convert::TryInto,
fmt,
mem::ManuallyDrop,
ops::Deref,
sync::{
atomic::{AtomicU16, Ordering},
Arc, Mutex, RwLock,
atomic::{AtomicU16, AtomicUsize, Ordering},
Arc, RwLock,
},
time::{Duration, SystemTime, UNIX_EPOCH},
};
Expand Down Expand Up @@ -527,7 +528,7 @@ impl<T, S> Undeclarable<S> for T where T: UndeclarableSealed<S> {}

pub(crate) struct SessionInner {
/// See [`WeakSession`] doc
weak_counter: Mutex<usize>,
strong_counter: AtomicUsize,
pub(crate) runtime: Runtime,
pub(crate) state: RwLock<SessionState>,
pub(crate) id: u16,
Expand Down Expand Up @@ -573,16 +574,14 @@ impl fmt::Debug for Session {

impl Clone for Session {
fn clone(&self) -> Self {
let _weak = self.0.weak_counter.lock().unwrap();
self.0.strong_counter.fetch_add(1, Ordering::Relaxed);
Self(self.0.clone())
}
}

impl Drop for Session {
fn drop(&mut self) {
let weak = self.0.weak_counter.lock().unwrap();
if Arc::strong_count(&self.0) == *weak + /* the `Arc` currently dropped */ 1 {
drop(weak);
if self.0.strong_counter.fetch_sub(1, Ordering::Relaxed) == 1 {
if let Err(error) = self.close().wait() {
tracing::error!(error)
}
Expand All @@ -596,32 +595,17 @@ impl Drop for Session {
/// When all `Session` instance are dropped, [`Session::close`] is be called and cleans
/// the reference cycles, allowing the underlying `Arc` to be properly reclaimed.
///
/// The pseudo-weak algorithm relies on a counter wrapped in a mutex. It was indeed the simplest
/// to implement it, because atomic manipulations to achieve this semantic would not have been
/// trivial at all — what could happen if a pseudo-weak is cloned while the last session instance
/// is dropped? With a mutex, it's simple, and it works perfectly fine, as we don't care about the
/// performance penalty when it comes to session entities cloning/dropping.
///
/// (Although it was planed to be used initially, `Weak` was in fact causing errors in the session
/// closing, because the primitive implementation seemed to be used in the closing operation.)
#[derive(Clone)]
pub(crate) struct WeakSession(Arc<SessionInner>);

impl WeakSession {
fn new(session: &Arc<SessionInner>) -> Self {
let mut weak = session.weak_counter.lock().unwrap();
*weak += 1;
Self(session.clone())
}
}

impl Clone for WeakSession {
fn clone(&self) -> Self {
let mut weak = self.0.weak_counter.lock().unwrap();
*weak += 1;
Self(self.0.clone())
}
}

impl fmt::Debug for WeakSession {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.0.fmt(f)
Expand All @@ -636,13 +620,6 @@ impl Deref for WeakSession {
}
}

impl Drop for WeakSession {
fn drop(&mut self) {
let mut weak = self.0.weak_counter.lock().unwrap();
*weak -= 1;
}
}

/// Error indicating the operation cannot proceed because the session is closed.
///
/// It may be returned by operations like [`Session::get`] or [`Publisher::put`](crate::api::publisher::Publisher::put) when
Expand Down Expand Up @@ -677,7 +654,7 @@ impl Session {
publisher_qos.into(),
));
let session = Session(Arc::new(SessionInner {
weak_counter: Mutex::new(0),
strong_counter: AtomicUsize::new(1),
runtime: runtime.clone(),
state,
id: SESSION_ID_COUNTER.fetch_add(1, Ordering::SeqCst),
Expand Down

0 comments on commit 68f2458

Please sign in to comment.