Skip to content

Commit

Permalink
Finished the porting of pipelien. Note the usage of spawn_blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
YuanYuYuan committed Oct 24, 2023
1 parent 98b9f94 commit 4821577
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 162 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 15 additions & 14 deletions io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use crate::config;
#[cfg(not(target_os = "macos"))]
use advisory_lock::{AdvisoryFileLock, FileLockMode};
use async_io::Async;
use async_std::fs::remove_file;
use async_std::task::JoinHandle;
use tokio::fs::remove_file;
use tokio::task::JoinHandle;
use async_trait::async_trait;
use filepath::FilePath;
use nix::unistd::unlink;
Expand All @@ -29,6 +29,7 @@ use std::io::{Read, Write};
use std::sync::Arc;
use zenoh_core::{zasyncread, zasyncwrite};
use zenoh_protocol::core::{EndPoint, Locator};
use std::io::ErrorKind;

use unix_named_pipe::{create, open_read, open_write};

Expand Down Expand Up @@ -111,7 +112,7 @@ impl PipeR {
let result = self
.pipe
.read_with_mut(|pipe| match pipe.read(&mut buf[..]) {
Ok(0) => Err(async_std::io::ErrorKind::WouldBlock.into()),
Ok(0) => Err(ErrorKind::WouldBlock.into()),
Ok(val) => Ok(val),
Err(e) => Err(e),
})
Expand All @@ -123,13 +124,13 @@ impl PipeR {
let mut r: usize = 0;
self.pipe
.read_with_mut(|pipe| match pipe.read(&mut buf[r..]) {
Ok(0) => Err(async_std::io::ErrorKind::WouldBlock.into()),
Ok(0) => Err(ErrorKind::WouldBlock.into()),
Ok(val) => {
r += val;
if r == buf.len() {
return Ok(());
}
Err(async_std::io::ErrorKind::WouldBlock.into())
Err(ErrorKind::WouldBlock.into())
}
Err(e) => Err(e),
})
Expand Down Expand Up @@ -184,7 +185,7 @@ impl PipeW {
let result = self
.pipe
.write_with_mut(|pipe| match pipe.write(buf) {
Ok(0) => Err(async_std::io::ErrorKind::WouldBlock.into()),
Ok(0) => Err(ErrorKind::WouldBlock.into()),
Ok(val) => Ok(val),
Err(e) => Err(e),
})
Expand All @@ -196,13 +197,13 @@ impl PipeW {
let mut r: usize = 0;
self.pipe
.write_with_mut(|pipe| match pipe.write(&buf[r..]) {
Ok(0) => Err(async_std::io::ErrorKind::WouldBlock.into()),
Ok(0) => Err(ErrorKind::WouldBlock.into()),
Ok(val) => {
r += val;
if r == buf.len() {
return Ok(());
}
Err(async_std::io::ErrorKind::WouldBlock.into())
Err(ErrorKind::WouldBlock.into())
}
Err(e) => Err(e),
})
Expand Down Expand Up @@ -289,7 +290,7 @@ impl UnicastPipeListener {
let mut request_channel = PipeR::new(&path_uplink, access_mode).await?;

// create listening task
let listening_task_handle = async_std::task::spawn(async move {
let listening_task_handle = tokio::task::spawn(async move {
loop {
let _ = handle_incoming_connections(
&endpoint,
Expand All @@ -309,8 +310,8 @@ impl UnicastPipeListener {
})
}

async fn stop_listening(self) {
self.listening_task_handle.cancel().await;
fn stop_listening(self) {
self.listening_task_handle.abort();
}
}

Expand Down Expand Up @@ -549,22 +550,22 @@ impl LinkManagerUnicastTrait for LinkManagerUnicastPipe {
let removed = zasyncwrite!(self.listeners).remove(endpoint);
match removed {
Some(val) => {
val.stop_listening().await;
val.stop_listening();
Ok(())
}
None => bail!("No listener found for endpoint {}", endpoint),
}
}

fn get_listeners(&self) -> Vec<EndPoint> {
async_std::task::block_on(async { zasyncread!(self.listeners) })
tokio::runtime::Handle::current().block_on(async { zasyncread!(self.listeners) })
.keys()
.cloned()
.collect()
}

fn get_locators(&self) -> Vec<Locator> {
async_std::task::block_on(async { zasyncread!(self.listeners) })
tokio::runtime::Handle::current().block_on(async { zasyncread!(self.listeners) })
.values()
.map(|v| v.uplink_locator.clone())
.collect()
Expand Down
3 changes: 2 additions & 1 deletion io/zenoh-transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async-executor = { workspace = true }
async-global-executor = { workspace = true, features = ["tokio"] }
async-std = { workspace = true }
async-trait = { workspace = true }
tokio = { workspace = true, features = ["sync", "rt", "fs"] }
tokio = { workspace = true, features = ["sync", "rt", "fs", "time", "macros", "rt-multi-thread"] }
flume = { workspace = true }
log = { workspace = true }
lz4_flex = { workspace = true }
Expand All @@ -76,6 +76,7 @@ zenoh-sync = { workspace = true }
zenoh-util = { workspace = true }

[dev-dependencies]
futures-util = { workspace = true }
env_logger = { workspace = true }
panic-message = { workspace = true }
zenoh-protocol = { workspace = true, features = ["test"] }
Expand Down
135 changes: 74 additions & 61 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ impl StageInRefill {
}

fn wait(&self) -> bool {
self.n_ref_r.recv().is_ok()
let res = self.n_ref_r.recv().is_ok();
res
}
}

Expand Down Expand Up @@ -170,7 +171,9 @@ impl StageIn {
// Attempt the serialization on the current batch
let e = match batch.encode(&*msg) {
Ok(_) => zretok!(batch),
Err(e) => e,
Err(e) => {
e
}
};

// Lock the channel. We are the only one that will be writing on it.
Expand Down Expand Up @@ -604,7 +607,8 @@ impl TransmissionPipelineProducer {
};
// Lock the channel. We are the only one that will be writing on it.
let mut queue = zlock!(self.stage_in[idx]);
queue.push_network_message(&mut msg, priority)
let res = queue.push_network_message(&mut msg, priority);
res
}

#[inline]
Expand Down Expand Up @@ -657,7 +661,9 @@ impl TransmissionPipelineConsumer {
bo = b;
}
}
Pull::None => {}
Pull::None => {

}
}
}

Expand Down Expand Up @@ -701,7 +707,6 @@ impl TransmissionPipelineConsumer {
#[cfg(test)]
mod tests {
use super::*;
use async_std::{prelude::FutureExt, task};
use std::{
convert::TryFrom,
sync::{
Expand All @@ -710,6 +715,8 @@ mod tests {
},
time::{Duration, Instant},
};
use tokio::task;
use tokio::time::timeout;
use zenoh_buffers::{
reader::{DidntRead, HasReader},
ZBuf,
Expand All @@ -721,9 +728,10 @@ mod tests {
transport::{BatchSize, Fragment, Frame, TransportBody, TransportSn},
zenoh::{PushBody, Put},
};
use zenoh_result::ZResult;

const SLEEP: Duration = Duration::from_millis(100);
const TIMEOUT: Duration = Duration::from_secs(60);
const TIMEOUT: Duration = Duration::from_secs(10);

const CONFIG: TransmissionPipelineConf = TransmissionPipelineConf {
is_streamed: true,
Expand All @@ -732,8 +740,8 @@ mod tests {
backoff: Duration::from_micros(1),
};

#[test]
fn tx_pipeline_flow() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn tx_pipeline_flow() -> ZResult<()> {
fn schedule(queue: TransmissionPipelineProducer, num_msg: usize, payload_size: usize) {
// Send reliable messages
let key = "test".into();
Expand Down Expand Up @@ -766,6 +774,7 @@ mod tests {
);
queue.push_network_message(message.clone());
}
println!("schedule is done.");
}

async fn consume(mut queue: TransmissionPipelineConsumer, num_msg: usize) {
Expand Down Expand Up @@ -804,7 +813,9 @@ mod tests {
}
println!("Pipeline Flow [<<<]: Pulled {} msgs", msgs + 1);
}
Err(_) => break,
Err(_) => {
break;
}
}
}
println!("Pipeline Flow [+++]: Refill {} msgs", msgs + 1);
Expand All @@ -818,7 +829,7 @@ mod tests {
}

// Pipeline priorities
let tct = TransportPriorityTx::make(Bits::from(TransportSn::MAX)).unwrap();
let tct = TransportPriorityTx::make(Bits::from(TransportSn::MAX))?;
let priorities = vec![tct];

// Total amount of bytes to send in each test
Expand All @@ -827,37 +838,37 @@ mod tests {
// Payload size of the messages
let payload_sizes = [8, 64, 512, 4_096, 8_192, 32_768, 262_144, 2_097_152];

task::block_on(async {
for ps in payload_sizes.iter() {
if u64::try_from(*ps).is_err() {
break;
}
for ps in payload_sizes.iter() {
if u64::try_from(*ps).is_err() {
break;
}

// Compute the number of messages to send
let num_msg = max_msgs.min(bytes / ps);
// Compute the number of messages to send
let num_msg = max_msgs.min(bytes / ps);

let (producer, consumer) = TransmissionPipeline::make(
TransmissionPipelineConf::default(),
priorities.as_slice(),
);
let (producer, consumer) = TransmissionPipeline::make(
TransmissionPipelineConf::default(),
priorities.as_slice(),
);

let t_c = task::spawn(async move {
consume(consumer, num_msg).await;
});
let t_c = task::spawn(async move {
consume(consumer, num_msg).await;
});

let c_ps = *ps;
let t_s = task::spawn(async move {
schedule(producer, num_msg, c_ps);
});
let c_ps = *ps;
let t_s = task::spawn_blocking(move || {
schedule(producer, num_msg, c_ps);
});

let res = t_c.join(t_s).timeout(TIMEOUT).await;
assert!(res.is_ok());
}
});
let res = timeout(TIMEOUT, futures_util::future::join(t_c, t_s)).await;
assert!(res.is_ok());
}

Ok(())
}

#[test]
fn tx_pipeline_blocking() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn tx_pipeline_blocking() -> ZResult<()> {
fn schedule(queue: TransmissionPipelineProducer, counter: Arc<AtomicUsize>, id: usize) {
// Make sure to put only one message per batch: set the payload size
// to half of the batch in such a way the serialized zenoh message
Expand Down Expand Up @@ -903,7 +914,7 @@ mod tests {
}

// Pipeline
let tct = TransportPriorityTx::make(Bits::from(TransportSn::MAX)).unwrap();
let tct = TransportPriorityTx::make(Bits::from(TransportSn::MAX))?;
let priorities = vec![tct];
let (producer, mut consumer) =
TransmissionPipeline::make(TransmissionPipelineConf::default(), priorities.as_slice());
Expand All @@ -921,34 +932,36 @@ mod tests {
schedule(producer, c_counter, 2);
});

task::block_on(async {
// Wait to have sent enough messages and to have blocked
println!(
"Pipeline Blocking [---]: waiting to have {} messages being scheduled",
CONFIG.queue_size[Priority::MAX as usize]
);
let check = async {
while counter.load(Ordering::Acquire) < CONFIG.queue_size[Priority::MAX as usize] {
task::sleep(SLEEP).await;
}
};
check.timeout(TIMEOUT).await.unwrap();
// Wait to have sent enough messages and to have blocked
println!(
"Pipeline Blocking [---]: waiting to have {} messages being scheduled",
CONFIG.queue_size[Priority::MAX as usize]
);
let check = async {
while counter.load(Ordering::Acquire) < CONFIG.queue_size[Priority::MAX as usize] {
tokio::time::sleep(SLEEP).await;
}
};

timeout(TIMEOUT, check).await?;

// Disable and drain the queue
// Disable and drain the queue
timeout(
TIMEOUT,
task::spawn_blocking(move || {
println!("Pipeline Blocking [---]: draining the queue");
let _ = consumer.drain();
})
.timeout(TIMEOUT)
.await
.unwrap();

// Make sure that the tasks scheduling have been unblocked
println!("Pipeline Blocking [---]: waiting for schedule (1) to be unblocked");
h1.timeout(TIMEOUT).await.unwrap();
println!("Pipeline Blocking [---]: waiting for schedule (2) to be unblocked");
h2.timeout(TIMEOUT).await.unwrap();
});
}),
)
.await??;

// Make sure that the tasks scheduling have been unblocked
println!("Pipeline Blocking [---]: waiting for schedule (1) to be unblocked");
timeout(TIMEOUT, h1).await??;
println!("Pipeline Blocking [---]: waiting for schedule (2) to be unblocked");
timeout(TIMEOUT, h2).await??;

Ok(())
}

#[test]
Expand Down Expand Up @@ -1014,7 +1027,7 @@ mod tests {
}
});

task::block_on(async {
tokio::runtime::Handle::current().block_on(async {
let mut prev_size: usize = usize::MAX;
loop {
let received = count.swap(0, Ordering::AcqRel);
Expand All @@ -1024,7 +1037,7 @@ mod tests {
println!("{} bytes: {:.6} Gbps", current, 2.0 * thr);
}
prev_size = current;
task::sleep(Duration::from_millis(500)).await;
tokio::time::sleep(Duration::from_millis(500)).await;
}
});
}
Expand Down
Loading

0 comments on commit 4821577

Please sign in to comment.