Skip to content

Commit

Permalink
Add perf benchmark, update changelog
Browse files Browse the repository at this point in the history
  • Loading branch information
carter committed Sep 23, 2024
1 parent 4a43d80 commit 19edd55
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed

- ROS1 Native Publishers correctly call unadvertise when dropped
- ROS1 Native Publishers no longer occasionally truncate very large messages (>5MB)

### Changed

Expand Down
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 29 additions & 2 deletions roslibrust/src/ros1/subscriber.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::ros1::{names::Name, tcpros::ConnectionHeader};
use abort_on_drop::ChildTask;
use log::*;
use roslibrust_codegen::{RosMessageType, ShapeShifter};
use std::{marker::PhantomData, sync::Arc};
use tokio::{
Expand Down Expand Up @@ -27,13 +28,29 @@ impl<T: RosMessageType> Subscriber<T> {
}

pub async fn next(&mut self) -> Option<Result<T, SubscriberError>> {
trace!("Subscriber of type {:?} awaiting recv()", T::ROS_TYPE_NAME);
let data = match self.receiver.recv().await {
Ok(v) => v,
Ok(v) => {
trace!("Subscriber of type {:?} received data", T::ROS_TYPE_NAME);
v
}
Err(RecvError::Closed) => return None,
Err(RecvError::Lagged(n)) => return Some(Err(SubscriberError::Lagged(n))),
};
trace!(
"Subscriber of type {:?} deserializing data",
T::ROS_TYPE_NAME
);
let tick = tokio::time::Instant::now();
match serde_rosmsg::from_slice::<T>(&data[..]) {
Ok(p) => Some(Ok(p)),
Ok(p) => {
let duration = tick.elapsed();
trace!(
"Subscriber of type {:?} deserialized data in {duration:?}",
T::ROS_TYPE_NAME
);
Some(Ok(p))
}
Err(e) => Some(Err(e.into())),
}
}
Expand Down Expand Up @@ -142,8 +159,18 @@ impl Subscription {
publisher_list.write().await.push(publisher_uri.to_owned());
// Repeatedly read from the stream until its dry
loop {
trace!(
"Subscription to {} receiving from {} is awaiting next body",
topic_name,
publisher_uri
);
match tcpros::receive_body(&mut stream).await {
Ok(body) => {
trace!(
"Subscription to {} receiving from {} received body",
topic_name,
publisher_uri
);
let send_result = sender.send(body);
if let Err(err) = send_result {
log::error!("Unable to send message data due to dropped channel, closing connection: {err}");
Expand Down
10 changes: 8 additions & 2 deletions roslibrust_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ edition = "2021"

[dependencies]
env_logger = "0.10"
roslibrust = { path = "../roslibrust" }
roslibrust = { path = "../roslibrust", features = ["ros1"] }
roslibrust_codegen = { path = "../roslibrust_codegen" }
lazy_static = "1.4"
tokio = { version = "1.20", features = ["full"] }
log = "0.4"

[dev-dependencies]
diffy = "0.3.0"
diffy = "0.3.0"

[[bin]]
path = "src/performance_ramp.rs"
name = "ramp"
76 changes: 76 additions & 0 deletions roslibrust_test/src/performance_ramp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
//! Goal of this executable is to asses the bandwidths and performance limits of roslibrust.
//! This may turn into a benchmark later.
use log::*;
mod ros1;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::builder()
.parse_env(env_logger::Env::default().default_filter_or("info"))
.format_timestamp_millis()
.init();
// Goal is to send a large image payload at progressively higher rates until we sta`t to lag
// Requires running roscore
let client =
roslibrust::ros1::NodeHandle::new("http://localhost:11311", "performance_ramp").await?;

let publisher = client
.advertise::<ros1::sensor_msgs::Image>("/image_topic", 1, false)
.await?;
let mut subscriber = client
.subscribe::<ros1::sensor_msgs::Image>("/image_topic", 1)
.await?;
// Startup delay to make sure pub and sub are connected
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

// Publisher task
tokio::spawn(async move {
for data_size_mb in (10..=100).step_by(10) {
// Creating a big vector here
let mut data = vec![0; data_size_mb * 1_000_000];
*data.last_mut().unwrap() = 69;
let image = ros1::sensor_msgs::Image {
header: ros1::std_msgs::Header {
stamp: roslibrust_codegen::Time { secs: 0, nsecs: 0 },
frame_id: "test".to_string(),
seq: data_size_mb as u32,
},
height: 1080,
width: 1920,
encoding: "bgr8".to_string(),
is_bigendian: false as u8,
step: 5760,
data,
};
publisher.publish(&image).await.unwrap();
// Send at 10Hz
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
info!("Test complete");
// Final bonus sleep to make sure last message sends before shutting down
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
std::process::exit(0);
});

// Subscriber task
loop {
if let Some(msg) = subscriber.next().await {
match msg {
Ok(msg) => {
info!("Got message @ {:?}", msg.header.seq);
}
Err(e) => {
error!("Error: {e}");
break;
}
}
} else {
// Shutting down
error!("Channel dropped?");
break;
}
}

Ok(())
}

0 comments on commit 19edd55

Please sign in to comment.