From 4a08b9895b6772be74ea2593273048511c5ea4c5 Mon Sep 17 00:00:00 2001 From: Julien Enoch Date: Tue, 23 Jan 2024 09:32:25 +0100 Subject: [PATCH] RouteServiceCli: create DDS Reader/Writer only if a remote Server is announced (fix #62) (#63) --- zenoh-plugin-ros2dds/src/lib.rs | 25 +- zenoh-plugin-ros2dds/src/route_action_cli.rs | 19 +- zenoh-plugin-ros2dds/src/route_action_srv.rs | 13 +- zenoh-plugin-ros2dds/src/route_publisher.rs | 3 +- zenoh-plugin-ros2dds/src/route_service_cli.rs | 293 ++++++++++++------ zenoh-plugin-ros2dds/src/route_service_srv.rs | 13 +- zenoh-plugin-ros2dds/src/route_subscriber.rs | 10 +- zenoh-plugin-ros2dds/src/routes_mgr.rs | 2 +- 8 files changed, 252 insertions(+), 126 deletions(-) diff --git a/zenoh-plugin-ros2dds/src/lib.rs b/zenoh-plugin-ros2dds/src/lib.rs index bbf3ac8..32e6456 100644 --- a/zenoh-plugin-ros2dds/src/lib.rs +++ b/zenoh-plugin-ros2dds/src/lib.rs @@ -548,13 +548,24 @@ impl<'a> ROS2PluginRuntime<'a> { if let Some(allowance) = &self.config.allowance { use ROS2DiscoveryEvent::*; match evt { - DiscoveredMsgPub(_, iface) => allowance.is_publisher_allowed(&iface.name), - DiscoveredMsgSub(_, iface) => allowance.is_subscriber_allowed(&iface.name), - DiscoveredServiceSrv(_, iface) => allowance.is_service_srv_allowed(&iface.name), - DiscoveredServiceCli(_, iface) => allowance.is_service_cli_allowed(&iface.name), - DiscoveredActionSrv(_, iface) => allowance.is_action_srv_allowed(&iface.name), - DiscoveredActionCli(_, iface) => allowance.is_action_cli_allowed(&iface.name), - _ => true, // only Undiscovered events remain - always allow them (in case dynamic change of config is supported) + DiscoveredMsgPub(_, iface) | UndiscoveredMsgPub(_, iface) => { + allowance.is_publisher_allowed(&iface.name) + } + DiscoveredMsgSub(_, iface) | UndiscoveredMsgSub(_, iface) => { + allowance.is_subscriber_allowed(&iface.name) + } + DiscoveredServiceSrv(_, iface) | UndiscoveredServiceSrv(_, iface) => { + allowance.is_service_srv_allowed(&iface.name) + } + DiscoveredServiceCli(_, iface) | UndiscoveredServiceCli(_, iface) => { + allowance.is_service_cli_allowed(&iface.name) + } + DiscoveredActionSrv(_, iface) | UndiscoveredActionSrv(_, iface) => { + allowance.is_action_srv_allowed(&iface.name) + } + DiscoveredActionCli(_, iface) | UndiscoveredActionCli(_, iface) => { + allowance.is_action_cli_allowed(&iface.name) + } } } else { // no allow/deny configured => allow all diff --git a/zenoh-plugin-ros2dds/src/route_action_cli.rs b/zenoh-plugin-ros2dds/src/route_action_cli.rs index 7d5eb58..468956c 100644 --- a/zenoh-plugin-ros2dds/src/route_action_cli.rs +++ b/zenoh-plugin-ros2dds/src/route_action_cli.rs @@ -83,7 +83,7 @@ impl RouteActionCli<'_> { format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_SEND_GOAL), format!("{ros2_type}_SendGoal"), &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_SEND_GOAL, - &None, + None, send_goal_queries_timeout, context.clone(), ) @@ -97,7 +97,7 @@ impl RouteActionCli<'_> { format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_CANCEL_GOAL), ROS2_ACTION_CANCEL_GOAL_SRV_TYPE.to_string(), &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_CANCEL_GOAL, - &None, + None, cancel_goal_queries_timeout, context.clone(), ) @@ -111,7 +111,7 @@ impl RouteActionCli<'_> { format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_GET_RESULT), format!("{ros2_type}_GetResult"), &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_GET_RESULT, - &None, + None, get_result_queries_timeout, context.clone(), ) @@ -154,7 +154,8 @@ impl RouteActionCli<'_> { }) } - async fn activate(&mut self) -> Result<(), String> { + // Announce the route over Zenoh via a LivelinessToken + async fn announce_route(&mut self) -> Result<(), String> { self.is_active = true; // create associated LivelinessToken @@ -163,6 +164,7 @@ impl RouteActionCli<'_> { &self.zenoh_key_expr_prefix, &self.ros2_type, )?; + log::debug!("{self} announce via token {liveliness_ke}"); let ros2_name = self.ros2_name.clone(); self.liveliness_token = Some(self.context.zsession .liveliness() @@ -178,8 +180,9 @@ impl RouteActionCli<'_> { Ok(()) } - fn deactivate(&mut self) { - log::debug!("{self} deactivate"); + // Retire the route over Zenoh removing the LivelinessToken + fn retire_route(&mut self) { + log::debug!("{self} retire"); // Drop Zenoh Publisher and Liveliness token // The DDS Writer remains to be discovered by local ROS nodes self.is_active = false; @@ -256,7 +259,7 @@ impl RouteActionCli<'_> { log::debug!("{self} now serving local nodes {:?}", self.local_nodes); // if 1st local node added, activate the route if self.local_nodes.len() == 1 { - if let Err(e) = self.activate().await { + if let Err(e) = self.announce_route().await { log::error!("{self} activation failed: {e}"); } } @@ -274,7 +277,7 @@ impl RouteActionCli<'_> { log::debug!("{self} now serving local nodes {:?}", self.local_nodes); // if last local node removed, deactivate the route if self.local_nodes.is_empty() { - self.deactivate(); + self.retire_route(); } } diff --git a/zenoh-plugin-ros2dds/src/route_action_srv.rs b/zenoh-plugin-ros2dds/src/route_action_srv.rs index a0caa25..3905721 100644 --- a/zenoh-plugin-ros2dds/src/route_action_srv.rs +++ b/zenoh-plugin-ros2dds/src/route_action_srv.rs @@ -140,7 +140,8 @@ impl RouteActionSrv<'_> { }) } - async fn activate(&mut self) -> Result<(), String> { + // Announce the route over Zenoh via a LivelinessToken + async fn announce_route(&mut self) -> Result<(), String> { self.is_active = true; // create associated LivelinessToken @@ -149,6 +150,7 @@ impl RouteActionSrv<'_> { &self.zenoh_key_expr_prefix, &self.ros2_type, )?; + log::debug!("{self} announce via token {liveliness_ke}"); let ros2_name = self.ros2_name.clone(); self.liveliness_token = Some(self.context.zsession .liveliness() @@ -164,8 +166,9 @@ impl RouteActionSrv<'_> { Ok(()) } - fn deactivate(&mut self) { - log::debug!("{self} deactivate"); + // Retire the route over Zenoh removing the LivelinessToken + fn retire_route(&mut self) { + log::debug!("{self} retire"); // Drop Zenoh Publisher and Liveliness token // The DDS Writer remains to be discovered by local ROS nodes self.is_active = false; @@ -242,7 +245,7 @@ impl RouteActionSrv<'_> { log::debug!("{self} now serving local nodes {:?}", self.local_nodes); // if 1st local node added, activate the route if self.local_nodes.len() == 1 { - if let Err(e) = self.activate().await { + if let Err(e) = self.announce_route().await { log::error!("{self} activation failed: {e}"); } } @@ -260,7 +263,7 @@ impl RouteActionSrv<'_> { log::debug!("{self} now serving local nodes {:?}", self.local_nodes); // if last local node removed, deactivate the route if self.local_nodes.is_empty() { - self.deactivate(); + self.retire_route(); } } diff --git a/zenoh-plugin-ros2dds/src/route_publisher.rs b/zenoh-plugin-ros2dds/src/route_publisher.rs index 9204948..993bee5 100644 --- a/zenoh-plugin-ros2dds/src/route_publisher.rs +++ b/zenoh-plugin-ros2dds/src/route_publisher.rs @@ -248,8 +248,6 @@ impl RoutePublisher<'_> { .map_err(|e| format!("Failed to lisetn of matchibng status changes: {e}",))? }; - // Ok(route) - Ok(RoutePublisher { ros2_name, ros2_type, @@ -427,6 +425,7 @@ fn activate_dds_reader( context.ros_discovery_mgr.add_dds_reader(get_guid(&reader)?); if old != DDS_ENTITY_NULL { + log::warn!("{route_id}: on activation their was already a DDS Reader - overwrite it"); if let Err(e) = delete_dds_entity(old) { log::warn!("{route_id}: failed to delete overwritten DDS Reader: {e}"); } diff --git a/zenoh-plugin-ros2dds/src/route_service_cli.rs b/zenoh-plugin-ros2dds/src/route_service_cli.rs index ba79266..d331e65 100644 --- a/zenoh-plugin-ros2dds/src/route_service_cli.rs +++ b/zenoh-plugin-ros2dds/src/route_service_cli.rs @@ -14,10 +14,12 @@ use cyclors::dds_entity_t; use serde::Serialize; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use std::{collections::HashSet, fmt}; use zenoh::buffers::{ZBuf, ZSlice}; +use zenoh::handlers::{Callback, Dyn}; use zenoh::liveliness::LivelinessToken; use zenoh::prelude::r#async::AsyncResolve; use zenoh::prelude::*; @@ -27,8 +29,9 @@ use zenoh_core::SyncResolve; use crate::dds_types::{DDSRawSample, TypeInfo}; use crate::dds_utils::{ create_dds_reader, create_dds_writer, dds_write, delete_dds_entity, get_guid, + serialize_atomic_entity_guid, AtomicDDSEntity, }; -use crate::dds_utils::{is_cdr_little_endian, serialize_entity_guid}; +use crate::dds_utils::{is_cdr_little_endian, DDS_ENTITY_NULL}; use crate::liveliness_mgt::new_ke_liveliness_service_cli; use crate::ros2_utils::{ is_service_for_action, new_service_id, ros2_service_type_to_reply_dds_type, @@ -47,6 +50,9 @@ pub struct RouteServiceCli<'a> { ros2_type: String, // the Zenoh key expression used for routing zenoh_key_expr: OwnedKeyExpr, + // the DDS type info (if available) + #[serde(skip)] + type_info: Option>, // the context #[serde(skip)] context: Context, @@ -54,11 +60,11 @@ pub struct RouteServiceCli<'a> { queries_timeout: Duration, is_active: bool, // the local DDS Reader receiving client's requests and routing them to Zenoh - #[serde(serialize_with = "serialize_entity_guid")] - req_reader: dds_entity_t, + #[serde(serialize_with = "serialize_atomic_entity_guid")] + req_reader: Arc, // the local DDS Writer sending replies to the client - #[serde(serialize_with = "serialize_entity_guid")] - rep_writer: dds_entity_t, + #[serde(serialize_with = "serialize_atomic_entity_guid")] + rep_writer: Arc, // a liveliness token associated to this route, for announcement to other plugins #[serde(skip)] liveliness_token: Option>, @@ -70,23 +76,7 @@ pub struct RouteServiceCli<'a> { impl Drop for RouteServiceCli<'_> { fn drop(&mut self) { - // remove reader's GID from ros_discovery_info message - match get_guid(&self.req_reader) { - Ok(gid) => self.context.ros_discovery_mgr.remove_dds_reader(gid), - Err(e) => log::warn!("{self}: {e}"), - } - // remove writer's GID from ros_discovery_info message - match get_guid(&self.rep_writer) { - Ok(gid) => self.context.ros_discovery_mgr.remove_dds_writer(gid), - Err(e) => log::warn!("{self}: {e}"), - } - - if let Err(e) = delete_dds_entity(self.req_reader) { - log::warn!("{}: error deleting DDS Reader: {}", self, e); - } - if let Err(e) = delete_dds_entity(self.rep_writer) { - log::warn!("{}: error deleting DDS Writer: {}", self, e); - } + self.deactivate(); } } @@ -106,54 +96,114 @@ impl RouteServiceCli<'_> { ros2_name: String, ros2_type: String, zenoh_key_expr: OwnedKeyExpr, - type_info: &Option>, + type_info: Option>, queries_timeout: Duration, context: Context, ) -> Result, String> { log::debug!( "Route Service Client (ROS:{ros2_name} <-> Zenoh:{zenoh_key_expr}): creation with type {ros2_type}" ); + Ok(RouteServiceCli { + ros2_name, + ros2_type, + zenoh_key_expr, + type_info, + context, + queries_timeout, + is_active: false, + rep_writer: Arc::new(DDS_ENTITY_NULL.into()), + req_reader: Arc::new(DDS_ENTITY_NULL.into()), + liveliness_token: None, + remote_routes: HashSet::new(), + local_nodes: HashSet::new(), + }) + } + // Announce the route over Zenoh via a LivelinessToken + async fn announce_route(&mut self) -> Result<(), String> { + // if not for an Action (since actions declare their own liveliness) + if !is_service_for_action(&self.ros2_name) { + // create associated LivelinessToken + let liveliness_ke = new_ke_liveliness_service_cli( + &self.context.plugin_id, + &self.zenoh_key_expr, + &self.ros2_type, + )?; + log::debug!("{self}: announce via token {liveliness_ke}"); + let ros2_name = self.ros2_name.clone(); + self.liveliness_token = Some(self.context.zsession + .liveliness() + .declare_token(liveliness_ke) + .res_async() + .await + .map_err(|e| { + format!( + "Failed create LivelinessToken associated to route for Service Client {ros2_name}: {e}" + ) + })? + ); + } + Ok(()) + } + + // Retire the route over Zenoh removing the LivelinessToken + fn retire_route(&mut self) { + log::debug!("{self}: retire"); + // Drop Zenoh Publisher and Liveliness token + // The DDS Writer remains to be discovered by local ROS nodes + self.liveliness_token = None; + } + + fn activate(&mut self) -> Result<(), String> { + log::debug!("{self}: activate"); // Default Service QoS let mut qos = QOS_DEFAULT_SERVICE.clone(); // Add DATA_USER QoS similarly to rmw_cyclone_dds here: // https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/rmw_node.cpp#L5028C17-L5028C17 - let server_id_str = new_service_id(&context.participant)?; + let server_id_str = new_service_id(&self.context.participant)?; let user_data = format!("serviceid= {server_id_str};"); qos.user_data = Some(user_data.into_bytes()); log::debug!( - "Route Service Client (ROS:{ros2_name} <-> Zenoh:{zenoh_key_expr}): using id '{server_id_str}' => USER_DATA={:?}", qos.user_data.as_ref().unwrap() + "{self}: using id '{server_id_str}' => USER_DATA={:?}", + qos.user_data.as_ref().unwrap() ); // create DDS Writer to send replies coming from Zenoh to the Client - let rep_topic_name = format!("rr{ros2_name}Reply"); - let rep_type_name = ros2_service_type_to_reply_dds_type(&ros2_type); + let rep_topic_name = format!("rr{}Reply", self.ros2_name); + let rep_type_name = ros2_service_type_to_reply_dds_type(&self.ros2_type); let rep_writer = create_dds_writer( - context.participant, + self.context.participant, rep_topic_name, rep_type_name, true, qos.clone(), )?; + let old = self.rep_writer.swap(rep_writer, Ordering::Relaxed); + if old != DDS_ENTITY_NULL { + log::warn!("{self}: on activation their was already a DDS Reply Writer - overwrite it"); + if let Err(e) = delete_dds_entity(old) { + log::warn!("{self}: failed to delete overwritten DDS Reply Writer: {e}"); + } + } + // add writer's GID in ros_discovery_info message - context + self.context .ros_discovery_mgr .add_dds_writer(get_guid(&rep_writer)?); - let route_id: String = - format!("Route Service Client (ROS:{ros2_name} <-> Zenoh:{zenoh_key_expr})",); - // create DDS Reader to receive requests and route them to Zenoh - let req_topic_name = format!("rq{ros2_name}Request"); - let req_type_name = ros2_service_type_to_request_dds_type(&ros2_type); - let zenoh_key_expr2 = zenoh_key_expr.clone(); - let zsession2 = context.zsession.clone(); + let route_id: String = self.to_string(); + let req_topic_name = format!("rq{}Request", self.ros2_name); + let req_type_name = ros2_service_type_to_request_dds_type(&self.ros2_type); + let zenoh_key_expr2 = self.zenoh_key_expr.clone(); + let zsession2 = self.context.zsession.clone(); + let queries_timeout = self.queries_timeout; let req_reader = create_dds_reader( - context.participant, + self.context.participant, req_topic_name, req_type_name, - type_info, + &self.type_info, true, qos, None, @@ -168,73 +218,79 @@ impl RouteServiceCli<'_> { ); }, )?; + let old = self.req_reader.swap(req_reader, Ordering::Relaxed); + if old != DDS_ENTITY_NULL { + log::warn!( + "{self}: on activation their was already a DDS Request Reader - overwrite it" + ); + if let Err(e) = delete_dds_entity(old) { + log::warn!("{self}: failed to delete overwritten DDS Request Reader: {e}"); + } + } + // add reader's GID in ros_discovery_info message - context + self.context .ros_discovery_mgr .add_dds_reader(get_guid(&req_reader)?); - Ok(RouteServiceCli { - ros2_name, - ros2_type, - zenoh_key_expr, - context, - queries_timeout, - is_active: false, - rep_writer, - req_reader, - liveliness_token: None, - remote_routes: HashSet::new(), - local_nodes: HashSet::new(), - }) - } - - async fn activate(&mut self) -> Result<(), String> { self.is_active = true; - - // if not for an Action (since actions declare their own liveliness) - if !is_service_for_action(&self.ros2_name) { - // create associated LivelinessToken - let liveliness_ke = new_ke_liveliness_service_cli( - &self.context.plugin_id, - &self.zenoh_key_expr, - &self.ros2_type, - )?; - let ros2_name = self.ros2_name.clone(); - self.liveliness_token = Some(self.context.zsession - .liveliness() - .declare_token(liveliness_ke) - .res_async() - .await - .map_err(|e| { - format!( - "Failed create LivelinessToken associated to route for Service Client {ros2_name}: {e}" - ) - })? - ); - } Ok(()) } fn deactivate(&mut self) { - log::debug!("{self} deactivate"); - // Drop Zenoh Publisher and Liveliness token - // The DDS Writer remains to be discovered by local ROS nodes + log::debug!("{self}: Deactivate"); + let req_reader = self.req_reader.swap(DDS_ENTITY_NULL, Ordering::Relaxed); + if req_reader != DDS_ENTITY_NULL { + // remove reader's GID from ros_discovery_info message + match get_guid(&req_reader) { + Ok(gid) => self.context.ros_discovery_mgr.remove_dds_reader(gid), + Err(e) => log::warn!("{self}: {e}"), + } + if let Err(e) = delete_dds_entity(req_reader) { + log::warn!("{}: error deleting DDS Reader: {}", self, e); + } + } + let rep_writer = self.rep_writer.swap(DDS_ENTITY_NULL, Ordering::Relaxed); + if rep_writer != DDS_ENTITY_NULL { + // remove writer's GID from ros_discovery_info message + match get_guid(&req_reader) { + Ok(gid) => self.context.ros_discovery_mgr.remove_dds_writer(gid), + Err(e) => log::warn!("{self}: {e}"), + } + if let Err(e) = delete_dds_entity(rep_writer) { + log::warn!("{}: error deleting DDS Writer: {}", self, e); + } + } self.is_active = false; - self.liveliness_token = None; } #[inline] pub fn add_remote_route(&mut self, plugin_id: &str, zenoh_key_expr: &keyexpr) { self.remote_routes .insert(format!("{plugin_id}:{zenoh_key_expr}")); - log::debug!("{self} now serving remote routes {:?}", self.remote_routes); + log::debug!("{self}: now serving remote routes {:?}", self.remote_routes); + // if 1st remote node added (i.e. a Server has been announced), activate the route + // NOTE: The route shall not be active if a remote Service Server have not been detected. + // Otherwise, the Client will send a request to this route that will get no reply + // and will drop it, leading the Client to hang (see #62). + // TODO: rather rely on a Querier MatchingStatus (in the same way that it's done for RoutePublisher) + // when available in zenoh... + if self.remote_routes.len() == 1 { + if let Err(e) = self.activate() { + log::error!("{self}: activation failed: {e}"); + } + } } #[inline] pub fn remove_remote_route(&mut self, plugin_id: &str, zenoh_key_expr: &keyexpr) { self.remote_routes .remove(&format!("{plugin_id}:{zenoh_key_expr}")); - log::debug!("{self} now serving remote routes {:?}", self.remote_routes); + log::debug!("{self}: now serving remote routes {:?}", self.remote_routes); + // if last remote node removed, deactivate the route + if self.remote_routes.is_empty() { + self.deactivate(); + } } #[inline] @@ -245,11 +301,11 @@ impl RouteServiceCli<'_> { #[inline] pub async fn add_local_node(&mut self, node: String) { self.local_nodes.insert(node); - log::debug!("{self} now serving local nodes {:?}", self.local_nodes); - // if 1st local node added, activate the route + log::debug!("{self}: now serving local nodes {:?}", self.local_nodes); + // if 1st local node added, announce the route if self.local_nodes.len() == 1 { - if let Err(e) = self.activate().await { - log::error!("{self} activation failed: {e}"); + if let Err(e) = self.announce_route().await { + log::error!("{self}: announcement failed: {e}"); } } } @@ -257,10 +313,10 @@ impl RouteServiceCli<'_> { #[inline] pub fn remove_local_node(&mut self, node: &str) { self.local_nodes.remove(node); - log::debug!("{self} now serving local nodes {:?}", self.local_nodes); - // if last local node removed, deactivate the route + log::debug!("{self}: now serving local nodes {:?}", self.local_nodes); + // if last local node removed, retire the route if self.local_nodes.is_empty() { - self.deactivate(); + self.retire_route(); } } @@ -319,15 +375,34 @@ fn route_dds_request_to_zenoh( ); } - let route_id2 = route_id.to_string(); if let Err(e) = zsession .get(zenoh_key_expr) .with_value(zenoh_req_buf) .with_attachment(request_id.as_attachment()) .allowed_destination(Locality::Remote) .timeout(query_timeout) - .callback(move |reply| { - route_zenoh_reply_to_dds(route_id2.clone(), reply, request_id, rep_writer) + .with({ + let route_id1: String = route_id.to_string(); + let route_id2 = route_id.to_string(); + let reply_received1 = Arc::new(AtomicBool::new(false)); + let reply_received2 = reply_received1.clone(); + CallbackPair { + callback: move |reply| { + if !reply_received1.swap(true, std::sync::atomic::Ordering::Relaxed) { + route_zenoh_reply_to_dds(&route_id1, reply, request_id, rep_writer) + } else { + log::warn!("{route_id1}: received more than 1 reply for request {request_id} - dropping the extra replies"); + } + }, + drop: move || { + if !reply_received2.load(std::sync::atomic::Ordering::Relaxed) { + // There is no way to send an error message as a reply to a ROS Service Client ! + // (sending an invalid message will make it crash...) + // We have no choice but to log the error and let the client hanging without reply, until a timeout (if set by the client) + log::warn!("{route_id2}: received NO reply for request {request_id} - cannot reply to client, it will hang until timeout"); + } + }, + } }) .res_sync() { @@ -335,8 +410,38 @@ fn route_dds_request_to_zenoh( } } +// TODO: remove and replace with Zenoh's CallbackPair when https://github.com/eclipse-zenoh/zenoh/pull/653 is available +struct CallbackPair +where + DropFn: FnMut() + Send + Sync + 'static, +{ + pub callback: Callback, + pub drop: DropFn, +} + +impl Drop for CallbackPair +where + DropCallback: FnMut() + Send + Sync + 'static, +{ + fn drop(&mut self) { + (self.drop)() + } +} + +impl<'a, OnEvent, Event, DropCallback> IntoCallbackReceiverPair<'a, Event> + for CallbackPair +where + OnEvent: Fn(Event) + Send + Sync + 'a, + DropCallback: FnMut() + Send + Sync + 'static, +{ + type Receiver = (); + fn into_cb_receiver_pair(self) -> (Callback<'a, Event>, Self::Receiver) { + (Dyn::from(move |x| (self.callback)(x)), ()) + } +} + fn route_zenoh_reply_to_dds( - route_id: String, + route_id: &str, reply: Reply, request_id: CddsRequestHeader, rep_writer: dds_entity_t, diff --git a/zenoh-plugin-ros2dds/src/route_service_srv.rs b/zenoh-plugin-ros2dds/src/route_service_srv.rs index 8c57746..8250a30 100644 --- a/zenoh-plugin-ros2dds/src/route_service_srv.rs +++ b/zenoh-plugin-ros2dds/src/route_service_srv.rs @@ -207,7 +207,8 @@ impl RouteServiceSrv<'_> { }) } - async fn activate(&mut self) -> Result<(), String> { + // Announce the route over Zenoh via a LivelinessToken + async fn announce_route(&mut self) -> Result<(), String> { // For lifetime issue, redeclare the zenoh key expression that can't be stored in Self let declared_ke = self .context @@ -262,6 +263,7 @@ impl RouteServiceSrv<'_> { &self.zenoh_key_expr, &self.ros2_type, )?; + log::debug!("{self} announce via token {liveliness_ke}"); let ros2_name = self.ros2_name.clone(); self.liveliness_token = Some(self.context.zsession .liveliness() @@ -278,8 +280,9 @@ impl RouteServiceSrv<'_> { Ok(()) } - fn deactivate(&mut self) { - log::debug!("{self} deactivate"); + // Retire the route over Zenoh removing the LivelinessToken + fn retire_route(&mut self) { + log::debug!("{self} retire"); // Drop Zenoh Publisher and Liveliness token // The DDS Writer remains to be discovered by local ROS nodes self.zenoh_queryable = None; @@ -311,7 +314,7 @@ impl RouteServiceSrv<'_> { log::debug!("{self} now serving local nodes {:?}", self.local_nodes); // if 1st local node added, activate the route if self.local_nodes.len() == 1 { - if let Err(e) = self.activate().await { + if let Err(e) = self.announce_route().await { log::error!("{self} activation failed: {e}"); } } @@ -323,7 +326,7 @@ impl RouteServiceSrv<'_> { log::debug!("{self} now serving local nodes {:?}", self.local_nodes); // if last local node removed, deactivate the route if self.local_nodes.is_empty() { - self.deactivate(); + self.retire_route(); } } diff --git a/zenoh-plugin-ros2dds/src/route_subscriber.rs b/zenoh-plugin-ros2dds/src/route_subscriber.rs index 3414b52..141c7e8 100644 --- a/zenoh-plugin-ros2dds/src/route_subscriber.rs +++ b/zenoh-plugin-ros2dds/src/route_subscriber.rs @@ -160,7 +160,8 @@ impl RouteSubscriber<'_> { }) } - async fn activate(&mut self, discovered_reader_qos: &Qos) -> Result<(), String> { + // Announce the route over Zenoh via a LivelinessToken + async fn announce_route(&mut self, discovered_reader_qos: &Qos) -> Result<(), String> { log::debug!("{self} activate"); // Callback routing message received by Zenoh subscriber to DDS Writer (if set) let ros2_name = self.ros2_name.clone(); @@ -232,7 +233,8 @@ impl RouteSubscriber<'_> { Ok(()) } - fn deactivate(&mut self) { + // Retire the route over Zenoh removing the LivelinessToken + fn retire_route(&mut self) { log::debug!("{self} deactivate"); // Drop Zenoh Subscriber and Liveliness token // The DDS Writer remains to be discovered by local ROS nodes @@ -306,7 +308,7 @@ impl RouteSubscriber<'_> { log::debug!("{self} now serving local nodes {:?}", self.local_nodes); // if 1st local node added, activate the route if self.local_nodes.len() == 1 { - if let Err(e) = self.activate(discovered_reader_qos).await { + if let Err(e) = self.announce_route(discovered_reader_qos).await { log::error!("{self} activation failed: {e}"); } } @@ -318,7 +320,7 @@ impl RouteSubscriber<'_> { log::debug!("{self} now serving local nodes {:?}", self.local_nodes); // if last local node removed, deactivate the route if self.local_nodes.is_empty() { - self.deactivate(); + self.retire_route(); } } diff --git a/zenoh-plugin-ros2dds/src/routes_mgr.rs b/zenoh-plugin-ros2dds/src/routes_mgr.rs index 3fd733a..657fe0f 100644 --- a/zenoh-plugin-ros2dds/src/routes_mgr.rs +++ b/zenoh-plugin-ros2dds/src/routes_mgr.rs @@ -682,7 +682,7 @@ impl<'a> RoutesMgr<'a> { ros2_name.clone(), ros2_type, zenoh_key_expr.clone(), - &None, + None, queries_timeout, self.context.clone(), )