Skip to content

Commit

Permalink
publish a vector of bytes given a topic type and message definition
Browse files Browse the repository at this point in the history
  • Loading branch information
lucasw committed Aug 17, 2024
1 parent f9ccf04 commit dbc1535
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- The XML RPC client for interacting directly with the rosmaster server has been exposed as a public API
- Experimental: Initial support for writing generic clients that can be compile time specialized for rosbridge or ros1
- Can subscribe to any topic and get raw bytes instead of a deserialized message of known type
- Can publish to any topic and send raw bytes instead of a deserialized message

### Fixed

Expand Down
26 changes: 26 additions & 0 deletions roslibrust/src/ros1/node/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,32 @@ impl NodeServerHandle {
})?)
}

/// Registers a publisher with the underlying node server
/// Returns a channel that the raw bytes of a publish can be shoved into to queue the publish
pub(crate) async fn register_publisher_any(
&self,
topic: &str,
topic_type: &str,
msg_definition: &str,
queue_size: usize,
latching: bool,
) -> Result<mpsc::Sender<Vec<u8>>, NodeError> {
let (sender, receiver) = oneshot::channel();
self.node_server_sender.send(NodeMsg::RegisterPublisher {
reply: sender,
topic: topic.to_owned(),
topic_type: topic_type.to_owned(),
queue_size,
msg_definition: msg_definition.to_owned(),
md5sum: "*".to_string(),
latching,
})?;
let received = receiver.await?;
Ok(received.map_err(|_err| {
NodeError::IoError(io::Error::from(io::ErrorKind::ConnectionAborted))
})?)
}

pub(crate) async fn unregister_publisher(&self, topic: &str) -> Result<(), NodeError> {
let (sender, receiver) = oneshot::channel();
self.node_server_sender.send(NodeMsg::UnregisterPublisher {
Expand Down
19 changes: 17 additions & 2 deletions roslibrust/src/ros1/node/handle.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use super::actor::{Node, NodeServerHandle};
use crate::{
ros1::{
names::Name, publisher::Publisher, service_client::ServiceClient, subscriber::Subscriber,
subscriber::SubscriberAny, NodeError, ServiceServer,
names::Name, publisher::Publisher, publisher::PublisherAny, service_client::ServiceClient,
subscriber::Subscriber, subscriber::SubscriberAny, NodeError, ServiceServer,
},
ServiceFn,
};
Expand Down Expand Up @@ -62,6 +62,21 @@ impl NodeHandle {
/// Subsequent calls will simply be given additional handles to the underlying publication.
/// This behavior was chosen to mirror ROS1's API, however it is reccomended to .clone() the returend publisher
/// instead of calling this function multiple times.
pub async fn advertise_any(
&self,
topic_name: &str,
topic_type: &str,
msg_definition: &str,
queue_size: usize,
latching: bool,
) -> Result<PublisherAny, NodeError> {
let sender = self
.inner
.register_publisher_any(topic_name, topic_type, msg_definition, queue_size, latching)
.await?;
Ok(PublisherAny::new(topic_name, sender))
}

pub async fn advertise<T: roslibrust_codegen::RosMessageType>(
&self,
topic_name: &str,
Expand Down
35 changes: 34 additions & 1 deletion roslibrust/src/ros1/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,38 @@ impl<T: RosMessageType> Publisher<T> {
}
}

pub struct PublisherAny {
topic_name: String,
sender: mpsc::Sender<Vec<u8>>,
phantom: PhantomData<Vec<u8>>,
}

impl PublisherAny {
pub(crate) fn new(topic_name: &str, sender: mpsc::Sender<Vec<u8>>) -> Self {
Self {
topic_name: topic_name.to_owned(),
sender,
phantom: PhantomData,
}
}

/// Queues a message to be send on the related topic.
/// Returns when the data has been queued not when data is actually sent.
pub async fn publish(&self, data: &Vec<u8>) -> Result<(), PublisherError> {
// TODO this is a pretty dumb...
// because of the internal channel used for re-direction this future doesn't
// actually complete when the data is sent, but merely when it is queued to be sent
// This function could probably be non-async
// Or we should do some significant re-work to have it only yield when the data is sent.
self.sender
.send(data.to_vec())
.await
.map_err(|_| PublisherError::StreamClosed)?;
debug!("Publishing data on topic {}", self.topic_name);
Ok(())
}
}

pub(crate) struct Publication {
topic_type: String,
listener_port: u16,
Expand Down Expand Up @@ -250,7 +282,8 @@ impl Publication {
if let Some(connection_md5sum) = connection_header.md5sum {
if connection_md5sum != "*" {
if let Some(local_md5sum) = &responding_conn_header.md5sum {
if connection_md5sum != *local_md5sum {
// TODO(lucasw) is it ok to match any with "*"?
if local_md5sum != "*" && connection_md5sum != *local_md5sum {
warn!(
"Got subscribe request for {}, but md5sums do not match. Expected {:?}, received {:?}",
topic_name,
Expand Down
1 change: 1 addition & 0 deletions roslibrust/src/ros1/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ async fn establish_publisher_connection(

if let Ok(responded_header) = tcpros::receive_header(&mut stream).await {
if conn_header.md5sum == Some("*".to_string())
|| responded_header.md5sum == Some("*".to_string())
|| conn_header.md5sum == responded_header.md5sum
{
log::debug!(
Expand Down
32 changes: 32 additions & 0 deletions roslibrust/tests/ros1_native_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,38 @@ mod tests {
"assets/ros1_common_interfaces"
);

#[test_log::test(tokio::test)]
async fn test_publish_any() {
// publish a single message in raw bytes and test the received message is as expected
let nh = NodeHandle::new("http://localhost:11311", "test_publish_any")
.await
.unwrap();

let publisher = nh
.advertise_any(
"/test_publish_any",
"std_msgs/String",
"string data\n",
1,
true,
)
.await
.unwrap();

let mut subscriber = nh
.subscribe::<std_msgs::String>("/test_publish_any", 1)
.await
.unwrap();

let msg_raw: Vec<u8> = vec![8, 0, 0, 0, 4, 0, 0, 0, 116, 101, 115, 116].to_vec();
publisher.publish(&msg_raw).await.unwrap();

let res =
tokio::time::timeout(tokio::time::Duration::from_millis(250), subscriber.next()).await;
let msg = res.unwrap().unwrap().unwrap();
assert_eq!(msg.data, "test");
}

#[test_log::test(tokio::test)]
async fn test_subscribe_any() {
// get a single message in raw bytes and test the bytes are as expected
Expand Down

0 comments on commit dbc1535

Please sign in to comment.