diff --git a/zenoh-plugin-ros2dds/src/liveliness_mgt.rs b/zenoh-plugin-ros2dds/src/liveliness_mgt.rs index 35e9d70..9ad8f1c 100644 --- a/zenoh-plugin-ros2dds/src/liveliness_mgt.rs +++ b/zenoh-plugin-ros2dds/src/liveliness_mgt.rs @@ -21,6 +21,8 @@ use zenoh::key_expr::{ keyexpr, OwnedKeyExpr, }; +use crate::ros2_utils::ros_distro_is_less_than; + const SLASH_REPLACEMSNT_CHAR: &str = "ยง"; kedefine!( @@ -195,7 +197,7 @@ fn unescape_slashes(ke: &keyexpr) -> OwnedKeyExpr { // NOTE: only significant Qos for ROS2 are serialized // See https://docs.ros.org/en/rolling/Concepts/Intermediate/About-Quality-of-Service-Settings.html // -// format: ":::," +// format: ":::,[:]" // where each element is "" if default QoS, or an integer in case of enum, and 'K' for !keyless pub fn qos_to_key_expr(keyless: bool, qos: &Qos) -> OwnedKeyExpr { use std::io::Write; @@ -217,6 +219,13 @@ pub fn qos_to_key_expr(keyless: bool, qos: &Qos) -> OwnedKeyExpr { write!(&mut w, "{},{}", *kind as isize, depth).unwrap(); } + // Since Iron USER_DATA QoS contains the type_hash and must be forwarded to remote bridge for Reader/Writer creation + if !ros_distro_is_less_than("iron") { + if let Some(v) = &qos.user_data { + write!(&mut w, ":{}", String::from_utf8_lossy(v)).unwrap(); + } + } + unsafe { let s: String = String::from_utf8_unchecked(w); OwnedKeyExpr::from_string_unchecked(s) @@ -225,8 +234,8 @@ pub fn qos_to_key_expr(keyless: bool, qos: &Qos) -> OwnedKeyExpr { fn key_expr_to_qos(ke: &keyexpr) -> Result<(bool, Qos), String> { let elts: Vec<&str> = ke.split(':').collect(); - if elts.len() != 4 { - return Err(format!("Internal Error: unexpected QoS expression: '{ke}' - 4 elements between : were expected")); + if elts.len() < 4 { + return Err(format!("Internal Error: unexpected QoS expression: '{ke}' - at least 4 elements between ':' were expected")); } let mut qos = Qos::default(); let keyless = elts[0].is_empty(); @@ -253,6 +262,10 @@ fn key_expr_to_qos(ke: &keyexpr) -> Result<(bool, Qos), String> { _ => return Err(format!("Internal Error: unexpected QoS expression: '{ke}' - failed to parse History in 4th element")), } } + // The USER_DATA might be present as 5th element + if elts.len() > 4 && !elts[4].is_empty() { + qos.user_data = Some(elts[4].into()); + } Ok((keyless, qos)) } diff --git a/zenoh-plugin-ros2dds/src/ros2_utils.rs b/zenoh-plugin-ros2dds/src/ros2_utils.rs index 6995f51..e33f8b6 100644 --- a/zenoh-plugin-ros2dds/src/ros2_utils.rs +++ b/zenoh-plugin-ros2dds/src/ros2_utils.rs @@ -14,6 +14,7 @@ use std::{ env::VarError, + str, sync::atomic::{AtomicU32, Ordering}, }; @@ -36,10 +37,18 @@ use crate::{config::Config, dds_utils::get_guid}; pub const ROS2_ACTION_CANCEL_GOAL_SRV_TYPE: &str = "action_msgs/srv/CancelGoal"; pub const ROS2_ACTION_STATUS_MSG_TYPE: &str = "action_msgs/msg/GoalStatusArray"; +// Type hash for action_msgs/msg/GoalStatusArray in Iron and Jazzy (might change in future versions) +pub const ROS2_ACTION_STATUS_MSG_TYPE_HASH: &str = + "RIHS01_91a0593bacdcc50ea9bdcf849a938b128412cc1ea821245c663bcd26f83c295e"; // ROS_DISTRO value assumed if the environment variable is not set pub const ASSUMED_ROS_DISTRO: &str = "iron"; +// Separator used by ROS 2 in USER_DATA QoS +pub const USER_DATA_PROPS_SEPARATOR: char = ';'; +// Key for type hash used by ROS 2 in USER_DATA QoS +pub const USER_DATA_TYPEHASH_KEY: &str = "typehash="; + lazy_static::lazy_static!( pub static ref ROS_DISTRO: String = get_ros_distro(); @@ -312,7 +321,7 @@ fn ros2_service_default_qos() -> Qos { } fn ros2_action_feedback_default_qos() -> Qos { - Qos { + let mut qos = Qos { history: Some(History { kind: HistoryKind::KEEP_LAST, depth: 10, @@ -337,13 +346,24 @@ fn ros2_action_feedback_default_qos() -> Qos { kind: IgnoreLocalKind::PARTICIPANT, }), ..Default::default() + }; + if !ros_distro_is_less_than("iron") { + // NOTE: the type hash should be a real one instead of this invalid type hash. + // However, `rmw_cyclonedds_cpp` doesn't do any type checking (yet). + // And the way to forward the type hash for actions (and services) raise questions + // that are described in https://github.com/eclipse-zenoh/zenoh-plugin-ros2dds/pull/349 + insert_type_hash( + &mut qos, + "RIHS01_0000000000000000000000000000000000000000000000000000000000000000", + ); } + qos } fn ros2_action_status_default_qos() -> Qos { // Default Status topic QoS copied from: // https://github.com/ros2/rcl/blob/8f7f4f0804a34ee9d9ecd2d7e75a57ce2b7ced5d/rcl_action/include/rcl_action/default_qos.h#L30 - Qos { + let mut qos = Qos { durability: Some(Durability { kind: DurabilityKind::TRANSIENT_LOCAL, }), @@ -367,7 +387,12 @@ fn ros2_action_status_default_qos() -> Qos { kind: IgnoreLocalKind::PARTICIPANT, }), ..Default::default() + }; + if !ros_distro_is_less_than("iron") { + // add type_hash in USER_DATA QoS + insert_type_hash(&mut qos, ROS2_ACTION_STATUS_MSG_TYPE_HASH); } + qos } pub fn is_service_for_action(ros2_service_name: &str) -> bool { @@ -393,6 +418,31 @@ pub fn check_ros_name(name: &str) -> Result<(), String> { } } +/// For potential use later (type_hash in admin space?) +// pub fn extract_type_hash(qos: &Qos) -> Option { +// if let Some(v) = &qos.user_data { +// if let Ok(s) = str::from_utf8(v) { +// if let Some(mut start) = s.find(USER_DATA_TYPEHASH_KEY) { +// start += USER_DATA_TYPEHASH_KEY.len(); +// if let Some(end) = s[start..].find(USER_DATA_PROPS_SEPARATOR) { +// return Some(s[start..(start + end)].into()); +// } +// } +// } +// } +// None +// } + +pub fn insert_type_hash(qos: &mut Qos, type_hash: &str) { + let mut s = USER_DATA_TYPEHASH_KEY.to_string(); + s.push_str(type_hash); + s.push(USER_DATA_PROPS_SEPARATOR); + match qos.user_data { + Some(ref mut v) => v.extend(s.into_bytes().iter()), + None => qos.user_data = Some(s.into_bytes()), + } +} + lazy_static::lazy_static!( pub static ref CLIENT_ID_COUNTER: AtomicU32 = AtomicU32::default(); ); diff --git a/zenoh-plugin-ros2dds/src/ros_discovery.rs b/zenoh-plugin-ros2dds/src/ros_discovery.rs index 170a538..0aec45b 100644 --- a/zenoh-plugin-ros2dds/src/ros_discovery.rs +++ b/zenoh-plugin-ros2dds/src/ros_discovery.rs @@ -34,16 +34,22 @@ use zenoh::{ // Contributors: // ZettaScale Zenoh Team, // -use crate::dds_utils::{ddsrt_iov_len_from_usize, delete_dds_entity, get_guid}; use crate::{ dds_types::DDSRawSample, gid::Gid, ros2_utils::{ros_distro_is_less_than, ROS_DISTRO}, ChannelEvent, ROS_DISCOVERY_INFO_PUSH_INTERVAL_MS, }; +use crate::{ + dds_utils::{ddsrt_iov_len_from_usize, delete_dds_entity, get_guid}, + ros2_utils::{USER_DATA_PROPS_SEPARATOR, USER_DATA_TYPEHASH_KEY}, +}; pub const ROS_DISCOVERY_INFO_TOPIC_NAME: &str = "ros_discovery_info"; const ROS_DISCOVERY_INFO_TOPIC_TYPE: &str = "rmw_dds_common::msg::dds_::ParticipantEntitiesInfo_"; +// Type hash for rmw_dds_common::msg::dds_::ParticipantEntitiesInfo_ in Iron and Jazzy (might change in future versions) +const ROS_DISCOVERY_INFO_TYPE_HASH: &str = + "RIHS01_91a0593bacdcc50ea9bdcf849a938b128412cc1ea821245c663bcd26f83c295e"; pub struct RosDiscoveryInfoMgr { reader: dds_entity_t, @@ -87,6 +93,16 @@ impl RosDiscoveryInfoMgr { .unwrap() .into_raw(); + // Since Iron, the Reader/Writer on `ros_discovery_info` topic are expected to have the type hash in USER_DATA QoS + let user_data_qos: Option> = if ros_distro_is_less_than("iron") { + None + } else { + let mut s = USER_DATA_TYPEHASH_KEY.to_string(); + s.push_str(ROS_DISCOVERY_INFO_TYPE_HASH); + s.push(USER_DATA_PROPS_SEPARATOR); + Some(s.into_bytes()) + }; + unsafe { // Create topic (for reader/writer creation) let t = cdds_create_blob_topic(participant, cton, ctyn, true); @@ -108,6 +124,7 @@ impl RosDiscoveryInfoMgr { qos.ignore_local = Some(IgnoreLocal { kind: IgnoreLocalKind::PARTICIPANT, }); + qos.user_data = user_data_qos.clone(); let qos_native = qos.to_qos_native(); let reader = dds_create_reader(participant, t, qos_native, std::ptr::null()); Qos::delete_qos_native(qos_native); @@ -137,6 +154,7 @@ impl RosDiscoveryInfoMgr { qos.ignore_local = Some(IgnoreLocal { kind: IgnoreLocalKind::PARTICIPANT, }); + qos.user_data = user_data_qos.clone(); let qos_native = qos.to_qos_native(); let writer = dds_create_writer(participant, t, qos_native, std::ptr::null()); Qos::delete_qos_native(qos_native);