Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add type hash support (pub/sub only) #349

Merged
merged 8 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions zenoh-plugin-ros2dds/src/liveliness_mgt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use zenoh::key_expr::{
keyexpr, OwnedKeyExpr,
};

use crate::ros2_utils::ros_distro_is_less_than;

const SLASH_REPLACEMSNT_CHAR: &str = "§";

kedefine!(
Expand Down Expand Up @@ -195,7 +197,7 @@ fn unescape_slashes(ke: &keyexpr) -> OwnedKeyExpr {
// NOTE: only significant Qos for ROS2 are serialized
// See https://docs.ros.org/en/rolling/Concepts/Intermediate/About-Quality-of-Service-Settings.html
//
// format: "<keyless>:<ReliabilityKind>:<DurabilityKind>:<HistoryKid>,<HistoryDepth>"
// format: "<keyless>:<ReliabilityKind>:<DurabilityKind>:<HistoryKid>,<HistoryDepth>[:<UserData>]"
// where each element is "" if default QoS, or an integer in case of enum, and 'K' for !keyless
pub fn qos_to_key_expr(keyless: bool, qos: &Qos) -> OwnedKeyExpr {
use std::io::Write;
Expand All @@ -217,6 +219,13 @@ pub fn qos_to_key_expr(keyless: bool, qos: &Qos) -> OwnedKeyExpr {
write!(&mut w, "{},{}", *kind as isize, depth).unwrap();
}

// Since Iron USER_DATA QoS contains the type_hash and must be forwarded to remote bridge for Reader/Writer creation
if !ros_distro_is_less_than("iron") {
if let Some(v) = &qos.user_data {
write!(&mut w, ":{}", String::from_utf8_lossy(v)).unwrap();
}
}

unsafe {
let s: String = String::from_utf8_unchecked(w);
OwnedKeyExpr::from_string_unchecked(s)
Expand All @@ -225,8 +234,8 @@ pub fn qos_to_key_expr(keyless: bool, qos: &Qos) -> OwnedKeyExpr {

fn key_expr_to_qos(ke: &keyexpr) -> Result<(bool, Qos), String> {
let elts: Vec<&str> = ke.split(':').collect();
if elts.len() != 4 {
return Err(format!("Internal Error: unexpected QoS expression: '{ke}' - 4 elements between : were expected"));
if elts.len() < 4 {
return Err(format!("Internal Error: unexpected QoS expression: '{ke}' - at least 4 elements between ':' were expected"));
}
let mut qos = Qos::default();
let keyless = elts[0].is_empty();
Expand All @@ -253,6 +262,10 @@ fn key_expr_to_qos(ke: &keyexpr) -> Result<(bool, Qos), String> {
_ => return Err(format!("Internal Error: unexpected QoS expression: '{ke}' - failed to parse History in 4th element")),
}
}
// The USER_DATA might be present as 5th element
if elts.len() > 4 && !elts[4].is_empty() {
qos.user_data = Some(elts[4].into());
}

Ok((keyless, qos))
}
Expand Down
53 changes: 51 additions & 2 deletions zenoh-plugin-ros2dds/src/ros2_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::{
env::VarError,
str,
sync::atomic::{AtomicU32, Ordering},
};

Expand All @@ -36,10 +37,18 @@ use crate::{config::Config, dds_utils::get_guid};

pub const ROS2_ACTION_CANCEL_GOAL_SRV_TYPE: &str = "action_msgs/srv/CancelGoal";
pub const ROS2_ACTION_STATUS_MSG_TYPE: &str = "action_msgs/msg/GoalStatusArray";
// Type hash for action_msgs/msg/GoalStatusArray in Iron and Jazzy (might change in future versions)
pub const ROS2_ACTION_STATUS_MSG_TYPE_HASH: &str =
"RIHS01_91a0593bacdcc50ea9bdcf849a938b128412cc1ea821245c663bcd26f83c295e";

// ROS_DISTRO value assumed if the environment variable is not set
pub const ASSUMED_ROS_DISTRO: &str = "iron";

// Separator used by ROS 2 in USER_DATA QoS
pub const USER_DATA_PROPS_SEPARATOR: char = ';';
// Key for type hash used by ROS 2 in USER_DATA QoS
pub const USER_DATA_TYPEHASH_KEY: &str = "typehash=";

lazy_static::lazy_static!(
pub static ref ROS_DISTRO: String = get_ros_distro();

Expand Down Expand Up @@ -312,7 +321,7 @@ fn ros2_service_default_qos() -> Qos {
}

fn ros2_action_feedback_default_qos() -> Qos {
Qos {
let mut qos = Qos {
history: Some(History {
kind: HistoryKind::KEEP_LAST,
depth: 10,
Expand All @@ -337,13 +346,24 @@ fn ros2_action_feedback_default_qos() -> Qos {
kind: IgnoreLocalKind::PARTICIPANT,
}),
..Default::default()
};
if !ros_distro_is_less_than("iron") {
// NOTE: the type hash should be a real one instead of this invalid type hash.
// However, `rmw_cyclonedds_cpp` doesn't do any type checking (yet).
// And the way to forward the type hash for actions (and services) raise questions
// that are described in https://github.com/eclipse-zenoh/zenoh-plugin-ros2dds/pull/349
insert_type_hash(
&mut qos,
"RIHS01_0000000000000000000000000000000000000000000000000000000000000000",
);
}
qos
}

fn ros2_action_status_default_qos() -> Qos {
// Default Status topic QoS copied from:
// https://github.com/ros2/rcl/blob/8f7f4f0804a34ee9d9ecd2d7e75a57ce2b7ced5d/rcl_action/include/rcl_action/default_qos.h#L30
Qos {
let mut qos = Qos {
durability: Some(Durability {
kind: DurabilityKind::TRANSIENT_LOCAL,
}),
Expand All @@ -367,7 +387,12 @@ fn ros2_action_status_default_qos() -> Qos {
kind: IgnoreLocalKind::PARTICIPANT,
}),
..Default::default()
};
if !ros_distro_is_less_than("iron") {
// add type_hash in USER_DATA QoS
insert_type_hash(&mut qos, ROS2_ACTION_STATUS_MSG_TYPE_HASH);
}
qos
}

pub fn is_service_for_action(ros2_service_name: &str) -> bool {
Expand All @@ -393,6 +418,30 @@ pub fn check_ros_name(name: &str) -> Result<(), String> {
}
}

pub fn extract_type_hash(qos: &Qos) -> Option<String> {
JEnoch marked this conversation as resolved.
Show resolved Hide resolved
if let Some(v) = &qos.user_data {
if let Ok(s) = str::from_utf8(v) {
if let Some(mut start) = s.find(USER_DATA_TYPEHASH_KEY) {
start += USER_DATA_TYPEHASH_KEY.len();
if let Some(end) = s[start..].find(USER_DATA_PROPS_SEPARATOR) {
return Some(s[start..(start + end)].into());
}
}
}
}
None
}

pub fn insert_type_hash(qos: &mut Qos, type_hash: &str) {
let mut s = USER_DATA_TYPEHASH_KEY.to_string();
s.push_str(type_hash);
s.push(USER_DATA_PROPS_SEPARATOR);
match qos.user_data {
Some(ref mut v) => v.extend(s.into_bytes().iter()),
None => qos.user_data = Some(s.into_bytes()),
}
}

lazy_static::lazy_static!(
pub static ref CLIENT_ID_COUNTER: AtomicU32 = AtomicU32::default();
);
Expand Down
20 changes: 19 additions & 1 deletion zenoh-plugin-ros2dds/src/ros_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,22 @@ use zenoh::{
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use crate::dds_utils::{ddsrt_iov_len_from_usize, delete_dds_entity, get_guid};
use crate::{
dds_types::DDSRawSample,
gid::Gid,
ros2_utils::{ros_distro_is_less_than, ROS_DISTRO},
ChannelEvent, ROS_DISCOVERY_INFO_PUSH_INTERVAL_MS,
};
use crate::{
dds_utils::{ddsrt_iov_len_from_usize, delete_dds_entity, get_guid},
ros2_utils::{USER_DATA_PROPS_SEPARATOR, USER_DATA_TYPEHASH_KEY},
};

pub const ROS_DISCOVERY_INFO_TOPIC_NAME: &str = "ros_discovery_info";
const ROS_DISCOVERY_INFO_TOPIC_TYPE: &str = "rmw_dds_common::msg::dds_::ParticipantEntitiesInfo_";
// Type hash for rmw_dds_common::msg::dds_::ParticipantEntitiesInfo_ in Iron and Jazzy (might change in future versions)
const ROS_DISCOVERY_INFO_TYPE_HASH: &str =
"RIHS01_91a0593bacdcc50ea9bdcf849a938b128412cc1ea821245c663bcd26f83c295e";

pub struct RosDiscoveryInfoMgr {
reader: dds_entity_t,
Expand Down Expand Up @@ -87,6 +93,16 @@ impl RosDiscoveryInfoMgr {
.unwrap()
.into_raw();

// Since Iron, the Reader/Writer on `ros_discovery_info` topic are expected to have the type hash in USER_DATA QoS
let user_data_qos: Option<Vec<u8>> = if ros_distro_is_less_than("iron") {
None
} else {
let mut s = USER_DATA_TYPEHASH_KEY.to_string();
s.push_str(ROS_DISCOVERY_INFO_TYPE_HASH);
s.push(USER_DATA_PROPS_SEPARATOR);
Some(s.into_bytes())
};

unsafe {
// Create topic (for reader/writer creation)
let t = cdds_create_blob_topic(participant, cton, ctyn, true);
Expand All @@ -108,6 +124,7 @@ impl RosDiscoveryInfoMgr {
qos.ignore_local = Some(IgnoreLocal {
kind: IgnoreLocalKind::PARTICIPANT,
});
qos.user_data = user_data_qos.clone();
let qos_native = qos.to_qos_native();
let reader = dds_create_reader(participant, t, qos_native, std::ptr::null());
Qos::delete_qos_native(qos_native);
Expand Down Expand Up @@ -137,6 +154,7 @@ impl RosDiscoveryInfoMgr {
qos.ignore_local = Some(IgnoreLocal {
kind: IgnoreLocalKind::PARTICIPANT,
});
qos.user_data = user_data_qos.clone();
let qos_native = qos.to_qos_native();
let writer = dds_create_writer(participant, t, qos_native, std::ptr::null());
Qos::delete_qos_native(qos_native);
Expand Down
Loading