Skip to content

Commit

Permalink
Roll back to async-std TX
Browse files Browse the repository at this point in the history
  • Loading branch information
YuanYuYuan committed Oct 30, 2023
1 parent 4821577 commit 8319744
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 24 deletions.
3 changes: 2 additions & 1 deletion io/zenoh-transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ unstable = []

[dependencies]
async-executor = { workspace = true }
async-global-executor = { workspace = true, features = ["tokio"] }
async-global-executor = { workspace = true }
# async-global-executor = { workspace = true, features = ["tokio"] }
async-std = { workspace = true }
async-trait = { workspace = true }
tokio = { workspace = true, features = ["sync", "rt", "fs", "time", "macros", "rt-multi-thread"] }
Expand Down
63 changes: 48 additions & 15 deletions io/zenoh-transport/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,27 +316,60 @@ impl Default for TransportManagerBuilder {
}
}

#[derive(Clone)]
pub(crate) struct TransportExecutor {
pub runtime: tokio::runtime::Runtime,
executor: Arc<async_executor::Executor<'static>>,
sender: async_std::channel::Sender<()>,
}

impl TransportExecutor {
fn new(num_threads: usize) -> ZResult<Self> {
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::runtime::Builder;
let runtime = Builder::new_multi_thread()
.enable_time()
.worker_threads(num_threads)
.thread_name_fn(|| {
static ATOMIC_TX_THREAD_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_TX_THREAD_ID.fetch_add(1, Ordering::SeqCst);
format!("zenoh-tx-{}", id)
})
.build()?;
Ok(TransportExecutor { runtime })
fn new(num_threads: usize) -> Self {
let (sender, receiver) = async_std::channel::bounded(1);
let executor = Arc::new(async_executor::Executor::new());
for i in 0..num_threads {
let exec = executor.clone();
let recv = receiver.clone();
std::thread::Builder::new()
.name(format!("zenoh-tx-{}", i))
.spawn(move || async_std::task::block_on(exec.run(recv.recv())))
.unwrap();
}
Self { executor, sender }
}

async fn stop(&self) {
let _ = self.sender.send(()).await;
}

pub(crate) fn spawn<T: Send + 'static>(
&self,
future: impl core::future::Future<Output = T> + Send + 'static,
) -> async_executor::Task<T> {
self.executor.spawn(future)
}
}

// pub(crate) struct TransportExecutor {
// pub runtime: tokio::runtime::Runtime,
// }
//
// impl TransportExecutor {
// fn new(num_threads: usize) -> ZResult<Self> {
// use std::sync::atomic::{AtomicUsize, Ordering};
// use tokio::runtime::Builder;
// let runtime = Builder::new_multi_thread()
// .enable_time()
// .worker_threads(num_threads)
// .thread_name_fn(|| {
// static ATOMIC_TX_THREAD_ID: AtomicUsize = AtomicUsize::new(0);
// let id = ATOMIC_TX_THREAD_ID.fetch_add(1, Ordering::SeqCst);
// format!("zenoh-tx-{}", id)
// })
// .build()?;
// Ok(TransportExecutor { runtime })
// }
// }
//
#[derive(Clone)]
pub struct TransportManager {
pub config: Arc<TransportManagerConfig>,
Expand Down Expand Up @@ -368,7 +401,7 @@ impl TransportManager {
cipher: Arc::new(cipher),
locator_inspector: Default::default(),
new_unicast_link_sender,
tx_executor: Arc::new(TransportExecutor::new(tx_threads).unwrap()),
tx_executor: Arc::new(TransportExecutor::new(tx_threads)),
#[cfg(feature = "stats")]
stats: std::sync::Arc::new(crate::stats::TransportStats::default()),
};
Expand Down
8 changes: 5 additions & 3 deletions io/zenoh-transport/src/unicast/lowlatency/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ impl TransportUnicastLowlatency {
}

pub(super) fn start_keepalive(&self, executor: &TransportExecutor, keep_alive: Duration) {
let mut guard = async_global_executor::block_on(async { zasyncwrite!(self.handle_keepalive) });
// let mut guard = async_global_executor::block_on(async { zasyncwrite!(self.handle_keepalive) });
let mut guard = async_std::task::block_on(async { zasyncwrite!(self.handle_keepalive) });
let c_transport = self.clone();
let handle = executor.runtime.spawn(async move {
let handle = executor.spawn(async move {
let res = keepalive_task(
c_transport.link.clone(),
keep_alive,
Expand Down Expand Up @@ -125,7 +126,8 @@ impl TransportUnicastLowlatency {
drop(guard);

if let Some(handle) = handle {
handle.abort();
// handle.abort();
let _ = handle.cancel().await;
log::debug!("[{}] keepalive task stopped...", zid,);
}
}
Expand Down
3 changes: 2 additions & 1 deletion io/zenoh-transport/src/unicast/lowlatency/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ pub(crate) struct TransportUnicastLowlatency {
pub(super) stats: Arc<TransportStats>,

// The flags to stop TX/RX tasks
pub(crate) handle_keepalive: Arc<RwLock<Option<JoinHandle<()>>>>,
// pub(crate) handle_keepalive: Arc<RwLock<Option<JoinHandle<()>>>>,
pub(crate) handle_keepalive: Arc<RwLock<Option<async_executor::Task<()>>>>,
pub(crate) handle_rx: Arc<RwLock<Option<JoinHandle<()>>>>,
}

Expand Down
10 changes: 6 additions & 4 deletions io/zenoh-transport/src/unicast/universal/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ pub(super) struct TransportLinkUnicast {
// The transport this link is associated to
transport: TransportUnicastUniversal,
// The signals to stop TX/RX tasks
handle_tx: Option<Arc<tokio::task::JoinHandle<()>>>,
// handle_tx: Option<Arc<tokio::task::JoinHandle<()>>>,
handle_tx: Option<Arc<async_executor::Task<()>>>,
signal_rx: Signal,
handle_rx: Option<Arc<JoinHandle<()>>>,
}
Expand Down Expand Up @@ -115,7 +116,8 @@ impl TransportLinkUnicast {
// Spawn the TX task
let c_link = self.link.clone();
let c_transport = self.transport.clone();
let handle = executor.runtime.spawn(async move {
// let handle = executor.runtime.spawn(async move {
let handle = executor.spawn(async move {
let res = tx_task(
consumer,
c_link.clone(),
Expand Down Expand Up @@ -167,7 +169,7 @@ impl TransportLinkUnicast {
log::debug!("{}", e);
// Spawn a task to avoid a deadlock waiting for this same task
// to finish in the close() joining its handle
task::spawn(async move { c_transport.del_link(&c_link).await });
async_std::task::spawn(async move { c_transport.del_link(&c_link).await });
}
});
self.handle_rx = Some(Arc::new(handle));
Expand All @@ -191,7 +193,7 @@ impl TransportLinkUnicast {
if let Some(handle) = self.handle_tx.take() {
// SAFETY: it is safe to unwrap the Arc since we have the ownership of the whole link
let handle_tx = Arc::try_unwrap(handle).unwrap();
handle_tx.await?;
handle_tx.await;
}

self.link.close().await
Expand Down

0 comments on commit 8319744

Please sign in to comment.