Skip to content

Commit

Permalink
Merge pull request #194 from lucasw/publisher_any
Browse files Browse the repository at this point in the history
publish a vector of bytes given a topic type and message definition
  • Loading branch information
Carter12s authored Sep 23, 2024
2 parents f9ccf04 + bfe8962 commit 569d954
Show file tree
Hide file tree
Showing 9 changed files with 960 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- The XML RPC client for interacting directly with the rosmaster server has been exposed as a public API
- Experimental: Initial support for writing generic clients that can be compile time specialized for rosbridge or ros1
- Can subscribe to any topic and get raw bytes instead of a deserialized message of known type
- Can publish to any topic and send raw bytes instead of a deserialized message

### Fixed

Expand Down
45 changes: 45 additions & 0 deletions roslibrust/examples/ros1_publish_any.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
roslibrust_codegen_macro::find_and_generate_ros_messages!("assets/ros1_common_interfaces");

/// This example demonstrates ths usage of the .advertise_any() function
///
/// The intent of the API is to support use cases like play back data from a bag file.
/// Most users are encourage to not use this API and instead rely on generated message types.
/// See ros1_talker.rs for a "normal" example.
#[cfg(feature = "ros1")]
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
// Note: this example needs a ros master running to work
let node =
roslibrust::ros1::NodeHandle::new("http://localhost:11311", "/ros1_publish_any").await?;

// Definition from: https://docs.ros.org/en/noetic/api/std_msgs/html/msg/String.html
let msg_definition = r#"string data"#;

let publisher = node
.advertise_any("/chatter", "std_msgs/String", &msg_definition, 100, false)
.await?;

// Data taken from example in:
// https://wiki.ros.org/ROS/Connection%20Header
// Note: publish expects the body length field to be present, as well as length of each field
// - First four bytes are Body length = 9 bytes
// - Next four bytes are length of data field = 5 bytes
// - Lass five bytes are the data field as ascii "hello"
// Note: Byte order!
let data: Vec<u8> = vec![
0x09, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x68, 0x65, 0x6c, 0x6c, 0x6f,
];

// This will publish "hello" in a loop at 1Hz
// `rostopic echo /chatter` will show the message being published
loop {
publisher.publish(&data).await?;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}

#[cfg(not(feature = "ros1"))]
fn main() {
eprintln!("This example does nothing without compiling with the feature 'ros1'");
}
1 change: 1 addition & 0 deletions roslibrust/src/ros1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub use node::*;

mod publisher;
pub use publisher::Publisher;
pub use publisher::PublisherAny;
mod service_client;
pub use service_client::ServiceClient;
mod subscriber;
Expand Down
43 changes: 43 additions & 0 deletions roslibrust/src/ros1/node/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,49 @@ impl NodeServerHandle {
})?)
}

/// Registers a publisher with the underlying node server
/// Returns a channel that the raw bytes of a publish can be shoved into to queue the publish
pub(crate) async fn register_publisher_any(
&self,
topic: &str,
topic_type: &str,
msg_definition: &str,
queue_size: usize,
latching: bool,
) -> Result<mpsc::Sender<Vec<u8>>, NodeError> {
let (sender, receiver) = oneshot::channel();

let md5sum;
let md5sum_res =
roslibrust_codegen::message_definition_to_md5sum(topic_type, msg_definition);
match md5sum_res {
// TODO(lucasw) make a new error type for this?
Err(err) => {
log::error!("{:?}", err);
return Err(NodeError::IoError(io::Error::from(
io::ErrorKind::ConnectionAborted,
)));
}
Ok(md5sum_rv) => {
md5sum = md5sum_rv;
}
}

self.node_server_sender.send(NodeMsg::RegisterPublisher {
reply: sender,
topic: topic.to_owned(),
topic_type: topic_type.to_owned(),
queue_size,
msg_definition: msg_definition.to_owned(),
md5sum,
latching,
})?;
let received = receiver.await?;
Ok(received.map_err(|_err| {
NodeError::IoError(io::Error::from(io::ErrorKind::ConnectionAborted))
})?)
}

pub(crate) async fn unregister_publisher(&self, topic: &str) -> Result<(), NodeError> {
let (sender, receiver) = oneshot::channel();
self.node_server_sender.send(NodeMsg::UnregisterPublisher {
Expand Down
30 changes: 26 additions & 4 deletions roslibrust/src/ros1/node/handle.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use super::actor::{Node, NodeServerHandle};
use crate::{
ros1::{
names::Name, publisher::Publisher, service_client::ServiceClient, subscriber::Subscriber,
subscriber::SubscriberAny, NodeError, ServiceServer,
names::Name, publisher::Publisher, publisher::PublisherAny, service_client::ServiceClient,
subscriber::Subscriber, subscriber::SubscriberAny, NodeError, ServiceServer,
},
ServiceFn,
};

/// Represents a handle to an underlying [Node]. NodeHandle's can be freely cloned, moved, copied, etc.
/// Represents a handle to an underlying Node. NodeHandle's can be freely cloned, moved, copied, etc.
/// This class provides the user facing API for interacting with ROS.
#[derive(Clone)]
pub struct NodeHandle {
Expand Down Expand Up @@ -55,12 +55,34 @@ impl NodeHandle {
self.inner.get_client_uri().await
}

/// Create a new publisher any arbitrary message type.
///
/// This function is intended to be used when a message definition was not available at compile time,
/// such as when playing back data from a bag file.
/// This function requires the text of the expanded message definition as would be produced by `gendeps --cat`.
/// See <https://wiki.ros.org/roslib/gentools> for what this should like.
/// Messages autogenerated with roslibrust_codegen will include this information in their DEFINITION constant.
pub async fn advertise_any(
&self,
topic_name: &str,
topic_type: &str,
msg_definition: &str,
queue_size: usize,
latching: bool,
) -> Result<PublisherAny, NodeError> {
let sender = self
.inner
.register_publisher_any(topic_name, topic_type, msg_definition, queue_size, latching)
.await?;
Ok(PublisherAny::new(topic_name, sender))
}

/// Create a new publisher for the given type.
///
/// This function can be called multiple times to create multiple publishers for the same topic,
/// however the FIRST call will establish the queue size and latching behavior for the topic.
/// Subsequent calls will simply be given additional handles to the underlying publication.
/// This behavior was chosen to mirror ROS1's API, however it is reccomended to .clone() the returend publisher
/// This behavior was chosen to mirror ROS1's API, however it is recommended to .clone() the returned publisher
/// instead of calling this function multiple times.
pub async fn advertise<T: roslibrust_codegen::RosMessageType>(
&self,
Expand Down
42 changes: 42 additions & 0 deletions roslibrust/src/ros1/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use tokio::{
sync::{mpsc, RwLock},
};

/// The regular Publisher representation returned by calling advertise on a [crate::ros1::NodeHandle].
pub struct Publisher<T> {
topic_name: String,
sender: mpsc::Sender<Vec<u8>>,
Expand Down Expand Up @@ -49,6 +50,45 @@ impl<T: RosMessageType> Publisher<T> {
}
}

/// A specialty publisher used when message type is not known at compile time.
///
/// Relies on user to provide serialized data. Typically used with playback from bag files.
pub struct PublisherAny {
topic_name: String,
sender: mpsc::Sender<Vec<u8>>,
phantom: PhantomData<Vec<u8>>,
}

impl PublisherAny {
pub(crate) fn new(topic_name: &str, sender: mpsc::Sender<Vec<u8>>) -> Self {
Self {
topic_name: topic_name.to_owned(),
sender,
phantom: PhantomData,
}
}

/// Queues a message to be send on the related topic.
/// Returns when the data has been queued not when data is actually sent.
///
/// This expects the data to be the raw bytes of the message body as they would appear going over the wire.
/// See ros1_publish_any.rs example for more details.
/// Body length should be included as first four bytes.
pub async fn publish(&self, data: &Vec<u8>) -> Result<(), PublisherError> {
// TODO this is a pretty dumb...
// because of the internal channel used for re-direction this future doesn't
// actually complete when the data is sent, but merely when it is queued to be sent
// This function could probably be non-async
// Or we should do some significant re-work to have it only yield when the data is sent.
self.sender
.send(data.to_vec())
.await
.map_err(|_| PublisherError::StreamClosed)?;
debug!("Publishing data on topic {}", self.topic_name);
Ok(())
}
}

pub(crate) struct Publication {
topic_type: String,
listener_port: u16,
Expand Down Expand Up @@ -250,6 +290,8 @@ impl Publication {
if let Some(connection_md5sum) = connection_header.md5sum {
if connection_md5sum != "*" {
if let Some(local_md5sum) = &responding_conn_header.md5sum {
// TODO(lucasw) is it ok to match any with "*"?
// if local_md5sum != "*" && connection_md5sum != *local_md5sum {
if connection_md5sum != *local_md5sum {
warn!(
"Got subscribe request for {}, but md5sums do not match. Expected {:?}, received {:?}",
Expand Down
1 change: 1 addition & 0 deletions roslibrust/src/ros1/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ async fn establish_publisher_connection(

if let Ok(responded_header) = tcpros::receive_header(&mut stream).await {
if conn_header.md5sum == Some("*".to_string())
|| responded_header.md5sum == Some("*".to_string())
|| conn_header.md5sum == responded_header.md5sum
{
log::debug!(
Expand Down
32 changes: 32 additions & 0 deletions roslibrust/tests/ros1_native_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,38 @@ mod tests {
"assets/ros1_common_interfaces"
);

#[test_log::test(tokio::test)]
async fn test_publish_any() {
// publish a single message in raw bytes and test the received message is as expected
let nh = NodeHandle::new("http://localhost:11311", "test_publish_any")
.await
.unwrap();

let publisher = nh
.advertise_any(
"/test_publish_any",
"std_msgs/String",
"string data\n",
1,
true,
)
.await
.unwrap();

let mut subscriber = nh
.subscribe::<std_msgs::String>("/test_publish_any", 1)
.await
.unwrap();

let msg_raw: Vec<u8> = vec![8, 0, 0, 0, 4, 0, 0, 0, 116, 101, 115, 116].to_vec();
publisher.publish(&msg_raw).await.unwrap();

let res =
tokio::time::timeout(tokio::time::Duration::from_millis(250), subscriber.next()).await;
let msg = res.unwrap().unwrap().unwrap();
assert_eq!(msg.data, "test");
}

#[test_log::test(tokio::test)]
async fn test_subscribe_any() {
// get a single message in raw bytes and test the bytes are as expected
Expand Down
Loading

0 comments on commit 569d954

Please sign in to comment.