From 83197440b63b1a6071f60c61baa4822785fac8b7 Mon Sep 17 00:00:00 2001 From: yuanyuyuan Date: Mon, 30 Oct 2023 16:10:34 +0800 Subject: [PATCH] Roll back to async-std TX --- io/zenoh-transport/Cargo.toml | 3 +- io/zenoh-transport/src/manager.rs | 63 ++++++++++++++----- .../src/unicast/lowlatency/link.rs | 8 ++- .../src/unicast/lowlatency/transport.rs | 3 +- .../src/unicast/universal/link.rs | 10 +-- 5 files changed, 63 insertions(+), 24 deletions(-) diff --git a/io/zenoh-transport/Cargo.toml b/io/zenoh-transport/Cargo.toml index 300575fbf7..82636243f5 100644 --- a/io/zenoh-transport/Cargo.toml +++ b/io/zenoh-transport/Cargo.toml @@ -49,7 +49,8 @@ unstable = [] [dependencies] async-executor = { workspace = true } -async-global-executor = { workspace = true, features = ["tokio"] } +async-global-executor = { workspace = true } +# async-global-executor = { workspace = true, features = ["tokio"] } async-std = { workspace = true } async-trait = { workspace = true } tokio = { workspace = true, features = ["sync", "rt", "fs", "time", "macros", "rt-multi-thread"] } diff --git a/io/zenoh-transport/src/manager.rs b/io/zenoh-transport/src/manager.rs index ccc7407ca0..62d4d24111 100644 --- a/io/zenoh-transport/src/manager.rs +++ b/io/zenoh-transport/src/manager.rs @@ -316,27 +316,60 @@ impl Default for TransportManagerBuilder { } } +#[derive(Clone)] pub(crate) struct TransportExecutor { - pub runtime: tokio::runtime::Runtime, + executor: Arc>, + sender: async_std::channel::Sender<()>, } impl TransportExecutor { - fn new(num_threads: usize) -> ZResult { - use std::sync::atomic::{AtomicUsize, Ordering}; - use tokio::runtime::Builder; - let runtime = Builder::new_multi_thread() - .enable_time() - .worker_threads(num_threads) - .thread_name_fn(|| { - static ATOMIC_TX_THREAD_ID: AtomicUsize = AtomicUsize::new(0); - let id = ATOMIC_TX_THREAD_ID.fetch_add(1, Ordering::SeqCst); - format!("zenoh-tx-{}", id) - }) - .build()?; - Ok(TransportExecutor { runtime }) + fn new(num_threads: usize) -> Self { + let (sender, receiver) = async_std::channel::bounded(1); + let executor = Arc::new(async_executor::Executor::new()); + for i in 0..num_threads { + let exec = executor.clone(); + let recv = receiver.clone(); + std::thread::Builder::new() + .name(format!("zenoh-tx-{}", i)) + .spawn(move || async_std::task::block_on(exec.run(recv.recv()))) + .unwrap(); + } + Self { executor, sender } + } + + async fn stop(&self) { + let _ = self.sender.send(()).await; + } + + pub(crate) fn spawn( + &self, + future: impl core::future::Future + Send + 'static, + ) -> async_executor::Task { + self.executor.spawn(future) } } +// pub(crate) struct TransportExecutor { +// pub runtime: tokio::runtime::Runtime, +// } +// +// impl TransportExecutor { +// fn new(num_threads: usize) -> ZResult { +// use std::sync::atomic::{AtomicUsize, Ordering}; +// use tokio::runtime::Builder; +// let runtime = Builder::new_multi_thread() +// .enable_time() +// .worker_threads(num_threads) +// .thread_name_fn(|| { +// static ATOMIC_TX_THREAD_ID: AtomicUsize = AtomicUsize::new(0); +// let id = ATOMIC_TX_THREAD_ID.fetch_add(1, Ordering::SeqCst); +// format!("zenoh-tx-{}", id) +// }) +// .build()?; +// Ok(TransportExecutor { runtime }) +// } +// } +// #[derive(Clone)] pub struct TransportManager { pub config: Arc, @@ -368,7 +401,7 @@ impl TransportManager { cipher: Arc::new(cipher), locator_inspector: Default::default(), new_unicast_link_sender, - tx_executor: Arc::new(TransportExecutor::new(tx_threads).unwrap()), + tx_executor: Arc::new(TransportExecutor::new(tx_threads)), #[cfg(feature = "stats")] stats: std::sync::Arc::new(crate::stats::TransportStats::default()), }; diff --git a/io/zenoh-transport/src/unicast/lowlatency/link.rs b/io/zenoh-transport/src/unicast/lowlatency/link.rs index 315583b677..41c0d0520b 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/link.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/link.rs @@ -90,9 +90,10 @@ impl TransportUnicastLowlatency { } pub(super) fn start_keepalive(&self, executor: &TransportExecutor, keep_alive: Duration) { - let mut guard = async_global_executor::block_on(async { zasyncwrite!(self.handle_keepalive) }); + // let mut guard = async_global_executor::block_on(async { zasyncwrite!(self.handle_keepalive) }); + let mut guard = async_std::task::block_on(async { zasyncwrite!(self.handle_keepalive) }); let c_transport = self.clone(); - let handle = executor.runtime.spawn(async move { + let handle = executor.spawn(async move { let res = keepalive_task( c_transport.link.clone(), keep_alive, @@ -125,7 +126,8 @@ impl TransportUnicastLowlatency { drop(guard); if let Some(handle) = handle { - handle.abort(); + // handle.abort(); + let _ = handle.cancel().await; log::debug!("[{}] keepalive task stopped...", zid,); } } diff --git a/io/zenoh-transport/src/unicast/lowlatency/transport.rs b/io/zenoh-transport/src/unicast/lowlatency/transport.rs index 485ac9a91f..c711859573 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/transport.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/transport.rs @@ -63,7 +63,8 @@ pub(crate) struct TransportUnicastLowlatency { pub(super) stats: Arc, // The flags to stop TX/RX tasks - pub(crate) handle_keepalive: Arc>>>, + // pub(crate) handle_keepalive: Arc>>>, + pub(crate) handle_keepalive: Arc>>>, pub(crate) handle_rx: Arc>>>, } diff --git a/io/zenoh-transport/src/unicast/universal/link.rs b/io/zenoh-transport/src/unicast/universal/link.rs index f71c814ed6..a8cdcc19bd 100644 --- a/io/zenoh-transport/src/unicast/universal/link.rs +++ b/io/zenoh-transport/src/unicast/universal/link.rs @@ -66,7 +66,8 @@ pub(super) struct TransportLinkUnicast { // The transport this link is associated to transport: TransportUnicastUniversal, // The signals to stop TX/RX tasks - handle_tx: Option>>, + // handle_tx: Option>>, + handle_tx: Option>>, signal_rx: Signal, handle_rx: Option>>, } @@ -115,7 +116,8 @@ impl TransportLinkUnicast { // Spawn the TX task let c_link = self.link.clone(); let c_transport = self.transport.clone(); - let handle = executor.runtime.spawn(async move { + // let handle = executor.runtime.spawn(async move { + let handle = executor.spawn(async move { let res = tx_task( consumer, c_link.clone(), @@ -167,7 +169,7 @@ impl TransportLinkUnicast { log::debug!("{}", e); // Spawn a task to avoid a deadlock waiting for this same task // to finish in the close() joining its handle - task::spawn(async move { c_transport.del_link(&c_link).await }); + async_std::task::spawn(async move { c_transport.del_link(&c_link).await }); } }); self.handle_rx = Some(Arc::new(handle)); @@ -191,7 +193,7 @@ impl TransportLinkUnicast { if let Some(handle) = self.handle_tx.take() { // SAFETY: it is safe to unwrap the Arc since we have the ownership of the whole link let handle_tx = Arc::try_unwrap(handle).unwrap(); - handle_tx.await?; + handle_tx.await; } self.link.close().await