diff --git a/.github/workflows/crates_check.sh b/.github/workflows/crates_check.sh index 7e9fca4c8c..a33db4c0de 100755 --- a/.github/workflows/crates_check.sh +++ b/.github/workflows/crates_check.sh @@ -11,6 +11,7 @@ cargo check -p zenoh-macros --manifest-path commons/zenoh-macros/Cargo.toml && cargo check -p zenoh-shm --manifest-path commons/zenoh-shm/Cargo.toml && cargo check -p zenoh-codec --manifest-path commons/zenoh-codec/Cargo.toml && cargo check -p zenoh-config --manifest-path commons/zenoh-config/Cargo.toml && +cargo check -p zenoh-task --manifest-path commons/zenoh-task/Cargo.toml && cargo check -p zenoh-link-commons --manifest-path io/zenoh-link-commons/Cargo.toml && cargo check -p zenoh-link-udp --manifest-path io/zenoh-links/zenoh-link-udp/Cargo.toml && cargo check -p zenoh-link-tcp --manifest-path io/zenoh-links/zenoh-link-tcp/Cargo.toml && diff --git a/.github/workflows/crates_publish.sh b/.github/workflows/crates_publish.sh index 3cd1243a61..ce16243a35 100755 --- a/.github/workflows/crates_publish.sh +++ b/.github/workflows/crates_publish.sh @@ -12,6 +12,7 @@ cargo login $1 (cd commons/zenoh-shm && cargo publish; cargo clean) (cd commons/zenoh-codec && cargo publish; cargo clean) (cd commons/zenoh-config && cargo publish; cargo clean) +(cd commons/zenoh-task && cargo publish; cargo clean) (cd io/zenoh-link-commons && cargo publish; cargo clean) (cd io/zenoh-links/zenoh-link-udp && cargo publish; cargo clean) (cd io/zenoh-links/zenoh-link-tcp && cargo publish; cargo clean) diff --git a/Cargo.lock b/Cargo.lock index a03e85fd79..ab95b226aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4603,6 +4603,7 @@ dependencies = [ "zenoh-result", "zenoh-shm", "zenoh-sync", + "zenoh-task", "zenoh-transport", "zenoh-util", ] @@ -5101,6 +5102,16 @@ dependencies = [ "zenoh-core", ] +[[package]] +name = "zenoh-task" +version = "0.11.0-dev" +dependencies = [ + "async-std", + "futures", + "tokio", + "uuid", +] + [[package]] name = "zenoh-transport" version = "0.11.0-dev" @@ -5131,6 +5142,7 @@ dependencies = [ "zenoh-result", "zenoh-shm", "zenoh-sync", + "zenoh-task", "zenoh-util", ] diff --git a/Cargo.toml b/Cargo.toml index 918c432073..a91d862267 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ members = [ "commons/zenoh-result", "commons/zenoh-shm", "commons/zenoh-sync", + "commons/zenoh-task", "commons/zenoh-util", "examples", "io/zenoh-link", @@ -172,6 +173,7 @@ zenoh-util = { version = "0.11.0-dev", path = "commons/zenoh-util" } zenoh-crypto = { version = "0.11.0-dev", path = "commons/zenoh-crypto" } zenoh-codec = { version = "0.11.0-dev", path = "commons/zenoh-codec" } zenoh-sync = { version = "0.11.0-dev", path = "commons/zenoh-sync" } +zenoh-task = { version = "0.11.0-dev", path = "commons/zenoh-task" } zenoh-collections = { version = "0.11.0-dev", path = "commons/zenoh-collections", default-features = false } zenoh-macros = { version = "0.11.0-dev", path = "commons/zenoh-macros" } zenoh-plugin-trait = { version = "0.11.0-dev", path = "plugins/zenoh-plugin-trait", default-features = false } diff --git a/commons/zenoh-task/Cargo.toml b/commons/zenoh-task/Cargo.toml new file mode 100644 index 0000000000..3270a003f2 --- /dev/null +++ b/commons/zenoh-task/Cargo.toml @@ -0,0 +1,36 @@ +# +# Copyright (c) 2023 ZettaScale Technology +# +# This program and the accompanying materials are made available under the +# terms of the Eclipse Public License 2.0 which is available at +# http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +# which is available at https://www.apache.org/licenses/LICENSE-2.0. +# +# SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +# +# Contributors: +# ZettaScale Zenoh Team, +# +[package] +rust-version = { workspace = true } +name = "zenoh-task" +version = { workspace = true } +repository = { workspace = true } +homepage = { workspace = true } +authors = [ + "Denis " +] +edition = { workspace = true } +license = { workspace = true } +categories = { workspace = true } +description = "Internal crate for zenoh." +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-std = { workspace = true, features = ["default", "unstable"] } +tokio = { workspace = true, features = ["default", "sync"] } +futures = { workspace = true } +uuid = { workspace = true } + +[dev-dependencies] +async-std = { workspace = true, features = ["default", "unstable", "attributes"] } diff --git a/commons/zenoh-task/README.md b/commons/zenoh-task/README.md new file mode 100644 index 0000000000..65a29107b9 --- /dev/null +++ b/commons/zenoh-task/README.md @@ -0,0 +1,8 @@ +# ⚠️ WARNING ⚠️ + +This crate is intended for Zenoh's internal use. + +- [Click here for Zenoh's main repository](https://github.com/eclipse-zenoh/zenoh) +- [Click here for Zenoh's documentation](https://zenoh.io) + + diff --git a/commons/zenoh-task/src/lib.rs b/commons/zenoh-task/src/lib.rs new file mode 100644 index 0000000000..405fcda9e4 --- /dev/null +++ b/commons/zenoh-task/src/lib.rs @@ -0,0 +1,88 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +//! ⚠️ WARNING ⚠️ +//! +//! This module is intended for Zenoh's internal use. +//! +//! [Click here for Zenoh's documentation](../zenoh/index.html) + +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::future::Future; +use std::ops::DerefMut; +use std::sync::Arc; +use std::sync::Mutex; +use tokio::task::{self, JoinHandle}; +use uuid::Uuid; + +#[derive(Clone)] +pub struct TaskController { + running_task_id_to_handle: Arc>>>>, +} + +impl TaskController { + pub fn new() -> TaskController { + TaskController { + running_task_id_to_handle: Arc::new(Mutex::new( + HashMap::>>::new(), + )), + } + } + + /// Spawns a task (similarly to task::spawn) that can be later terminated by call to terminate_all() + /// Task output is ignored + pub fn spawn(&self, future: F) + where + F: Future + Send + 'static, + T: Send + 'static, + { + let mut tasks = self.running_task_id_to_handle.lock().unwrap(); + let id = TaskController::get_next_task_id(tasks.deref_mut()); + let tasks_mutex = self.running_task_id_to_handle.clone(); + let jh = task::spawn(futures::FutureExt::map(future, move |_| { + tasks_mutex.lock().unwrap().remove(&id); + () + })); + tasks.insert(id, Some(jh)); + } + + fn get_next_task_id(hm: &mut HashMap>>) -> Uuid { + loop { + let uuid = Uuid::new_v4(); + match hm.entry(uuid.clone()) { + Entry::Occupied(_) => { + continue; + } + Entry::Vacant(v) => { + v.insert(None); + return uuid; + } + } + } + } + + /// Terminates all prevously spawned tasks + pub fn terminate_all(&self) { + let tasks: Vec<(Uuid, Option>)> = self + .running_task_id_to_handle + .lock() + .unwrap() + .drain() + .collect(); + for (_id, jh) in tasks { + let _ = jh.unwrap().abort(); + } + } +} diff --git a/io/zenoh-transport/Cargo.toml b/io/zenoh-transport/Cargo.toml index 0921f4e1ee..fcbaa2b644 100644 --- a/io/zenoh-transport/Cargo.toml +++ b/io/zenoh-transport/Cargo.toml @@ -73,6 +73,7 @@ zenoh-protocol = { workspace = true } zenoh-result = { workspace = true } zenoh-shm = { workspace = true, optional = true } zenoh-sync = { workspace = true } +zenoh-task = { workspace = true } zenoh-util = { workspace = true } [dev-dependencies] diff --git a/io/zenoh-transport/src/manager.rs b/io/zenoh-transport/src/manager.rs index 3c225274aa..abf6f66cd4 100644 --- a/io/zenoh-transport/src/manager.rs +++ b/io/zenoh-transport/src/manager.rs @@ -19,10 +19,11 @@ use crate::multicast::manager::{ TransportManagerBuilderMulticast, TransportManagerConfigMulticast, TransportManagerStateMulticast, }; +use async_std::channel::RecvError; use async_std::{sync::Mutex as AsyncMutex, task}; use rand::{RngCore, SeedableRng}; use std::collections::HashMap; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::Duration; use zenoh_config::{Config, LinkRxConf, QueueConf, QueueSizeConf}; use zenoh_crypto::{BlockCipher, PseudoRng}; @@ -33,6 +34,7 @@ use zenoh_protocol::{ VERSION, }; use zenoh_result::{bail, ZResult}; +use zenoh_task::TaskController; /// # Examples /// ``` @@ -320,25 +322,36 @@ impl Default for TransportManagerBuilder { pub(crate) struct TransportExecutor { executor: Arc>, sender: async_std::channel::Sender<()>, + exec_handles: Arc>>>>, } impl TransportExecutor { fn new(num_threads: usize) -> Self { let (sender, receiver) = async_std::channel::bounded(1); let executor = Arc::new(async_executor::Executor::new()); + let mut exec_handles = Vec::new(); for i in 0..num_threads { let exec = executor.clone(); let recv = receiver.clone(); - std::thread::Builder::new() + let h = std::thread::Builder::new() .name(format!("zenoh-tx-{}", i)) .spawn(move || async_std::task::block_on(exec.run(recv.recv()))) .unwrap(); + exec_handles.push(h); + } + Self { + executor, + sender, + exec_handles: Arc::new(Mutex::new(exec_handles)), } - Self { executor, sender } } async fn stop(&self) { let _ = self.sender.send(()).await; + let mut eh = self.exec_handles.lock().unwrap(); + for h in eh.drain(0..) { + let _ = h.join(); + } } pub(crate) fn spawn( @@ -360,6 +373,7 @@ pub struct TransportManager { pub(crate) tx_executor: TransportExecutor, #[cfg(feature = "stats")] pub(crate) stats: Arc, + pub(crate) task_controller: TaskController, } impl TransportManager { @@ -383,10 +397,11 @@ impl TransportManager { tx_executor: TransportExecutor::new(tx_threads), #[cfg(feature = "stats")] stats: std::sync::Arc::new(crate::stats::TransportStats::default()), + task_controller: TaskController::new(), }; // @TODO: this should be moved into the unicast module - async_std::task::spawn({ + this.task_controller.spawn({ let this = this.clone(); async move { while let Ok(link) = new_unicast_link_receiver.recv_async().await { @@ -412,6 +427,7 @@ impl TransportManager { } pub async fn close(&self) { + self.task_controller.terminate_all(); self.close_unicast().await; self.tx_executor.stop().await; } diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml index 3896c5d57f..f8387111de 100644 --- a/zenoh/Cargo.toml +++ b/zenoh/Cargo.toml @@ -102,6 +102,7 @@ zenoh-protocol = { workspace = true, features = ["std"] } zenoh-result = { workspace = true } zenoh-shm = { workspace = true, optional = true } zenoh-sync = { workspace = true } +zenoh-task = { workspace = true } zenoh-transport = { workspace = true } zenoh-util = { workspace = true } diff --git a/zenoh/src/net/primitives/mux.rs b/zenoh/src/net/primitives/mux.rs index 17aad11311..f35d90593d 100644 --- a/zenoh/src/net/primitives/mux.rs +++ b/zenoh/src/net/primitives/mux.rs @@ -13,7 +13,7 @@ // use super::{EPrimitives, Primitives}; use crate::net::routing::{ - dispatcher::face::Face, + dispatcher::face::{Face, WeakFace}, interceptor::{InterceptorTrait, InterceptorsChain}, RoutingContext, }; @@ -25,7 +25,7 @@ use zenoh_transport::{multicast::TransportMulticast, unicast::TransportUnicast}; pub struct Mux { pub handler: TransportUnicast, - pub(crate) face: OnceLock, + pub(crate) face: OnceLock, pub(crate) interceptor: InterceptorsChain, } @@ -48,7 +48,7 @@ impl Primitives for Mux { }; if self.interceptor.interceptors.is_empty() { let _ = self.handler.schedule(msg); - } else if let Some(face) = self.face.get() { + } else if let Some(face) = self.face.get().and_then(|f| f.upgrade()) { let ctx = RoutingContext::new_out(msg, face.clone()); if let Some(ctx) = self.interceptor.intercept(ctx) { let _ = self.handler.schedule(ctx.msg); @@ -66,7 +66,7 @@ impl Primitives for Mux { }; if self.interceptor.interceptors.is_empty() { let _ = self.handler.schedule(msg); - } else if let Some(face) = self.face.get() { + } else if let Some(face) = self.face.get().and_then(|f| f.upgrade()) { let ctx = RoutingContext::new_out(msg, face.clone()); if let Some(ctx) = self.interceptor.intercept(ctx) { let _ = self.handler.schedule(ctx.msg); @@ -84,7 +84,7 @@ impl Primitives for Mux { }; if self.interceptor.interceptors.is_empty() { let _ = self.handler.schedule(msg); - } else if let Some(face) = self.face.get() { + } else if let Some(face) = self.face.get().and_then(|f| f.upgrade()) { let ctx = RoutingContext::new_out(msg, face.clone()); if let Some(ctx) = self.interceptor.intercept(ctx) { let _ = self.handler.schedule(ctx.msg); @@ -102,7 +102,7 @@ impl Primitives for Mux { }; if self.interceptor.interceptors.is_empty() { let _ = self.handler.schedule(msg); - } else if let Some(face) = self.face.get() { + } else if let Some(face) = self.face.get().and_then(|f| f.upgrade()) { let ctx = RoutingContext::new_out(msg, face.clone()); if let Some(ctx) = self.interceptor.intercept(ctx) { let _ = self.handler.schedule(ctx.msg); @@ -120,7 +120,7 @@ impl Primitives for Mux { }; if self.interceptor.interceptors.is_empty() { let _ = self.handler.schedule(msg); - } else if let Some(face) = self.face.get() { + } else if let Some(face) = self.face.get().and_then(|f| f.upgrade()) { let ctx = RoutingContext::new_out(msg, face.clone()); if let Some(ctx) = self.interceptor.intercept(ctx) { let _ = self.handler.schedule(ctx.msg); @@ -161,7 +161,7 @@ impl EPrimitives for Mux { }; if self.interceptor.interceptors.is_empty() { let _ = self.handler.schedule(msg); - } else if let Some(face) = self.face.get() { + } else if let Some(face) = self.face.get().and_then(|f| f.upgrade()) { let ctx = RoutingContext::new_out(msg, face.clone()); if let Some(ctx) = self.interceptor.intercept(ctx) { let _ = self.handler.schedule(ctx.msg); diff --git a/zenoh/src/net/routing/dispatcher/face.rs b/zenoh/src/net/routing/dispatcher/face.rs index fcf6d3c302..12525193e3 100644 --- a/zenoh/src/net/routing/dispatcher/face.rs +++ b/zenoh/src/net/routing/dispatcher/face.rs @@ -18,7 +18,7 @@ use crate::net::primitives::Primitives; use std::any::Any; use std::collections::HashMap; use std::fmt; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use zenoh_protocol::zenoh::RequestBody; use zenoh_protocol::{ core::{ExprId, WhatAmI, ZenohId}, @@ -108,12 +108,43 @@ impl fmt::Display for FaceState { } } +#[derive(Clone)] +pub struct WeakFace { + pub(crate) tables: Weak, + pub(crate) state: Weak, +} + +impl WeakFace { + pub fn new() -> WeakFace { + WeakFace { + tables: Weak::new(), + state: Weak::new() + } + } + + pub fn upgrade(&self) -> Option { + Some(Face { + tables: self.tables.upgrade()?, + state: self.state.upgrade()? + }) + } +} + #[derive(Clone)] pub struct Face { pub(crate) tables: Arc, pub(crate) state: Arc, } +impl Face { + pub fn downgrade(&self) -> WeakFace { + WeakFace { + tables: Arc::downgrade(&self.tables), + state: Arc::downgrade(&self.state) + } + } +} + impl Primitives for Face { fn send_declare(&self, msg: zenoh_protocol::network::Declare) { let ctrl_lock = zlock!(self.tables.ctrl_lock); diff --git a/zenoh/src/net/routing/dispatcher/resource.rs b/zenoh/src/net/routing/dispatcher/resource.rs index 7fc71c623d..9e9379fbb0 100644 --- a/zenoh/src/net/routing/dispatcher/resource.rs +++ b/zenoh/src/net/routing/dispatcher/resource.rs @@ -289,7 +289,8 @@ impl Resource { let mut resclone = res.clone(); let mutres = get_mut_unchecked(&mut resclone); if let Some(ref mut parent) = mutres.parent { - if Arc::strong_count(res) <= 3 && res.childs.is_empty() { + if Arc::strong_count(res) <= 3 && res.childs.is_empty() { + // consider only childless resource held by only one external object (+ 1 strong count for resclone, + 1 strong count for res.parent to a total of 3 ) log::debug!("Unregister resource {}", res.expr()); if let Some(context) = mutres.context.as_mut() { for match_ in &mut context.matches { @@ -303,6 +304,7 @@ impl Resource { } } } + mutres.nonwild_prefix.take(); { get_mut_unchecked(parent).childs.remove(&res.suffix); } diff --git a/zenoh/src/net/routing/hat/linkstate_peer/network.rs b/zenoh/src/net/routing/hat/linkstate_peer/network.rs index ac610a808b..d9e869173e 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/network.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/network.rs @@ -15,6 +15,7 @@ use crate::net::codec::Zenoh080Routing; use crate::net::protocol::linkstate::{LinkState, LinkStateList}; use crate::net::routing::dispatcher::tables::NodeId; use crate::net::runtime::Runtime; +use crate::runtime::WeakRuntime; use async_std::task; use petgraph::graph::NodeIndex; use petgraph::visit::{VisitMap, Visitable}; @@ -116,7 +117,7 @@ pub(super) struct Network { pub(super) trees: Vec, pub(super) distances: Vec, pub(super) graph: petgraph::stable_graph::StableUnGraph, - pub(super) runtime: Runtime, + pub(super) runtime: WeakRuntime, } impl Network { @@ -156,7 +157,7 @@ impl Network { }], distances: vec![0.0], graph, - runtime, + runtime: Runtime::downgrade(&runtime), } } @@ -248,7 +249,7 @@ impl Network { whatami: self.graph[idx].whatami, locators: if details.locators { if idx == self.idx { - Some(self.runtime.get_locators()) + Some(self.runtime.upgrade().unwrap().get_locators()) } else { self.graph[idx].locators.clone() } @@ -337,6 +338,7 @@ impl Network { pub(super) fn link_states(&mut self, link_states: Vec, src: ZenohId) -> Changes { log::trace!("{} Received from {} raw: {:?}", self.name, src, link_states); + let strong_runtime = self.runtime.upgrade().unwrap(); let graph = &self.graph; let links = &mut self.links; @@ -487,13 +489,13 @@ impl Network { if !self.autoconnect.is_empty() { // Connect discovered peers - if task::block_on(self.runtime.manager().get_transport_unicast(&zid)) + if task::block_on(strong_runtime.manager().get_transport_unicast(&zid)) .is_none() && self.autoconnect.matches(whatami) { if let Some(locators) = locators { - let runtime = self.runtime.clone(); - self.runtime.spawn(async move { + let runtime = strong_runtime.clone(); + strong_runtime.spawn(async move { // random backoff async_std::task::sleep(std::time::Duration::from_millis( rand::random::() % 100, @@ -606,15 +608,15 @@ impl Network { for (_, idx, _) in &link_states { let node = &self.graph[*idx]; if let Some(whatami) = node.whatami { - if task::block_on(self.runtime.manager().get_transport_unicast(&node.zid)) + if task::block_on(strong_runtime.manager().get_transport_unicast(&node.zid)) .is_none() && self.autoconnect.matches(whatami) { if let Some(locators) = &node.locators { - let runtime = self.runtime.clone(); + let runtime = strong_runtime.clone(); let zid = node.zid; let locators = locators.clone(); - self.runtime.spawn(async move { + strong_runtime.spawn(async move { // random backoff async_std::task::sleep(std::time::Duration::from_millis( rand::random::() % 100, diff --git a/zenoh/src/net/routing/hat/p2p_peer/gossip.rs b/zenoh/src/net/routing/hat/p2p_peer/gossip.rs index ae3fda51a7..aa992f3bf7 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/gossip.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/gossip.rs @@ -14,6 +14,7 @@ use crate::net::codec::Zenoh080Routing; use crate::net::protocol::linkstate::{LinkState, LinkStateList}; use crate::net::runtime::Runtime; +use crate::runtime::WeakRuntime; use async_std::task; use petgraph::graph::NodeIndex; use std::convert::TryInto; @@ -94,7 +95,7 @@ pub(super) struct Network { pub(super) idx: NodeIndex, pub(super) links: VecMap, pub(super) graph: petgraph::stable_graph::StableUnGraph, - pub(super) runtime: Runtime, + pub(super) runtime: WeakRuntime, } impl Network { @@ -125,7 +126,7 @@ impl Network { idx, links: VecMap::new(), graph, - runtime, + runtime: Runtime::downgrade(&runtime), } } @@ -192,7 +193,7 @@ impl Network { whatami: self.graph[idx].whatami, locators: if details.locators { if idx == self.idx { - Some(self.runtime.get_locators()) + Some(self.runtime.upgrade().unwrap().get_locators()) } else { self.graph[idx].locators.clone() } @@ -267,6 +268,7 @@ impl Network { pub(super) fn link_states(&mut self, link_states: Vec, src: ZenohId) { log::trace!("{} Received from {} raw: {:?}", self.name, src, link_states); + let strong_runtime = self.runtime.upgrade().unwrap(); let graph = &self.graph; let links = &mut self.links; @@ -407,13 +409,13 @@ impl Network { if !self.autoconnect.is_empty() { // Connect discovered peers - if task::block_on(self.runtime.manager().get_transport_unicast(&zid)) + if task::block_on(strong_runtime.manager().get_transport_unicast(&zid)) .is_none() && self.autoconnect.matches(whatami) { if let Some(locators) = locators { - let runtime = self.runtime.clone(); - self.runtime.spawn(async move { + let runtime = strong_runtime.clone(); + strong_runtime.spawn(async move { // random backoff async_std::task::sleep(std::time::Duration::from_millis( rand::random::() % 100, diff --git a/zenoh/src/net/routing/router.rs b/zenoh/src/net/routing/router.rs index 26c9d36185..47d66aeb4b 100644 --- a/zenoh/src/net/routing/router.rs +++ b/zenoh/src/net/routing/router.rs @@ -146,7 +146,7 @@ impl Router { state: newface, }; - let _ = mux.face.set(face.clone()); + let _ = mux.face.set(Face::downgrade(&face)); ctrl_lock.new_transport_unicast_face(&mut tables, &self.tables, &mut face, &transport)?; diff --git a/zenoh/src/net/runtime/mod.rs b/zenoh/src/net/runtime/mod.rs index ac125421f6..d1f06a351b 100644 --- a/zenoh/src/net/runtime/mod.rs +++ b/zenoh/src/net/runtime/mod.rs @@ -30,7 +30,8 @@ use async_std::task::JoinHandle; use futures::stream::StreamExt; use futures::Future; use std::any::Any; -use std::sync::Arc; +use std::sync::{Arc, Weak}; +use std::time::Duration; use stop_token::future::FutureExt; use stop_token::{StopSource, TimedOutError}; use uhlc::{HLCBuilder, HLC}; @@ -58,6 +59,16 @@ struct RuntimeState { stop_source: std::sync::RwLock>, } +pub struct WeakRuntime { + state: Weak, +} + +impl WeakRuntime { + pub fn upgrade(&self) -> Option { + self.state.upgrade().map(|state| Runtime { state }) + } +} + #[derive(Clone)] pub struct Runtime { state: Arc, @@ -98,7 +109,7 @@ impl Runtime { let router = Arc::new(Router::new(zid, whatami, hlc.clone(), &config)); let handler = Arc::new(RuntimeTransportEventHandler { - runtime: std::sync::RwLock::new(None), + runtime: std::sync::RwLock::new(WeakRuntime { state: Weak::new() }), }); let transport_manager = TransportManager::builder() @@ -124,7 +135,7 @@ impl Runtime { stop_source: std::sync::RwLock::new(Some(StopSource::new())), }), }; - *handler.runtime.write().unwrap() = Some(runtime.clone()); + *handler.runtime.write().unwrap() = Runtime::downgrade(&runtime); get_mut_unchecked(&mut runtime.state.router.clone()).init_link_state(runtime.clone()); let receiver = config.subscribe(); @@ -158,6 +169,8 @@ impl Runtime { log::trace!("Runtime::close())"); drop(self.state.stop_source.write().unwrap().take()); self.manager().close().await; + // clean up to break cyclic reference of self.state to itself + self.state.transport_handlers.write().unwrap().clear(); Ok(()) } @@ -201,10 +214,16 @@ impl Runtime { pub fn whatami(&self) -> WhatAmI { self.state.whatami } + + pub fn downgrade(this: &Runtime) -> WeakRuntime { + WeakRuntime { + state: Arc::downgrade(&this.state), + } + } } struct RuntimeTransportEventHandler { - runtime: std::sync::RwLock>, + runtime: std::sync::RwLock, } impl TransportEventHandler for RuntimeTransportEventHandler { @@ -213,7 +232,7 @@ impl TransportEventHandler for RuntimeTransportEventHandler { peer: TransportPeer, transport: TransportUnicast, ) -> ZResult> { - match zread!(self.runtime).as_ref() { + match zread!(self.runtime).upgrade().as_ref() { Some(runtime) => { let slave_handlers: Vec> = zread!(runtime.state.transport_handlers) @@ -241,7 +260,7 @@ impl TransportEventHandler for RuntimeTransportEventHandler { &self, transport: TransportMulticast, ) -> ZResult> { - match zread!(self.runtime).as_ref() { + match zread!(self.runtime).upgrade().as_ref() { Some(runtime) => { let slave_handlers: Vec> = zread!(runtime.state.transport_handlers) diff --git a/zenoh/src/session.rs b/zenoh/src/session.rs index 8989cc1cb7..c408ad62a9 100644 --- a/zenoh/src/session.rs +++ b/zenoh/src/session.rs @@ -81,6 +81,7 @@ use zenoh_protocol::{ }, }; use zenoh_result::ZResult; +use zenoh_task::TaskController; use zenoh_util::core::AsyncResolve; zconfigurable! { @@ -329,6 +330,7 @@ pub struct Session { pub(crate) state: Arc>, pub(crate) id: u16, pub(crate) alive: bool, + pub(crate) task_controller: TaskController, } static SESSION_ID_COUNTER: AtomicU16 = AtomicU16::new(0); @@ -349,6 +351,7 @@ impl Session { state: state.clone(), id: SESSION_ID_COUNTER.fetch_add(1, Ordering::SeqCst), alive: true, + task_controller: TaskController::new(), }; runtime.new_handler(Arc::new(admin::Handler::new(session.clone()))); @@ -452,10 +455,14 @@ impl Session { pub fn close(self) -> impl Resolve> { ResolveFuture::new(async move { trace!("close()"); + self.task_controller.terminate_all(); self.runtime.close().await?; - let primitives = zwrite!(self.state).primitives.as_ref().unwrap().clone(); - primitives.send_close(); + let mut state = zwrite!(self.state); + state.primitives.as_ref().unwrap().send_close(); + // clean up to break cyclic references from self.state to itself + state.primitives.take(); + state.queryables.clear(); Ok(()) }) @@ -840,6 +847,7 @@ impl Session { state: self.state.clone(), id: self.id, alive: false, + task_controller: self.task_controller.clone(), } } @@ -1537,7 +1545,7 @@ impl Session { for msub in state.matching_listeners.values() { if key_expr.intersects(&msub.key_expr) { // Cannot hold session lock when calling tables (matching_status()) - async_std::task::spawn({ + self.task_controller.spawn({ let session = self.clone(); let msub = msub.clone(); async move { @@ -1570,7 +1578,7 @@ impl Session { for msub in state.matching_listeners.values() { if key_expr.intersects(&msub.key_expr) { // Cannot hold session lock when calling tables (matching_status()) - async_std::task::spawn({ + self.task_controller.spawn({ let session = self.clone(); let msub = msub.clone(); async move { @@ -1788,7 +1796,7 @@ impl Session { Locality::Any => 2, _ => 1, }; - task::spawn({ + self.task_controller.spawn({ let state = self.state.clone(); let zid = self.runtime.zid(); async move {