diff --git a/CHANGELOG.md b/CHANGELOG.md index fbddd6d..459cb72 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed - ROS1 Native Publishers correctly call unadvertise when dropped +- ROS1 Native Publishers no longer occasionally truncate very large messages (>5MB) ### Changed diff --git a/Cargo.lock b/Cargo.lock index 7cd6986..4a7356b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1314,8 +1314,10 @@ dependencies = [ "diffy", "env_logger 0.10.2", "lazy_static", + "log", "roslibrust", "roslibrust_codegen", + "tokio", ] [[package]] diff --git a/roslibrust/src/ros1/publisher.rs b/roslibrust/src/ros1/publisher.rs index 39b6a1a..105ef55 100644 --- a/roslibrust/src/ros1/publisher.rs +++ b/roslibrust/src/ros1/publisher.rs @@ -211,8 +211,9 @@ impl Publication { Some(msg_to_publish) => { let mut streams = subscriber_streams.write().await; let mut streams_to_remove = vec![]; + // TODO: we're awaiting in a for loop... Could parallelize here for (stream_idx, stream) in streams.iter_mut().enumerate() { - if let Err(err) = stream.write(&msg_to_publish[..]).await { + if let Err(err) = stream.write_all(&msg_to_publish[..]).await { // TODO: A single failure between nodes that cross host boundaries is probably normal, should make this more robust perhaps debug!("Failed to send data to subscriber: {err}, removing"); streams_to_remove.push(stream_idx); @@ -265,7 +266,6 @@ impl Publication { loop { if let Ok((mut stream, peer_addr)) = tcp_listener.accept().await { info!("Received connection from subscriber at {peer_addr} for topic {topic_name}"); - // Read the connection header: let connection_header = match tcpros::receive_header(&mut stream).await { Ok(header) => header, @@ -314,7 +314,7 @@ impl Publication { .to_bytes(false) .expect("Couldn't serialize connection header"); stream - .write(&response_header_bytes[..]) + .write_all(&response_header_bytes[..]) .await .expect("Unable to respond on tcpstream"); diff --git a/roslibrust/src/ros1/subscriber.rs b/roslibrust/src/ros1/subscriber.rs index 1200dd9..d17a2c8 100644 --- a/roslibrust/src/ros1/subscriber.rs +++ b/roslibrust/src/ros1/subscriber.rs @@ -1,5 +1,6 @@ use crate::ros1::{names::Name, tcpros::ConnectionHeader}; use abort_on_drop::ChildTask; +use log::*; use roslibrust_codegen::{RosMessageType, ShapeShifter}; use std::{marker::PhantomData, sync::Arc}; use tokio::{ @@ -27,13 +28,29 @@ impl Subscriber { } pub async fn next(&mut self) -> Option> { + trace!("Subscriber of type {:?} awaiting recv()", T::ROS_TYPE_NAME); let data = match self.receiver.recv().await { - Ok(v) => v, + Ok(v) => { + trace!("Subscriber of type {:?} received data", T::ROS_TYPE_NAME); + v + } Err(RecvError::Closed) => return None, Err(RecvError::Lagged(n)) => return Some(Err(SubscriberError::Lagged(n))), }; + trace!( + "Subscriber of type {:?} deserializing data", + T::ROS_TYPE_NAME + ); + let tick = tokio::time::Instant::now(); match serde_rosmsg::from_slice::(&data[..]) { - Ok(p) => Some(Ok(p)), + Ok(p) => { + let duration = tick.elapsed(); + trace!( + "Subscriber of type {:?} deserialized data in {duration:?}", + T::ROS_TYPE_NAME + ); + Some(Ok(p)) + } Err(e) => Some(Err(e.into())), } } @@ -142,8 +159,18 @@ impl Subscription { publisher_list.write().await.push(publisher_uri.to_owned()); // Repeatedly read from the stream until its dry loop { + trace!( + "Subscription to {} receiving from {} is awaiting next body", + topic_name, + publisher_uri + ); match tcpros::receive_body(&mut stream).await { Ok(body) => { + trace!( + "Subscription to {} receiving from {} received body", + topic_name, + publisher_uri + ); let send_result = sender.send(body); if let Err(err) = send_result { log::error!("Unable to send message data due to dropped channel, closing connection: {err}"); diff --git a/roslibrust/src/ros1/tcpros.rs b/roslibrust/src/ros1/tcpros.rs index 5041749..758d786 100644 --- a/roslibrust/src/ros1/tcpros.rs +++ b/roslibrust/src/ros1/tcpros.rs @@ -257,7 +257,7 @@ pub async fn receive_body(stream: &mut TcpStream) -> Result, std::io::Er let mut body = vec![0u8; body_len as usize + 4]; // Copy the length into the first four bytes body[..4].copy_from_slice(&body_len.to_le_bytes()); - // Read the body into the buffer + // Read the body into the buffer after the header stream.read_exact(&mut body[4..]).await?; // Return body diff --git a/roslibrust_test/Cargo.toml b/roslibrust_test/Cargo.toml index f16c63a..ca07925 100644 --- a/roslibrust_test/Cargo.toml +++ b/roslibrust_test/Cargo.toml @@ -5,9 +5,15 @@ edition = "2021" [dependencies] env_logger = "0.10" -roslibrust = { path = "../roslibrust" } +roslibrust = { path = "../roslibrust", features = ["ros1"] } roslibrust_codegen = { path = "../roslibrust_codegen" } lazy_static = "1.4" +tokio = { version = "1.20", features = ["net", "sync"] } +log = "0.4" [dev-dependencies] -diffy = "0.3.0" \ No newline at end of file +diffy = "0.3.0" + +[[bin]] +path = "src/performance_ramp.rs" +name = "ramp" diff --git a/roslibrust_test/src/performance_ramp.rs b/roslibrust_test/src/performance_ramp.rs new file mode 100644 index 0000000..494a977 --- /dev/null +++ b/roslibrust_test/src/performance_ramp.rs @@ -0,0 +1,76 @@ +//! Goal of this executable is to asses the bandwidths and performance limits of roslibrust. +//! This may turn into a benchmark later. + +use log::*; +mod ros1; + +#[tokio::main] +async fn main() -> Result<(), Box> { + env_logger::builder() + .parse_env(env_logger::Env::default().default_filter_or("info")) + .format_timestamp_millis() + .init(); + // Goal is to send a large image payload at progressively higher rates until we sta`t to lag + // Requires running roscore + let client = + roslibrust::ros1::NodeHandle::new("http://localhost:11311", "performance_ramp").await?; + + let publisher = client + .advertise::("/image_topic", 1, false) + .await?; + let mut subscriber = client + .subscribe::("/image_topic", 1) + .await?; + // Startup delay to make sure pub and sub are connected + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Publisher task + tokio::spawn(async move { + for data_size_mb in (10..=100).step_by(10) { + // Creating a big vector here + let mut data = vec![0; data_size_mb * 1_000_000]; + *data.last_mut().unwrap() = 69; + let image = ros1::sensor_msgs::Image { + header: ros1::std_msgs::Header { + stamp: roslibrust_codegen::Time { secs: 0, nsecs: 0 }, + frame_id: "test".to_string(), + seq: data_size_mb as u32, + }, + height: 1080, + width: 1920, + encoding: "bgr8".to_string(), + is_bigendian: false as u8, + step: 5760, + data, + }; + publisher.publish(&image).await.unwrap(); + // Send at 10Hz + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + info!("Test complete"); + // Final bonus sleep to make sure last message sends before shutting down + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + std::process::exit(0); + }); + + // Subscriber task + loop { + if let Some(msg) = subscriber.next().await { + match msg { + Ok(msg) => { + info!("Got message @ {:?}", msg.header.seq); + } + Err(e) => { + error!("Error: {e}"); + break; + } + } + } else { + // Shutting down + error!("Channel dropped?"); + break; + } + } + + Ok(()) +}