From 19edd551c93aa7d0b3e7d88eaf5ef8e83b2bde3e Mon Sep 17 00:00:00 2001 From: carter Date: Mon, 23 Sep 2024 15:33:37 -0600 Subject: [PATCH] Add perf benchmark, update changelog --- CHANGELOG.md | 1 + Cargo.lock | 13 +++++ roslibrust/src/ros1/subscriber.rs | 31 +++++++++- roslibrust_test/Cargo.toml | 10 +++- roslibrust_test/src/performance_ramp.rs | 76 +++++++++++++++++++++++++ 5 files changed, 127 insertions(+), 4 deletions(-) create mode 100644 roslibrust_test/src/performance_ramp.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index fbddd6dd..459cb724 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - ROS1 Native Publishers correctly call unadvertise when dropped +- ROS1 Native Publishers no longer occasionally truncate very large messages (>5MB) ### Changed diff --git a/Cargo.lock b/Cargo.lock index 7cd6986e..0495fd16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1021,6 +1021,16 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "parking_lot" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" +dependencies = [ + "lock_api", + "parking_lot_core", +] + [[package]] name = "parking_lot_core" version = "0.9.10" @@ -1314,8 +1324,10 @@ dependencies = [ "diffy", "env_logger 0.10.2", "lazy_static", + "log", "roslibrust", "roslibrust_codegen", + "tokio", ] [[package]] @@ -1769,6 +1781,7 @@ dependencies = [ "bytes", "libc", "mio", + "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", diff --git a/roslibrust/src/ros1/subscriber.rs b/roslibrust/src/ros1/subscriber.rs index 1200dd9a..d17a2c8d 100644 --- a/roslibrust/src/ros1/subscriber.rs +++ b/roslibrust/src/ros1/subscriber.rs @@ -1,5 +1,6 @@ use crate::ros1::{names::Name, tcpros::ConnectionHeader}; use abort_on_drop::ChildTask; +use log::*; use roslibrust_codegen::{RosMessageType, ShapeShifter}; use std::{marker::PhantomData, sync::Arc}; use tokio::{ @@ -27,13 +28,29 @@ impl Subscriber { } pub async fn next(&mut self) -> Option> { + trace!("Subscriber of type {:?} awaiting recv()", T::ROS_TYPE_NAME); let data = match self.receiver.recv().await { - Ok(v) => v, + Ok(v) => { + trace!("Subscriber of type {:?} received data", T::ROS_TYPE_NAME); + v + } Err(RecvError::Closed) => return None, Err(RecvError::Lagged(n)) => return Some(Err(SubscriberError::Lagged(n))), }; + trace!( + "Subscriber of type {:?} deserializing data", + T::ROS_TYPE_NAME + ); + let tick = tokio::time::Instant::now(); match serde_rosmsg::from_slice::(&data[..]) { - Ok(p) => Some(Ok(p)), + Ok(p) => { + let duration = tick.elapsed(); + trace!( + "Subscriber of type {:?} deserialized data in {duration:?}", + T::ROS_TYPE_NAME + ); + Some(Ok(p)) + } Err(e) => Some(Err(e.into())), } } @@ -142,8 +159,18 @@ impl Subscription { publisher_list.write().await.push(publisher_uri.to_owned()); // Repeatedly read from the stream until its dry loop { + trace!( + "Subscription to {} receiving from {} is awaiting next body", + topic_name, + publisher_uri + ); match tcpros::receive_body(&mut stream).await { Ok(body) => { + trace!( + "Subscription to {} receiving from {} received body", + topic_name, + publisher_uri + ); let send_result = sender.send(body); if let Err(err) = send_result { log::error!("Unable to send message data due to dropped channel, closing connection: {err}"); diff --git a/roslibrust_test/Cargo.toml b/roslibrust_test/Cargo.toml index f16c63aa..b4338f13 100644 --- a/roslibrust_test/Cargo.toml +++ b/roslibrust_test/Cargo.toml @@ -5,9 +5,15 @@ edition = "2021" [dependencies] env_logger = "0.10" -roslibrust = { path = "../roslibrust" } +roslibrust = { path = "../roslibrust", features = ["ros1"] } roslibrust_codegen = { path = "../roslibrust_codegen" } lazy_static = "1.4" +tokio = { version = "1.20", features = ["full"] } +log = "0.4" [dev-dependencies] -diffy = "0.3.0" \ No newline at end of file +diffy = "0.3.0" + +[[bin]] +path = "src/performance_ramp.rs" +name = "ramp" diff --git a/roslibrust_test/src/performance_ramp.rs b/roslibrust_test/src/performance_ramp.rs new file mode 100644 index 00000000..494a9771 --- /dev/null +++ b/roslibrust_test/src/performance_ramp.rs @@ -0,0 +1,76 @@ +//! Goal of this executable is to asses the bandwidths and performance limits of roslibrust. +//! This may turn into a benchmark later. + +use log::*; +mod ros1; + +#[tokio::main] +async fn main() -> Result<(), Box> { + env_logger::builder() + .parse_env(env_logger::Env::default().default_filter_or("info")) + .format_timestamp_millis() + .init(); + // Goal is to send a large image payload at progressively higher rates until we sta`t to lag + // Requires running roscore + let client = + roslibrust::ros1::NodeHandle::new("http://localhost:11311", "performance_ramp").await?; + + let publisher = client + .advertise::("/image_topic", 1, false) + .await?; + let mut subscriber = client + .subscribe::("/image_topic", 1) + .await?; + // Startup delay to make sure pub and sub are connected + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Publisher task + tokio::spawn(async move { + for data_size_mb in (10..=100).step_by(10) { + // Creating a big vector here + let mut data = vec![0; data_size_mb * 1_000_000]; + *data.last_mut().unwrap() = 69; + let image = ros1::sensor_msgs::Image { + header: ros1::std_msgs::Header { + stamp: roslibrust_codegen::Time { secs: 0, nsecs: 0 }, + frame_id: "test".to_string(), + seq: data_size_mb as u32, + }, + height: 1080, + width: 1920, + encoding: "bgr8".to_string(), + is_bigendian: false as u8, + step: 5760, + data, + }; + publisher.publish(&image).await.unwrap(); + // Send at 10Hz + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + info!("Test complete"); + // Final bonus sleep to make sure last message sends before shutting down + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + std::process::exit(0); + }); + + // Subscriber task + loop { + if let Some(msg) = subscriber.next().await { + match msg { + Ok(msg) => { + info!("Got message @ {:?}", msg.header.seq); + } + Err(e) => { + error!("Error: {e}"); + break; + } + } + } else { + // Shutting down + error!("Channel dropped?"); + break; + } + } + + Ok(()) +}