diff --git a/ci/valgrind-check/src/pub_sub/bin/z_pub_sub.rs b/ci/valgrind-check/src/pub_sub/bin/z_pub_sub.rs index f3b1dd0efe..ad96d0b2b0 100644 --- a/ci/valgrind-check/src/pub_sub/bin/z_pub_sub.rs +++ b/ci/valgrind-check/src/pub_sub/bin/z_pub_sub.rs @@ -13,7 +13,7 @@ // use std::time::Duration; -use zenoh::{config::Config, key_expr::KeyExpr, prelude::*}; +use zenoh::{config::Config, key_expr::KeyExpr}; #[tokio::main] async fn main() { diff --git a/ci/valgrind-check/src/queryable_get/bin/z_queryable_get.rs b/ci/valgrind-check/src/queryable_get/bin/z_queryable_get.rs index 8ea7be201b..e82ecba477 100644 --- a/ci/valgrind-check/src/queryable_get/bin/z_queryable_get.rs +++ b/ci/valgrind-check/src/queryable_get/bin/z_queryable_get.rs @@ -16,7 +16,6 @@ use std::{convert::TryFrom, time::Duration}; use zenoh::{ config::Config, key_expr::KeyExpr, - prelude::*, query::{QueryTarget, Selector}, }; diff --git a/zenoh/src/api/admin.rs b/zenoh/src/api/admin.rs index 7452431977..060bb78c43 100644 --- a/zenoh/src/api/admin.rs +++ b/zenoh/src/api/admin.rs @@ -14,7 +14,7 @@ use std::{ collections::hash_map::DefaultHasher, hash::{Hash, Hasher}, - sync::{Arc, Weak}, + sync::Arc, }; use zenoh_core::{Result as ZResult, Wait}; @@ -34,7 +34,7 @@ use super::{ sample::{DataInfo, Locality, SampleKind}, subscriber::SubscriberKind, }; -use crate::api::session::SessionInner; +use crate::api::session::WeakSession; lazy_static::lazy_static!( static ref KE_STARSTAR: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("**") }; @@ -44,10 +44,10 @@ lazy_static::lazy_static!( static ref KE_LINK: &'static keyexpr = unsafe { keyexpr::from_str_unchecked("link") }; ); -pub(crate) fn init(session: &Arc) { +pub(crate) fn init(session: WeakSession) { if let Ok(own_zid) = keyexpr::new(&session.runtime.zid().to_string()) { let admin_key = KeyExpr::from(*KE_PREFIX / own_zid / *KE_SESSION / *KE_STARSTAR) - .to_wire(session) + .to_wire(&session) .to_owned(); let _admin_qabl = session.declare_queryable_inner( @@ -55,18 +55,14 @@ pub(crate) fn init(session: &Arc) { true, Locality::SessionLocal, Arc::new({ - let session = Arc::downgrade(session); - move |q| { - if let Some(session) = Weak::upgrade(&session) { - on_admin_query(&session, q) - } - } + let session = session.clone(); + move |q| on_admin_query(&session, q) }), ); } } -pub(crate) fn on_admin_query(session: &SessionInner, query: Query) { +pub(crate) fn on_admin_query(session: &WeakSession, query: Query) { fn reply_peer(own_zid: &keyexpr, query: &Query, peer: TransportPeer) { let zid = peer.zid.to_string(); if let Ok(zid) = keyexpr::new(&zid) { @@ -128,11 +124,11 @@ pub(crate) fn on_admin_query(session: &SessionInner, query: Query) { #[derive(Clone)] pub(crate) struct Handler { - pub(crate) session: Weak, + pub(crate) session: WeakSession, } impl Handler { - pub(crate) fn new(session: Weak) -> Self { + pub(crate) fn new(session: WeakSession) -> Self { Self { session } } } @@ -159,10 +155,7 @@ impl TransportMulticastEventHandler for Handler { &self, peer: zenoh_transport::TransportPeer, ) -> ZResult> { - let Some(session) = Weak::upgrade(&self.session) else { - bail!("session closed"); - }; - if let Ok(own_zid) = keyexpr::new(&session.runtime.zid().to_string()) { + if let Ok(own_zid) = keyexpr::new(&self.session.runtime.zid().to_string()) { if let Ok(zid) = keyexpr::new(&peer.zid.to_string()) { let expr = WireExpr::from( &(*KE_PREFIX / own_zid / *KE_SESSION / *KE_TRANSPORT_UNICAST / zid), @@ -172,7 +165,7 @@ impl TransportMulticastEventHandler for Handler { encoding: Some(Encoding::APPLICATION_JSON), ..Default::default() }; - session.execute_subscriber_callbacks( + self.session.execute_subscriber_callbacks( true, &expr, Some(info), @@ -205,7 +198,7 @@ impl TransportMulticastEventHandler for Handler { pub(crate) struct PeerHandler { pub(crate) expr: WireExpr<'static>, - pub(crate) session: Weak, + pub(crate) session: WeakSession, } impl TransportPeerEventHandler for PeerHandler { @@ -214,16 +207,13 @@ impl TransportPeerEventHandler for PeerHandler { } fn new_link(&self, link: zenoh_link::Link) { - let Some(session) = Weak::upgrade(&self.session) else { - return; - }; let mut s = DefaultHasher::new(); link.hash(&mut s); let info = DataInfo { encoding: Some(Encoding::APPLICATION_JSON), ..Default::default() }; - session.execute_subscriber_callbacks( + self.session.execute_subscriber_callbacks( true, &self .expr @@ -239,16 +229,13 @@ impl TransportPeerEventHandler for PeerHandler { } fn del_link(&self, link: zenoh_link::Link) { - let Some(session) = Weak::upgrade(&self.session) else { - return; - }; let mut s = DefaultHasher::new(); link.hash(&mut s); let info = DataInfo { kind: SampleKind::Delete, ..Default::default() }; - session.execute_subscriber_callbacks( + self.session.execute_subscriber_callbacks( true, &self .expr @@ -266,14 +253,11 @@ impl TransportPeerEventHandler for PeerHandler { fn closing(&self) {} fn closed(&self) { - let Some(session) = Weak::upgrade(&self.session) else { - return; - }; let info = DataInfo { kind: SampleKind::Delete, ..Default::default() }; - session.execute_subscriber_callbacks( + self.session.execute_subscriber_callbacks( true, &self.expr, Some(info), diff --git a/zenoh/src/api/builders/publisher.rs b/zenoh/src/api/builders/publisher.rs index f41b46dfbb..53c32c8a7d 100644 --- a/zenoh/src/api/builders/publisher.rs +++ b/zenoh/src/api/builders/publisher.rs @@ -11,10 +11,7 @@ // Contributors: // ZettaScale Zenoh Team, // -use std::{ - future::{IntoFuture, Ready}, - sync::Arc, -}; +use std::future::{IntoFuture, Ready}; use zenoh_core::{Resolvable, Result as ZResult, Wait}; #[cfg(feature = "unstable")] @@ -333,7 +330,7 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> { Ok(Publisher { #[cfg(feature = "unstable")] session_id: self.session.0.runtime.zid(), - session: Arc::downgrade(&self.session.0), + session: self.session.downgrade(), id: 0, // This is a one shot Publisher key_expr: self.key_expr?, encoding: self.encoding, @@ -394,7 +391,7 @@ impl<'a, 'b> Wait for PublisherBuilder<'a, 'b> { Ok(Publisher { #[cfg(feature = "unstable")] session_id: self.session.0.runtime.zid(), - session: Arc::downgrade(&self.session.0), + session: self.session.downgrade(), id, key_expr, encoding: self.encoding, @@ -404,7 +401,8 @@ impl<'a, 'b> Wait for PublisherBuilder<'a, 'b> { destination: self.destination, #[cfg(feature = "unstable")] reliability: self.reliability, - #[cfg(feature = "unstable")]matching_listeners: Default::default(), + #[cfg(feature = "unstable")] + matching_listeners: Default::default(), undeclare_on_drop: true, }) } diff --git a/zenoh/src/api/liveliness.rs b/zenoh/src/api/liveliness.rs index 50bcf9da67..ce6a60ca35 100644 --- a/zenoh/src/api/liveliness.rs +++ b/zenoh/src/api/liveliness.rs @@ -16,7 +16,7 @@ use std::{ convert::TryInto, future::{IntoFuture, Ready}, mem::size_of, - sync::{Arc, Weak}, + sync::Arc, time::Duration, }; @@ -33,7 +33,7 @@ use super::{ subscriber::{Subscriber, SubscriberInner}, Id, }; -use crate::api::session::SessionInner; +use crate::api::session::WeakSession; /// A structure with functions to declare a /// [`LivelinessToken`](LivelinessToken), query @@ -258,7 +258,7 @@ impl Wait for LivelinessTokenBuilder<'_, '_> { .0 .declare_liveliness_inner(&key_expr) .map(|tok_state| LivelinessToken { - session: Arc::downgrade(&self.session.0), + session: self.session.downgrade(), state: tok_state, undeclare_on_drop: true, }) @@ -311,7 +311,7 @@ pub(crate) struct LivelinessTokenState { #[zenoh_macros::unstable] #[derive(Debug)] pub struct LivelinessToken { - session: Weak, + session: WeakSession, state: Arc, undeclare_on_drop: bool, } @@ -388,10 +388,7 @@ impl LivelinessToken { fn undeclare_impl(&mut self) -> ZResult<()> { // set the flag first to avoid double panic if this function panic self.undeclare_on_drop = false; - match self.session.upgrade() { - Some(session) => session.undeclare_liveliness(self.state.id), - None => Ok(()), - } + self.session.undeclare_liveliness(self.state.id) } } @@ -581,7 +578,7 @@ where inner: SubscriberInner { #[cfg(feature = "unstable")] session_id: session.zid(), - session: Arc::downgrade(&self.session.0), + session: self.session.downgrade(), state: sub_state, kind: SubscriberKind::LivelinessSubscriber, // `size_of::() == 0` means callback-only subscriber diff --git a/zenoh/src/api/publisher.rs b/zenoh/src/api/publisher.rs index 03e78be52d..0ebcc326ae 100644 --- a/zenoh/src/api/publisher.rs +++ b/zenoh/src/api/publisher.rs @@ -17,7 +17,6 @@ use std::{ fmt, future::{IntoFuture, Ready}, pin::Pin, - sync::{Arc, Weak}, task::{Context, Poll}, }; @@ -36,7 +35,7 @@ use { handlers::{Callback, DefaultHandler, IntoHandler}, sample::SourceInfo, }, - std::{collections::HashSet, sync::Mutex}, + std::{collections::HashSet, sync::Arc, sync::Mutex}, zenoh_config::wrappers::EntityGlobalId, zenoh_config::ZenohId, zenoh_protocol::core::EntityGlobalIdProto, @@ -54,7 +53,7 @@ use super::{ session::UndeclarableSealed, }; use crate::{ - api::{session::SessionInner, subscriber::SubscriberKind, Id}, + api::{session::WeakSession, subscriber::SubscriberKind, Id}, net::primitives::Primitives, }; @@ -109,7 +108,7 @@ impl fmt::Debug for PublisherState { pub struct Publisher<'a> { #[cfg(feature = "unstable")] pub(crate) session_id: ZenohId, - pub(crate) session: Weak, + pub(crate) session: WeakSession, pub(crate) id: Id, pub(crate) key_expr: KeyExpr<'a>, pub(crate) encoding: Encoding, @@ -125,12 +124,6 @@ pub struct Publisher<'a> { } impl<'a> Publisher<'a> { - fn session(&self) -> ZResult> { - self.session - .upgrade() - .ok_or_else(|| zerror!("session closed").into()) - } - /// Returns the [`EntityGlobalId`] of this Publisher. /// /// # Examples @@ -256,7 +249,7 @@ impl<'a> Publisher<'a> { #[zenoh_macros::unstable] pub fn matching_status(&self) -> impl Resolve> + '_ { zenoh_core::ResolveFuture::new(async move { - self.session()? + self.session .matching_status(self.key_expr(), self.destination) }) } @@ -312,17 +305,14 @@ impl<'a> Publisher<'a> { fn undeclare_impl(&mut self) -> ZResult<()> { // set the flag first to avoid double panic if this function panic self.undeclare_on_drop = false; - let Ok(session) = self.session() else { - return Ok(()); - }; #[cfg(feature = "unstable")] { let ids: Vec = zlock!(self.matching_listeners).drain().collect(); for id in ids { - session.undeclare_matches_listener_inner(id)? + self.session.undeclare_matches_listener_inner(id)? } } - session.undeclare_publisher_inner(self.id) + self.session.undeclare_publisher_inner(self.id) } } @@ -430,17 +420,16 @@ impl Publisher<'_> { attachment: Option, ) -> ZResult<()> { tracing::trace!("write({:?}, [...])", &self.key_expr); - let session = self.session()?; - let primitives = zread!(session.state).primitives()?; + let primitives = zread!(self.session.state).primitives()?; let timestamp = if timestamp.is_none() { - session.runtime.new_timestamp() + self.session.runtime.new_timestamp() } else { timestamp }; if self.destination != Locality::SessionLocal { primitives.send_push( Push { - wire_expr: self.key_expr.to_wire(&session).to_owned(), + wire_expr: self.key_expr.to_wire(&self.session).to_owned(), ext_qos: ext::QoSType::new( self.priority.into(), self.congestion_control, @@ -493,9 +482,9 @@ impl Publisher<'_> { )), }; - session.execute_subscriber_callbacks( + self.session.execute_subscriber_callbacks( true, - &self.key_expr.to_wire(&session), + &self.key_expr.to_wire(&self.session), Some(data_info), payload.into(), SubscriberKind::Subscriber, @@ -779,7 +768,7 @@ where let (callback, receiver) = self.handler.into_handler(); let state = self .publisher - .session()? + .session .declare_matches_listener_inner(self.publisher, callback)?; zlock!(self.publisher.matching_listeners).insert(state.id); Ok(MatchingListener { @@ -941,7 +930,7 @@ impl Wait for MatchingListenerUndeclaration<'_> { zlock!(self.subscriber.publisher.matching_listeners).remove(&self.subscriber.state.id); self.subscriber .publisher - .session()? + .session .undeclare_matches_listener_inner(self.subscriber.state.id) } } diff --git a/zenoh/src/api/queryable.rs b/zenoh/src/api/queryable.rs index 590f864218..0904fa138e 100644 --- a/zenoh/src/api/queryable.rs +++ b/zenoh/src/api/queryable.rs @@ -16,7 +16,7 @@ use std::{ future::{IntoFuture, Ready}, mem::size_of, ops::{Deref, DerefMut}, - sync::{Arc, Weak}, + sync::Arc, }; use tracing::error; @@ -50,7 +50,7 @@ use crate::{ publisher::Priority, sample::{Locality, QoSBuilder, Sample, SampleKind}, selector::Selector, - session::{SessionInner, UndeclarableSealed}, + session::{UndeclarableSealed, WeakSession}, value::Value, Id, }, @@ -543,7 +543,7 @@ impl fmt::Debug for QueryableState { pub(crate) struct QueryableInner { #[cfg(feature = "unstable")] pub(crate) session_id: ZenohId, - pub(crate) session: Weak, + pub(crate) session: WeakSession, pub(crate) state: Arc, // Queryable is undeclared on drop unless its handler is a ZST, i.e. it is callback-only pub(crate) undeclare_on_drop: bool, @@ -863,10 +863,7 @@ impl Queryable { fn undeclare_impl(&mut self) -> ZResult<()> { // set the flag first to avoid double panic if this function panic self.inner.undeclare_on_drop = false; - let Some(session) = self.inner.session.upgrade() else { - return Ok(()); - }; - session.close_queryable(self.inner.state.id) + self.inner.session.close_queryable(self.inner.state.id) } } @@ -930,7 +927,7 @@ where inner: QueryableInner { #[cfg(feature = "unstable")] session_id: session.zid(), - session: Arc::downgrade(&self.session.0), + session: self.session.downgrade(), state: qable_state, // `size_of::() == 0` means callback-only queryable undeclare_on_drop: size_of::() > 0, diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index bdade37b84..e2c4beeb30 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -16,9 +16,10 @@ use std::{ convert::TryInto, fmt, future::{IntoFuture, Ready}, + ops::Deref, sync::{ atomic::{AtomicU16, Ordering}, - Arc, RwLock, Weak, + Arc, Mutex, RwLock, }, time::{Duration, SystemTime, UNIX_EPOCH}, }; @@ -392,6 +393,7 @@ pub trait Undeclarable: UndeclarableSealed {} impl Undeclarable for T where T: UndeclarableSealed {} pub(crate) struct SessionInner { + weak_counter: Mutex, pub(crate) runtime: Runtime, pub(crate) state: RwLock, pub(crate) id: u16, @@ -409,9 +411,78 @@ impl fmt::Debug for SessionInner { /// A zenoh session. /// -#[derive(Clone)] pub struct Session(pub(crate) Arc); +impl Session { + pub(crate) fn downgrade(&self) -> WeakSession { + WeakSession::new(&self.0) + } +} + +impl fmt::Debug for Session { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.0.fmt(f) + } +} + +impl Clone for Session { + fn clone(&self) -> Self { + let _weak = self.0.weak_counter.lock().unwrap(); + Self(self.0.clone()) + } +} + +impl Drop for Session { + fn drop(&mut self) { + let weak = self.0.weak_counter.lock().unwrap(); + if Arc::strong_count(&self.0) == *weak + 1 { + drop(weak); + if let Err(error) = self.close().wait() { + tracing::error!(error) + } + } + } +} + +pub(crate) struct WeakSession(Arc); + +impl WeakSession { + fn new(session: &Arc) -> Self { + let mut weak = session.weak_counter.lock().unwrap(); + *weak += 1; + Self(session.clone()) + } +} + +impl Clone for WeakSession { + fn clone(&self) -> Self { + let mut weak = self.0.weak_counter.lock().unwrap(); + *weak += 1; + Self(self.0.clone()) + } +} + +impl fmt::Debug for WeakSession { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.0.fmt(f) + } +} + +impl Deref for WeakSession { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Drop for WeakSession { + fn drop(&mut self) { + let mut weak = self.0.weak_counter.lock().unwrap(); + *weak -= 1; + } +} + static SESSION_ID_COUNTER: AtomicU16 = AtomicU16::new(0); impl Session { pub(crate) fn init( @@ -426,22 +497,23 @@ impl Session { aggregated_subscribers, aggregated_publishers, )); - let session = Arc::new(SessionInner { + let session = Session(Arc::new(SessionInner { + weak_counter: Mutex::new(0), runtime: runtime.clone(), state, id: SESSION_ID_COUNTER.fetch_add(1, Ordering::SeqCst), owns_runtime, task_controller: TaskController::default(), - }); + })); - runtime.new_handler(Arc::new(admin::Handler::new(Arc::downgrade(&session)))); + runtime.new_handler(Arc::new(admin::Handler::new(session.downgrade()))); - let primitives = Some(router.new_primitives(Arc::new(Arc::downgrade(&session)))); - zwrite!(session.state).primitives = primitives; + let primitives = Some(router.new_primitives(Arc::new(session.downgrade()))); + zwrite!(session.0.state).primitives = primitives; - admin::init(&session); + admin::init(session.downgrade()); - Session(session) + session }) } @@ -935,6 +1007,7 @@ impl SessionInner { if self.owns_runtime { self.runtime.close().await?; } + zwrite!(self.state).queryables.clear(); primitives.send_close(); Ok(()) }) @@ -1560,7 +1633,7 @@ impl SessionInner { // TODO: check which ZRuntime should be used self.task_controller .spawn_with_rt(zenoh_runtime::ZRuntime::Net, { - let session = self.clone(); + let session = WeakSession::new(self); let msub = msub.clone(); async move { match msub.current.lock() { @@ -1598,7 +1671,7 @@ impl SessionInner { // TODO: check which ZRuntime should be used self.task_controller .spawn_with_rt(zenoh_runtime::ZRuntime::Net, { - let session = self.clone(); + let session = WeakSession::new(self); let msub = msub.clone(); async move { match msub.current.lock() { @@ -1754,7 +1827,7 @@ impl SessionInner { let token = self.task_controller.get_cancellation_token(); self.task_controller .spawn_with_rt(zenoh_runtime::ZRuntime::Net, { - let session = self.clone(); + let session = WeakSession::new(self); #[cfg(feature = "unstable")] let zid = self.runtime.zid(); async move { @@ -1860,7 +1933,7 @@ impl SessionInner { let token = self.task_controller.get_cancellation_token(); self.task_controller .spawn_with_rt(zenoh_runtime::ZRuntime::Net, { - let session = self.clone(); + let session = WeakSession::new(self); let zid = self.runtime.zid(); async move { tokio::select! { @@ -1962,7 +2035,7 @@ impl SessionInner { qid, zid: zid.into(), primitives: if local { - Arc::new(Arc::downgrade(self)) + Arc::new(WeakSession::new(self)) } else { primitives }, @@ -1981,21 +2054,15 @@ impl SessionInner { } } -impl Primitives for Weak { +impl Primitives for WeakSession { fn send_interest(&self, msg: zenoh_protocol::network::Interest) { - if self.upgrade().is_none() { - return; - } trace!("recv Interest {} {:?}", msg.id, msg.wire_expr); } fn send_declare(&self, msg: zenoh_protocol::network::Declare) { - let Some(session) = self.upgrade() else { - return; - }; match msg.body { zenoh_protocol::network::DeclareBody::DeclareKeyExpr(m) => { trace!("recv DeclareKeyExpr {} {:?}", m.id, m.wire_expr); - let state = &mut zwrite!(session.state); + let state = &mut zwrite!(self.state); match state.remote_key_to_expr(&m.wire_expr) { Ok(key_expr) => { let mut res_node = ResourceNode::new(key_expr.clone().into()); @@ -2027,14 +2094,14 @@ impl Primitives for Weak { trace!("recv DeclareSubscriber {} {:?}", m.id, m.wire_expr); #[cfg(feature = "unstable")] { - let mut state = zwrite!(session.state); + let mut state = zwrite!(self.state); match state .wireexpr_to_keyexpr(&m.wire_expr, false) .map(|e| e.into_owned()) { Ok(expr) => { state.remote_subscribers.insert(m.id, expr.clone()); - session.update_status_up(&state, &expr); + self.update_status_up(&state, &expr); } Err(err) => { tracing::error!( @@ -2049,9 +2116,9 @@ impl Primitives for Weak { trace!("recv UndeclareSubscriber {:?}", m.id); #[cfg(feature = "unstable")] { - let mut state = zwrite!(session.state); + let mut state = zwrite!(self.state); if let Some(expr) = state.remote_subscribers.remove(&m.id) { - session.update_status_down(&state, &expr); + self.update_status_down(&state, &expr); } else { tracing::error!("Received Undeclare Subscriber for unknown id: {}", m.id); } @@ -2067,7 +2134,7 @@ impl Primitives for Weak { trace!("recv DeclareToken {:?}", m.id); #[cfg(feature = "unstable")] { - let mut state = zwrite!(session.state); + let mut state = zwrite!(self.state); match state .wireexpr_to_keyexpr(&m.wire_expr, false) .map(|e| e.into_owned()) @@ -2101,7 +2168,7 @@ impl Primitives for Weak { drop(state); - session.execute_subscriber_callbacks( + self.execute_subscriber_callbacks( false, &m.wire_expr, None, @@ -2124,7 +2191,7 @@ impl Primitives for Weak { trace!("recv UndeclareToken {:?}", m.id); #[cfg(feature = "unstable")] { - let mut state = zwrite!(session.state); + let mut state = zwrite!(self.state); if let Some(key_expr) = state.remote_tokens.remove(&m.id) { drop(state); @@ -2133,9 +2200,9 @@ impl Primitives for Weak { ..Default::default() }; - session.execute_subscriber_callbacks( + self.execute_subscriber_callbacks( false, - &key_expr.to_wire(&session), + &key_expr.to_wire(self), Some(data_info), ZBuf::default(), SubscriberKind::LivelinessSubscriber, @@ -2157,9 +2224,9 @@ impl Primitives for Weak { ..Default::default() }; - session.execute_subscriber_callbacks( + self.execute_subscriber_callbacks( false, - &key_expr.to_wire(&session), + &key_expr.to_wire(self), Some(data_info), ZBuf::default(), SubscriberKind::LivelinessSubscriber, @@ -2184,7 +2251,7 @@ impl Primitives for Weak { #[cfg(feature = "unstable")] if let Some(interest_id) = msg.interest_id { - let mut state = zwrite!(session.state); + let mut state = zwrite!(self.state); let _ = state.liveliness_queries.remove(&interest_id); } } @@ -2192,9 +2259,6 @@ impl Primitives for Weak { } fn send_push(&self, msg: Push, _reliability: Reliability) { - let Some(session) = self.upgrade() else { - return; - }; trace!("recv Push {:?}", msg); match msg.payload { PushBody::Put(m) => { @@ -2206,7 +2270,7 @@ impl Primitives for Weak { source_id: m.ext_sinfo.as_ref().map(|i| i.id.into()), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), }; - session.execute_subscriber_callbacks( + self.execute_subscriber_callbacks( false, &msg.wire_expr, Some(info), @@ -2226,7 +2290,7 @@ impl Primitives for Weak { source_id: m.ext_sinfo.as_ref().map(|i| i.id.into()), source_sn: m.ext_sinfo.as_ref().map(|i| i.sn as u64), }; - session.execute_subscriber_callbacks( + self.execute_subscriber_callbacks( false, &msg.wire_expr, Some(info), @@ -2241,12 +2305,9 @@ impl Primitives for Weak { } fn send_request(&self, msg: Request) { - let Some(session) = self.upgrade() else { - return; - }; trace!("recv Request {:?}", msg); match msg.payload { - RequestBody::Query(m) => session.handle_query( + RequestBody::Query(m) => self.handle_query( false, &msg.wire_expr, &m.parameters, @@ -2260,13 +2321,10 @@ impl Primitives for Weak { } fn send_response(&self, msg: Response) { - let Some(session) = self.upgrade() else { - return; - }; trace!("recv Response {:?}", msg); match msg.payload { ResponseBody::Err(e) => { - let mut state = zwrite!(session.state); + let mut state = zwrite!(self.state); match state.queries.get_mut(&msg.rid) { Some(query) => { let callback = query.callback.clone(); @@ -2288,7 +2346,7 @@ impl Primitives for Weak { } } ResponseBody::Reply(m) => { - let mut state = zwrite!(session.state); + let mut state = zwrite!(self.state); let key_expr = match state.remote_key_to_expr(&msg.wire_expr) { Ok(key) => key.into_owned(), Err(e) => { @@ -2462,11 +2520,8 @@ impl Primitives for Weak { } fn send_response_final(&self, msg: ResponseFinal) { - let Some(session) = self.upgrade() else { - return; - }; trace!("recv ResponseFinal {:?}", msg); - let mut state = zwrite!(session.state); + let mut state = zwrite!(self.state); match state.queries.get_mut(&msg.rid) { Some(query) => { query.nb_final -= 1; @@ -2488,28 +2543,11 @@ impl Primitives for Weak { } fn send_close(&self) { - if self.upgrade().is_none() { - return; - } trace!("recv Close"); } } -impl Drop for SessionInner { - fn drop(&mut self) { - if let Err(error) = self.close().wait() { - tracing::error!(error); - } - } -} - -impl fmt::Debug for Session { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - self.0.fmt(f) - } -} - -impl crate::net::primitives::EPrimitives for Weak { +impl crate::net::primitives::EPrimitives for WeakSession { #[inline] fn send_interest(&self, ctx: crate::net::routing::RoutingContext) { (self as &dyn Primitives).send_interest(ctx.msg) diff --git a/zenoh/src/api/subscriber.rs b/zenoh/src/api/subscriber.rs index 8aafb7a8f5..4dd1caba38 100644 --- a/zenoh/src/api/subscriber.rs +++ b/zenoh/src/api/subscriber.rs @@ -17,7 +17,7 @@ use std::{ future::{IntoFuture, Ready}, mem::size_of, ops::{Deref, DerefMut}, - sync::{Arc, Weak}, + sync::Arc, }; use tracing::error; @@ -37,7 +37,7 @@ use crate::{ handlers::{locked, Callback, DefaultHandler, IntoHandler}, key_expr::KeyExpr, sample::{Locality, Sample}, - session::{SessionInner, UndeclarableSealed}, + session::{UndeclarableSealed, WeakSession}, Id, }, Session, @@ -64,7 +64,7 @@ impl fmt::Debug for SubscriberState { pub(crate) struct SubscriberInner { #[cfg(feature = "unstable")] pub(crate) session_id: ZenohId, - pub(crate) session: Weak, + pub(crate) session: WeakSession, pub(crate) state: Arc, pub(crate) kind: SubscriberKind, // Subscriber is undeclared on drop unless its handler is a ZST, i.e. it is callback-only @@ -336,7 +336,7 @@ where inner: SubscriberInner { #[cfg(feature = "unstable")] session_id: session.zid(), - session: Arc::downgrade(&session.0), + session: session.downgrade(), state: sub_state, kind: SubscriberKind::Subscriber, // `size_of::() == 0` means callback-only subscriber @@ -484,10 +484,9 @@ impl Subscriber { fn undeclare_impl(&mut self) -> ZResult<()> { // set the flag first to avoid double panic if this function panic self.inner.undeclare_on_drop = false; - let Some(session) = self.inner.session.upgrade() else { - return Ok(()); - }; - session.undeclare_subscriber_inner(self.inner.state.id, self.inner.kind) + self.inner + .session + .undeclare_subscriber_inner(self.inner.state.id, self.inner.kind) } }