diff --git a/zenoh-plugin-ros2dds/src/lib.rs b/zenoh-plugin-ros2dds/src/lib.rs index bbf3ac8..32e6456 100644 --- a/zenoh-plugin-ros2dds/src/lib.rs +++ b/zenoh-plugin-ros2dds/src/lib.rs @@ -548,13 +548,24 @@ impl<'a> ROS2PluginRuntime<'a> { if let Some(allowance) = &self.config.allowance { use ROS2DiscoveryEvent::*; match evt { - DiscoveredMsgPub(_, iface) => allowance.is_publisher_allowed(&iface.name), - DiscoveredMsgSub(_, iface) => allowance.is_subscriber_allowed(&iface.name), - DiscoveredServiceSrv(_, iface) => allowance.is_service_srv_allowed(&iface.name), - DiscoveredServiceCli(_, iface) => allowance.is_service_cli_allowed(&iface.name), - DiscoveredActionSrv(_, iface) => allowance.is_action_srv_allowed(&iface.name), - DiscoveredActionCli(_, iface) => allowance.is_action_cli_allowed(&iface.name), - _ => true, // only Undiscovered events remain - always allow them (in case dynamic change of config is supported) + DiscoveredMsgPub(_, iface) | UndiscoveredMsgPub(_, iface) => { + allowance.is_publisher_allowed(&iface.name) + } + DiscoveredMsgSub(_, iface) | UndiscoveredMsgSub(_, iface) => { + allowance.is_subscriber_allowed(&iface.name) + } + DiscoveredServiceSrv(_, iface) | UndiscoveredServiceSrv(_, iface) => { + allowance.is_service_srv_allowed(&iface.name) + } + DiscoveredServiceCli(_, iface) | UndiscoveredServiceCli(_, iface) => { + allowance.is_service_cli_allowed(&iface.name) + } + DiscoveredActionSrv(_, iface) | UndiscoveredActionSrv(_, iface) => { + allowance.is_action_srv_allowed(&iface.name) + } + DiscoveredActionCli(_, iface) | UndiscoveredActionCli(_, iface) => { + allowance.is_action_cli_allowed(&iface.name) + } } } else { // no allow/deny configured => allow all diff --git a/zenoh-plugin-ros2dds/src/route_action_cli.rs b/zenoh-plugin-ros2dds/src/route_action_cli.rs index 9ae59b2..468956c 100644 --- a/zenoh-plugin-ros2dds/src/route_action_cli.rs +++ b/zenoh-plugin-ros2dds/src/route_action_cli.rs @@ -83,7 +83,7 @@ impl RouteActionCli<'_> { format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_SEND_GOAL), format!("{ros2_type}_SendGoal"), &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_SEND_GOAL, - &None, + None, send_goal_queries_timeout, context.clone(), ) @@ -97,7 +97,7 @@ impl RouteActionCli<'_> { format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_CANCEL_GOAL), ROS2_ACTION_CANCEL_GOAL_SRV_TYPE.to_string(), &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_CANCEL_GOAL, - &None, + None, cancel_goal_queries_timeout, context.clone(), ) @@ -111,7 +111,7 @@ impl RouteActionCli<'_> { format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_GET_RESULT), format!("{ros2_type}_GetResult"), &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_GET_RESULT, - &None, + None, get_result_queries_timeout, context.clone(), ) diff --git a/zenoh-plugin-ros2dds/src/route_publisher.rs b/zenoh-plugin-ros2dds/src/route_publisher.rs index 59a427d..993bee5 100644 --- a/zenoh-plugin-ros2dds/src/route_publisher.rs +++ b/zenoh-plugin-ros2dds/src/route_publisher.rs @@ -425,6 +425,7 @@ fn activate_dds_reader( context.ros_discovery_mgr.add_dds_reader(get_guid(&reader)?); if old != DDS_ENTITY_NULL { + log::warn!("{route_id}: on activation their was already a DDS Reader - overwrite it"); if let Err(e) = delete_dds_entity(old) { log::warn!("{route_id}: failed to delete overwritten DDS Reader: {e}"); } diff --git a/zenoh-plugin-ros2dds/src/route_service_cli.rs b/zenoh-plugin-ros2dds/src/route_service_cli.rs index b45d1aa..4008da3 100644 --- a/zenoh-plugin-ros2dds/src/route_service_cli.rs +++ b/zenoh-plugin-ros2dds/src/route_service_cli.rs @@ -14,7 +14,7 @@ use cyclors::dds_entity_t; use serde::Serialize; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use std::{collections::HashSet, fmt}; @@ -29,8 +29,9 @@ use zenoh_core::SyncResolve; use crate::dds_types::{DDSRawSample, TypeInfo}; use crate::dds_utils::{ create_dds_reader, create_dds_writer, dds_write, delete_dds_entity, get_guid, + serialize_atomic_entity_guid, AtomicDDSEntity, }; -use crate::dds_utils::{is_cdr_little_endian, serialize_entity_guid}; +use crate::dds_utils::{is_cdr_little_endian, DDS_ENTITY_NULL}; use crate::liveliness_mgt::new_ke_liveliness_service_cli; use crate::ros2_utils::{ is_service_for_action, new_service_id, ros2_service_type_to_reply_dds_type, @@ -49,6 +50,9 @@ pub struct RouteServiceCli<'a> { ros2_type: String, // the Zenoh key expression used for routing zenoh_key_expr: OwnedKeyExpr, + // the DDS type info (if available) + #[serde(skip)] + type_info: Option>, // the context #[serde(skip)] context: Context, @@ -56,11 +60,11 @@ pub struct RouteServiceCli<'a> { queries_timeout: Duration, is_active: bool, // the local DDS Reader receiving client's requests and routing them to Zenoh - #[serde(serialize_with = "serialize_entity_guid")] - req_reader: dds_entity_t, + #[serde(serialize_with = "serialize_atomic_entity_guid")] + req_reader: Arc, // the local DDS Writer sending replies to the client - #[serde(serialize_with = "serialize_entity_guid")] - rep_writer: dds_entity_t, + #[serde(serialize_with = "serialize_atomic_entity_guid")] + rep_writer: Arc, // a liveliness token associated to this route, for announcement to other plugins #[serde(skip)] liveliness_token: Option>, @@ -72,23 +76,7 @@ pub struct RouteServiceCli<'a> { impl Drop for RouteServiceCli<'_> { fn drop(&mut self) { - // remove reader's GID from ros_discovery_info message - match get_guid(&self.req_reader) { - Ok(gid) => self.context.ros_discovery_mgr.remove_dds_reader(gid), - Err(e) => log::warn!("{self}: {e}"), - } - // remove writer's GID from ros_discovery_info message - match get_guid(&self.rep_writer) { - Ok(gid) => self.context.ros_discovery_mgr.remove_dds_writer(gid), - Err(e) => log::warn!("{self}: {e}"), - } - - if let Err(e) = delete_dds_entity(self.req_reader) { - log::warn!("{}: error deleting DDS Reader: {}", self, e); - } - if let Err(e) = delete_dds_entity(self.rep_writer) { - log::warn!("{}: error deleting DDS Writer: {}", self, e); - } + self.deactivate(); } } @@ -108,54 +96,114 @@ impl RouteServiceCli<'_> { ros2_name: String, ros2_type: String, zenoh_key_expr: OwnedKeyExpr, - type_info: &Option>, + type_info: Option>, queries_timeout: Duration, context: Context, ) -> Result, String> { log::debug!( "Route Service Client (ROS:{ros2_name} <-> Zenoh:{zenoh_key_expr}): creation with type {ros2_type}" ); + Ok(RouteServiceCli { + ros2_name, + ros2_type, + zenoh_key_expr, + type_info, + context, + queries_timeout, + is_active: false, + rep_writer: Arc::new(DDS_ENTITY_NULL.into()), + req_reader: Arc::new(DDS_ENTITY_NULL.into()), + liveliness_token: None, + remote_routes: HashSet::new(), + local_nodes: HashSet::new(), + }) + } + // Announce the route over Zenoh via a LivelinessToken + async fn announce_route(&mut self) -> Result<(), String> { + // if not for an Action (since actions declare their own liveliness) + if !is_service_for_action(&self.ros2_name) { + // create associated LivelinessToken + let liveliness_ke = new_ke_liveliness_service_cli( + &self.context.plugin_id, + &self.zenoh_key_expr, + &self.ros2_type, + )?; + log::debug!("{self}: announce via token {liveliness_ke}"); + let ros2_name = self.ros2_name.clone(); + self.liveliness_token = Some(self.context.zsession + .liveliness() + .declare_token(liveliness_ke) + .res_async() + .await + .map_err(|e| { + format!( + "Failed create LivelinessToken associated to route for Service Client {ros2_name}: {e}" + ) + })? + ); + } + Ok(()) + } + + // Retire the route over Zenoh removing the LivelinessToken + fn retire_route(&mut self) { + log::debug!("{self}: retire"); + // Drop Zenoh Publisher and Liveliness token + // The DDS Writer remains to be discovered by local ROS nodes + self.liveliness_token = None; + } + + fn activate(&mut self) -> Result<(), String> { + log::debug!("{self}: activate"); // Default Service QoS let mut qos = QOS_DEFAULT_SERVICE.clone(); // Add DATA_USER QoS similarly to rmw_cyclone_dds here: // https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/rmw_node.cpp#L5028C17-L5028C17 - let server_id_str = new_service_id(&context.participant)?; + let server_id_str = new_service_id(&self.context.participant)?; let user_data = format!("serviceid= {server_id_str};"); qos.user_data = Some(user_data.into_bytes()); log::debug!( - "Route Service Client (ROS:{ros2_name} <-> Zenoh:{zenoh_key_expr}): using id '{server_id_str}' => USER_DATA={:?}", qos.user_data.as_ref().unwrap() + "{self}: using id '{server_id_str}' => USER_DATA={:?}", + qos.user_data.as_ref().unwrap() ); // create DDS Writer to send replies coming from Zenoh to the Client - let rep_topic_name = format!("rr{ros2_name}Reply"); - let rep_type_name = ros2_service_type_to_reply_dds_type(&ros2_type); + let rep_topic_name = format!("rr{}Reply", self.ros2_name); + let rep_type_name = ros2_service_type_to_reply_dds_type(&self.ros2_type); let rep_writer = create_dds_writer( - context.participant, + self.context.participant, rep_topic_name, rep_type_name, true, qos.clone(), )?; + let old = self.rep_writer.swap(rep_writer, Ordering::Relaxed); + if old != DDS_ENTITY_NULL { + log::warn!("{self}: on activation their was already a DDS Reply Writer - overwrite it"); + if let Err(e) = delete_dds_entity(old) { + log::warn!("{self}: failed to delete overwritten DDS Reply Writer: {e}"); + } + } + // add writer's GID in ros_discovery_info message - context + self.context .ros_discovery_mgr .add_dds_writer(get_guid(&rep_writer)?); - let route_id: String = - format!("Route Service Client (ROS:{ros2_name} <-> Zenoh:{zenoh_key_expr})",); - // create DDS Reader to receive requests and route them to Zenoh - let req_topic_name = format!("rq{ros2_name}Request"); - let req_type_name = ros2_service_type_to_request_dds_type(&ros2_type); - let zenoh_key_expr2 = zenoh_key_expr.clone(); - let zsession2 = context.zsession.clone(); + let route_id: String = self.to_string(); + let req_topic_name = format!("rq{}Request", self.ros2_name); + let req_type_name = ros2_service_type_to_request_dds_type(&self.ros2_type); + let zenoh_key_expr2 = self.zenoh_key_expr.clone(); + let zsession2 = self.context.zsession.clone(); + let queries_timeout = self.queries_timeout; let req_reader = create_dds_reader( - context.participant, + self.context.participant, req_topic_name, req_type_name, - type_info, + &self.type_info, true, qos, None, @@ -170,76 +218,77 @@ impl RouteServiceCli<'_> { ); }, )?; + let old = self.req_reader.swap(req_reader, Ordering::Relaxed); + if old != DDS_ENTITY_NULL { + log::warn!( + "{self}: on activation their was already a DDS Request Reader - overwrite it" + ); + if let Err(e) = delete_dds_entity(old) { + log::warn!("{self}: failed to delete overwritten DDS Request Reader: {e}"); + } + } + // add reader's GID in ros_discovery_info message - context + self.context .ros_discovery_mgr .add_dds_reader(get_guid(&req_reader)?); - Ok(RouteServiceCli { - ros2_name, - ros2_type, - zenoh_key_expr, - context, - queries_timeout, - is_active: false, - rep_writer, - req_reader, - liveliness_token: None, - remote_routes: HashSet::new(), - local_nodes: HashSet::new(), - }) - } - - // Announce the route over Zenoh via a LivelinessToken - async fn announce_route(&mut self) -> Result<(), String> { self.is_active = true; - - // if not for an Action (since actions declare their own liveliness) - if !is_service_for_action(&self.ros2_name) { - // create associated LivelinessToken - let liveliness_ke = new_ke_liveliness_service_cli( - &self.context.plugin_id, - &self.zenoh_key_expr, - &self.ros2_type, - )?; - log::debug!("{self} announce via token {liveliness_ke}"); - let ros2_name = self.ros2_name.clone(); - self.liveliness_token = Some(self.context.zsession - .liveliness() - .declare_token(liveliness_ke) - .res_async() - .await - .map_err(|e| { - format!( - "Failed create LivelinessToken associated to route for Service Client {ros2_name}: {e}" - ) - })? - ); - } Ok(()) } - // Retire the route over Zenoh removing the LivelinessToken - fn retire_route(&mut self) { - log::debug!("{self} retire"); - // Drop Zenoh Publisher and Liveliness token - // The DDS Writer remains to be discovered by local ROS nodes + fn deactivate(&mut self) { + log::debug!("{self}: Deactivate"); + let req_reader = self.req_reader.swap(DDS_ENTITY_NULL, Ordering::Relaxed); + if req_reader != DDS_ENTITY_NULL { + // remove reader's GID from ros_discovery_info message + match get_guid(&req_reader) { + Ok(gid) => self.context.ros_discovery_mgr.remove_dds_reader(gid), + Err(e) => log::warn!("{self}: {e}"), + } + if let Err(e) = delete_dds_entity(req_reader) { + log::warn!("{}: error deleting DDS Reader: {}", self, e); + } + } + let rep_writer = self.rep_writer.swap(DDS_ENTITY_NULL, Ordering::Relaxed); + if rep_writer != DDS_ENTITY_NULL { + // remove writer's GID from ros_discovery_info message + match get_guid(&req_reader) { + Ok(gid) => self.context.ros_discovery_mgr.remove_dds_writer(gid), + Err(e) => log::warn!("{self}: {e}"), + } + if let Err(e) = delete_dds_entity(rep_writer) { + log::warn!("{}: error deleting DDS Writer: {}", self, e); + } + } self.is_active = false; - self.liveliness_token = None; } #[inline] pub fn add_remote_route(&mut self, plugin_id: &str, zenoh_key_expr: &keyexpr) { self.remote_routes .insert(format!("{plugin_id}:{zenoh_key_expr}")); - log::debug!("{self} now serving remote routes {:?}", self.remote_routes); + log::debug!("{self}: now serving remote routes {:?}", self.remote_routes); + // if 1st remote node added (i.e. a Server has been announced), activate the route + // NOTE: The route shall not be active if a remote Service Server have not been detected. + // Otherwise, the Client will send a request to this route that will get no reply + // and will drop it, leading the Client to hang (see #62). + if self.remote_routes.len() == 1 { + if let Err(e) = self.activate() { + log::error!("{self}: activation failed: {e}"); + } + } } #[inline] pub fn remove_remote_route(&mut self, plugin_id: &str, zenoh_key_expr: &keyexpr) { self.remote_routes .remove(&format!("{plugin_id}:{zenoh_key_expr}")); - log::debug!("{self} now serving remote routes {:?}", self.remote_routes); + log::debug!("{self}: now serving remote routes {:?}", self.remote_routes); + // if last remote node removed, deactivate the route + if self.remote_routes.is_empty() { + self.deactivate(); + } } #[inline] @@ -250,11 +299,11 @@ impl RouteServiceCli<'_> { #[inline] pub async fn add_local_node(&mut self, node: String) { self.local_nodes.insert(node); - log::debug!("{self} now serving local nodes {:?}", self.local_nodes); - // if 1st local node added, activate the route + log::debug!("{self}: now serving local nodes {:?}", self.local_nodes); + // if 1st local node added, announce the route if self.local_nodes.len() == 1 { if let Err(e) = self.announce_route().await { - log::error!("{self} activation failed: {e}"); + log::error!("{self}: announcement failed: {e}"); } } } @@ -262,8 +311,8 @@ impl RouteServiceCli<'_> { #[inline] pub fn remove_local_node(&mut self, node: &str) { self.local_nodes.remove(node); - log::debug!("{self} now serving local nodes {:?}", self.local_nodes); - // if last local node removed, deactivate the route + log::debug!("{self}: now serving local nodes {:?}", self.local_nodes); + // if last local node removed, retire the route if self.local_nodes.is_empty() { self.retire_route(); } diff --git a/zenoh-plugin-ros2dds/src/routes_mgr.rs b/zenoh-plugin-ros2dds/src/routes_mgr.rs index 3fd733a..657fe0f 100644 --- a/zenoh-plugin-ros2dds/src/routes_mgr.rs +++ b/zenoh-plugin-ros2dds/src/routes_mgr.rs @@ -682,7 +682,7 @@ impl<'a> RoutesMgr<'a> { ros2_name.clone(), ros2_type, zenoh_key_expr.clone(), - &None, + None, queries_timeout, self.context.clone(), )