diff --git a/zenoh-plugin-ros2dds/src/dds_utils.rs b/zenoh-plugin-ros2dds/src/dds_utils.rs index b2ac840..4a69f69 100644 --- a/zenoh-plugin-ros2dds/src/dds_utils.rs +++ b/zenoh-plugin-ros2dds/src/dds_utils.rs @@ -32,11 +32,25 @@ use crate::{ vec_into_raw_parts, }; -pub const DDS_ENTITY_NULL: dds_entity_t = 0; - // An atomic dds_entity_t (=i32), for safe concurrent creation/deletion of DDS entities pub type AtomicDDSEntity = AtomicI32; +pub const DDS_ENTITY_NULL: dds_entity_t = 0; +pub const CDR_HEADER_LE: [u8; 4] = [0, 1, 0, 0]; +pub const CDR_HEADER_BE: [u8; 4] = [0, 0, 0, 0]; + +/// Return None if the buffer is shorter than a CDR header (4 bytes). +/// Otherwise, return true if the encoding flag (last bit of 2nd byte) corresponds little endian +pub fn is_cdr_little_endian(cdr_buffer: &[u8]) -> Option { + // Per DDSI spec ยง10.2 (https://www.omg.org/spec/DDSI-RTPS/2.5/PDF), + // the endianness flag is the last bit of the RepresentationOptions (2 last octets) + if cdr_buffer.len() > 3 { + Some(cdr_buffer[1] & 1 > 0) + } else { + None + } +} + pub fn ddsrt_iov_len_to_usize(len: ddsrt_iov_len_t) -> Result { // Depending the platform ddsrt_iov_len_t can have different typedef // See https://github.com/eclipse-cyclonedds/cyclonedds/blob/master/src/ddsrt/include/dds/ddsrt/iovec.h diff --git a/zenoh-plugin-ros2dds/src/ros2_utils.rs b/zenoh-plugin-ros2dds/src/ros2_utils.rs index f72cb0c..a88ad10 100644 --- a/zenoh-plugin-ros2dds/src/ros2_utils.rs +++ b/zenoh-plugin-ros2dds/src/ros2_utils.rs @@ -12,11 +12,6 @@ // ZettaScale Zenoh Team, // -use std::{ - env::VarError, - sync::atomic::{AtomicU32, Ordering}, -}; - use cyclors::{ dds_entity_t, qos::{ @@ -25,7 +20,13 @@ use cyclors::{ DDS_INFINITE_TIME, }, }; +use std::{ + env::VarError, + sync::atomic::{AtomicU32, Ordering}, +}; use zenoh::prelude::{keyexpr, KeyExpr, OwnedKeyExpr}; +use zenoh::sample::Attachment; +use zenoh_core::{bail, zresult::ZError}; use crate::{config::Config, dds_utils::get_guid, ke_for_sure}; @@ -164,6 +165,112 @@ pub fn dds_type_to_ros2_action_type(dds_topic: &str) -> String { ) } +const ATTACHMENT_KEY_REQUEST_HEADER: [u8; 3] = [0x72, 0x71, 0x68]; // "rqh" in ASCII + +/// In rmw_cyclonedds_cpp a cdds_request_header sent within each request and reply payload. +/// See https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/serdata.hpp#L73 +/// Note that it's different from the rmw_request_id_t defined in RMW interfaces in +/// https://github.com/ros2/rmw/blob/9b3d9d0e3021b7a6e75d8886e3e061a53c36c789/rmw/include/rmw/types.h#L360 +#[derive(Clone, Copy, Hash, PartialEq, Eq)] +pub struct CddsRequestHeader { + // The header contains a u64 GUID (Client's) and a i64 sequence number. + // Keep those as a single buffer, as it's transfered as such between DDS and Zenoh. + header: [u8; 16], + // the sequence number is subject to endianness, we need to keep a flag for it + is_little_endian: bool, +} + +impl CddsRequestHeader { + pub fn create(client_id: u64, seq_num: u64, is_little_endian: bool) -> CddsRequestHeader { + let mut header = [0u8; 16]; + if is_little_endian { + header[..8].copy_from_slice(&client_id.to_le_bytes()); + header[8..].copy_from_slice(&seq_num.to_le_bytes()) + } else { + header[..8].copy_from_slice(&client_id.to_be_bytes()); + header[8..].copy_from_slice(&seq_num.to_be_bytes()) + } + CddsRequestHeader { + header, + is_little_endian, + } + } + + pub fn from_slice(header: [u8; 16], is_little_endian: bool) -> CddsRequestHeader { + CddsRequestHeader { + header, + is_little_endian, + } + } + + pub fn is_little_endian(&self) -> bool { + self.is_little_endian + } + + pub fn as_slice(&self) -> &[u8] { + &self.header + } + + pub fn as_attachment(&self) -> Attachment { + let mut attach = Attachment::new(); + + // concat header + endianness flag + let mut buf = [0u8; 17]; + buf[0..16].copy_from_slice(&self.header); + buf[16] = self.is_little_endian as u8; + + attach.insert(&ATTACHMENT_KEY_REQUEST_HEADER, &buf); + attach + } +} + +impl TryFrom<&Attachment> for CddsRequestHeader { + type Error = ZError; + fn try_from(value: &Attachment) -> Result { + match value.get(&ATTACHMENT_KEY_REQUEST_HEADER) { + Some(buf) => { + if buf.len() == 17 { + let header: [u8; 16] = buf[0..16] + .try_into() + .expect("Shouldn't happen: buf is 17 bytes"); + Ok(CddsRequestHeader { + header, + is_little_endian: buf[16] != 0, + }) + } else { + bail!("Attachment 'header' is not 16 bytes: {buf:02x?}") + } + } + None => bail!("No 'header' key found in Attachment"), + } + } +} + +impl std::fmt::Display for CddsRequestHeader { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // a request header is made of 8 bytes client guid + 8 bytes sequence number + // display as such for easier understanding + write!(f, "(")?; + for i in &self.header[0..8] { + write!(f, "{i:02x}")?; + } + let seq_num = if self.is_little_endian { + u64::from_le_bytes( + self.header[8..] + .try_into() + .expect("Shouldn't happen: self.header is 16 bytes"), + ) + } else { + u64::from_be_bytes( + self.header[8..] + .try_into() + .expect("Shouldn't happen: self.header is 16 bytes"), + ) + }; + write!(f, ",{seq_num})",) + } +} + fn ros2_service_default_qos() -> Qos { // Default Service QoS copied from: // https://github.com/ros2/rmw/blob/83445be486deae8c78d275e092eafb4bf380bd49/rmw/include/rmw/qos_profiles.h#L64C44-L64C44 diff --git a/zenoh-plugin-ros2dds/src/route_service_cli.rs b/zenoh-plugin-ros2dds/src/route_service_cli.rs index 31abfa4..ba79266 100644 --- a/zenoh-plugin-ros2dds/src/route_service_cli.rs +++ b/zenoh-plugin-ros2dds/src/route_service_cli.rs @@ -25,14 +25,14 @@ use zenoh::query::Reply; use zenoh_core::SyncResolve; use crate::dds_types::{DDSRawSample, TypeInfo}; -use crate::dds_utils::serialize_entity_guid; use crate::dds_utils::{ create_dds_reader, create_dds_writer, dds_write, delete_dds_entity, get_guid, }; +use crate::dds_utils::{is_cdr_little_endian, serialize_entity_guid}; use crate::liveliness_mgt::new_ke_liveliness_service_cli; use crate::ros2_utils::{ is_service_for_action, new_service_id, ros2_service_type_to_reply_dds_type, - ros2_service_type_to_request_dds_type, QOS_DEFAULT_SERVICE, + ros2_service_type_to_request_dds_type, CddsRequestHeader, QOS_DEFAULT_SERVICE, }; use crate::routes_mgr::Context; use crate::LOG_PAYLOAD; @@ -280,7 +280,7 @@ fn route_dds_request_to_zenoh( sample: &DDSRawSample, zenoh_key_expr: &OwnedKeyExpr, zsession: &Arc, - queries_timeout: Duration, + query_timeout: Duration, rep_writer: dds_entity_t, ) { // request payload is expected to be the Request type encoded as CDR, including a 4 bytes header, @@ -293,7 +293,14 @@ fn route_dds_request_to_zenoh( let zbuf: ZBuf = sample.into(); let dds_req_buf = zbuf.contiguous(); - let request_id: [u8; 16] = dds_req_buf[4..20].try_into().unwrap(); + let is_little_endian = + is_cdr_little_endian(&dds_req_buf).expect("Shouldn't happen: sample.len >= 20"); + let request_id = CddsRequestHeader::from_slice( + dds_req_buf[4..20] + .try_into() + .expect("Shouldn't happen: sample.len >= 20"), + is_little_endian, + ); // route request buffer stripped from request_id (client_id + sequence_number) let mut zenoh_req_buf = ZBuf::empty(); @@ -304,10 +311,10 @@ fn route_dds_request_to_zenoh( zenoh_req_buf.push_zslice(slice.subslice(20, slice.len()).unwrap()); if *LOG_PAYLOAD { - log::debug!("{route_id}: routing request {request_id:02x?} from DDS to Zenoh - payload: {zenoh_req_buf:02x?}"); + log::debug!("{route_id}: routing request {request_id} from DDS to Zenoh - payload: {zenoh_req_buf:02x?}"); } else { log::trace!( - "{route_id}: routing request {request_id:02x?} from DDS to Zenoh - {} bytes", + "{route_id}: routing request {request_id} from DDS to Zenoh - {} bytes", zenoh_req_buf.len() ); } @@ -316,21 +323,22 @@ fn route_dds_request_to_zenoh( if let Err(e) = zsession .get(zenoh_key_expr) .with_value(zenoh_req_buf) + .with_attachment(request_id.as_attachment()) .allowed_destination(Locality::Remote) - .timeout(queries_timeout) + .timeout(query_timeout) .callback(move |reply| { route_zenoh_reply_to_dds(route_id2.clone(), reply, request_id, rep_writer) }) .res_sync() { - log::warn!("{route_id}: routing request {request_id:02x?} from DDS to Zenoh failed: {e}"); + log::warn!("{route_id}: routing request {request_id} from DDS to Zenoh failed: {e}"); } } fn route_zenoh_reply_to_dds( route_id: String, reply: Reply, - request_id: [u8; 16], + request_id: CddsRequestHeader, rep_writer: dds_entity_t, ) { match reply.sample { @@ -338,7 +346,7 @@ fn route_zenoh_reply_to_dds( let zenoh_rep_buf = sample.payload.contiguous(); if zenoh_rep_buf.len() < 4 || zenoh_rep_buf[1] > 1 { log::warn!( - "{route_id}: received invalid reply from Zenoh for {request_id:02x?}: {zenoh_rep_buf:0x?}" + "{route_id}: received invalid reply from Zenoh for {request_id}: {zenoh_rep_buf:0x?}" ); return; } @@ -347,27 +355,27 @@ fn route_zenoh_reply_to_dds( // copy CDR header dds_rep_buf.extend_from_slice(&zenoh_rep_buf[..4]); // add request_id - dds_rep_buf.extend_from_slice(&request_id); + dds_rep_buf.extend_from_slice(request_id.as_slice()); // add query payoad dds_rep_buf.extend_from_slice(&zenoh_rep_buf[4..]); if *LOG_PAYLOAD { - log::debug!("{route_id}: routing reply for {request_id:02x?} from Zenoh to DDS - payload: {dds_rep_buf:02x?}"); + log::debug!("{route_id}: routing reply for {request_id} from Zenoh to DDS - payload: {dds_rep_buf:02x?}"); } else { log::trace!( - "{route_id}: routing reply for {request_id:02x?} from Zenoh to DDS - {} bytes", + "{route_id}: routing reply for {request_id} from Zenoh to DDS - {} bytes", dds_rep_buf.len() ); } if let Err(e) = dds_write(rep_writer, dds_rep_buf) { log::warn!( - "{route_id}: routing reply for {request_id:02x?} from Zenoh to DDS failed: {e}" + "{route_id}: routing reply for {request_id} from Zenoh to DDS failed: {e}" ); } } Err(val) => { - log::warn!("{route_id}: received error as reply for {request_id:02x?}: {val}"); + log::warn!("{route_id}: received error as reply for {request_id}: {val}"); } } } diff --git a/zenoh-plugin-ros2dds/src/route_service_srv.rs b/zenoh-plugin-ros2dds/src/route_service_srv.rs index a0580e0..8c57746 100644 --- a/zenoh-plugin-ros2dds/src/route_service_srv.rs +++ b/zenoh-plugin-ros2dds/src/route_service_srv.rs @@ -27,15 +27,15 @@ use zenoh::queryable::{Query, Queryable}; use zenoh_core::zwrite; use crate::dds_types::{DDSRawSample, TypeInfo}; -use crate::dds_utils::serialize_entity_guid; use crate::dds_utils::{ create_dds_reader, create_dds_writer, dds_write, delete_dds_entity, get_guid, - get_instance_handle, + get_instance_handle, CDR_HEADER_BE, CDR_HEADER_LE, }; +use crate::dds_utils::{is_cdr_little_endian, serialize_entity_guid}; use crate::liveliness_mgt::new_ke_liveliness_service_srv; use crate::ros2_utils::{ is_service_for_action, new_service_id, ros2_service_type_to_reply_dds_type, - ros2_service_type_to_request_dds_type, QOS_DEFAULT_SERVICE, + ros2_service_type_to_request_dds_type, CddsRequestHeader, QOS_DEFAULT_SERVICE, }; use crate::routes_mgr::Context; use crate::{serialize_option_as_bool, LOG_PAYLOAD}; @@ -62,7 +62,7 @@ pub struct RouteServiceSrv<'a> { // the local DDS Reader receiving replies from the service server #[serde(serialize_with = "serialize_entity_guid")] rep_reader: dds_entity_t, - // the client GUID used in eacch request + // the client GUID used in each request #[serde(skip)] client_guid: u64, // the ROS sequence number for requests @@ -70,7 +70,7 @@ pub struct RouteServiceSrv<'a> { sequence_number: Arc, // queries waiting for a reply #[serde(skip)] - queries_in_progress: Arc>>, + queries_in_progress: Arc>>, // a liveliness token associated to this route, for announcement to other plugins #[serde(skip)] liveliness_token: Option>, @@ -153,12 +153,12 @@ impl RouteServiceSrv<'_> { let client_guid = get_instance_handle(req_writer)?; log::debug!( - "{route_id}: (local client_guid={client_guid}) id='{client_id_str}' => USER_DATA={:?}", + "{route_id}: (local client_guid={client_guid:02x?}) id='{client_id_str}' => USER_DATA={:?}", qos.user_data.as_ref().unwrap() ); // map of queries in progress - let queries_in_progress: Arc>> = + let queries_in_progress: Arc>> = Arc::new(RwLock::new(HashMap::new())); // create DDS Reader to receive replies and route them to Zenoh @@ -181,7 +181,6 @@ impl RouteServiceSrv<'_> { zenoh_key_expr.clone(), &mut zwrite!(queries_in_progress), &route_id, - client_guid, ); } }, @@ -225,7 +224,7 @@ impl RouteServiceSrv<'_> { // create the zenoh Queryable // if Reader is TRANSIENT_LOCAL, use a PublicationCache to store historical data - let queries_in_progress: Arc>> = + let queries_in_progress: Arc>> = self.queries_in_progress.clone(); let sequence_number: Arc = self.sequence_number.clone(); let route_id: String = self.to_string(); @@ -339,82 +338,96 @@ impl RouteServiceSrv<'_> { } } -const CDR_HEADER_LE: [u8; 4] = [0, 1, 0, 0]; - fn route_zenoh_request_to_dds( query: Query, - queries_in_progress: &mut HashMap, + queries_in_progress: &mut HashMap, sequence_number: &AtomicU64, route_id: &str, client_guid: u64, req_writer: i32, ) { - let n = sequence_number.fetch_add(1, Ordering::Relaxed); + // Get expected endianness from the query value: + // if any and if long enoough it shall be the Request type encoded as CDR (including 4 bytes header) + let is_little_endian = match query.value() { + Some(Value { payload, .. }) if payload.len() > 4 => { + is_cdr_little_endian(payload.contiguous().as_ref()).unwrap_or(true) + } + _ => true, + }; + + // Try to get request_id from Query attachment (in case it comes from another bridge). + // Otherwise, create one using client_guid + sequence_number + let request_id = query + .attachment() + .and_then(|a| CddsRequestHeader::try_from(a).ok()) + .unwrap_or_else(|| { + CddsRequestHeader::create( + client_guid, + sequence_number.fetch_add(1, Ordering::Relaxed), + is_little_endian, + ) + }); // prepend request payload with a (client_guid, sequence_number) header as per rmw_cyclonedds here: // https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/serdata.hpp#L73 let dds_req_buf = if let Some(value) = query.value() { - // query payload is expected to be the Request type encoded as CDR (including 4 bytes header) + // The query comes with some payload. It's expected to be the Request type encoded as CDR (including 4 bytes header) let zenoh_req_buf = &*(value.payload.contiguous()); if zenoh_req_buf.len() < 4 || zenoh_req_buf[1] > 1 { log::warn!("{route_id}: received invalid request: {zenoh_req_buf:0x?}"); return; } - let client_guid_bytes = if zenoh_req_buf[1] == 0 { - client_guid.to_be_bytes() - } else { - client_guid.to_le_bytes() - }; + // Send to DDS a buffer made of + // - the same CDR header coming with the query + // - the request_id as request header as per rmw_cyclonedds here: + // https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/serdata.hpp#L73 + // - the remaining of query payload let mut dds_req_buf: Vec = Vec::new(); - // copy CDR header dds_req_buf.extend_from_slice(&zenoh_req_buf[..4]); - // add client_id - dds_req_buf.extend_from_slice(&client_guid_bytes); - // add sequence_number (endianness depending on header) - if zenoh_req_buf[1] == 0 { - dds_req_buf.extend_from_slice(&n.to_be_bytes()); - } else { - dds_req_buf.extend_from_slice(&n.to_le_bytes()); - } - // add query payoad + dds_req_buf.extend_from_slice(request_id.as_slice()); dds_req_buf.extend_from_slice(&zenoh_req_buf[4..]); dds_req_buf } else { // No query payload - send a request containing just client_guid + sequence_number - let mut dds_req_buf: Vec = CDR_HEADER_LE.into(); - dds_req_buf.extend_from_slice(&client_guid.to_le_bytes()); - dds_req_buf.extend_from_slice(&n.to_le_bytes()); + // Send to DDS a buffer made of + // - a CDR header + // - the request_id as request header + let mut dds_req_buf: Vec = if request_id.is_little_endian() { + CDR_HEADER_LE.into() + } else { + CDR_HEADER_BE.into() + }; + dds_req_buf.extend_from_slice(request_id.as_slice()); dds_req_buf }; if *LOG_PAYLOAD { log::debug!( - "{route_id}: routing request #{n} from Zenoh to DDS - payload: {dds_req_buf:02x?}" + "{route_id}: routing request {request_id} from Zenoh to DDS - payload: {dds_req_buf:02x?}" ); } else { log::trace!( - "{route_id}: routing request #{n} from Zenoh to DDS - {} bytes", + "{route_id}: routing request {request_id} from Zenoh to DDS - {} bytes", dds_req_buf.len() ); } - queries_in_progress.insert(n, query); + queries_in_progress.insert(request_id, query); if let Err(e) = dds_write(req_writer, dds_req_buf) { log::warn!("{route_id}: routing request from Zenoh to DDS failed: {e}"); - queries_in_progress.remove(&n); + queries_in_progress.remove(&request_id); } } fn route_dds_reply_to_zenoh( sample: &DDSRawSample, zenoh_key_expr: OwnedKeyExpr, - queries_in_progress: &mut HashMap, + queries_in_progress: &mut HashMap, route_id: &str, - client_guid: u64, ) { // reply payload is expected to be the Response type encoded as CDR, including a 4 bytes header, - // the client guid (8 bytes) and a sequence_number (8 bytes). As per rmw_cyclonedds here: + // the request id as header (16 bytes). As per rmw_cyclonedds here: // https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/serdata.hpp#L73 if sample.len() < 20 { log::warn!("{route_id}: received invalid response from DDS: {sample:0x?}"); @@ -424,28 +437,17 @@ fn route_dds_reply_to_zenoh( let zbuf: ZBuf = sample.into(); let dds_rep_buf = zbuf.contiguous(); let cdr_header = &dds_rep_buf[..4]; - let guid = if dds_rep_buf[1] == 0 { - u64::from_be_bytes(dds_rep_buf[4..12].try_into().unwrap()) - } else { - u64::from_le_bytes(dds_rep_buf[4..12].try_into().unwrap()) - }; - - let seq_num = if cdr_header[1] == 0 { - u64::from_be_bytes(dds_rep_buf[12..20].try_into().unwrap()) - } else { - u64::from_le_bytes(dds_rep_buf[12..20].try_into().unwrap()) - }; - - if guid != client_guid { - // The reply is for another client than this bridge... - // Could happen since the same topic is used for this service replies to all clients! - // Just drop it. - log::trace!( - "{route_id}: received response for another client: {guid:0x?} (me: {client_guid:0x?})" - ); - return; - } - match queries_in_progress.remove(&seq_num) { + let is_little_endian = + is_cdr_little_endian(cdr_header).expect("Shouldn't happen: cdr_header is 4 bytes"); + let request_id = CddsRequestHeader::from_slice( + dds_rep_buf[4..20] + .try_into() + .expect("Shouldn't happen: slice is 16 bytes"), + is_little_endian, + ); + + // Check if it's one of my queries in progress. Drop otherwise + match queries_in_progress.remove(&request_id) { Some(query) => { use zenoh_core::SyncResolve; let slice: ZSlice = dds_rep_buf.into_owned().into(); @@ -454,10 +456,10 @@ fn route_dds_reply_to_zenoh( zenoh_rep_buf.push_zslice(slice.subslice(20, slice.len()).unwrap()); if *LOG_PAYLOAD { - log::debug!("{route_id}: routing reply #{seq_num} from DDS to Zenoh - payload: {zenoh_rep_buf:02x?}"); + log::debug!("{route_id}: routing reply {request_id} from DDS to Zenoh - payload: {zenoh_rep_buf:02x?}"); } else { log::trace!( - "{route_id}: routing reply #{seq_num} from DDS to Zenoh - {} bytes", + "{route_id}: routing reply {request_id} from DDS to Zenoh - {} bytes", zenoh_rep_buf.len() ); } @@ -466,11 +468,11 @@ fn route_dds_reply_to_zenoh( .reply(Ok(Sample::new(zenoh_key_expr, zenoh_rep_buf))) .res_sync() { - log::warn!("{route_id}: routing reply for request #{seq_num} from DDS to Zenoh failed: {e}"); + log::warn!("{route_id}: routing reply for request {request_id} from DDS to Zenoh failed: {e}"); } } - None => log::warn!( - "{route_id}: received response from DDS an unknown query (already timed out ?): #{seq_num}" + None => log::trace!( + "{route_id}: received response from DDS an unknown query: {request_id} - ignore it" ), } }