Skip to content

Commit

Permalink
terminate more tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisBiryukov91 committed Mar 19, 2024
1 parent 44dfef4 commit e12eedf
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 45 deletions.
13 changes: 12 additions & 1 deletion commons/zenoh-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl ZRuntimePool {
}

/// Force drops ZRUNTIME_POOL.
///
///
/// Rust does not drop static variables. They are always reported by valgrind for exampler.
/// This function can be used force drop ZRUNTIME_POOL, to prevent valgrind reporting memory leaks related to it.
#[doc(hidden)]
Expand All @@ -161,6 +161,17 @@ pub unsafe fn zruntime_pool_free() {
std::mem::drop(hm.read());
}

#[doc(hidden)]
pub struct ZRuntimeGuard;

impl Drop for ZRuntimeGuard {
fn drop(&mut self) {
unsafe {
zruntime_pool_free();
}
}
}

#[derive(Debug, Copy, Clone)]
pub struct ZRuntimeConfig {
pub application_threads: usize,
Expand Down
31 changes: 18 additions & 13 deletions commons/zenoh-task/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
//! [Click here for Zenoh's documentation](../zenoh/index.html)
use std::future::Future;
use zenoh_runtime::ZRuntime;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;

use zenoh_runtime::ZRuntime;

#[derive(Clone)]
pub struct TaskController {
Expand All @@ -42,7 +42,7 @@ impl Default for TaskController {
impl TaskController {
/// Spawns a task (similarly to task::spawn) that can be later terminated by call to terminate_all()
/// Task output is ignored
pub fn spawn<F, T>(&self, future: F)
pub fn spawn<F, T>(&self, future: F) -> JoinHandle<()>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
Expand All @@ -54,12 +54,12 @@ impl TaskController {
_ = future => {}
}
};
self.tracker.spawn(task);
self.tracker.spawn(task)
}

/// Spawns a task using a specified runtime (similarly to Runtime::spawn) that can be later terminated by call to terminate_all().
/// Task output is ignored.
pub fn spawn_with_rt<F, T>(&self, rt: ZRuntime, future: F)
pub fn spawn_with_rt<F, T>(&self, rt: ZRuntime, future: F) -> JoinHandle<()>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
Expand All @@ -71,7 +71,7 @@ impl TaskController {
_ = future => {}
}
};
self.tracker.spawn_on(task, &rt);
self.tracker.spawn_on(task, &rt)
}

pub fn get_cancellation_token(&self) -> CancellationToken {
Expand All @@ -80,31 +80,36 @@ impl TaskController {

/// Spawns a task that can be cancelled via cancelling a token obtained by get_cancellation_token().
/// It can be later terminated by call to terminate_all().
/// Task output is ignored.
pub fn spawn_cancellable<F, T>(&self, future: F)
pub fn spawn_cancellable<F, T>(&self, future: F) -> JoinHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
self.tracker.spawn(future);
self.tracker.spawn(future)
}

/// Spawns a task that can be cancelled via cancelling a token obtained by get_cancellation_token() using a specified runtime.
/// It can be later terminated by call to terminate_all().
/// Task output is ignored.
pub fn spawn_cancellable_with_rt<F, T>(&self, rt: ZRuntime, future: F)
pub fn spawn_cancellable_with_rt<F, T>(&self, rt: ZRuntime, future: F) -> JoinHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
self.tracker.spawn_on(future, &rt);
self.tracker.spawn_on(future, &rt)
}

/// Terminates all prevously spawned tasks
pub fn terminate_all(&self) {
self.tracker.close();
self.token.cancel();
let tracker = self.tracker.clone();
futures::executor::block_on( async move { tracker.wait().await } );
futures::executor::block_on(async move { tracker.wait().await });
}

/// Terminates all prevously spawned tasks
pub async fn terminate_all_async(&self) {
self.tracker.close();
self.token.cancel();
self.tracker.wait().await;
}
}
2 changes: 1 addition & 1 deletion io/zenoh-transport/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ pub struct TransportManager {
pub(crate) new_unicast_link_sender: NewLinkChannelSender,
#[cfg(feature = "stats")]
pub(crate) stats: Arc<crate::stats::TransportStats>,
task_controller: TaskController,
pub(crate) task_controller: TaskController,
}

impl TransportManager {
Expand Down
15 changes: 7 additions & 8 deletions io/zenoh-transport/src/multicast/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use zenoh_protocol::{
transport::{close, Join},
};
use zenoh_result::{bail, ZResult};
use zenoh_task::TaskController;
// use zenoh_util::{Timed, TimedEvent, TimedHandle, Timer};

/*************************************/
Expand Down Expand Up @@ -82,8 +83,8 @@ pub(crate) struct TransportMulticastInner {
pub(super) link: Arc<RwLock<Option<TransportLinkMulticastUniversal>>>,
// The callback
pub(super) callback: Arc<RwLock<Option<Arc<dyn TransportMulticastEventHandler>>>>,
// token for safe cancellation
token: CancellationToken,
// Task controller for safe task cancellation
task_controller: TaskController,
// Transport statistics
#[cfg(feature = "stats")]
pub(super) stats: Arc<TransportStats>,
Expand Down Expand Up @@ -115,7 +116,7 @@ impl TransportMulticastInner {
locator: config.link.link.get_dst().to_owned(),
link: Arc::new(RwLock::new(None)),
callback: Arc::new(RwLock::new(None)),
token: CancellationToken::new(),
task_controller: TaskController::default(),
#[cfg(feature = "stats")]
stats,
};
Expand Down Expand Up @@ -183,8 +184,7 @@ impl TransportMulticastInner {
cb.closed();
}

// TODO(yuyuan): use CancellationToken to unify the termination with the above
self.token.cancel();
self.task_controller.terminate_all_async().await;

Ok(())
}
Expand Down Expand Up @@ -369,7 +369,7 @@ impl TransportMulticastInner {
// TODO(yuyuan): refine the clone behaviors
let is_active = Arc::new(AtomicBool::new(false));
let c_is_active = is_active.clone();
let token = self.token.child_token();
let token = self.task_controller.get_cancellation_token();
let c_token = token.clone();
let c_self = self.clone();
let c_locator = locator.clone();
Expand All @@ -389,8 +389,7 @@ impl TransportMulticastInner {
let _ = c_self.del_peer(&c_locator, close::reason::EXPIRED);
};

// TODO(yuyuan): Put it into TaskTracker or store as JoinHandle
zenoh_runtime::ZRuntime::Acceptor.spawn(task);
self.task_controller.spawn_cancellable_with_rt(zenoh_runtime::ZRuntime::Acceptor, task);

// TODO(yuyuan): Integrate the above async task into TransportMulticastPeer
// Store the new peer
Expand Down
23 changes: 12 additions & 11 deletions io/zenoh-transport/src/unicast/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,17 +744,18 @@ impl TransportManager {

// Spawn a task to accept the link
let c_manager = self.clone();
zenoh_runtime::ZRuntime::Acceptor.spawn(async move {
if let Err(e) = tokio::time::timeout(
c_manager.config.unicast.accept_timeout,
super::establishment::accept::accept_link(link, &c_manager),
)
.await
{
log::debug!("{}", e);
}
incoming_counter.fetch_sub(1, SeqCst);
});
self.task_controller
.spawn_with_rt(zenoh_runtime::ZRuntime::Acceptor, async move {
if let Err(e) = tokio::time::timeout(
c_manager.config.unicast.accept_timeout,
super::establishment::accept::accept_link(link, &c_manager),
)
.await
{
log::debug!("{}", e);
}
incoming_counter.fetch_sub(1, SeqCst);
});
}
}

Expand Down
18 changes: 7 additions & 11 deletions zenoh/src/net/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ use futures::Future;
use std::any::Any;
use std::sync::{Arc, Weak};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use uhlc::{HLCBuilder, HLC};
use zenoh_link::{EndPoint, Link};
use zenoh_plugin_trait::{PluginStartArgs, StructVersion};
use zenoh_protocol::core::{Locator, WhatAmI, ZenohId};
use zenoh_protocol::network::NetworkMessage;
use zenoh_result::{bail, ZResult};
use zenoh_sync::get_mut_unchecked;
use zenoh_task::TaskController;
use zenoh_transport::{
multicast::TransportMulticast, unicast::TransportUnicast, TransportEventHandler,
TransportManager, TransportMulticastEventHandler, TransportPeer, TransportPeerEventHandler,
Expand All @@ -54,7 +54,7 @@ struct RuntimeState {
transport_handlers: std::sync::RwLock<Vec<Arc<dyn TransportEventHandler>>>,
locators: std::sync::RwLock<Vec<Locator>>,
hlc: Option<Arc<HLC>>,
token: CancellationToken,
task_controller: TaskController,
}

pub struct WeakRuntime {
Expand Down Expand Up @@ -130,7 +130,7 @@ impl Runtime {
transport_handlers: std::sync::RwLock::new(vec![]),
locators: std::sync::RwLock::new(vec![]),
hlc,
token: CancellationToken::new(),
task_controller: TaskController::default(),
}),
};
*handler.runtime.write().unwrap() = Runtime::downgrade(&runtime);
Expand Down Expand Up @@ -166,7 +166,7 @@ impl Runtime {
pub async fn close(&self) -> ZResult<()> {
log::trace!("Runtime::close())");
// TODO: Check this whether is able to terminate all spawned task by Runtime::spawn
self.state.token.cancel();
self.state.task_controller.terminate_all();
self.manager().close().await;
// clean up to break cyclic reference of self.state to itself
self.state.transport_handlers.write().unwrap().clear();
Expand All @@ -186,13 +186,9 @@ impl Runtime {
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let token = self.state.token.clone();
zenoh_runtime::ZRuntime::Net.spawn(async move {
tokio::select! {
_ = token.cancelled() => {}
_ = future => {}
}
})
self.state
.task_controller
.spawn_with_rt(zenoh_runtime::ZRuntime::Net, future)
}

pub(crate) fn router(&self) -> Arc<Router> {
Expand Down

0 comments on commit e12eedf

Please sign in to comment.