From e12eedfd362da535151ae49c3a62887afa0980cc Mon Sep 17 00:00:00 2001 From: Denis Biryukov Date: Tue, 19 Mar 2024 14:50:01 +0000 Subject: [PATCH] terminate more tasks --- commons/zenoh-runtime/src/lib.rs | 13 +++++++- commons/zenoh-task/src/lib.rs | 31 +++++++++++-------- io/zenoh-transport/src/manager.rs | 2 +- io/zenoh-transport/src/multicast/transport.rs | 15 +++++---- io/zenoh-transport/src/unicast/manager.rs | 23 +++++++------- zenoh/src/net/runtime/mod.rs | 18 +++++------ 6 files changed, 57 insertions(+), 45 deletions(-) diff --git a/commons/zenoh-runtime/src/lib.rs b/commons/zenoh-runtime/src/lib.rs index 114b3d2ac4..8f11935bbe 100644 --- a/commons/zenoh-runtime/src/lib.rs +++ b/commons/zenoh-runtime/src/lib.rs @@ -143,7 +143,7 @@ impl ZRuntimePool { } /// Force drops ZRUNTIME_POOL. -/// +/// /// Rust does not drop static variables. They are always reported by valgrind for exampler. /// This function can be used force drop ZRUNTIME_POOL, to prevent valgrind reporting memory leaks related to it. #[doc(hidden)] @@ -161,6 +161,17 @@ pub unsafe fn zruntime_pool_free() { std::mem::drop(hm.read()); } +#[doc(hidden)] +pub struct ZRuntimeGuard; + +impl Drop for ZRuntimeGuard { + fn drop(&mut self) { + unsafe { + zruntime_pool_free(); + } + } +} + #[derive(Debug, Copy, Clone)] pub struct ZRuntimeConfig { pub application_threads: usize, diff --git a/commons/zenoh-task/src/lib.rs b/commons/zenoh-task/src/lib.rs index fcb1e1452a..3d665f3035 100644 --- a/commons/zenoh-task/src/lib.rs +++ b/commons/zenoh-task/src/lib.rs @@ -19,10 +19,10 @@ //! [Click here for Zenoh's documentation](../zenoh/index.html) use std::future::Future; -use zenoh_runtime::ZRuntime; +use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tokio_util::task::TaskTracker; - +use zenoh_runtime::ZRuntime; #[derive(Clone)] pub struct TaskController { @@ -42,7 +42,7 @@ impl Default for TaskController { impl TaskController { /// Spawns a task (similarly to task::spawn) that can be later terminated by call to terminate_all() /// Task output is ignored - pub fn spawn(&self, future: F) + pub fn spawn(&self, future: F) -> JoinHandle<()> where F: Future + Send + 'static, T: Send + 'static, @@ -54,12 +54,12 @@ impl TaskController { _ = future => {} } }; - self.tracker.spawn(task); + self.tracker.spawn(task) } /// Spawns a task using a specified runtime (similarly to Runtime::spawn) that can be later terminated by call to terminate_all(). /// Task output is ignored. - pub fn spawn_with_rt(&self, rt: ZRuntime, future: F) + pub fn spawn_with_rt(&self, rt: ZRuntime, future: F) -> JoinHandle<()> where F: Future + Send + 'static, T: Send + 'static, @@ -71,7 +71,7 @@ impl TaskController { _ = future => {} } }; - self.tracker.spawn_on(task, &rt); + self.tracker.spawn_on(task, &rt) } pub fn get_cancellation_token(&self) -> CancellationToken { @@ -80,24 +80,22 @@ impl TaskController { /// Spawns a task that can be cancelled via cancelling a token obtained by get_cancellation_token(). /// It can be later terminated by call to terminate_all(). - /// Task output is ignored. - pub fn spawn_cancellable(&self, future: F) + pub fn spawn_cancellable(&self, future: F) -> JoinHandle where F: Future + Send + 'static, T: Send + 'static, { - self.tracker.spawn(future); + self.tracker.spawn(future) } /// Spawns a task that can be cancelled via cancelling a token obtained by get_cancellation_token() using a specified runtime. /// It can be later terminated by call to terminate_all(). - /// Task output is ignored. - pub fn spawn_cancellable_with_rt(&self, rt: ZRuntime, future: F) + pub fn spawn_cancellable_with_rt(&self, rt: ZRuntime, future: F) -> JoinHandle where F: Future + Send + 'static, T: Send + 'static, { - self.tracker.spawn_on(future, &rt); + self.tracker.spawn_on(future, &rt) } /// Terminates all prevously spawned tasks @@ -105,6 +103,13 @@ impl TaskController { self.tracker.close(); self.token.cancel(); let tracker = self.tracker.clone(); - futures::executor::block_on( async move { tracker.wait().await } ); + futures::executor::block_on(async move { tracker.wait().await }); + } + + /// Terminates all prevously spawned tasks + pub async fn terminate_all_async(&self) { + self.tracker.close(); + self.token.cancel(); + self.tracker.wait().await; } } diff --git a/io/zenoh-transport/src/manager.rs b/io/zenoh-transport/src/manager.rs index f28509fc27..b66eeac501 100644 --- a/io/zenoh-transport/src/manager.rs +++ b/io/zenoh-transport/src/manager.rs @@ -321,7 +321,7 @@ pub struct TransportManager { pub(crate) new_unicast_link_sender: NewLinkChannelSender, #[cfg(feature = "stats")] pub(crate) stats: Arc, - task_controller: TaskController, + pub(crate) task_controller: TaskController, } impl TransportManager { diff --git a/io/zenoh-transport/src/multicast/transport.rs b/io/zenoh-transport/src/multicast/transport.rs index c647730390..0cc7efc1d6 100644 --- a/io/zenoh-transport/src/multicast/transport.rs +++ b/io/zenoh-transport/src/multicast/transport.rs @@ -39,6 +39,7 @@ use zenoh_protocol::{ transport::{close, Join}, }; use zenoh_result::{bail, ZResult}; +use zenoh_task::TaskController; // use zenoh_util::{Timed, TimedEvent, TimedHandle, Timer}; /*************************************/ @@ -82,8 +83,8 @@ pub(crate) struct TransportMulticastInner { pub(super) link: Arc>>, // The callback pub(super) callback: Arc>>>, - // token for safe cancellation - token: CancellationToken, + // Task controller for safe task cancellation + task_controller: TaskController, // Transport statistics #[cfg(feature = "stats")] pub(super) stats: Arc, @@ -115,7 +116,7 @@ impl TransportMulticastInner { locator: config.link.link.get_dst().to_owned(), link: Arc::new(RwLock::new(None)), callback: Arc::new(RwLock::new(None)), - token: CancellationToken::new(), + task_controller: TaskController::default(), #[cfg(feature = "stats")] stats, }; @@ -183,8 +184,7 @@ impl TransportMulticastInner { cb.closed(); } - // TODO(yuyuan): use CancellationToken to unify the termination with the above - self.token.cancel(); + self.task_controller.terminate_all_async().await; Ok(()) } @@ -369,7 +369,7 @@ impl TransportMulticastInner { // TODO(yuyuan): refine the clone behaviors let is_active = Arc::new(AtomicBool::new(false)); let c_is_active = is_active.clone(); - let token = self.token.child_token(); + let token = self.task_controller.get_cancellation_token(); let c_token = token.clone(); let c_self = self.clone(); let c_locator = locator.clone(); @@ -389,8 +389,7 @@ impl TransportMulticastInner { let _ = c_self.del_peer(&c_locator, close::reason::EXPIRED); }; - // TODO(yuyuan): Put it into TaskTracker or store as JoinHandle - zenoh_runtime::ZRuntime::Acceptor.spawn(task); + self.task_controller.spawn_cancellable_with_rt(zenoh_runtime::ZRuntime::Acceptor, task); // TODO(yuyuan): Integrate the above async task into TransportMulticastPeer // Store the new peer diff --git a/io/zenoh-transport/src/unicast/manager.rs b/io/zenoh-transport/src/unicast/manager.rs index eaf25cd2a3..8a63f4f630 100644 --- a/io/zenoh-transport/src/unicast/manager.rs +++ b/io/zenoh-transport/src/unicast/manager.rs @@ -744,17 +744,18 @@ impl TransportManager { // Spawn a task to accept the link let c_manager = self.clone(); - zenoh_runtime::ZRuntime::Acceptor.spawn(async move { - if let Err(e) = tokio::time::timeout( - c_manager.config.unicast.accept_timeout, - super::establishment::accept::accept_link(link, &c_manager), - ) - .await - { - log::debug!("{}", e); - } - incoming_counter.fetch_sub(1, SeqCst); - }); + self.task_controller + .spawn_with_rt(zenoh_runtime::ZRuntime::Acceptor, async move { + if let Err(e) = tokio::time::timeout( + c_manager.config.unicast.accept_timeout, + super::establishment::accept::accept_link(link, &c_manager), + ) + .await + { + log::debug!("{}", e); + } + incoming_counter.fetch_sub(1, SeqCst); + }); } } diff --git a/zenoh/src/net/runtime/mod.rs b/zenoh/src/net/runtime/mod.rs index 8351c8f352..78b30c5b12 100644 --- a/zenoh/src/net/runtime/mod.rs +++ b/zenoh/src/net/runtime/mod.rs @@ -31,7 +31,6 @@ use futures::Future; use std::any::Any; use std::sync::{Arc, Weak}; use tokio::task::JoinHandle; -use tokio_util::sync::CancellationToken; use uhlc::{HLCBuilder, HLC}; use zenoh_link::{EndPoint, Link}; use zenoh_plugin_trait::{PluginStartArgs, StructVersion}; @@ -39,6 +38,7 @@ use zenoh_protocol::core::{Locator, WhatAmI, ZenohId}; use zenoh_protocol::network::NetworkMessage; use zenoh_result::{bail, ZResult}; use zenoh_sync::get_mut_unchecked; +use zenoh_task::TaskController; use zenoh_transport::{ multicast::TransportMulticast, unicast::TransportUnicast, TransportEventHandler, TransportManager, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler, @@ -54,7 +54,7 @@ struct RuntimeState { transport_handlers: std::sync::RwLock>>, locators: std::sync::RwLock>, hlc: Option>, - token: CancellationToken, + task_controller: TaskController, } pub struct WeakRuntime { @@ -130,7 +130,7 @@ impl Runtime { transport_handlers: std::sync::RwLock::new(vec![]), locators: std::sync::RwLock::new(vec![]), hlc, - token: CancellationToken::new(), + task_controller: TaskController::default(), }), }; *handler.runtime.write().unwrap() = Runtime::downgrade(&runtime); @@ -166,7 +166,7 @@ impl Runtime { pub async fn close(&self) -> ZResult<()> { log::trace!("Runtime::close())"); // TODO: Check this whether is able to terminate all spawned task by Runtime::spawn - self.state.token.cancel(); + self.state.task_controller.terminate_all(); self.manager().close().await; // clean up to break cyclic reference of self.state to itself self.state.transport_handlers.write().unwrap().clear(); @@ -186,13 +186,9 @@ impl Runtime { F: Future + Send + 'static, T: Send + 'static, { - let token = self.state.token.clone(); - zenoh_runtime::ZRuntime::Net.spawn(async move { - tokio::select! { - _ = token.cancelled() => {} - _ = future => {} - } - }) + self.state + .task_controller + .spawn_with_rt(zenoh_runtime::ZRuntime::Net, future) } pub(crate) fn router(&self) -> Arc {