diff --git a/Cargo.lock b/Cargo.lock index 8370dfd1c1..cc986c519d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4990,6 +4990,7 @@ dependencies = [ "async-trait", "env_logger", "flume", + "futures-util", "log", "lz4_flex", "panic-message", diff --git a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs index 441864976d..1b1baea1be 100644 --- a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs @@ -15,8 +15,8 @@ use crate::config; #[cfg(not(target_os = "macos"))] use advisory_lock::{AdvisoryFileLock, FileLockMode}; use async_io::Async; -use async_std::fs::remove_file; -use async_std::task::JoinHandle; +use tokio::fs::remove_file; +use tokio::task::JoinHandle; use async_trait::async_trait; use filepath::FilePath; use nix::unistd::unlink; @@ -29,6 +29,7 @@ use std::io::{Read, Write}; use std::sync::Arc; use zenoh_core::{zasyncread, zasyncwrite}; use zenoh_protocol::core::{EndPoint, Locator}; +use std::io::ErrorKind; use unix_named_pipe::{create, open_read, open_write}; @@ -111,7 +112,7 @@ impl PipeR { let result = self .pipe .read_with_mut(|pipe| match pipe.read(&mut buf[..]) { - Ok(0) => Err(async_std::io::ErrorKind::WouldBlock.into()), + Ok(0) => Err(ErrorKind::WouldBlock.into()), Ok(val) => Ok(val), Err(e) => Err(e), }) @@ -123,13 +124,13 @@ impl PipeR { let mut r: usize = 0; self.pipe .read_with_mut(|pipe| match pipe.read(&mut buf[r..]) { - Ok(0) => Err(async_std::io::ErrorKind::WouldBlock.into()), + Ok(0) => Err(ErrorKind::WouldBlock.into()), Ok(val) => { r += val; if r == buf.len() { return Ok(()); } - Err(async_std::io::ErrorKind::WouldBlock.into()) + Err(ErrorKind::WouldBlock.into()) } Err(e) => Err(e), }) @@ -184,7 +185,7 @@ impl PipeW { let result = self .pipe .write_with_mut(|pipe| match pipe.write(buf) { - Ok(0) => Err(async_std::io::ErrorKind::WouldBlock.into()), + Ok(0) => Err(ErrorKind::WouldBlock.into()), Ok(val) => Ok(val), Err(e) => Err(e), }) @@ -196,13 +197,13 @@ impl PipeW { let mut r: usize = 0; self.pipe .write_with_mut(|pipe| match pipe.write(&buf[r..]) { - Ok(0) => Err(async_std::io::ErrorKind::WouldBlock.into()), + Ok(0) => Err(ErrorKind::WouldBlock.into()), Ok(val) => { r += val; if r == buf.len() { return Ok(()); } - Err(async_std::io::ErrorKind::WouldBlock.into()) + Err(ErrorKind::WouldBlock.into()) } Err(e) => Err(e), }) @@ -289,7 +290,7 @@ impl UnicastPipeListener { let mut request_channel = PipeR::new(&path_uplink, access_mode).await?; // create listening task - let listening_task_handle = async_std::task::spawn(async move { + let listening_task_handle = tokio::task::spawn(async move { loop { let _ = handle_incoming_connections( &endpoint, @@ -309,8 +310,8 @@ impl UnicastPipeListener { }) } - async fn stop_listening(self) { - self.listening_task_handle.cancel().await; + fn stop_listening(self) { + self.listening_task_handle.abort(); } } @@ -549,7 +550,7 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastPipe { let removed = zasyncwrite!(self.listeners).remove(endpoint); match removed { Some(val) => { - val.stop_listening().await; + val.stop_listening(); Ok(()) } None => bail!("No listener found for endpoint {}", endpoint), @@ -557,14 +558,14 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastPipe { } fn get_listeners(&self) -> Vec { - async_std::task::block_on(async { zasyncread!(self.listeners) }) + tokio::runtime::Handle::current().block_on(async { zasyncread!(self.listeners) }) .keys() .cloned() .collect() } fn get_locators(&self) -> Vec { - async_std::task::block_on(async { zasyncread!(self.listeners) }) + tokio::runtime::Handle::current().block_on(async { zasyncread!(self.listeners) }) .values() .map(|v| v.uplink_locator.clone()) .collect() diff --git a/io/zenoh-transport/Cargo.toml b/io/zenoh-transport/Cargo.toml index a259a714ee..300575fbf7 100644 --- a/io/zenoh-transport/Cargo.toml +++ b/io/zenoh-transport/Cargo.toml @@ -52,7 +52,7 @@ async-executor = { workspace = true } async-global-executor = { workspace = true, features = ["tokio"] } async-std = { workspace = true } async-trait = { workspace = true } -tokio = { workspace = true, features = ["sync", "rt", "fs"] } +tokio = { workspace = true, features = ["sync", "rt", "fs", "time", "macros", "rt-multi-thread"] } flume = { workspace = true } log = { workspace = true } lz4_flex = { workspace = true } @@ -76,6 +76,7 @@ zenoh-sync = { workspace = true } zenoh-util = { workspace = true } [dev-dependencies] +futures-util = { workspace = true } env_logger = { workspace = true } panic-message = { workspace = true } zenoh-protocol = { workspace = true, features = ["test"] } diff --git a/io/zenoh-transport/src/common/pipeline.rs b/io/zenoh-transport/src/common/pipeline.rs index e295b5bb6e..f523e7b02d 100644 --- a/io/zenoh-transport/src/common/pipeline.rs +++ b/io/zenoh-transport/src/common/pipeline.rs @@ -57,7 +57,8 @@ impl StageInRefill { } fn wait(&self) -> bool { - self.n_ref_r.recv().is_ok() + let res = self.n_ref_r.recv().is_ok(); + res } } @@ -170,7 +171,9 @@ impl StageIn { // Attempt the serialization on the current batch let e = match batch.encode(&*msg) { Ok(_) => zretok!(batch), - Err(e) => e, + Err(e) => { + e + } }; // Lock the channel. We are the only one that will be writing on it. @@ -604,7 +607,8 @@ impl TransmissionPipelineProducer { }; // Lock the channel. We are the only one that will be writing on it. let mut queue = zlock!(self.stage_in[idx]); - queue.push_network_message(&mut msg, priority) + let res = queue.push_network_message(&mut msg, priority); + res } #[inline] @@ -657,7 +661,9 @@ impl TransmissionPipelineConsumer { bo = b; } } - Pull::None => {} + Pull::None => { + + } } } @@ -701,7 +707,6 @@ impl TransmissionPipelineConsumer { #[cfg(test)] mod tests { use super::*; - use async_std::{prelude::FutureExt, task}; use std::{ convert::TryFrom, sync::{ @@ -710,6 +715,8 @@ mod tests { }, time::{Duration, Instant}, }; + use tokio::task; + use tokio::time::timeout; use zenoh_buffers::{ reader::{DidntRead, HasReader}, ZBuf, @@ -721,9 +728,10 @@ mod tests { transport::{BatchSize, Fragment, Frame, TransportBody, TransportSn}, zenoh::{PushBody, Put}, }; + use zenoh_result::ZResult; const SLEEP: Duration = Duration::from_millis(100); - const TIMEOUT: Duration = Duration::from_secs(60); + const TIMEOUT: Duration = Duration::from_secs(10); const CONFIG: TransmissionPipelineConf = TransmissionPipelineConf { is_streamed: true, @@ -732,8 +740,8 @@ mod tests { backoff: Duration::from_micros(1), }; - #[test] - fn tx_pipeline_flow() { + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn tx_pipeline_flow() -> ZResult<()> { fn schedule(queue: TransmissionPipelineProducer, num_msg: usize, payload_size: usize) { // Send reliable messages let key = "test".into(); @@ -766,6 +774,7 @@ mod tests { ); queue.push_network_message(message.clone()); } + println!("schedule is done."); } async fn consume(mut queue: TransmissionPipelineConsumer, num_msg: usize) { @@ -804,7 +813,9 @@ mod tests { } println!("Pipeline Flow [<<<]: Pulled {} msgs", msgs + 1); } - Err(_) => break, + Err(_) => { + break; + } } } println!("Pipeline Flow [+++]: Refill {} msgs", msgs + 1); @@ -818,7 +829,7 @@ mod tests { } // Pipeline priorities - let tct = TransportPriorityTx::make(Bits::from(TransportSn::MAX)).unwrap(); + let tct = TransportPriorityTx::make(Bits::from(TransportSn::MAX))?; let priorities = vec![tct]; // Total amount of bytes to send in each test @@ -827,37 +838,37 @@ mod tests { // Payload size of the messages let payload_sizes = [8, 64, 512, 4_096, 8_192, 32_768, 262_144, 2_097_152]; - task::block_on(async { - for ps in payload_sizes.iter() { - if u64::try_from(*ps).is_err() { - break; - } + for ps in payload_sizes.iter() { + if u64::try_from(*ps).is_err() { + break; + } - // Compute the number of messages to send - let num_msg = max_msgs.min(bytes / ps); + // Compute the number of messages to send + let num_msg = max_msgs.min(bytes / ps); - let (producer, consumer) = TransmissionPipeline::make( - TransmissionPipelineConf::default(), - priorities.as_slice(), - ); + let (producer, consumer) = TransmissionPipeline::make( + TransmissionPipelineConf::default(), + priorities.as_slice(), + ); - let t_c = task::spawn(async move { - consume(consumer, num_msg).await; - }); + let t_c = task::spawn(async move { + consume(consumer, num_msg).await; + }); - let c_ps = *ps; - let t_s = task::spawn(async move { - schedule(producer, num_msg, c_ps); - }); + let c_ps = *ps; + let t_s = task::spawn_blocking(move || { + schedule(producer, num_msg, c_ps); + }); - let res = t_c.join(t_s).timeout(TIMEOUT).await; - assert!(res.is_ok()); - } - }); + let res = timeout(TIMEOUT, futures_util::future::join(t_c, t_s)).await; + assert!(res.is_ok()); + } + + Ok(()) } - #[test] - fn tx_pipeline_blocking() { + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn tx_pipeline_blocking() -> ZResult<()> { fn schedule(queue: TransmissionPipelineProducer, counter: Arc, id: usize) { // Make sure to put only one message per batch: set the payload size // to half of the batch in such a way the serialized zenoh message @@ -903,7 +914,7 @@ mod tests { } // Pipeline - let tct = TransportPriorityTx::make(Bits::from(TransportSn::MAX)).unwrap(); + let tct = TransportPriorityTx::make(Bits::from(TransportSn::MAX))?; let priorities = vec![tct]; let (producer, mut consumer) = TransmissionPipeline::make(TransmissionPipelineConf::default(), priorities.as_slice()); @@ -921,34 +932,36 @@ mod tests { schedule(producer, c_counter, 2); }); - task::block_on(async { - // Wait to have sent enough messages and to have blocked - println!( - "Pipeline Blocking [---]: waiting to have {} messages being scheduled", - CONFIG.queue_size[Priority::MAX as usize] - ); - let check = async { - while counter.load(Ordering::Acquire) < CONFIG.queue_size[Priority::MAX as usize] { - task::sleep(SLEEP).await; - } - }; - check.timeout(TIMEOUT).await.unwrap(); + // Wait to have sent enough messages and to have blocked + println!( + "Pipeline Blocking [---]: waiting to have {} messages being scheduled", + CONFIG.queue_size[Priority::MAX as usize] + ); + let check = async { + while counter.load(Ordering::Acquire) < CONFIG.queue_size[Priority::MAX as usize] { + tokio::time::sleep(SLEEP).await; + } + }; + + timeout(TIMEOUT, check).await?; - // Disable and drain the queue + // Disable and drain the queue + timeout( + TIMEOUT, task::spawn_blocking(move || { println!("Pipeline Blocking [---]: draining the queue"); let _ = consumer.drain(); - }) - .timeout(TIMEOUT) - .await - .unwrap(); - - // Make sure that the tasks scheduling have been unblocked - println!("Pipeline Blocking [---]: waiting for schedule (1) to be unblocked"); - h1.timeout(TIMEOUT).await.unwrap(); - println!("Pipeline Blocking [---]: waiting for schedule (2) to be unblocked"); - h2.timeout(TIMEOUT).await.unwrap(); - }); + }), + ) + .await??; + + // Make sure that the tasks scheduling have been unblocked + println!("Pipeline Blocking [---]: waiting for schedule (1) to be unblocked"); + timeout(TIMEOUT, h1).await??; + println!("Pipeline Blocking [---]: waiting for schedule (2) to be unblocked"); + timeout(TIMEOUT, h2).await??; + + Ok(()) } #[test] @@ -1014,7 +1027,7 @@ mod tests { } }); - task::block_on(async { + tokio::runtime::Handle::current().block_on(async { let mut prev_size: usize = usize::MAX; loop { let received = count.swap(0, Ordering::AcqRel); @@ -1024,7 +1037,7 @@ mod tests { println!("{} bytes: {:.6} Gbps", current, 2.0 * thr); } prev_size = current; - task::sleep(Duration::from_millis(500)).await; + tokio::time::sleep(Duration::from_millis(500)).await; } }); } diff --git a/io/zenoh-transport/src/multicast/link.rs b/io/zenoh-transport/src/multicast/link.rs index 35bb075734..6ad091beff 100644 --- a/io/zenoh-transport/src/multicast/link.rs +++ b/io/zenoh-transport/src/multicast/link.rs @@ -13,18 +13,16 @@ // use super::common::{pipeline::TransmissionPipeline, priority::TransportPriorityTx}; use super::transport::TransportMulticastInner; -use crate::common::batch::WBatch; use crate::common::pipeline::{ TransmissionPipelineConf, TransmissionPipelineConsumer, TransmissionPipelineProducer, }; #[cfg(feature = "stats")] use crate::stats::TransportStats; -use async_std::prelude::FutureExt; use std::convert::TryInto; use std::sync::Arc; use std::time::{Duration, Instant}; -use tokio::task; use tokio::task::JoinHandle; +use tokio::{select, task}; use zenoh_buffers::ZSlice; use zenoh_core::zlock; use zenoh_link::{LinkMulticast, Locator}; @@ -208,55 +206,68 @@ async fn tx_task( mut last_sns: Vec, #[cfg(feature = "stats")] stats: Arc, ) -> ZResult<()> { - enum Action { - Pull((WBatch, usize)), - Join, - Stop, - } - - async fn pull(pipeline: &mut TransmissionPipelineConsumer) -> Action { - match pipeline.pull().await { - Some(sb) => Action::Pull(sb), - None => Action::Stop, - } - } - async fn join(last_join: Instant, join_interval: Duration) -> Action { + async fn join(last_join: Instant, join_interval: Duration) { let now = Instant::now(); let target = last_join + join_interval; if now < target { let left = target - now; tokio::time::sleep(left).await; } - Action::Join } let mut last_join = Instant::now().checked_sub(config.join_interval).unwrap(); loop { - match pull(&mut pipeline) - .race(join(last_join, config.join_interval)) - .await - { - Action::Pull((batch, priority)) => { - // Send the buffer on the link - let bytes = batch.as_bytes(); - link.write_all(bytes).await?; - // Keep track of next SNs - if let Some(sn) = batch.latest_sn.reliable { - last_sns[priority].reliable = sn; - } - if let Some(sn) = batch.latest_sn.best_effort { - last_sns[priority].best_effort = sn; - } - #[cfg(feature = "stats")] - { - stats.inc_tx_t_msgs(batch.stats.t_msgs); - stats.inc_tx_bytes(bytes.len()); + select! { + res = pipeline.pull() => { + match res { + Some((batch, priority)) => { + // Send the buffer on the link + let bytes = batch.as_bytes(); + link.write_all(bytes).await?; + // Keep track of next SNs + if let Some(sn) = batch.latest_sn.reliable { + last_sns[priority].reliable = sn; + } + if let Some(sn) = batch.latest_sn.best_effort { + last_sns[priority].best_effort = sn; + } + #[cfg(feature = "stats")] + { + stats.inc_tx_t_msgs(batch.stats.t_msgs); + stats.inc_tx_bytes(bytes.len()); + } + // Reinsert the batch into the queue + pipeline.refill(batch, priority); + + } + None => { + // Drain the transmission pipeline and write remaining bytes on the wire + let mut batches = pipeline.drain(); + for (b, _) in batches.drain(..) { + tokio::time::timeout(config.join_interval, link.write_all(b.as_bytes())) + .await + .map_err(|_| { + zerror!( + "{}: flush failed after {} ms", + link, + config.join_interval.as_millis() + ) + })??; + + #[cfg(feature = "stats")] + { + stats.inc_tx_t_msgs(b.stats.t_msgs); + stats.inc_tx_bytes(b.len() as usize); + } + } + break; + } + } - // Reinsert the batch into the queue - pipeline.refill(batch, priority); } - Action::Join => { + + _ = join(last_join, config.join_interval) => { let next_sns = last_sns .iter() .map(|c| PrioritySn { @@ -293,29 +304,7 @@ async fn tx_task( } last_join = Instant::now(); - } - Action::Stop => { - // Drain the transmission pipeline and write remaining bytes on the wire - let mut batches = pipeline.drain(); - for (b, _) in batches.drain(..) { - link.write_all(b.as_bytes()) - .timeout(config.join_interval) - .await - .map_err(|_| { - zerror!( - "{}: flush failed after {} ms", - link, - config.join_interval.as_millis() - ) - })??; - #[cfg(feature = "stats")] - { - stats.inc_tx_t_msgs(b.stats.t_msgs); - stats.inc_tx_bytes(b.len() as usize); - } - } - break; } } } @@ -330,19 +319,9 @@ async fn rx_task( rx_buffer_size: usize, batch_size: BatchSize, ) -> ZResult<()> { - enum Action { - Read((usize, Locator)), - Stop, - } - - async fn read(link: &LinkMulticast, buffer: &mut [u8]) -> ZResult { + async fn read(link: &LinkMulticast, buffer: &mut [u8]) -> ZResult<(usize, Locator)> { let (n, loc) = link.read(buffer).await?; - Ok(Action::Read((n, loc.into_owned()))) - } - - async fn stop(signal: Signal) -> ZResult { - signal.wait().await; - Ok(Action::Stop) + Ok((n, loc.into_owned())) } // The pool of buffers @@ -352,13 +331,15 @@ async fn rx_task( n += 1; } let pool = RecyclingObjectPool::new(n, || vec![0_u8; mtu].into_boxed_slice()); - while !signal.is_triggered() { + + loop { // Retrieve one buffer let mut buffer = pool.try_take().unwrap_or_else(|| pool.alloc()); - // Async read from the underlying link - let action = read(&link, &mut buffer).race(stop(signal.clone())).await?; - match action { - Action::Read((n, loc)) => { + + select! { + _ = signal.wait() => break, + res = read(&link, &mut buffer) => { + let (n, loc) = res?; if n == 0 { // Reading 0 bytes means error bail!("{}: zero bytes reading", link); @@ -379,7 +360,6 @@ async fn rx_task( &transport, )?; } - Action::Stop => break, } } Ok(()) diff --git a/io/zenoh-transport/src/unicast/lowlatency/link.rs b/io/zenoh-transport/src/unicast/lowlatency/link.rs index ef61a7517a..315583b677 100644 --- a/io/zenoh-transport/src/unicast/lowlatency/link.rs +++ b/io/zenoh-transport/src/unicast/lowlatency/link.rs @@ -15,7 +15,6 @@ use super::transport::TransportUnicastLowlatency; #[cfg(feature = "stats")] use crate::stats::TransportStats; use crate::TransportExecutor; -use async_std::prelude::FutureExt; use tokio::{sync::RwLock, task}; use zenoh_codec::*; use zenoh_core::{zasyncread, zasyncwrite}; @@ -76,7 +75,7 @@ pub(crate) async fn send_with_link( impl TransportUnicastLowlatency { pub(super) fn send(&self, msg: TransportMessageLowLatency) -> ZResult<()> { - tokio::runtime::Handle::current().block_on(self.send_async(msg)) + async_global_executor::block_on(self.send_async(msg)) } pub(super) async fn send_async(&self, msg: TransportMessageLowLatency) -> ZResult<()> { @@ -91,7 +90,7 @@ impl TransportUnicastLowlatency { } pub(super) fn start_keepalive(&self, executor: &TransportExecutor, keep_alive: Duration) { - let mut guard = tokio::runtime::Handle::current().block_on(async { zasyncwrite!(self.handle_keepalive) }); + let mut guard = async_global_executor::block_on(async { zasyncwrite!(self.handle_keepalive) }); let c_transport = self.clone(); let handle = executor.runtime.spawn(async move { let res = keepalive_task( @@ -132,7 +131,7 @@ impl TransportUnicastLowlatency { } pub(super) fn internal_start_rx(&self, lease: Duration, batch_size: u16) { - let mut guard = tokio::runtime::Handle::current().block_on(async { zasyncwrite!(self.handle_rx) }); + let mut guard = async_global_executor::block_on(async { zasyncwrite!(self.handle_rx) }); let c_transport = self.clone(); let handle = task::spawn(async move { let guard = zasyncread!(c_transport.link); @@ -230,8 +229,7 @@ async fn rx_task_stream( let mut buffer = pool.try_take().unwrap_or_else(|| pool.alloc()); // Async read from the underlying link - let bytes = read(&link, &mut buffer) - .timeout(lease) + let bytes = tokio::time::timeout(lease, read(&link, &mut buffer)) .await .map_err(|_| zerror!("{}: expired after {} milliseconds", link, lease.as_millis()))??; #[cfg(feature = "stats")] @@ -264,7 +262,8 @@ async fn rx_task_dgram( // Async read from the underlying link let bytes = - link.read(&mut buffer).timeout(lease).await.map_err(|_| { + tokio::time::timeout(lease, link.read(&mut buffer)) + .await.map_err(|_| { zerror!("{}: expired after {} milliseconds", link, lease.as_millis()) })??;