diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c39fd5..9215f28 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - ROS1 Native Publishers now support latching behavior - The XML RPC client for interacting directly with the rosmaster server has been exposed as a public API - Experimental: Initial support for writing generic clients that can be compile time specialized for rosbridge or ros1 +- Can subscribe to any topic and get raw bytes instead of a deserialized message of known type ### Fixed diff --git a/roslibrust/src/ros1/node/handle.rs b/roslibrust/src/ros1/node/handle.rs index 33d02ef..3ef3e6e 100644 --- a/roslibrust/src/ros1/node/handle.rs +++ b/roslibrust/src/ros1/node/handle.rs @@ -2,7 +2,7 @@ use super::actor::{Node, NodeServerHandle}; use crate::{ ros1::{ names::Name, publisher::Publisher, service_client::ServiceClient, subscriber::Subscriber, - NodeError, ServiceServer, + subscriber::SubscriberAny, NodeError, ServiceServer, }, ServiceFn, }; @@ -75,6 +75,18 @@ impl NodeHandle { Ok(Publisher::new(topic_name, sender)) } + pub async fn subscribe_any( + &self, + topic_name: &str, + queue_size: usize, + ) -> Result { + let receiver = self + .inner + .register_subscriber::(topic_name, queue_size) + .await?; + Ok(SubscriberAny::new(receiver)) + } + pub async fn subscribe( &self, topic_name: &str, diff --git a/roslibrust/src/ros1/subscriber.rs b/roslibrust/src/ros1/subscriber.rs index a19058d..900453b 100644 --- a/roslibrust/src/ros1/subscriber.rs +++ b/roslibrust/src/ros1/subscriber.rs @@ -1,6 +1,6 @@ use crate::ros1::{names::Name, tcpros::ConnectionHeader}; use abort_on_drop::ChildTask; -use roslibrust_codegen::RosMessageType; +use roslibrust_codegen::{RosMessageType, ShapeShifter}; use std::{marker::PhantomData, sync::Arc}; use tokio::{ io::AsyncWriteExt, @@ -39,6 +39,30 @@ impl Subscriber { } } +pub struct SubscriberAny { + receiver: broadcast::Receiver>, + _phantom: PhantomData, +} + +impl SubscriberAny { + pub(crate) fn new(receiver: broadcast::Receiver>) -> Self { + Self { + receiver, + _phantom: PhantomData, + } + } + + // pub async fn next(&mut self) -> Option> { + pub async fn next(&mut self) -> Option, SubscriberError>> { + let data = match self.receiver.recv().await { + Ok(v) => v, + Err(RecvError::Closed) => return None, + Err(RecvError::Lagged(n)) => return Some(Err(SubscriberError::Lagged(n))), + }; + Some(Ok(data)) + } +} + pub struct Subscription { subscription_tasks: Vec>, _msg_receiver: broadcast::Receiver>, @@ -154,7 +178,9 @@ async fn establish_publisher_connection( stream.write_all(&conn_header_bytes[..]).await?; if let Ok(responded_header) = tcpros::receive_header(&mut stream).await { - if conn_header.md5sum == responded_header.md5sum { + if conn_header.md5sum == Some("*".to_string()) + || conn_header.md5sum == responded_header.md5sum + { log::debug!( "Established connection with publisher for {:?}", conn_header.topic diff --git a/roslibrust/tests/ros1_native_integration_tests.rs b/roslibrust/tests/ros1_native_integration_tests.rs index b4cc3ec..710be2f 100644 --- a/roslibrust/tests/ros1_native_integration_tests.rs +++ b/roslibrust/tests/ros1_native_integration_tests.rs @@ -12,6 +12,33 @@ mod tests { "assets/ros1_common_interfaces" ); + #[test_log::test(tokio::test)] + async fn test_subscribe_any() { + // get a single message in raw bytes and test the bytes are as expected + let nh = NodeHandle::new("http://localhost:11311", "test_subscribe_any") + .await + .unwrap(); + + let publisher = nh + .advertise::("/test_subscribe_any", 1, true) + .await + .unwrap(); + + let mut subscriber = nh.subscribe_any("/test_subscribe_any", 1).await.unwrap(); + + publisher + .publish(&std_msgs::String { + data: "test".to_owned(), + }) + .await + .unwrap(); + + let res = + tokio::time::timeout(tokio::time::Duration::from_millis(250), subscriber.next()).await; + let res = res.unwrap().unwrap().unwrap(); + assert!(res == vec![8, 0, 0, 0, 4, 0, 0, 0, 116, 101, 115, 116]); + } + #[test_log::test(tokio::test)] async fn test_latching() { let nh = NodeHandle::new("http://localhost:11311", "test_latching") diff --git a/roslibrust_codegen/src/integral_types.rs b/roslibrust_codegen/src/integral_types.rs index 230fe8e..736c419 100644 --- a/roslibrust_codegen/src/integral_types.rs +++ b/roslibrust_codegen/src/integral_types.rs @@ -36,6 +36,16 @@ impl RosMessageType for Time { const DEFINITION: &'static str = ""; } +#[derive(:: serde :: Deserialize, :: serde :: Serialize, Debug, Default, Clone, PartialEq)] +pub struct ShapeShifter(Vec); + +// The equivalent of rospy AnyMsg or C++ ShapeShifter, subscribe_any() uses this type +impl RosMessageType for ShapeShifter { + const ROS_TYPE_NAME: &'static str = "*"; + const MD5SUM: &'static str = "*"; + const DEFINITION: &'static str = ""; +} + // TODO provide chrono conversions here behind a cfg flag /// Matches the integral ros1 duration type, with extensions for ease of use