Skip to content

Commit

Permalink
Add type hash support (pub/sub only) (#349)
Browse files Browse the repository at this point in the history
fix #290
  • Loading branch information
JEnoch authored Dec 3, 2024
1 parent 4a8ce2b commit ef5b953
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 6 deletions.
19 changes: 16 additions & 3 deletions zenoh-plugin-ros2dds/src/liveliness_mgt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use zenoh::key_expr::{
keyexpr, OwnedKeyExpr,
};

use crate::ros2_utils::ros_distro_is_less_than;

const SLASH_REPLACEMSNT_CHAR: &str = "§";

kedefine!(
Expand Down Expand Up @@ -195,7 +197,7 @@ fn unescape_slashes(ke: &keyexpr) -> OwnedKeyExpr {
// NOTE: only significant Qos for ROS2 are serialized
// See https://docs.ros.org/en/rolling/Concepts/Intermediate/About-Quality-of-Service-Settings.html
//
// format: "<keyless>:<ReliabilityKind>:<DurabilityKind>:<HistoryKid>,<HistoryDepth>"
// format: "<keyless>:<ReliabilityKind>:<DurabilityKind>:<HistoryKid>,<HistoryDepth>[:<UserData>]"
// where each element is "" if default QoS, or an integer in case of enum, and 'K' for !keyless
pub fn qos_to_key_expr(keyless: bool, qos: &Qos) -> OwnedKeyExpr {
use std::io::Write;
Expand All @@ -217,6 +219,13 @@ pub fn qos_to_key_expr(keyless: bool, qos: &Qos) -> OwnedKeyExpr {
write!(&mut w, "{},{}", *kind as isize, depth).unwrap();
}

// Since Iron USER_DATA QoS contains the type_hash and must be forwarded to remote bridge for Reader/Writer creation
if !ros_distro_is_less_than("iron") {
if let Some(v) = &qos.user_data {
write!(&mut w, ":{}", String::from_utf8_lossy(v)).unwrap();
}
}

unsafe {
let s: String = String::from_utf8_unchecked(w);
OwnedKeyExpr::from_string_unchecked(s)
Expand All @@ -225,8 +234,8 @@ pub fn qos_to_key_expr(keyless: bool, qos: &Qos) -> OwnedKeyExpr {

fn key_expr_to_qos(ke: &keyexpr) -> Result<(bool, Qos), String> {
let elts: Vec<&str> = ke.split(':').collect();
if elts.len() != 4 {
return Err(format!("Internal Error: unexpected QoS expression: '{ke}' - 4 elements between : were expected"));
if elts.len() < 4 {
return Err(format!("Internal Error: unexpected QoS expression: '{ke}' - at least 4 elements between ':' were expected"));
}
let mut qos = Qos::default();
let keyless = elts[0].is_empty();
Expand All @@ -253,6 +262,10 @@ fn key_expr_to_qos(ke: &keyexpr) -> Result<(bool, Qos), String> {
_ => return Err(format!("Internal Error: unexpected QoS expression: '{ke}' - failed to parse History in 4th element")),
}
}
// The USER_DATA might be present as 5th element
if elts.len() > 4 && !elts[4].is_empty() {
qos.user_data = Some(elts[4].into());
}

Ok((keyless, qos))
}
Expand Down
54 changes: 52 additions & 2 deletions zenoh-plugin-ros2dds/src/ros2_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::{
env::VarError,
str,
sync::atomic::{AtomicU32, Ordering},
};

Expand All @@ -36,10 +37,18 @@ use crate::{config::Config, dds_utils::get_guid};

pub const ROS2_ACTION_CANCEL_GOAL_SRV_TYPE: &str = "action_msgs/srv/CancelGoal";
pub const ROS2_ACTION_STATUS_MSG_TYPE: &str = "action_msgs/msg/GoalStatusArray";
// Type hash for action_msgs/msg/GoalStatusArray in Iron and Jazzy (might change in future versions)
pub const ROS2_ACTION_STATUS_MSG_TYPE_HASH: &str =
"RIHS01_91a0593bacdcc50ea9bdcf849a938b128412cc1ea821245c663bcd26f83c295e";

// ROS_DISTRO value assumed if the environment variable is not set
pub const ASSUMED_ROS_DISTRO: &str = "iron";

// Separator used by ROS 2 in USER_DATA QoS
pub const USER_DATA_PROPS_SEPARATOR: char = ';';
// Key for type hash used by ROS 2 in USER_DATA QoS
pub const USER_DATA_TYPEHASH_KEY: &str = "typehash=";

lazy_static::lazy_static!(
pub static ref ROS_DISTRO: String = get_ros_distro();

Expand Down Expand Up @@ -312,7 +321,7 @@ fn ros2_service_default_qos() -> Qos {
}

fn ros2_action_feedback_default_qos() -> Qos {
Qos {
let mut qos = Qos {
history: Some(History {
kind: HistoryKind::KEEP_LAST,
depth: 10,
Expand All @@ -337,13 +346,24 @@ fn ros2_action_feedback_default_qos() -> Qos {
kind: IgnoreLocalKind::PARTICIPANT,
}),
..Default::default()
};
if !ros_distro_is_less_than("iron") {
// NOTE: the type hash should be a real one instead of this invalid type hash.
// However, `rmw_cyclonedds_cpp` doesn't do any type checking (yet).
// And the way to forward the type hash for actions (and services) raise questions
// that are described in https://github.com/eclipse-zenoh/zenoh-plugin-ros2dds/pull/349
insert_type_hash(
&mut qos,
"RIHS01_0000000000000000000000000000000000000000000000000000000000000000",
);
}
qos
}

fn ros2_action_status_default_qos() -> Qos {
// Default Status topic QoS copied from:
// https://github.com/ros2/rcl/blob/8f7f4f0804a34ee9d9ecd2d7e75a57ce2b7ced5d/rcl_action/include/rcl_action/default_qos.h#L30
Qos {
let mut qos = Qos {
durability: Some(Durability {
kind: DurabilityKind::TRANSIENT_LOCAL,
}),
Expand All @@ -367,7 +387,12 @@ fn ros2_action_status_default_qos() -> Qos {
kind: IgnoreLocalKind::PARTICIPANT,
}),
..Default::default()
};
if !ros_distro_is_less_than("iron") {
// add type_hash in USER_DATA QoS
insert_type_hash(&mut qos, ROS2_ACTION_STATUS_MSG_TYPE_HASH);
}
qos
}

pub fn is_service_for_action(ros2_service_name: &str) -> bool {
Expand All @@ -393,6 +418,31 @@ pub fn check_ros_name(name: &str) -> Result<(), String> {
}
}

/// For potential use later (type_hash in admin space?)
// pub fn extract_type_hash(qos: &Qos) -> Option<String> {
// if let Some(v) = &qos.user_data {
// if let Ok(s) = str::from_utf8(v) {
// if let Some(mut start) = s.find(USER_DATA_TYPEHASH_KEY) {
// start += USER_DATA_TYPEHASH_KEY.len();
// if let Some(end) = s[start..].find(USER_DATA_PROPS_SEPARATOR) {
// return Some(s[start..(start + end)].into());
// }
// }
// }
// }
// None
// }

pub fn insert_type_hash(qos: &mut Qos, type_hash: &str) {
let mut s = USER_DATA_TYPEHASH_KEY.to_string();
s.push_str(type_hash);
s.push(USER_DATA_PROPS_SEPARATOR);
match qos.user_data {
Some(ref mut v) => v.extend(s.into_bytes().iter()),
None => qos.user_data = Some(s.into_bytes()),
}
}

lazy_static::lazy_static!(
pub static ref CLIENT_ID_COUNTER: AtomicU32 = AtomicU32::default();
);
Expand Down
20 changes: 19 additions & 1 deletion zenoh-plugin-ros2dds/src/ros_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,22 @@ use zenoh::{
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use crate::dds_utils::{ddsrt_iov_len_from_usize, delete_dds_entity, get_guid};
use crate::{
dds_types::DDSRawSample,
gid::Gid,
ros2_utils::{ros_distro_is_less_than, ROS_DISTRO},
ChannelEvent, ROS_DISCOVERY_INFO_PUSH_INTERVAL_MS,
};
use crate::{
dds_utils::{ddsrt_iov_len_from_usize, delete_dds_entity, get_guid},
ros2_utils::{USER_DATA_PROPS_SEPARATOR, USER_DATA_TYPEHASH_KEY},
};

pub const ROS_DISCOVERY_INFO_TOPIC_NAME: &str = "ros_discovery_info";
const ROS_DISCOVERY_INFO_TOPIC_TYPE: &str = "rmw_dds_common::msg::dds_::ParticipantEntitiesInfo_";
// Type hash for rmw_dds_common::msg::dds_::ParticipantEntitiesInfo_ in Iron and Jazzy (might change in future versions)
const ROS_DISCOVERY_INFO_TYPE_HASH: &str =
"RIHS01_91a0593bacdcc50ea9bdcf849a938b128412cc1ea821245c663bcd26f83c295e";

pub struct RosDiscoveryInfoMgr {
reader: dds_entity_t,
Expand Down Expand Up @@ -87,6 +93,16 @@ impl RosDiscoveryInfoMgr {
.unwrap()
.into_raw();

// Since Iron, the Reader/Writer on `ros_discovery_info` topic are expected to have the type hash in USER_DATA QoS
let user_data_qos: Option<Vec<u8>> = if ros_distro_is_less_than("iron") {
None
} else {
let mut s = USER_DATA_TYPEHASH_KEY.to_string();
s.push_str(ROS_DISCOVERY_INFO_TYPE_HASH);
s.push(USER_DATA_PROPS_SEPARATOR);
Some(s.into_bytes())
};

unsafe {
// Create topic (for reader/writer creation)
let t = cdds_create_blob_topic(participant, cton, ctyn, true);
Expand All @@ -108,6 +124,7 @@ impl RosDiscoveryInfoMgr {
qos.ignore_local = Some(IgnoreLocal {
kind: IgnoreLocalKind::PARTICIPANT,
});
qos.user_data = user_data_qos.clone();
let qos_native = qos.to_qos_native();
let reader = dds_create_reader(participant, t, qos_native, std::ptr::null());
Qos::delete_qos_native(qos_native);
Expand Down Expand Up @@ -137,6 +154,7 @@ impl RosDiscoveryInfoMgr {
qos.ignore_local = Some(IgnoreLocal {
kind: IgnoreLocalKind::PARTICIPANT,
});
qos.user_data = user_data_qos.clone();
let qos_native = qos.to_qos_native();
let writer = dds_create_writer(participant, t, qos_native, std::ptr::null());
Qos::delete_qos_native(qos_native);
Expand Down

0 comments on commit ef5b953

Please sign in to comment.