Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change to using write_all #198

Merged
merged 3 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed

- ROS1 Native Publishers correctly call unadvertise when dropped
- ROS1 Native Publishers no longer occasionally truncate very large messages (>5MB)

### Changed

Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions roslibrust/src/ros1/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,9 @@ impl Publication {
Some(msg_to_publish) => {
let mut streams = subscriber_streams.write().await;
let mut streams_to_remove = vec![];
// TODO: we're awaiting in a for loop... Could parallelize here
for (stream_idx, stream) in streams.iter_mut().enumerate() {
if let Err(err) = stream.write(&msg_to_publish[..]).await {
if let Err(err) = stream.write_all(&msg_to_publish[..]).await {
// TODO: A single failure between nodes that cross host boundaries is probably normal, should make this more robust perhaps
debug!("Failed to send data to subscriber: {err}, removing");
streams_to_remove.push(stream_idx);
Expand Down Expand Up @@ -265,7 +266,6 @@ impl Publication {
loop {
if let Ok((mut stream, peer_addr)) = tcp_listener.accept().await {
info!("Received connection from subscriber at {peer_addr} for topic {topic_name}");

// Read the connection header:
let connection_header = match tcpros::receive_header(&mut stream).await {
Ok(header) => header,
Expand Down Expand Up @@ -314,7 +314,7 @@ impl Publication {
.to_bytes(false)
.expect("Couldn't serialize connection header");
stream
.write(&response_header_bytes[..])
.write_all(&response_header_bytes[..])
.await
.expect("Unable to respond on tcpstream");

Expand Down
31 changes: 29 additions & 2 deletions roslibrust/src/ros1/subscriber.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::ros1::{names::Name, tcpros::ConnectionHeader};
use abort_on_drop::ChildTask;
use log::*;
use roslibrust_codegen::{RosMessageType, ShapeShifter};
use std::{marker::PhantomData, sync::Arc};
use tokio::{
Expand Down Expand Up @@ -27,13 +28,29 @@ impl<T: RosMessageType> Subscriber<T> {
}

pub async fn next(&mut self) -> Option<Result<T, SubscriberError>> {
trace!("Subscriber of type {:?} awaiting recv()", T::ROS_TYPE_NAME);
let data = match self.receiver.recv().await {
Ok(v) => v,
Ok(v) => {
trace!("Subscriber of type {:?} received data", T::ROS_TYPE_NAME);
v
}
Err(RecvError::Closed) => return None,
Err(RecvError::Lagged(n)) => return Some(Err(SubscriberError::Lagged(n))),
};
trace!(
"Subscriber of type {:?} deserializing data",
T::ROS_TYPE_NAME
);
let tick = tokio::time::Instant::now();
match serde_rosmsg::from_slice::<T>(&data[..]) {
Ok(p) => Some(Ok(p)),
Ok(p) => {
let duration = tick.elapsed();
trace!(
"Subscriber of type {:?} deserialized data in {duration:?}",
T::ROS_TYPE_NAME
);
Some(Ok(p))
}
Err(e) => Some(Err(e.into())),
}
}
Expand Down Expand Up @@ -142,8 +159,18 @@ impl Subscription {
publisher_list.write().await.push(publisher_uri.to_owned());
// Repeatedly read from the stream until its dry
loop {
trace!(
"Subscription to {} receiving from {} is awaiting next body",
topic_name,
publisher_uri
);
match tcpros::receive_body(&mut stream).await {
Ok(body) => {
trace!(
"Subscription to {} receiving from {} received body",
topic_name,
publisher_uri
);
let send_result = sender.send(body);
if let Err(err) = send_result {
log::error!("Unable to send message data due to dropped channel, closing connection: {err}");
Expand Down
2 changes: 1 addition & 1 deletion roslibrust/src/ros1/tcpros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ pub async fn receive_body(stream: &mut TcpStream) -> Result<Vec<u8>, std::io::Er
let mut body = vec![0u8; body_len as usize + 4];
// Copy the length into the first four bytes
body[..4].copy_from_slice(&body_len.to_le_bytes());
// Read the body into the buffer
// Read the body into the buffer after the header
stream.read_exact(&mut body[4..]).await?;

// Return body
Expand Down
10 changes: 8 additions & 2 deletions roslibrust_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@ edition = "2021"

[dependencies]
env_logger = "0.10"
roslibrust = { path = "../roslibrust" }
roslibrust = { path = "../roslibrust", features = ["ros1"] }
roslibrust_codegen = { path = "../roslibrust_codegen" }
lazy_static = "1.4"
tokio = { version = "1.20", features = ["net", "sync"] }
log = "0.4"

[dev-dependencies]
diffy = "0.3.0"
diffy = "0.3.0"

[[bin]]
path = "src/performance_ramp.rs"
name = "ramp"
76 changes: 76 additions & 0 deletions roslibrust_test/src/performance_ramp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
//! Goal of this executable is to asses the bandwidths and performance limits of roslibrust.
//! This may turn into a benchmark later.

use log::*;
mod ros1;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::builder()
.parse_env(env_logger::Env::default().default_filter_or("info"))
.format_timestamp_millis()
.init();
// Goal is to send a large image payload at progressively higher rates until we sta`t to lag
// Requires running roscore
let client =
roslibrust::ros1::NodeHandle::new("http://localhost:11311", "performance_ramp").await?;

let publisher = client
.advertise::<ros1::sensor_msgs::Image>("/image_topic", 1, false)
.await?;
let mut subscriber = client
.subscribe::<ros1::sensor_msgs::Image>("/image_topic", 1)
.await?;
// Startup delay to make sure pub and sub are connected
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

// Publisher task
tokio::spawn(async move {
for data_size_mb in (10..=100).step_by(10) {
// Creating a big vector here
let mut data = vec![0; data_size_mb * 1_000_000];
*data.last_mut().unwrap() = 69;
let image = ros1::sensor_msgs::Image {
header: ros1::std_msgs::Header {
stamp: roslibrust_codegen::Time { secs: 0, nsecs: 0 },
frame_id: "test".to_string(),
seq: data_size_mb as u32,
},
height: 1080,
width: 1920,
encoding: "bgr8".to_string(),
is_bigendian: false as u8,
step: 5760,
data,
};
publisher.publish(&image).await.unwrap();
// Send at 10Hz
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
info!("Test complete");
// Final bonus sleep to make sure last message sends before shutting down
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
std::process::exit(0);
});

// Subscriber task
loop {
if let Some(msg) = subscriber.next().await {
match msg {
Ok(msg) => {
info!("Got message @ {:?}", msg.header.seq);
}
Err(e) => {
error!("Error: {e}");
break;
}
}
} else {
// Shutting down
error!("Channel dropped?");
break;
}
}

Ok(())
}
Loading