From a15bd57b6f9890791a1aeab842ab297f3032c88a Mon Sep 17 00:00:00 2001 From: Julien Enoch Date: Wed, 11 Dec 2024 10:52:47 +0100 Subject: [PATCH] Route for Service Client: use the new Zenoh Querier and its matching_listener (#356) --- zenoh-plugin-ros2dds/src/route_service_cli.rs | 334 ++++++++++-------- zenoh-plugin-ros2dds/src/route_service_srv.rs | 7 +- 2 files changed, 196 insertions(+), 145 deletions(-) diff --git a/zenoh-plugin-ros2dds/src/route_service_cli.rs b/zenoh-plugin-ros2dds/src/route_service_cli.rs index 10ea0d9..afe5994 100644 --- a/zenoh-plugin-ros2dds/src/route_service_cli.rs +++ b/zenoh-plugin-ros2dds/src/route_service_cli.rs @@ -30,9 +30,9 @@ use zenoh::{ internal::buffers::{Buffer, ZBuf, ZSlice}, key_expr::{keyexpr, OwnedKeyExpr}, liveliness::LivelinessToken, - query::Reply, + query::{Querier, Reply}, sample::Locality, - Session, Wait, + Wait, }; use crate::{ @@ -46,6 +46,7 @@ use crate::{ is_service_for_action, new_service_id, ros2_service_type_to_reply_dds_type, ros2_service_type_to_request_dds_type, CddsRequestHeader, QOS_DEFAULT_SERVICE, }, + ros_discovery::RosDiscoveryInfoMgr, routes_mgr::Context, LOG_PAYLOAD, }; @@ -60,15 +61,15 @@ pub struct RouteServiceCli { ros2_type: String, // the Zenoh key expression used for routing zenoh_key_expr: OwnedKeyExpr, - // the DDS type info (if available) - #[serde(skip)] - type_info: Option>, // the context #[serde(skip)] context: Context, + #[serde(skip)] + _zenoh_querier: Arc>, #[serde(serialize_with = "crate::config::serialize_duration_as_f32")] queries_timeout: Duration, - is_active: bool, + #[serde(skip)] + _matching_listener: zenoh::matching::MatchingListener<()>, // the local DDS Reader receiving client's requests and routing them to Zenoh #[serde(serialize_with = "serialize_atomic_entity_guid")] req_reader: Arc, @@ -113,16 +114,73 @@ impl RouteServiceCli { tracing::debug!( "Route Service Client (ROS:{ros2_name} <-> Zenoh:{zenoh_key_expr}): creation with type {ros2_type} (queries_timeout={queries_timeout:#?})" ); + + let zenoh_querier: Arc> = Arc::new( + context + .zsession + .declare_querier(zenoh_key_expr.clone()) + .congestion_control(zenoh::qos::CongestionControl::Block) + .allowed_destination(Locality::Remote) + .timeout(queries_timeout) + .await + .map_err(|e| format!("Failed create Querier for key {zenoh_key_expr}: {e}",))?, + ); + + let route_id = format!("Route Service Client (ROS:{ros2_name} -> Zenoh:{zenoh_key_expr}"); + + // activate/deactivate DDS Reader/Writer on detection/undetection of matching Subscribers + // (copy/move all required args for the callback) + let rep_writer: Arc = Arc::new(DDS_ENTITY_NULL.into()); + let req_reader: Arc = Arc::new(DDS_ENTITY_NULL.into()); + + let matching_listener = zenoh_querier + .matching_listener() + .callback({ + let rep_writer = rep_writer.clone(); + let req_reader = req_reader.clone(); + let ros2_name = ros2_name.clone(); + let ros2_type = ros2_type.clone(); + let context = context.clone(); + let zquerier = zenoh_querier.clone(); + + move |status| { + tracing::debug!("{route_id} MatchingStatus changed: {status:?}"); + if status.matching() { + if let Err(e) = activate( + &rep_writer, + &req_reader, + &ros2_name, + &ros2_type, + &route_id, + &context, + &type_info, + &zquerier, + ) { + tracing::error!("{route_id}: failed to activate DDS Reader: {e}"); + } + } else { + deactivate( + &rep_writer, + &req_reader, + &route_id, + &context.ros_discovery_mgr, + ) + } + } + }) + .await + .map_err(|e| format!("Route Service Client (ROS:{ros2_name} <-> Zenoh:{zenoh_key_expr}): failed to listen of matching status changes: {e}",))?; + Ok(RouteServiceCli { ros2_name, ros2_type, zenoh_key_expr, - type_info, context, + _zenoh_querier: zenoh_querier, queries_timeout, - is_active: false, - rep_writer: Arc::new(DDS_ENTITY_NULL.into()), - req_reader: Arc::new(DDS_ENTITY_NULL.into()), + _matching_listener: matching_listener, + rep_writer, + req_reader, liveliness_token: None, remote_routes: HashSet::new(), local_nodes: HashSet::new(), @@ -163,116 +221,14 @@ impl RouteServiceCli { self.liveliness_token = None; } - fn activate(&mut self) -> Result<(), String> { - tracing::debug!("{self}: activate"); - // Default Service QoS - let mut qos = QOS_DEFAULT_SERVICE.clone(); - - // Add DATA_USER QoS similarly to rmw_cyclone_dds here: - // https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/rmw_node.cpp#L5028C17-L5028C17 - let server_id_str = new_service_id(&self.context.participant)?; - let user_data = format!("serviceid= {server_id_str};"); - qos.user_data = Some(user_data.into_bytes()); - tracing::debug!( - "{self}: using id '{server_id_str}' => USER_DATA={:?}", - qos.user_data.as_ref().unwrap() - ); - - // create DDS Writer to send replies coming from Zenoh to the Client - let rep_topic_name = format!("rr{}Reply", self.ros2_name); - let rep_type_name = ros2_service_type_to_reply_dds_type(&self.ros2_type); - let rep_writer = create_dds_writer( - self.context.participant, - rep_topic_name, - rep_type_name, - true, - qos.clone(), - )?; - let old = self.rep_writer.swap(rep_writer, Ordering::Relaxed); - if old != DDS_ENTITY_NULL { - tracing::warn!( - "{self}: on activation their was already a DDS Reply Writer - overwrite it" - ); - if let Err(e) = delete_dds_entity(old) { - tracing::warn!("{self}: failed to delete overwritten DDS Reply Writer: {e}"); - } - } - - // add writer's GID in ros_discovery_info message - self.context - .ros_discovery_mgr - .add_dds_writer(get_guid(&rep_writer)?); - - // create DDS Reader to receive requests and route them to Zenoh - let route_id: String = self.to_string(); - let req_topic_name = format!("rq{}Request", self.ros2_name); - let req_type_name = ros2_service_type_to_request_dds_type(&self.ros2_type); - let zenoh_key_expr2 = self.zenoh_key_expr.clone(); - let zsession2 = self.context.zsession.clone(); - let queries_timeout = self.queries_timeout; - let req_reader = create_dds_reader( - self.context.participant, - req_topic_name, - req_type_name, - &self.type_info, - true, - qos, - None, - move |sample| { - route_dds_request_to_zenoh( - &route_id, - sample, - &zenoh_key_expr2, - &zsession2, - queries_timeout, - rep_writer, - ); - }, - )?; - let old = self.req_reader.swap(req_reader, Ordering::Relaxed); - if old != DDS_ENTITY_NULL { - tracing::warn!( - "{self}: on activation their was already a DDS Request Reader - overwrite it" - ); - if let Err(e) = delete_dds_entity(old) { - tracing::warn!("{self}: failed to delete overwritten DDS Request Reader: {e}"); - } - } - - // add reader's GID in ros_discovery_info message - self.context - .ros_discovery_mgr - .add_dds_reader(get_guid(&req_reader)?); - - self.is_active = true; - Ok(()) - } - fn deactivate(&mut self) { - tracing::debug!("{self}: Deactivate"); - let req_reader = self.req_reader.swap(DDS_ENTITY_NULL, Ordering::Relaxed); - if req_reader != DDS_ENTITY_NULL { - // remove reader's GID from ros_discovery_info message - match get_guid(&req_reader) { - Ok(gid) => self.context.ros_discovery_mgr.remove_dds_reader(gid), - Err(e) => tracing::warn!("{self}: {e}"), - } - if let Err(e) = delete_dds_entity(req_reader) { - tracing::warn!("{}: error deleting DDS Reader: {}", self, e); - } - } - let rep_writer = self.rep_writer.swap(DDS_ENTITY_NULL, Ordering::Relaxed); - if rep_writer != DDS_ENTITY_NULL { - // remove writer's GID from ros_discovery_info message - match get_guid(&rep_writer) { - Ok(gid) => self.context.ros_discovery_mgr.remove_dds_writer(gid), - Err(e) => tracing::warn!("{self}: {e}"), - } - if let Err(e) = delete_dds_entity(rep_writer) { - tracing::warn!("{}: error deleting DDS Writer: {}", self, e); - } - } - self.is_active = false; + let route_id = self.to_string(); + deactivate( + &self.rep_writer, + &self.req_reader, + &route_id, + &self.context.ros_discovery_mgr, + ); } #[inline] @@ -280,17 +236,6 @@ impl RouteServiceCli { self.remote_routes .insert(format!("{zenoh_id}:{zenoh_key_expr}")); tracing::debug!("{self}: now serving remote routes {:?}", self.remote_routes); - // if 1st remote node added (i.e. a Server has been announced), activate the route - // NOTE: The route shall not be active if a remote Service Server have not been detected. - // Otherwise, the Client will send a request to this route that will get no reply - // and will drop it, leading the Client to hang (see #62). - // TODO: rather rely on a Querier MatchingStatus (in the same way that it's done for RoutePublisher) - // when available in zenoh... - if self.remote_routes.len() == 1 { - if let Err(e) = self.activate() { - tracing::error!("{self}: activation failed: {e}"); - } - } } #[inline] @@ -342,12 +287,125 @@ impl RouteServiceCli { } } +fn activate( + rep_writer: &Arc, + req_reader: &Arc, + ros2_name: &str, + ros2_type: &str, + route_id: &str, + context: &Context, + type_info: &Option>, + zenoh_querier: &Arc>, +) -> Result<(), String> { + tracing::debug!("{route_id}: activate"); + // Default Service QoS + let mut qos = QOS_DEFAULT_SERVICE.clone(); + + // Add DATA_USER QoS similarly to rmw_cyclone_dds here: + // https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/rmw_node.cpp#L5028C17-L5028C17 + let server_id_str = new_service_id(&context.participant)?; + let user_data = format!("serviceid= {server_id_str};"); + qos.user_data = Some(user_data.into_bytes()); + tracing::debug!( + "{route_id}: using id '{server_id_str}' => USER_DATA={:?}", + qos.user_data.as_ref().unwrap() + ); + + // create DDS Writer to send replies coming from Zenoh to the Client + let rep_topic_name = format!("rr{}Reply", ros2_name); + let rep_type_name = ros2_service_type_to_reply_dds_type(&ros2_type); + let dds_writer = create_dds_writer( + context.participant, + rep_topic_name, + rep_type_name, + true, + qos.clone(), + )?; + let old = rep_writer.swap(dds_writer, Ordering::Relaxed); + if old != DDS_ENTITY_NULL { + tracing::warn!( + "{route_id}: on activation their was already a DDS Reply Writer - overwrite it" + ); + if let Err(e) = delete_dds_entity(old) { + tracing::warn!("{route_id}: failed to delete overwritten DDS Reply Writer: {e}"); + } + } + + // add writer's GID in ros_discovery_info message + context + .ros_discovery_mgr + .add_dds_writer(get_guid(&dds_writer)?); + + // create DDS Reader to receive requests and route them to Zenoh + let req_topic_name = format!("rq{}Request", ros2_name); + let req_type_name = ros2_service_type_to_request_dds_type(&ros2_type); + let zquerier = zenoh_querier.clone(); + let route_id2 = route_id.to_owned(); + let dds_reader = create_dds_reader( + context.participant, + req_topic_name, + req_type_name, + &type_info, + true, + qos, + None, + move |sample| { + route_dds_request_to_zenoh(&route_id2, sample, &zquerier, dds_writer); + }, + )?; + let old = req_reader.swap(dds_reader, Ordering::Relaxed); + if old != DDS_ENTITY_NULL { + tracing::warn!( + "{route_id}: on activation their was already a DDS Request Reader - overwrite it" + ); + if let Err(e) = delete_dds_entity(old) { + tracing::warn!("{route_id}: failed to delete overwritten DDS Request Reader: {e}"); + } + } + + // add reader's GID in ros_discovery_info message + context + .ros_discovery_mgr + .add_dds_reader(get_guid(&dds_reader)?); + + Ok(()) +} + +fn deactivate( + rep_writer: &Arc, + req_reader: &Arc, + route_id: &str, + ros_discovery_mgr: &Arc, +) { + tracing::debug!("{route_id}: Deactivate"); + let req_reader = req_reader.swap(DDS_ENTITY_NULL, Ordering::Relaxed); + if req_reader != DDS_ENTITY_NULL { + // remove reader's GID from ros_discovery_info message + match get_guid(&req_reader) { + Ok(gid) => ros_discovery_mgr.remove_dds_reader(gid), + Err(e) => tracing::warn!("{route_id}: {e}"), + } + if let Err(e) = delete_dds_entity(req_reader) { + tracing::warn!("{route_id}: error deleting DDS Reader: {e}"); + } + } + let rep_writer = rep_writer.swap(DDS_ENTITY_NULL, Ordering::Relaxed); + if rep_writer != DDS_ENTITY_NULL { + // remove writer's GID from ros_discovery_info message + match get_guid(&rep_writer) { + Ok(gid) => ros_discovery_mgr.remove_dds_writer(gid), + Err(e) => tracing::warn!("{route_id}: {e}"), + } + if let Err(e) = delete_dds_entity(rep_writer) { + tracing::warn!("{route_id}: error deleting DDS Writer: {e}"); + } + } +} + fn route_dds_request_to_zenoh( route_id: &str, sample: &DDSRawSample, - zenoh_key_expr: &OwnedKeyExpr, - zsession: &Arc, - query_timeout: Duration, + querier: &Arc>, rep_writer: dds_entity_t, ) { // Request payload is expected to be the Request type encoded as CDR, including a 4 bytes CDR header, @@ -381,20 +439,18 @@ fn route_dds_request_to_zenoh( zenoh_req_buf.push_zslice(payload); if *LOG_PAYLOAD { - tracing::debug!("{route_id}: routing request {request_id} from DDS to Zenoh (timeout:{query_timeout:#?}) - payload: {zenoh_req_buf:02x?}"); + tracing::debug!("{route_id}: routing request {request_id} from DDS to Zenoh - payload: {zenoh_req_buf:02x?}"); } else { tracing::trace!( - "{route_id}: routing request {request_id} from DDS to Zenoh (timeout:{query_timeout:#?}) - {} bytes", + "{route_id}: routing request {request_id} from DDS to Zenoh - {} bytes", zenoh_req_buf.len() ); } - if let Err(e) = zsession - .get(zenoh_key_expr) + if let Err(e) = querier + .get() .payload(zenoh_req_buf) .attachment(request_id.as_attachment()) - .allowed_destination(Locality::Remote) - .timeout(query_timeout) .with({ let route_id1: String = route_id.to_string(); let route_id2 = route_id.to_string(); diff --git a/zenoh-plugin-ros2dds/src/route_service_srv.rs b/zenoh-plugin-ros2dds/src/route_service_srv.rs index 597e053..3e8ee29 100644 --- a/zenoh-plugin-ros2dds/src/route_service_srv.rs +++ b/zenoh-plugin-ros2dds/src/route_service_srv.rs @@ -226,12 +226,7 @@ impl RouteServiceSrv { .zsession .declare_keyexpr(self.zenoh_key_expr.clone()) .await - .map_err(|e| { - format!( - "Route Publisher (ROS:{} -> Zenoh:{}): failed to declare KeyExpr: {e}", - self.ros2_name, self.zenoh_key_expr - ) - })?; + .map_err(|e| format!("{self}: failed to declare KeyExpr: {e}"))?; // create the zenoh Queryable // if Reader is TRANSIENT_LOCAL, use a PublicationCache to store historical data