Skip to content

Commit

Permalink
- rename spawn -> spawn_abortable, spawn_cancellable -> spawn, for mo…
Browse files Browse the repository at this point in the history
…re clarity;

- Some tasks now terminate in more graceful way
  • Loading branch information
DenisBiryukov91 committed Mar 25, 2024
1 parent 918b1f8 commit 1b4622f
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 59 deletions.
1 change: 1 addition & 0 deletions commons/zenoh-task/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@ description = "Internal crate for zenoh."
[dependencies]
tokio = { workspace = true, features = ["default", "sync"] }
futures = { workspace = true }
log = { workspace = true }
zenoh-runtime = { workspace = true }
tokio-util = { workspace = true, features = ["rt"] }
45 changes: 31 additions & 14 deletions commons/zenoh-task/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
//! [Click here for Zenoh's documentation](../zenoh/index.html)
use std::future::Future;
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
use zenoh_runtime::ZRuntime;
use futures::future::FutureExt;

#[derive(Clone)]
pub struct TaskController {
Expand All @@ -42,7 +44,7 @@ impl Default for TaskController {
impl TaskController {
/// Spawns a task that can be later terminated by call to [`TaskController::terminate_all()`].
/// Task output is ignored.
pub fn spawn<F, T>(&self, future: F) -> JoinHandle<()>
pub fn spawn_abortable<F, T>(&self, future: F) -> JoinHandle<()>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
Expand All @@ -59,7 +61,7 @@ impl TaskController {

/// Spawns a task using a specified runtime that can be later terminated by call to [`TaskController::terminate_all()`].
/// Task output is ignored.
pub fn spawn_with_rt<F, T>(&self, rt: ZRuntime, future: F) -> JoinHandle<()>
pub fn spawn_abortable_with_rt<F, T>(&self, rt: ZRuntime, future: F) -> JoinHandle<()>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
Expand All @@ -78,44 +80,59 @@ impl TaskController {
self.token.child_token()
}

/// Spawns a task that can be cancelled via cancelling a token obtained by [`TaskController::get_cancellation_token()`],
/// Spawns a task that can be cancelled via cancellation of a token obtained by [`TaskController::get_cancellation_token()`],
/// or that can run to completion in finite amount of time.
/// It can be later terminated by call to [`TaskController::terminate_all()`].
pub fn spawn_cancellable<F, T>(&self, future: F) -> JoinHandle<T>
pub fn spawn<F, T>(&self, future: F) -> JoinHandle<()>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
self.tracker.spawn(future)
self.tracker.spawn(future.map(|_f| ()))
}

/// Spawns a task that can be cancelled via cancelling a token obtained by [`TaskController::get_cancellation_token()`],
/// Spawns a task that can be cancelled via cancellation of a token obtained by [`TaskController::get_cancellation_token()`],
/// or that can run to completion in finite amount of time, using a specified runtime.
/// It can be later aborted by call to [`TaskController::terminate_all()`].
pub fn spawn_cancellable_with_rt<F, T>(&self, rt: ZRuntime, future: F) -> JoinHandle<T>
pub fn spawn_with_rt<F, T>(&self, rt: ZRuntime, future: F) -> JoinHandle<()>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
self.tracker.spawn_on(future, &rt)
self.tracker.spawn_on(future.map(|_f| ()), &rt)
}

/// Terminates all prevously spawned tasks
/// The caller must ensure that all tasks spawned with [`TaskController::spawn_cancellable()`]
/// or [`TaskController::spawn_cancellable_with_rt()`] can yield in finite amount of time either because they will run to completion
/// The caller must ensure that all tasks spawned with [`TaskController::spawn()`]
/// or [`TaskController::spawn_with_rt()`] can yield in finite amount of time either because they will run to completion
/// or due to cancellation of token acquired via [`TaskController::get_cancellation_token()`].
/// Tasks spawned with [`TaskController::spawn()`] or [`TaskController::spawn_with_rt()`] will be aborted (i.e. terminated upon next await call).
/// Tasks spawned with [`TaskController::spawn_abortable()`] or [`TaskController::spawn_abortable_with_rt()`] will be aborted (i.e. terminated upon next await call).
pub fn terminate_all(&self) {
self.tracker.close();
self.token.cancel();
let tracker = self.tracker.clone();
futures::executor::block_on(async move { tracker.wait().await });
let task = async move {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(10)) => {
log::error!("Failed to terminate {} tasks", self.tracker.len());
}
_ = self.tracker.wait() => {}
}
};
let _ = futures::executor::block_on(task);
}

/// Async version of [`TaskController::terminate_all()`].
pub async fn terminate_all_async(&self) {
self.tracker.close();
self.token.cancel();
self.tracker.wait().await;
let task = async move {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(10)) => {
log::error!("Failed to terminate {} tasks", self.tracker.len());
}
_ = self.tracker.wait() => {}
}
};
task.await;
}
}
12 changes: 9 additions & 3 deletions io/zenoh-transport/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,14 +347,20 @@ impl TransportManager {
};

// @TODO: this should be moved into the unicast module
let cancellation_token = this.task_controller.get_cancellation_token();
this.task_controller
.spawn_with_rt(zenoh_runtime::ZRuntime::Net, {
let this = this.clone();
async move {
loop {
if let Ok(link) = new_unicast_link_receiver.recv_async().await {
this.handle_new_link_unicast(link).await;
}
tokio::select! {
res = new_unicast_link_receiver.recv_async() => {
if let Ok(link) = res {
this.handle_new_link_unicast(link).await;
}
}
_ = cancellation_token.cancelled() => { break; }
}
}
}
});
Expand Down
2 changes: 1 addition & 1 deletion io/zenoh-transport/src/multicast/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ impl TransportMulticastInner {
};

self.task_controller
.spawn_cancellable_with_rt(zenoh_runtime::ZRuntime::Acceptor, task);
.spawn_with_rt(zenoh_runtime::ZRuntime::Acceptor, task);

// TODO(yuyuan): Integrate the above async task into TransportMulticastPeer
// Store the new peer
Expand Down
40 changes: 34 additions & 6 deletions zenoh/src/net/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::GIT_VERSION;
pub use adminspace::AdminSpace;
use futures::stream::StreamExt;
use futures::Future;
use tokio_util::sync::CancellationToken;
use std::any::Any;
use std::sync::{Arc, Weak};
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -137,15 +138,26 @@ impl Runtime {
get_mut_unchecked(&mut runtime.state.router.clone()).init_link_state(runtime.clone());

let receiver = config.subscribe();
let token = runtime.get_cancellation_token();
runtime.spawn({
let runtime2 = runtime.clone();
async move {
let mut stream = receiver.into_stream();
while let Some(event) = stream.next().await {
if &*event == "connect/endpoints" {
if let Err(e) = runtime2.update_peers().await {
log::error!("Error updating peers: {}", e);
loop {
tokio::select! {
res = stream.next() => {
match res {
Some(event) => {
if &*event == "connect/endpoints" {
if let Err(e) = runtime2.update_peers().await {
log::error!("Error updating peers: {}", e);
}
}
},
None => { break; }
}
}
_ = token.cancelled() => { break; }
}
}
}
Expand Down Expand Up @@ -174,7 +186,7 @@ impl Runtime {
// due to not freed resource Arc, that apparently happens because
// the task responsible for resource clean up was aborted earlier than expected.
// This should be resolved by identfying correspodning task, and placing
// cancelaltion token manually inside it.
// cancellation token manually inside it.
self.router()
.tables
.tables
Expand All @@ -194,7 +206,7 @@ impl Runtime {
}

/// Spawns a task within runtime.
/// Upon runtime close the task will be automatically aborted.
/// Upon close runtime will block until this task completes
pub(crate) fn spawn<F, T>(&self, future: F) -> JoinHandle<()>
where
F: Future<Output = T> + Send + 'static,
Expand All @@ -205,6 +217,18 @@ impl Runtime {
.spawn_with_rt(zenoh_runtime::ZRuntime::Net, future)
}

/// Spawns a task within runtime.
/// Upon runtime close the task will be automatically aborted.
pub(crate) fn spawn_abortable<F, T>(&self, future: F) -> JoinHandle<()>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
self.state
.task_controller
.spawn_abortable_with_rt(zenoh_runtime::ZRuntime::Net, future)
}

pub(crate) fn router(&self) -> Arc<Router> {
self.state.router.clone()
}
Expand All @@ -230,6 +254,10 @@ impl Runtime {
state: Arc::downgrade(&this.state),
}
}

pub fn get_cancellation_token(&self) -> CancellationToken {
self.state.task_controller.get_cancellation_token()
}
}

struct RuntimeTransportEventHandler {
Expand Down
75 changes: 40 additions & 35 deletions zenoh/src/net/runtime/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,20 +220,20 @@ impl Runtime {
let this = self.clone();
match (listen, autoconnect.is_empty()) {
(true, false) => {
self.spawn(async move {
self.spawn_abortable(async move {
tokio::select! {
_ = this.responder(&mcast_socket, &sockets) => {},
_ = this.connect_all(&sockets, autoconnect, &addr) => {},
}
});
}
(true, true) => {
self.spawn(async move {
self.spawn_abortable(async move {
this.responder(&mcast_socket, &sockets).await;
});
}
(false, false) => {
self.spawn(
self.spawn_abortable(
async move { this.connect_all(&sockets, autoconnect, &addr).await },
);
}
Expand Down Expand Up @@ -479,43 +479,44 @@ impl Runtime {

async fn peer_connector(&self, peer: EndPoint) {
let mut delay = CONNECTION_RETRY_INITIAL_PERIOD;
let cancellation_token = self.get_cancellation_token();
loop {
log::trace!("Trying to connect to configured peer {}", peer);
let endpoint = peer.clone();
match tokio::time::timeout(
CONNECTION_TIMEOUT,
self.manager().open_transport_unicast(endpoint),
)
.await
{
Ok(Ok(transport)) => {
log::debug!("Successfully connected to configured peer {}", peer);
if let Ok(Some(orch_transport)) = transport.get_callback() {
if let Some(orch_transport) = orch_transport
.as_any()
.downcast_ref::<super::RuntimeSession>()
{
*zwrite!(orch_transport.endpoint) = Some(peer);
tokio::select! {
res = tokio::time::timeout(CONNECTION_TIMEOUT, self.manager().open_transport_unicast(endpoint)) => {
match res {
Ok(Ok(transport)) => {
log::debug!("Successfully connected to configured peer {}", peer);
if let Ok(Some(orch_transport)) = transport.get_callback() {
if let Some(orch_transport) = orch_transport
.as_any()
.downcast_ref::<super::RuntimeSession>()
{
*zwrite!(orch_transport.endpoint) = Some(peer);
}
}
break;
}
Ok(Err(e)) => {
log::debug!(
"Unable to connect to configured peer {}! {}. Retry in {:?}.",
peer,
e,
delay
);
}
Err(e) => {
log::debug!(
"Unable to connect to configured peer {}! {}. Retry in {:?}.",
peer,
e,
delay
);
}
}
break;
}
Ok(Err(e)) => {
log::debug!(
"Unable to connect to configured peer {}! {}. Retry in {:?}.",
peer,
e,
delay
);
}
Err(e) => {
log::debug!(
"Unable to connect to configured peer {}! {}. Retry in {:?}.",
peer,
e,
delay
);
}
_ = cancellation_token.cancelled() => { break; }
}
tokio::time::sleep(delay).await;
delay *= CONNECTION_RETRY_PERIOD_INCREASE_FACTOR;
Expand Down Expand Up @@ -842,10 +843,14 @@ impl Runtime {
match session.runtime.whatami() {
WhatAmI::Client => {
let runtime = session.runtime.clone();
let cancellation_token = runtime.get_cancellation_token();
session.runtime.spawn(async move {
let mut delay = CONNECTION_RETRY_INITIAL_PERIOD;
while runtime.start_client().await.is_err() {
tokio::time::sleep(delay).await;
tokio::select! {
_ = tokio::time::sleep(delay) => {}
_ = cancellation_token.cancelled() => { break; }
}
delay *= CONNECTION_RETRY_PERIOD_INCREASE_FACTOR;
if delay > CONNECTION_RETRY_MAX_PERIOD {
delay = CONNECTION_RETRY_MAX_PERIOD;
Expand Down

0 comments on commit 1b4622f

Please sign in to comment.