From 6d853a41939f3ec522d83ce8c3b5538c9dcd78bb Mon Sep 17 00:00:00 2001 From: Julien Enoch Date: Thu, 5 Oct 2023 16:56:56 +0200 Subject: [PATCH 1/3] Redesign: add context --- zenoh-plugin-ros2dds/src/dds_utils.rs | 26 +- zenoh-plugin-ros2dds/src/qos_helpers.rs | 4 +- zenoh-plugin-ros2dds/src/route_action_cli.rs | 72 ++--- zenoh-plugin-ros2dds/src/route_action_srv.rs | 72 ++--- zenoh-plugin-ros2dds/src/route_publisher.rs | 53 ++-- zenoh-plugin-ros2dds/src/route_service_cli.rs | 50 ++-- zenoh-plugin-ros2dds/src/route_service_srv.rs | 41 ++- zenoh-plugin-ros2dds/src/route_subscriber.rs | 57 ++-- zenoh-plugin-ros2dds/src/routes_mgr.rs | 272 +++++++++--------- 9 files changed, 304 insertions(+), 343 deletions(-) diff --git a/zenoh-plugin-ros2dds/src/dds_utils.rs b/zenoh-plugin-ros2dds/src/dds_utils.rs index 23eeca7..1670255 100644 --- a/zenoh-plugin-ros2dds/src/dds_utils.rs +++ b/zenoh-plugin-ros2dds/src/dds_utils.rs @@ -20,7 +20,7 @@ use serde::Serializer; use std::{ ffi::{CStr, CString}, mem::MaybeUninit, - sync::Arc, + sync::{atomic::AtomicI32, Arc}, time::Duration, }; #[cfg(feature = "dds_shm")] @@ -32,6 +32,11 @@ use crate::{ vec_into_raw_parts, }; +const DDS_ENTITY_NULL: dds_entity_t = 0; + +// An atomic dds_entity_t (=i32), for safe concurrent creation/deletion of DDS entities +type AtomicDDSEntity = AtomicI32; + pub fn delete_dds_entity(entity: dds_entity_t) -> Result<(), String> { unsafe { let r = dds_delete(entity); @@ -42,6 +47,15 @@ pub fn delete_dds_entity(entity: dds_entity_t) -> Result<(), String> { } } +pub(crate) fn delete_atomic_dds_entity(entity: &mut AtomicDDSEntity) -> Result<(), String> { + let dds_entity = entity.swap(DDS_ENTITY_NULL, std::sync::atomic::Ordering::Relaxed); + if dds_entity != DDS_ENTITY_NULL { + delete_dds_entity(dds_entity) + } else { + Ok(()) + } +} + pub fn get_guid(entity: &dds_entity_t) -> Result { unsafe { let mut guid = dds_guid_t { v: [0; 16] }; @@ -64,6 +78,16 @@ where } } +pub fn serialize_atomic_entity_guid(entity: &AtomicDDSEntity, s: S) -> Result +where + S: Serializer, +{ + match entity.load(std::sync::atomic::Ordering::Relaxed) { + DDS_ENTITY_NULL => s.serialize_str(""), + entity => serialize_entity_guid(&entity, s), + } +} + pub fn get_instance_handle(entity: dds_entity_t) -> Result { unsafe { let mut handle: dds_instance_handle_t = 0; diff --git a/zenoh-plugin-ros2dds/src/qos_helpers.rs b/zenoh-plugin-ros2dds/src/qos_helpers.rs index f359c74..43d6b54 100644 --- a/zenoh-plugin-ros2dds/src/qos_helpers.rs +++ b/zenoh-plugin-ros2dds/src/qos_helpers.rs @@ -27,8 +27,8 @@ pub fn get_durability_service_or_default(qos: &Qos) -> DurabilityService { } } -pub fn is_reader_reliable(reliability: &Option) -> bool { - reliability.as_ref().map_or(false, |reliability| { +pub fn is_reliable(qos: &Qos) -> bool { + qos.reliability.as_ref().map_or(false, |reliability| { reliability.kind == ReliabilityKind::RELIABLE }) } diff --git a/zenoh-plugin-ros2dds/src/route_action_cli.rs b/zenoh-plugin-ros2dds/src/route_action_cli.rs index b640a00..665115d 100644 --- a/zenoh-plugin-ros2dds/src/route_action_cli.rs +++ b/zenoh-plugin-ros2dds/src/route_action_cli.rs @@ -11,16 +11,15 @@ // Contributors: // ZettaScale Zenoh Team, // -use cyclors::dds_entity_t; use serde::Serialize; -use std::{collections::HashSet, fmt, sync::Arc}; +use std::{collections::HashSet, fmt}; use zenoh::{liveliness::LivelinessToken, prelude::*}; use zenoh_core::AsyncResolve; use crate::{ - config::Config, gid::Gid, liveliness_mgt::new_ke_liveliness_action_cli, ros2_utils::*, + gid::Gid, liveliness_mgt::new_ke_liveliness_action_cli, ros2_utils::*, route_action_srv::serialize_action_zenoh_key_expr, route_service_cli::RouteServiceCli, - route_subscriber::RouteSubscriber, + route_subscriber::RouteSubscriber, routes_mgr::Context, }; #[derive(Serialize)] @@ -35,12 +34,9 @@ pub struct RouteActionCli<'a> { serialize_with = "serialize_action_zenoh_key_expr" )] zenoh_key_expr_prefix: OwnedKeyExpr, - // the zenoh session + // the context #[serde(skip)] - zsession: &'a Arc, - // the config - #[serde(skip)] - _config: Arc, + context: Context<'a>, is_active: bool, #[serde(skip)] route_send_goal: RouteServiceCli<'a>, @@ -73,68 +69,56 @@ impl fmt::Display for RouteActionCli<'_> { impl RouteActionCli<'_> { #[allow(clippy::too_many_arguments)] - pub async fn create( - config: Arc, - zsession: &Arc, - participant: dds_entity_t, + pub async fn create<'a>( ros2_name: String, ros2_type: String, zenoh_key_expr_prefix: OwnedKeyExpr, - ) -> Result, String> { + context: &Context<'a>, + ) -> Result, String> { let route_send_goal = RouteServiceCli::create( - config.clone(), - zsession, - participant, format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_SEND_GOAL), format!("{ros2_type}_SendGoal"), &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_SEND_GOAL, &None, + context, ) .await?; let route_cancel_goal = RouteServiceCli::create( - config.clone(), - zsession, - participant, format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_CANCEL_GOAL), ROS2_ACTION_CANCEL_GOAL_SRV_TYPE.to_string(), &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_CANCEL_GOAL, &None, + context, ) .await?; let route_get_result = RouteServiceCli::create( - config.clone(), - zsession, - participant, format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_GET_RESULT), format!("{ros2_type}_GetResult"), &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_GET_RESULT, &None, + context, ) .await?; let route_feedback = RouteSubscriber::create( - config.clone(), - zsession, - participant, format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_FEEDBACK), format!("{ros2_type}_FeedbackMessage"), &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_FEEDBACK, true, QOS_ACTION_FEEDBACK.clone(), + context, ) .await?; let route_status = RouteSubscriber::create( - config.clone(), - zsession, - participant, format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_STATUS), ROS2_ACTION_STATUS_MSG_TYPE.to_string(), &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_STATUS, true, QOS_ACTION_STATUS.clone(), + context, ) .await?; @@ -142,8 +126,7 @@ impl RouteActionCli<'_> { ros2_name, ros2_type, zenoh_key_expr_prefix, - zsession, - _config: config, + context: context.clone(), is_active: false, route_send_goal, route_cancel_goal, @@ -156,14 +139,17 @@ impl RouteActionCli<'_> { }) } - async fn activate<'a>(&'a mut self, plugin_id: &keyexpr) -> Result<(), String> { + async fn activate<'a>(&'a mut self) -> Result<(), String> { self.is_active = true; // create associated LivelinessToken - let liveliness_ke = - new_ke_liveliness_action_cli(plugin_id, &self.zenoh_key_expr_prefix, &self.ros2_type)?; + let liveliness_ke = new_ke_liveliness_action_cli( + &self.context.plugin_id, + &self.zenoh_key_expr_prefix, + &self.ros2_type, + )?; let ros2_name = self.ros2_name.clone(); - self.liveliness_token = Some(self.zsession + self.liveliness_token = Some(self.context.zsession .liveliness() .declare_token(liveliness_ke) .res_async() @@ -260,24 +246,22 @@ impl RouteActionCli<'_> { } #[inline] - pub async fn add_local_node(&mut self, node: String, plugin_id: &keyexpr) { + pub async fn add_local_node(&mut self, node: String) { futures::join!( - self.route_send_goal.add_local_node(node.clone(), plugin_id), - self.route_cancel_goal - .add_local_node(node.clone(), plugin_id), - self.route_get_result - .add_local_node(node.clone(), plugin_id), + self.route_send_goal.add_local_node(node.clone()), + self.route_cancel_goal.add_local_node(node.clone()), + self.route_get_result.add_local_node(node.clone()), self.route_feedback - .add_local_node(node.clone(), plugin_id, &QOS_ACTION_FEEDBACK), + .add_local_node(node.clone(), &QOS_ACTION_FEEDBACK), self.route_status - .add_local_node(node.clone(), plugin_id, &QOS_ACTION_STATUS), + .add_local_node(node.clone(), &QOS_ACTION_STATUS), ); self.local_nodes.insert(node); log::debug!("{self} now serving local nodes {:?}", self.local_nodes); // if 1st local node added, activate the route if self.local_nodes.len() == 1 { - if let Err(e) = self.activate(plugin_id).await { + if let Err(e) = self.activate().await { log::error!("{self} activation failed: {e}"); } } diff --git a/zenoh-plugin-ros2dds/src/route_action_srv.rs b/zenoh-plugin-ros2dds/src/route_action_srv.rs index d0cc5bb..98656f4 100644 --- a/zenoh-plugin-ros2dds/src/route_action_srv.rs +++ b/zenoh-plugin-ros2dds/src/route_action_srv.rs @@ -11,15 +11,14 @@ // Contributors: // ZettaScale Zenoh Team, // -use cyclors::dds_entity_t; use serde::{Serialize, Serializer}; -use std::{collections::HashSet, fmt, sync::Arc}; +use std::{collections::HashSet, fmt}; use zenoh::{liveliness::LivelinessToken, prelude::*}; use zenoh_core::AsyncResolve; use crate::{ - config::Config, gid::Gid, liveliness_mgt::new_ke_liveliness_action_srv, ros2_utils::*, - route_publisher::RoutePublisher, route_service_srv::RouteServiceSrv, + gid::Gid, liveliness_mgt::new_ke_liveliness_action_srv, ros2_utils::*, + route_publisher::RoutePublisher, route_service_srv::RouteServiceSrv, routes_mgr::Context, }; #[derive(Serialize)] @@ -34,12 +33,9 @@ pub struct RouteActionSrv<'a> { serialize_with = "serialize_action_zenoh_key_expr" )] zenoh_key_expr_prefix: OwnedKeyExpr, - // the zenoh session + // the context #[serde(skip)] - zsession: &'a Arc, - // the config - #[serde(skip)] - _config: Arc, + context: Context<'a>, is_active: bool, #[serde(skip)] route_send_goal: RouteServiceSrv<'a>, @@ -72,70 +68,58 @@ impl fmt::Display for RouteActionSrv<'_> { impl RouteActionSrv<'_> { #[allow(clippy::too_many_arguments)] - pub async fn create( - config: Arc, - zsession: &Arc, - participant: dds_entity_t, + pub async fn create<'a>( ros2_name: String, ros2_type: String, zenoh_key_expr_prefix: OwnedKeyExpr, - ) -> Result, String> { + context: &Context<'a>, + ) -> Result, String> { let route_send_goal = RouteServiceSrv::create( - config.clone(), - zsession, - participant, format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_SEND_GOAL), format!("{ros2_type}_SendGoal"), &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_SEND_GOAL, &None, + context, ) .await?; let route_cancel_goal = RouteServiceSrv::create( - config.clone(), - zsession, - participant, format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_CANCEL_GOAL), ROS2_ACTION_CANCEL_GOAL_SRV_TYPE.to_string(), &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_CANCEL_GOAL, &None, + context, ) .await?; let route_get_result = RouteServiceSrv::create( - config.clone(), - zsession, - participant, format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_GET_RESULT), format!("{ros2_type}_GetResult"), &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_GET_RESULT, &None, + context, ) .await?; let route_feedback = RoutePublisher::create( - config.clone(), - zsession, - participant, format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_FEEDBACK), format!("{ros2_type}_FeedbackMessage"), &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_FEEDBACK, &None, true, QOS_ACTION_FEEDBACK.clone(), + context, ) .await?; let route_status = RoutePublisher::create( - config.clone(), - zsession, - participant, format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_STATUS), ROS2_ACTION_STATUS_MSG_TYPE.to_string(), &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_STATUS, &None, true, QOS_ACTION_STATUS.clone(), + context, ) .await?; @@ -143,8 +127,7 @@ impl RouteActionSrv<'_> { ros2_name, ros2_type, zenoh_key_expr_prefix, - zsession, - _config: config, + context: context.clone(), is_active: false, route_send_goal, route_cancel_goal, @@ -157,14 +140,17 @@ impl RouteActionSrv<'_> { }) } - async fn activate<'a>(&'a mut self, plugin_id: &keyexpr) -> Result<(), String> { + async fn activate<'a>(&'a mut self) -> Result<(), String> { self.is_active = true; // create associated LivelinessToken - let liveliness_ke = - new_ke_liveliness_action_srv(plugin_id, &self.zenoh_key_expr_prefix, &self.ros2_type)?; + let liveliness_ke = new_ke_liveliness_action_srv( + &self.context.plugin_id, + &self.zenoh_key_expr_prefix, + &self.ros2_type, + )?; let ros2_name = self.ros2_name.clone(); - self.liveliness_token = Some(self.zsession + self.liveliness_token = Some(self.context.zsession .liveliness() .declare_token(liveliness_ke) .res_async() @@ -261,24 +247,22 @@ impl RouteActionSrv<'_> { } #[inline] - pub async fn add_local_node(&mut self, node: String, plugin_id: &keyexpr) { + pub async fn add_local_node(&mut self, node: String) { futures::join!( - self.route_send_goal.add_local_node(node.clone(), plugin_id), - self.route_cancel_goal - .add_local_node(node.clone(), plugin_id), - self.route_get_result - .add_local_node(node.clone(), plugin_id), + self.route_send_goal.add_local_node(node.clone()), + self.route_cancel_goal.add_local_node(node.clone()), + self.route_get_result.add_local_node(node.clone()), self.route_feedback - .add_local_node(node.clone(), plugin_id, &QOS_ACTION_FEEDBACK), + .add_local_node(node.clone(), &QOS_ACTION_FEEDBACK), self.route_status - .add_local_node(node.clone(), plugin_id, &QOS_ACTION_STATUS), + .add_local_node(node.clone(), &QOS_ACTION_STATUS), ); self.local_nodes.insert(node); log::debug!("{self} now serving local nodes {:?}", self.local_nodes); // if 1st local node added, activate the route if self.local_nodes.len() == 1 { - if let Err(e) = self.activate(plugin_id).await { + if let Err(e) = self.activate().await { log::error!("{self} activation failed: {e}"); } } diff --git a/zenoh-plugin-ros2dds/src/route_publisher.rs b/zenoh-plugin-ros2dds/src/route_publisher.rs index 816925c..68d71ef 100644 --- a/zenoh-plugin-ros2dds/src/route_publisher.rs +++ b/zenoh-plugin-ros2dds/src/route_publisher.rs @@ -29,6 +29,7 @@ use crate::dds_utils::{delete_dds_entity, get_guid, serialize_entity_guid}; use crate::gid::Gid; use crate::liveliness_mgt::new_ke_liveliness_pub; use crate::ros2_utils::{is_message_for_action, ros2_message_type_to_dds_type}; +use crate::routes_mgr::Context; use crate::{qos_helpers::*, Config}; use crate::{serialize_option_as_bool, KE_PREFIX_PUB_CACHE}; @@ -47,12 +48,9 @@ pub struct RoutePublisher<'a> { ros2_type: String, // the Zenoh key expression used for routing zenoh_key_expr: OwnedKeyExpr, - // the zenoh session + // the context #[serde(skip)] - zsession: &'a Arc, - // the config - #[serde(skip)] - config: Arc, + context: Context<'a>, // the zenoh publisher used to re-publish to zenoh the data received by the DDS Reader // `None` when route is created on a remote announcement and no local ROS2 Subscriber discovered yet #[serde(rename = "is_active", serialize_with = "serialize_option_as_bool")] @@ -95,15 +93,13 @@ impl fmt::Display for RoutePublisher<'_> { impl RoutePublisher<'_> { #[allow(clippy::too_many_arguments)] pub async fn create<'a>( - config: Arc, - zsession: &'a Arc, - participant: dds_entity_t, ros2_name: String, ros2_type: String, zenoh_key_expr: OwnedKeyExpr, type_info: &Option>, keyless: bool, reader_qos: Qos, + context: &Context<'a>, ) -> Result, String> { let transient_local = is_transient_local(&reader_qos); log::debug!( @@ -111,7 +107,7 @@ impl RoutePublisher<'_> { ); // declare the zenoh key expression (for wire optimization) - let declared_ke = zsession + let declared_ke = context.zsession .declare_keyexpr(zenoh_key_expr.clone()) .res() .await @@ -121,8 +117,8 @@ impl RoutePublisher<'_> { // CongestionControl to be used when re-publishing over zenoh: Blocking if Writer is RELIABLE (since we don't know what is remote Reader's QoS) let congestion_ctrl = match ( - config.reliable_routes_blocking, - is_reader_reliable(&reader_qos.reliability), + context.config.reliable_routes_blocking, + is_reliable(&reader_qos), ) { (true, true) => CongestionControl::Block, _ => CongestionControl::Drop, @@ -130,18 +126,18 @@ impl RoutePublisher<'_> { let topic_name = format!("rt{ros2_name}"); let type_name = ros2_message_type_to_dds_type(&ros2_type); - let read_period = get_read_period(&config, &zenoh_key_expr); + let read_period = get_read_period(&context.config, &zenoh_key_expr); // create matching DDS Reader that forwards data coming from DDS to Zenoh let dds_reader = create_forwarding_dds_reader( - participant, + context.participant, topic_name, type_name, type_info, keyless, reader_qos.clone(), declared_ke.clone(), - zsession.clone(), + context.zsession.clone(), read_period, congestion_ctrl, )?; @@ -151,8 +147,7 @@ impl RoutePublisher<'_> { ros2_type, dds_reader, zenoh_key_expr, - zsession, - config, + context: context.clone(), zenoh_publisher: None, transient_local, keyless, @@ -162,13 +157,10 @@ impl RoutePublisher<'_> { }) } - async fn activate<'a>( - &'a mut self, - plugin_id: &keyexpr, - discovered_writer_qos: &Qos, - ) -> Result<(), String> { + async fn activate<'a>(&'a mut self, discovered_writer_qos: &Qos) -> Result<(), String> { // For lifetime issue, redeclare the zenoh key expression that can't be stored in Self let declared_ke = self + .context .zsession .declare_keyexpr(self.zenoh_key_expr.clone()) .res() @@ -206,16 +198,17 @@ impl RoutePublisher<'_> { (HistoryKind::KEEP_ALL, _) => usize::MAX, }; // In case there are several Writers served by this route, increase the cache size - history = history.saturating_mul(self.config.transient_local_cache_multiplier); + history = history.saturating_mul(self.context.config.transient_local_cache_multiplier); log::debug!( "{self}: caching TRANSIENT_LOCAL publications via a PublicationCache with history={history} (computed from Reader's QoS: history=({:?},{}), durability_service.max_instances={})", history_qos.kind, history_qos.depth, durability_service_qos.max_instances ); let pub_cache = self + .context .zsession .declare_publication_cache(&declared_ke) .history(history) - .queryable_prefix(*KE_PREFIX_PUB_CACHE / plugin_id) + .queryable_prefix(*KE_PREFIX_PUB_CACHE / &self.context.plugin_id) .queryable_allowed_origin(Locality::Remote) // Note: don't reply to queries from local QueryingSubscribers .res() .await @@ -228,6 +221,7 @@ impl RoutePublisher<'_> { Some(ZPublisher::PublicationCache(pub_cache)) } else { if let Err(e) = self + .context .zsession .declare_publisher(declared_ke.clone()) .res() @@ -247,14 +241,14 @@ impl RoutePublisher<'_> { if !is_message_for_action(&self.ros2_name) { // create associated LivelinessToken let liveliness_ke = new_ke_liveliness_pub( - plugin_id, + &self.context.plugin_id, &self.zenoh_key_expr, &self.ros2_type, self.keyless, discovered_writer_qos, )?; let ros2_name = self.ros2_name.clone(); - self.liveliness_token = Some(self.zsession + self.liveliness_token = Some(self.context.zsession .liveliness() .declare_token(liveliness_ke) .res() @@ -302,17 +296,12 @@ impl RoutePublisher<'_> { } #[inline] - pub async fn add_local_node( - &mut self, - node: String, - plugin_id: &keyexpr, - discovered_writer_qos: &Qos, - ) { + pub async fn add_local_node(&mut self, node: String, discovered_writer_qos: &Qos) { self.local_nodes.insert(node); log::debug!("{self} now serving local nodes {:?}", self.local_nodes); // if 1st local node added, activate the route if self.local_nodes.len() == 1 { - if let Err(e) = self.activate(plugin_id, discovered_writer_qos).await { + if let Err(e) = self.activate(discovered_writer_qos).await { log::error!("{self} activation failed: {e}"); } } diff --git a/zenoh-plugin-ros2dds/src/route_service_cli.rs b/zenoh-plugin-ros2dds/src/route_service_cli.rs index e6c2b47..4a71c95 100644 --- a/zenoh-plugin-ros2dds/src/route_service_cli.rs +++ b/zenoh-plugin-ros2dds/src/route_service_cli.rs @@ -35,7 +35,8 @@ use crate::ros2_utils::{ is_service_for_action, new_service_id, ros2_service_type_to_reply_dds_type, ros2_service_type_to_request_dds_type, }; -use crate::{Config, LOG_PAYLOAD}; +use crate::routes_mgr::Context; +use crate::LOG_PAYLOAD; // a route for a Service Client exposed in Zenoh as a Queryier #[allow(clippy::upper_case_acronyms)] @@ -47,12 +48,9 @@ pub struct RouteServiceCli<'a> { ros2_type: String, // the Zenoh key expression used for routing zenoh_key_expr: OwnedKeyExpr, - // the zenoh session + // the context #[serde(skip)] - zsession: &'a Arc, - // the config - #[serde(skip)] - _config: Arc, + context: Context<'a>, is_active: bool, // the local DDS Reader receiving client's requests and routing them to Zenoh #[serde(serialize_with = "serialize_entity_guid")] @@ -93,13 +91,11 @@ impl fmt::Display for RouteServiceCli<'_> { impl RouteServiceCli<'_> { #[allow(clippy::too_many_arguments)] pub async fn create<'a>( - config: Arc, - zsession: &'a Arc, - participant: dds_entity_t, ros2_name: String, ros2_type: String, zenoh_key_expr: OwnedKeyExpr, type_info: &Option>, + context: &Context<'a>, ) -> Result, String> { log::debug!( "Route Service Client (ROS:{ros2_name} <-> Zenoh:{zenoh_key_expr}): creation with type {ros2_type}" @@ -119,7 +115,7 @@ impl RouteServiceCli<'_> { // Add DATA_USER QoS similarly to rmw_cyclone_dds here: // https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/rmw_node.cpp#L5028C17-L5028C17 - let server_id_str = new_service_id(&participant)?; + let server_id_str = new_service_id(&context.participant)?; let user_data = format!("serviceid= {server_id_str};"); qos.user_data = Some(user_data.into_bytes()); log::debug!( @@ -130,7 +126,7 @@ impl RouteServiceCli<'_> { let rep_topic_name = format!("rr{ros2_name}Reply"); let rep_type_name = ros2_service_type_to_reply_dds_type(&ros2_type); let rep_writer = create_dds_writer( - participant, + context.participant, rep_topic_name, rep_type_name, true, @@ -144,9 +140,9 @@ impl RouteServiceCli<'_> { let req_topic_name = format!("rq{ros2_name}Request"); let req_type_name = ros2_service_type_to_request_dds_type(&ros2_type); let zenoh_key_expr2 = zenoh_key_expr.clone(); - let zsession2 = zsession.clone(); + let zsession2 = context.zsession.clone(); let req_reader = create_dds_reader( - participant, + context.participant, req_topic_name, req_type_name, type_info, @@ -154,13 +150,7 @@ impl RouteServiceCli<'_> { qos, None, move |sample| { - do_route_request( - &route_id, - sample, - zenoh_key_expr2.clone(), - &zsession2, - rep_writer, - ); + do_route_request(&route_id, sample, &zenoh_key_expr2, &zsession2, rep_writer); }, )?; @@ -168,8 +158,7 @@ impl RouteServiceCli<'_> { ros2_name, ros2_type, zenoh_key_expr, - zsession, - _config: config, + context: context.clone(), is_active: false, rep_writer, req_reader, @@ -179,16 +168,19 @@ impl RouteServiceCli<'_> { }) } - async fn activate<'a>(&'a mut self, plugin_id: &keyexpr) -> Result<(), String> { + async fn activate<'a>(&'a mut self) -> Result<(), String> { self.is_active = true; // if not for an Action (since actions declare their own liveliness) if !is_service_for_action(&self.ros2_name) { // create associated LivelinessToken - let liveliness_ke = - new_ke_liveliness_service_cli(plugin_id, &self.zenoh_key_expr, &self.ros2_type)?; + let liveliness_ke = new_ke_liveliness_service_cli( + &self.context.plugin_id, + &self.zenoh_key_expr, + &self.ros2_type, + )?; let ros2_name = self.ros2_name.clone(); - self.liveliness_token = Some(self.zsession + self.liveliness_token = Some(self.context.zsession .liveliness() .declare_token(liveliness_ke) .res_async() @@ -239,12 +231,12 @@ impl RouteServiceCli<'_> { } #[inline] - pub async fn add_local_node(&mut self, node: String, plugin_id: &keyexpr) { + pub async fn add_local_node(&mut self, node: String) { self.local_nodes.insert(node); log::debug!("{self} now serving local nodes {:?}", self.local_nodes); // if 1st local node added, activate the route if self.local_nodes.len() == 1 { - if let Err(e) = self.activate(plugin_id).await { + if let Err(e) = self.activate().await { log::error!("{self} activation failed: {e}"); } } @@ -274,7 +266,7 @@ impl RouteServiceCli<'_> { fn do_route_request( route_id: &str, sample: &DDSRawSample, - zenoh_key_expr: OwnedKeyExpr, + zenoh_key_expr: &OwnedKeyExpr, zsession: &Arc, rep_writer: dds_entity_t, ) { diff --git a/zenoh-plugin-ros2dds/src/route_service_srv.rs b/zenoh-plugin-ros2dds/src/route_service_srv.rs index 31be52a..8429306 100644 --- a/zenoh-plugin-ros2dds/src/route_service_srv.rs +++ b/zenoh-plugin-ros2dds/src/route_service_srv.rs @@ -39,7 +39,7 @@ use crate::ros2_utils::{ is_service_for_action, new_service_id, ros2_service_type_to_reply_dds_type, ros2_service_type_to_request_dds_type, }; -use crate::Config; +use crate::routes_mgr::Context; use crate::{serialize_option_as_bool, LOG_PAYLOAD}; // a route for a Service Server exposed in Zenoh as a Queryable @@ -51,12 +51,9 @@ pub struct RouteServiceSrv<'a> { ros2_type: String, // the Zenoh key expression used for routing zenoh_key_expr: OwnedKeyExpr, - // the zenoh session + // the context #[serde(skip)] - zsession: &'a Arc, - // the config - #[serde(skip)] - _config: Arc, + context: Context<'a>, // the zenoh queryable used to expose the service server in zenoh. // `None` when route is created on a remote announcement and no local ROS2 Service Server discovered yet #[serde(rename = "is_active", serialize_with = "serialize_option_as_bool")] @@ -109,13 +106,11 @@ impl fmt::Display for RouteServiceSrv<'_> { impl RouteServiceSrv<'_> { #[allow(clippy::too_many_arguments)] pub async fn create<'a>( - config: Arc, - zsession: &'a Arc, - participant: dds_entity_t, ros2_name: String, ros2_type: String, zenoh_key_expr: OwnedKeyExpr, type_info: &Option>, + context: &Context<'a>, ) -> Result, String> { log::debug!( "Route Service Server (ROS:{ros2_name} <-> Zenoh:{zenoh_key_expr}): creation with type {ros2_type}" @@ -135,7 +130,7 @@ impl RouteServiceSrv<'_> { // Add DATA_USER QoS similarly to rmw_cyclone_dds here: // https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/rmw_node.cpp#L5028C17-L5028C17 - let client_id_str = new_service_id(&participant)?; + let client_id_str = new_service_id(&context.participant)?; let user_data = format!("clientid= {client_id_str};"); qos.user_data = Some(user_data.into_bytes()); log::debug!( @@ -146,7 +141,7 @@ impl RouteServiceSrv<'_> { let req_topic_name = format!("rq{ros2_name}Request"); let req_type_name = ros2_service_type_to_request_dds_type(&ros2_type); let req_writer = create_dds_writer( - participant, + context.participant, req_topic_name, req_type_name, true, @@ -167,7 +162,7 @@ impl RouteServiceSrv<'_> { let queries_in_progress2 = queries_in_progress.clone(); let zenoh_key_expr2 = zenoh_key_expr.clone(); let rep_reader = create_dds_reader( - participant, + context.participant, rep_topic_name, rep_type_name, type_info, @@ -189,8 +184,7 @@ impl RouteServiceSrv<'_> { ros2_name, ros2_type, zenoh_key_expr, - zsession, - _config: config, + context: context.clone(), zenoh_queryable: None, req_writer, rep_reader, @@ -203,9 +197,10 @@ impl RouteServiceSrv<'_> { }) } - async fn activate<'a>(&'a mut self, plugin_id: &keyexpr) -> Result<(), String> { + async fn activate<'a>(&'a mut self) -> Result<(), String> { // For lifetime issue, redeclare the zenoh key expression that can't be stored in Self let declared_ke = self + .context .zsession .declare_keyexpr(self.zenoh_key_expr.clone()) .res() @@ -226,7 +221,8 @@ impl RouteServiceSrv<'_> { let client_guid = self.client_guid; let req_writer: i32 = self.req_writer; self.zenoh_queryable = Some( - self.zsession + self.context + .zsession .declare_queryable(&self.zenoh_key_expr) .callback(move |query| { do_route_request( @@ -251,10 +247,13 @@ impl RouteServiceSrv<'_> { // if not for an Action (since actions declare their own liveliness) if !is_service_for_action(&self.ros2_name) { // create associated LivelinessToken - let liveliness_ke = - new_ke_liveliness_service_srv(plugin_id, &self.zenoh_key_expr, &self.ros2_type)?; + let liveliness_ke = new_ke_liveliness_service_srv( + &self.context.plugin_id, + &self.zenoh_key_expr, + &self.ros2_type, + )?; let ros2_name = self.ros2_name.clone(); - self.liveliness_token = Some(self.zsession + self.liveliness_token = Some(self.context.zsession .liveliness() .declare_token(liveliness_ke) .res() @@ -305,12 +304,12 @@ impl RouteServiceSrv<'_> { } #[inline] - pub async fn add_local_node(&mut self, node: String, plugin_id: &keyexpr) { + pub async fn add_local_node(&mut self, node: String) { self.local_nodes.insert(node); log::debug!("{self} now serving local nodes {:?}", self.local_nodes); // if 1st local node added, activate the route if self.local_nodes.len() == 1 { - if let Err(e) = self.activate(plugin_id).await { + if let Err(e) = self.activate().await { log::error!("{self} activation failed: {e}"); } } diff --git a/zenoh-plugin-ros2dds/src/route_subscriber.rs b/zenoh-plugin-ros2dds/src/route_subscriber.rs index c9b2209..dd29d0c 100644 --- a/zenoh-plugin-ros2dds/src/route_subscriber.rs +++ b/zenoh-plugin-ros2dds/src/route_subscriber.rs @@ -19,7 +19,6 @@ use cyclors::{ use serde::Serialize; use std::collections::HashSet; use std::convert::TryInto; -use std::sync::Arc; use std::{ffi::CStr, fmt, time::Duration}; use zenoh::liveliness::LivelinessToken; use zenoh::prelude::*; @@ -32,9 +31,9 @@ use crate::gid::Gid; use crate::liveliness_mgt::new_ke_liveliness_sub; use crate::qos_helpers::is_transient_local; use crate::ros2_utils::{is_message_for_action, ros2_message_type_to_dds_type}; +use crate::routes_mgr::Context; use crate::{ - dds_utils::serialize_entity_guid, qos::Qos, vec_into_raw_parts, Config, KE_ANY_1_SEGMENT, - LOG_PAYLOAD, + dds_utils::serialize_entity_guid, qos::Qos, vec_into_raw_parts, KE_ANY_1_SEGMENT, LOG_PAYLOAD, }; use crate::{serialize_option_as_bool, KE_PREFIX_PUB_CACHE}; @@ -53,12 +52,9 @@ pub struct RouteSubscriber<'a> { ros2_type: String, // the Zenoh key expression used for routing zenoh_key_expr: OwnedKeyExpr, - // the zenoh session + // the context #[serde(skip)] - zsession: &'a Arc, - // the config - #[serde(skip)] - config: Arc, + context: Context<'a>, // the zenoh subscriber receiving data to be re-published by the DDS Writer // `None` when route is created on a remote announcement and no local ROS2 Subscriber discovered yet #[serde(rename = "is_active", serialize_with = "serialize_option_as_bool")] @@ -100,31 +96,33 @@ impl fmt::Display for RouteSubscriber<'_> { impl RouteSubscriber<'_> { #[allow(clippy::too_many_arguments)] - pub async fn create<'b>( - config: Arc, - zsession: &Arc, - participant: dds_entity_t, + pub async fn create<'a>( ros2_name: String, ros2_type: String, zenoh_key_expr: OwnedKeyExpr, keyless: bool, writer_qos: Qos, - ) -> Result, String> { + context: &Context<'a>, + ) -> Result, String> { let transient_local = is_transient_local(&writer_qos); log::debug!("Route Subscriber ({zenoh_key_expr} -> {ros2_name}): creation with type {ros2_type} (transient_local:{transient_local})"); let topic_name = format!("rt{ros2_name}"); let type_name = ros2_message_type_to_dds_type(&ros2_type); - let dds_writer = - create_dds_writer(participant, topic_name, type_name, keyless, writer_qos)?; + let dds_writer = create_dds_writer( + context.participant, + topic_name, + type_name, + keyless, + writer_qos, + )?; Ok(RouteSubscriber { ros2_name, ros2_type, zenoh_key_expr, - zsession, - config: config, + context: context.clone(), zenoh_subscriber: None, dds_writer, transient_local, @@ -135,11 +133,7 @@ impl RouteSubscriber<'_> { }) } - async fn activate( - &mut self, - plugin_id: &keyexpr, - discovered_reader_qos: &Qos, - ) -> Result<(), String> { + async fn activate(&mut self, discovered_reader_qos: &Qos) -> Result<(), String> { log::debug!("{self} activate"); // Callback routing data received by Zenoh subscriber to DDS Writer (if set) let ros2_name = self.ros2_name.clone(); @@ -156,13 +150,14 @@ impl RouteSubscriber<'_> { (*KE_PREFIX_PUB_CACHE / *KE_ANY_1_SEGMENT / &self.zenoh_key_expr).into(); log::debug!("{self}: query historical data from everybody for TRANSIENT_LOCAL Reader on {query_selector}"); let sub = self + .context .zsession .declare_subscriber(&self.zenoh_key_expr) .callback(subscriber_callback) .allowed_origin(Locality::Remote) // Allow only remote publications to avoid loops .reliable() .querying() - .query_timeout(self.config.queries_timeout) + .query_timeout(self.context.config.queries_timeout) .query_selector(query_selector) .query_accept_replies(ReplyKeyExpr::Any) .res() @@ -171,6 +166,7 @@ impl RouteSubscriber<'_> { Some(ZSubscriber::FetchingSubscriber(sub)) } else { let sub = self + .context .zsession .declare_subscriber(&self.zenoh_key_expr) .callback(subscriber_callback) @@ -186,7 +182,7 @@ impl RouteSubscriber<'_> { if !is_message_for_action(&self.ros2_name) { // create associated LivelinessToken let liveliness_ke = new_ke_liveliness_sub( - plugin_id, + &self.context.plugin_id, &self.zenoh_key_expr, &self.ros2_type, self.keyless, @@ -194,7 +190,7 @@ impl RouteSubscriber<'_> { )?; let ros2_name = self.ros2_name.clone(); self.liveliness_token = Some( - self.zsession + self.context.zsession .liveliness() .declare_token(liveliness_ke) .res() @@ -234,7 +230,7 @@ impl RouteSubscriber<'_> { if let Err(e) = sub .fetch({ - let session = &self.zsession; + let session = &self.context.zsession; let query_selector = query_selector.clone(); move |cb| { use zenoh_core::SyncResolve; @@ -286,17 +282,12 @@ impl RouteSubscriber<'_> { } #[inline] - pub async fn add_local_node( - &mut self, - entity_key: String, - plugin_id: &keyexpr, - discovered_reader_qos: &Qos, - ) { + pub async fn add_local_node(&mut self, entity_key: String, discovered_reader_qos: &Qos) { self.local_nodes.insert(entity_key); log::debug!("{self} now serving local nodes {:?}", self.local_nodes); // if 1st local node added, activate the route if self.local_nodes.len() == 1 { - if let Err(e) = self.activate(plugin_id, discovered_reader_qos).await { + if let Err(e) = self.activate(discovered_reader_qos).await { log::error!("{self} activation failed: {e}"); } } diff --git a/zenoh-plugin-ros2dds/src/routes_mgr.rs b/zenoh-plugin-ros2dds/src/routes_mgr.rs index 1279ecc..283bae8 100644 --- a/zenoh-plugin-ros2dds/src/routes_mgr.rs +++ b/zenoh-plugin-ros2dds/src/routes_mgr.rs @@ -68,12 +68,21 @@ enum RouteRef { ActionCliRoute(String), } +// A Context struct to be shared as an Arc amongst all the code +#[derive(Clone)] +pub struct Context<'a> { + pub(crate) plugin_id: Arc, + pub(crate) config: Arc, + pub(crate) zsession: &'a Arc, + pub(crate) participant: dds_entity_t, + // all discovered entities + pub(crate) discovered_entities: Arc>, + // ros_discovery_info read/write manager + pub(crate) ros_discovery_mgr: Arc, +} + pub struct RoutesMgr<'a> { - plugin_id: OwnedKeyExpr, - config: Arc, - zsession: &'a Arc, - participant: dds_entity_t, - discovered_entities: Arc>, + context: Arc>, // maps of established routes - ecah map indexed by topic/service/action name routes_publishers: HashMap>, routes_subscribers: HashMap>, @@ -81,8 +90,7 @@ pub struct RoutesMgr<'a> { routes_service_cli: HashMap>, routes_action_srv: HashMap>, routes_action_cli: HashMap>, - // ros_discovery_info read/write manager - ros_discovery_mgr: Arc, + // admin space key prefix (stripped in map indexes) admin_prefix: OwnedKeyExpr, // admin space: index is the admin_keyexpr (relative to admin_prefix) admin_space: HashMap, @@ -98,19 +106,23 @@ impl<'a> RoutesMgr<'a> { ros_discovery_mgr: Arc, admin_prefix: OwnedKeyExpr, ) -> RoutesMgr<'a> { - RoutesMgr { - plugin_id, + let context = Arc::new(Context { + plugin_id: Arc::new(plugin_id), config, zsession, participant, discovered_entities, + ros_discovery_mgr, + }); + + RoutesMgr { + context, routes_publishers: HashMap::new(), routes_subscribers: HashMap::new(), routes_service_srv: HashMap::new(), routes_service_cli: HashMap::new(), routes_action_srv: HashMap::new(), routes_action_cli: HashMap::new(), - ros_discovery_mgr, admin_prefix, admin_space: HashMap::new(), } @@ -123,10 +135,9 @@ impl<'a> RoutesMgr<'a> { use ROS2DiscoveryEvent::*; match event { DiscoveredMsgPub(node, iface) => { - let plugin_id = self.plugin_id.clone(); // Retrieve info on DDS Writer let entity = { - let entities = zread!(self.discovered_entities); + let entities = zread!(self.context.discovered_entities); entities .get_writer(&iface.writer) .ok_or(format!( @@ -145,7 +156,7 @@ impl<'a> RoutesMgr<'a> { true, ) .await?; - route.add_local_node(node, &plugin_id, &entity.qos).await; + route.add_local_node(node, &entity.qos).await; } UndiscoveredMsgPub(node, iface) => { @@ -158,10 +169,11 @@ impl<'a> RoutesMgr<'a> { .remove(&(*KE_PREFIX_ROUTE_PUBLISHER / iface.name_as_keyexpr())); let route = entry.remove(); // remove reader's GID in ros_discovery_msg - self.ros_discovery_mgr - .remove_dds_reader(route.dds_reader_guid().map_err(|e| { - format!("Failed to update ros_discovery_info message: {e}") - })?); + self.context.ros_discovery_mgr.remove_dds_reader( + route.dds_reader_guid().map_err(|e| { + format!("{route}: failed to update ros_discovery_info message: {e}") + })?, + ); log::info!("{route} removed"); } } @@ -170,7 +182,7 @@ impl<'a> RoutesMgr<'a> { DiscoveredMsgSub(node, iface) => { // Retrieve info on DDS Reader let entity = { - let entities = zread!(self.discovered_entities); + let entities = zread!(self.context.discovered_entities); entities .get_reader(&iface.reader) .ok_or(format!( @@ -179,7 +191,6 @@ impl<'a> RoutesMgr<'a> { ))? .clone() }; - let plugin_id = self.plugin_id.clone(); // Get route (create it if not yet exists) let route = self .get_or_create_route_subscriber( @@ -190,7 +201,7 @@ impl<'a> RoutesMgr<'a> { true, ) .await?; - route.add_local_node(node, &plugin_id, &entity.qos).await; + route.add_local_node(node, &entity.qos).await; } UndiscoveredMsgSub(node, iface) => { @@ -204,21 +215,21 @@ impl<'a> RoutesMgr<'a> { .remove(&(*KE_PREFIX_ROUTE_SUBSCRIBER / iface.name_as_keyexpr())); let route = entry.remove(); // remove writer's GID in ros_discovery_msg - self.ros_discovery_mgr - .remove_dds_writer(route.dds_writer_guid().map_err(|e| { - format!("Failed to update ros_discovery_info message: {e}") - })?); + self.context.ros_discovery_mgr.remove_dds_writer( + route.dds_writer_guid().map_err(|e| { + format!("{route}: failed to update ros_discovery_info message: {e}") + })?, + ); log::info!("{route} removed"); } } } DiscoveredServiceSrv(node, iface) => { - let plugin_id = self.plugin_id.clone(); // Get route (create it if not yet exists) let route = self .get_or_create_route_service_srv(iface.name, iface.typ, true) .await?; - route.add_local_node(node, &plugin_id).await; + route.add_local_node(node).await; } UndiscoveredServiceSrv(node, iface) => { if let Entry::Occupied(mut entry) = @@ -231,14 +242,14 @@ impl<'a> RoutesMgr<'a> { .remove(&(*KE_PREFIX_ROUTE_SERVICE_SRV / iface.name_as_keyexpr())); let route = entry.remove(); // remove reader's and writer's GID in ros_discovery_msg - self.ros_discovery_mgr.remove_dds_reader( + self.context.ros_discovery_mgr.remove_dds_reader( route.dds_rep_reader_guid().map_err(|e| { - format!("Failed to update ros_discovery_info message: {e}") + format!("{route}: failed to update ros_discovery_info message: {e}") })?, ); - self.ros_discovery_mgr.remove_dds_writer( + self.context.ros_discovery_mgr.remove_dds_writer( route.dds_req_writer_guid().map_err(|e| { - format!("Failed to update ros_discovery_info message: {e}") + format!("{route}: failed to update ros_discovery_info message: {e}") })?, ); log::info!("{route} removed"); @@ -246,12 +257,11 @@ impl<'a> RoutesMgr<'a> { } } DiscoveredServiceCli(node, iface) => { - let plugin_id = self.plugin_id.clone(); // Get route (create it if not yet exists) let route = self .get_or_create_route_service_cli(iface.name, iface.typ, true) .await?; - route.add_local_node(node, &plugin_id).await; + route.add_local_node(node).await; } UndiscoveredServiceCli(node, iface) => { if let Entry::Occupied(mut entry) = @@ -264,14 +274,14 @@ impl<'a> RoutesMgr<'a> { .remove(&(*KE_PREFIX_ROUTE_SERVICE_CLI / iface.name_as_keyexpr())); let route = entry.remove(); // remove reader's and writer's GID in ros_discovery_msg - self.ros_discovery_mgr.remove_dds_reader( + self.context.ros_discovery_mgr.remove_dds_reader( route.dds_req_reader_guid().map_err(|e| { - format!("Failed to update ros_discovery_info message: {e}") + format!("{route}: failed to update ros_discovery_info message: {e}") })?, ); - self.ros_discovery_mgr.remove_dds_writer( + self.context.ros_discovery_mgr.remove_dds_writer( route.dds_rep_writer_guid().map_err(|e| { - format!("Failed to update ros_discovery_info message: {e}") + format!("{route}: failed to update ros_discovery_info message: {e}") })?, ); log::info!("{route} removed"); @@ -279,12 +289,11 @@ impl<'a> RoutesMgr<'a> { } } DiscoveredActionSrv(node, iface) => { - let plugin_id = self.plugin_id.clone(); // Get route (create it if not yet exists) let route = self .get_or_create_route_action_srv(iface.name, iface.typ) .await?; - route.add_local_node(node, &plugin_id).await; + route.add_local_node(node).await; } UndiscoveredActionSrv(node, iface) => { if let Entry::Occupied(mut entry) = self.routes_action_srv.entry(iface.name.clone()) @@ -296,14 +305,14 @@ impl<'a> RoutesMgr<'a> { .remove(&(*KE_PREFIX_ROUTE_ACTION_SRV / iface.name_as_keyexpr())); let route = entry.remove(); // remove reader's and writer's GID in ros_discovery_msg - self.ros_discovery_mgr.remove_dds_readers( + self.context.ros_discovery_mgr.remove_dds_readers( route.dds_readers_guids().map_err(|e| { - format!("Failed to update ros_discovery_info message: {e}") + format!("{route}: failed to update ros_discovery_info message: {e}") })?, ); - self.ros_discovery_mgr.remove_dds_writers( + self.context.ros_discovery_mgr.remove_dds_writers( route.dds_writers_guids().map_err(|e| { - format!("Failed to update ros_discovery_info message: {e}") + format!("{route}: failed to update ros_discovery_info message: {e}") })?, ); log::info!("{route} removed"); @@ -311,12 +320,11 @@ impl<'a> RoutesMgr<'a> { } } DiscoveredActionCli(node, iface) => { - let plugin_id = self.plugin_id.clone(); // Get route (create it if not yet exists) let route = self .get_or_create_route_action_cli(iface.name, iface.typ) .await?; - route.add_local_node(node, &plugin_id).await; + route.add_local_node(node).await; } UndiscoveredActionCli(node, iface) => { if let Entry::Occupied(mut entry) = self.routes_action_cli.entry(iface.name.clone()) @@ -328,14 +336,14 @@ impl<'a> RoutesMgr<'a> { .remove(&(*KE_PREFIX_ROUTE_ACTION_CLI / iface.name_as_keyexpr())); let route = entry.remove(); // remove reader's and writer's GID in ros_discovery_msg - self.ros_discovery_mgr.remove_dds_readers( + self.context.ros_discovery_mgr.remove_dds_readers( route.dds_readers_guids().map_err(|e| { - format!("Failed to update ros_discovery_info message: {e}") + format!("{route}: failed to update ros_discovery_info message: {e}") })?, ); - self.ros_discovery_mgr.remove_dds_writers( + self.context.ros_discovery_mgr.remove_dds_writers( route.dds_writers_guids().map_err(|e| { - format!("Failed to update ros_discovery_info message: {e}") + format!("{route}: failed to update ros_discovery_info message: {e}") })?, ); log::info!("{route} removed"); @@ -387,10 +395,11 @@ impl<'a> RoutesMgr<'a> { .remove(&(*KE_PREFIX_ROUTE_SUBSCRIBER / &zenoh_key_expr)); let route = entry.remove(); // remove writer's GID in ros_discovery_msg - self.ros_discovery_mgr - .remove_dds_writer(route.dds_writer_guid().map_err(|e| { - format!("Failed to update ros_discovery_info message: {e}") - })?); + self.context.ros_discovery_mgr.remove_dds_writer( + route.dds_writer_guid().map_err(|e| { + format!("{route}: failed to update ros_discovery_info message: {e}") + })?, + ); log::info!("{route} removed"); } } @@ -431,10 +440,11 @@ impl<'a> RoutesMgr<'a> { .remove(&(*KE_PREFIX_ROUTE_PUBLISHER / &zenoh_key_expr)); let route = entry.remove(); // remove reader's GID in ros_discovery_msg - self.ros_discovery_mgr - .remove_dds_reader(route.dds_reader_guid().map_err(|e| { - format!("Failed to update ros_discovery_info message: {e}") - })?); + self.context.ros_discovery_mgr.remove_dds_reader( + route.dds_reader_guid().map_err(|e| { + format!("{route}: failed to update ros_discovery_info message: {e}") + })?, + ); log::info!("{route} removed"); } } @@ -467,14 +477,14 @@ impl<'a> RoutesMgr<'a> { .remove(&(*KE_PREFIX_ROUTE_SERVICE_CLI / &zenoh_key_expr)); let route = entry.remove(); // remove reader's and writer's GID in ros_discovery_msg - self.ros_discovery_mgr.remove_dds_reader( + self.context.ros_discovery_mgr.remove_dds_reader( route.dds_req_reader_guid().map_err(|e| { - format!("Failed to update ros_discovery_info message: {e}") + format!("{route}: failed to update ros_discovery_info message: {e}") })?, ); - self.ros_discovery_mgr.remove_dds_writer( + self.context.ros_discovery_mgr.remove_dds_writer( route.dds_rep_writer_guid().map_err(|e| { - format!("Failed to update ros_discovery_info message: {e}") + format!("{route}: failed to update ros_discovery_info message: {e}") })?, ); log::info!("{route} removed"); @@ -509,14 +519,14 @@ impl<'a> RoutesMgr<'a> { .remove(&(*KE_PREFIX_ROUTE_SERVICE_SRV / &zenoh_key_expr)); let route = entry.remove(); // remove reader's and writer's GID in ros_discovery_msg - self.ros_discovery_mgr.remove_dds_reader( + self.context.ros_discovery_mgr.remove_dds_reader( route.dds_rep_reader_guid().map_err(|e| { - format!("Failed to update ros_discovery_info message: {e}") + format!("{route}: failed to update ros_discovery_info message: {e}") })?, ); - self.ros_discovery_mgr.remove_dds_writer( + self.context.ros_discovery_mgr.remove_dds_writer( route.dds_req_writer_guid().map_err(|e| { - format!("Failed to update ros_discovery_info message: {e}") + format!("{route}: failed to update ros_discovery_info message: {e}") })?, ); log::info!("{route} removed"); @@ -551,14 +561,14 @@ impl<'a> RoutesMgr<'a> { .remove(&(*KE_PREFIX_ROUTE_SERVICE_CLI / &zenoh_key_expr)); let route = entry.remove(); // remove reader's and writer's GID in ros_discovery_msg - self.ros_discovery_mgr.remove_dds_readers( + self.context.ros_discovery_mgr.remove_dds_readers( route.dds_readers_guids().map_err(|e| { - format!("Failed to update ros_discovery_info message: {e}") + format!("{route}: failed to update ros_discovery_info message: {e}") })?, ); - self.ros_discovery_mgr.remove_dds_writers( + self.context.ros_discovery_mgr.remove_dds_writers( route.dds_writers_guids().map_err(|e| { - format!("Failed to update ros_discovery_info message: {e}") + format!("{route}: failed to update ros_discovery_info message: {e}") })?, ); log::info!("{route} removed"); @@ -593,14 +603,14 @@ impl<'a> RoutesMgr<'a> { .remove(&(*KE_PREFIX_ROUTE_SERVICE_SRV / &zenoh_key_expr)); let route = entry.remove(); // remove reader's and writer's GID in ros_discovery_msg - self.ros_discovery_mgr.remove_dds_readers( + self.context.ros_discovery_mgr.remove_dds_readers( route.dds_readers_guids().map_err(|e| { - format!("Failed to update ros_discovery_info message: {e}") + format!("{route}: failed to update ros_discovery_info message: {e}") })?, ); - self.ros_discovery_mgr.remove_dds_writers( + self.context.ros_discovery_mgr.remove_dds_writers( route.dds_writers_guids().map_err(|e| { - format!("Failed to update ros_discovery_info message: {e}") + format!("{route}: failed to update ros_discovery_info message: {e}") })?, ); log::info!("{route} removed"); @@ -614,7 +624,7 @@ impl<'a> RoutesMgr<'a> { pub async fn query_historical_all_publications(&mut self, plugin_id: &keyexpr) { for route in self.routes_subscribers.values_mut() { route - .query_historical_publications(plugin_id, self.config.queries_timeout) + .query_historical_publications(plugin_id, self.context.config.queries_timeout) .await; } } @@ -633,25 +643,23 @@ impl<'a> RoutesMgr<'a> { let zenoh_key_expr = ke_for_sure!(&ros2_name[1..]); // create route let route = RoutePublisher::create( - self.config.clone(), - self.zsession, - self.participant, ros2_name.clone(), ros2_type, zenoh_key_expr.to_owned(), &None, keyless, reader_qos, + &self.context, ) .await?; log::info!("{route} created"); // insert reader's GID in ros_discovery_msg - self.ros_discovery_mgr.add_dds_reader( - route - .dds_reader_guid() - .map_err(|e| format!("Failed to update ros_discovery_info message: {e}"))?, - ); + self.context + .ros_discovery_mgr + .add_dds_reader(route.dds_reader_guid().map_err(|e| { + format!("{route}: failed to update ros_discovery_info message: {e}") + })?); if admin_space_ref { // insert reference in admin_space @@ -680,24 +688,22 @@ impl<'a> RoutesMgr<'a> { let zenoh_key_expr = ke_for_sure!(&ros2_name[1..]); // create route let route = RouteSubscriber::create( - self.config.clone(), - self.zsession, - self.participant, ros2_name.clone(), ros2_type, zenoh_key_expr.to_owned(), keyless, writer_qos, + &self.context, ) .await?; log::info!("{route} created"); // insert writer's GID in ros_discovery_msg - self.ros_discovery_mgr.add_dds_writer( - route - .dds_writer_guid() - .map_err(|e| format!("Failed to update ros_discovery_info message: {e}"))?, - ); + self.context + .ros_discovery_mgr + .add_dds_writer(route.dds_writer_guid().map_err(|e| { + format!("{route}: failed to update ros_discovery_info message: {e}") + })?); if admin_space_ref { // insert reference in admin_space @@ -724,27 +730,25 @@ impl<'a> RoutesMgr<'a> { let zenoh_key_expr = ke_for_sure!(&ros2_name[1..]); // create route let route = RouteServiceSrv::create( - self.config.clone(), - self.zsession, - self.participant, ros2_name.clone(), ros2_type, zenoh_key_expr.to_owned(), &None, + &self.context, ) .await?; log::info!("{route} created"); // insert reader's and writer's GID in ros_discovery_msg - self.ros_discovery_mgr.add_dds_reader( - route - .dds_rep_reader_guid() - .map_err(|e| format!("Failed to update ros_discovery_info message: {e}"))?, + self.context.ros_discovery_mgr.add_dds_reader( + route.dds_rep_reader_guid().map_err(|e| { + format!("{route}: failed to update ros_discovery_info message: {e}") + })?, ); - self.ros_discovery_mgr.add_dds_writer( - route - .dds_req_writer_guid() - .map_err(|e| format!("Failed to update ros_discovery_info message: {e}"))?, + self.context.ros_discovery_mgr.add_dds_writer( + route.dds_req_writer_guid().map_err(|e| { + format!("{route}: failed to update ros_discovery_info message: {e}") + })?, ); if admin_space_ref { @@ -772,27 +776,25 @@ impl<'a> RoutesMgr<'a> { let zenoh_key_expr = ke_for_sure!(&ros2_name[1..]); // create route let route = RouteServiceCli::create( - self.config.clone(), - self.zsession, - self.participant, ros2_name.clone(), ros2_type, zenoh_key_expr.to_owned(), &None, + &self.context, ) .await?; log::info!("{route} created"); // insert reader's and writer's GID in ros_discovery_msg - self.ros_discovery_mgr.add_dds_reader( - route - .dds_req_reader_guid() - .map_err(|e| format!("Failed to update ros_discovery_info message: {e}"))?, + self.context.ros_discovery_mgr.add_dds_reader( + route.dds_req_reader_guid().map_err(|e| { + format!("{route}: failed to update ros_discovery_info message: {e}") + })?, ); - self.ros_discovery_mgr.add_dds_writer( - route - .dds_rep_writer_guid() - .map_err(|e| format!("Failed to update ros_discovery_info message: {e}"))?, + self.context.ros_discovery_mgr.add_dds_writer( + route.dds_rep_writer_guid().map_err(|e| { + format!("{route}: failed to update ros_discovery_info message: {e}") + })?, ); if admin_space_ref { @@ -819,27 +821,25 @@ impl<'a> RoutesMgr<'a> { let zenoh_key_expr = ke_for_sure!(&ros2_name[1..]); // create route let route = RouteActionSrv::create( - self.config.clone(), - self.zsession, - self.participant, ros2_name.clone(), ros2_type, zenoh_key_expr.to_owned(), + &self.context, ) .await?; log::info!("{route} created"); // insert readers' and writes' GID in ros_discovery_msg - self.ros_discovery_mgr.add_dds_readers( - route - .dds_readers_guids() - .map_err(|e| format!("Failed to update ros_discovery_info message: {e}"))?, - ); - self.ros_discovery_mgr.add_dds_writers( - route - .dds_writers_guids() - .map_err(|e| format!("Failed to update ros_discovery_info message: {e}"))?, - ); + self.context + .ros_discovery_mgr + .add_dds_readers(route.dds_readers_guids().map_err(|e| { + format!("{route}: failed to update ros_discovery_info message: {e}") + })?); + self.context + .ros_discovery_mgr + .add_dds_writers(route.dds_writers_guids().map_err(|e| { + format!("{route}: failed to update ros_discovery_info message: {e}") + })?); // insert reference in admin_space let admin_ke = *KE_PREFIX_ROUTE_ACTION_SRV / zenoh_key_expr; @@ -863,27 +863,25 @@ impl<'a> RoutesMgr<'a> { let zenoh_key_expr = ke_for_sure!(&ros2_name[1..]); // create route let route = RouteActionCli::create( - self.config.clone(), - self.zsession, - self.participant, ros2_name.clone(), ros2_type, zenoh_key_expr.to_owned(), + &self.context, ) .await?; log::info!("{route} created"); // insert readers' and writes' GID in ros_discovery_msg - self.ros_discovery_mgr.add_dds_readers( - route - .dds_readers_guids() - .map_err(|e| format!("Failed to update ros_discovery_info message: {e}"))?, - ); - self.ros_discovery_mgr.add_dds_writers( - route - .dds_writers_guids() - .map_err(|e| format!("Failed to update ros_discovery_info message: {e}"))?, - ); + self.context + .ros_discovery_mgr + .add_dds_readers(route.dds_readers_guids().map_err(|e| { + format!("{route}: failed to update ros_discovery_info message: {e}") + })?); + self.context + .ros_discovery_mgr + .add_dds_writers(route.dds_writers_guids().map_err(|e| { + format!("{route}: failed to update ros_discovery_info message: {e}") + })?); // insert reference in admin_space let admin_ke = *KE_PREFIX_ROUTE_ACTION_CLI / zenoh_key_expr; From 4498e9df6f05b0de25d6075987061d64b217d61b Mon Sep 17 00:00:00 2001 From: Julien Enoch Date: Thu, 5 Oct 2023 17:21:26 +0200 Subject: [PATCH 2/3] Redesign: improve ros_discovery_msg mngt --- zenoh-plugin-ros2dds/src/ros_discovery.rs | 26 ---- zenoh-plugin-ros2dds/src/route_publisher.rs | 10 ++ zenoh-plugin-ros2dds/src/route_service_cli.rs | 19 +++ zenoh-plugin-ros2dds/src/route_service_srv.rs | 19 +++ zenoh-plugin-ros2dds/src/route_subscriber.rs | 10 ++ zenoh-plugin-ros2dds/src/routes_mgr.rs | 136 ------------------ 6 files changed, 58 insertions(+), 162 deletions(-) diff --git a/zenoh-plugin-ros2dds/src/ros_discovery.rs b/zenoh-plugin-ros2dds/src/ros_discovery.rs index 1c7a4c3..e3963bb 100644 --- a/zenoh-plugin-ros2dds/src/ros_discovery.rs +++ b/zenoh-plugin-ros2dds/src/ros_discovery.rs @@ -235,19 +235,6 @@ impl RosDiscoveryInfoMgr { *has_changed = true; } - pub fn remove_dds_writers(&self, gids: Vec) { - let (ref mut info, ref mut has_changed) = *zwrite!(self.participant_entities_state); - let writer_gid_seq = &mut info - .node_entities_info_seq - .get_mut(&self.node_fullname) - .unwrap() - .writer_gid_seq; - for gid in gids { - writer_gid_seq.remove(&gid); - } - *has_changed = true; - } - pub fn add_dds_reader(&self, gid: Gid) { let (ref mut info, ref mut has_changed) = *zwrite!(self.participant_entities_state); info.node_entities_info_seq @@ -281,19 +268,6 @@ impl RosDiscoveryInfoMgr { *has_changed = true; } - pub fn remove_dds_readers(&self, gids: Vec) { - let (ref mut info, ref mut has_changed) = *zwrite!(self.participant_entities_state); - let reader_gid_seq = &mut info - .node_entities_info_seq - .get_mut(&self.node_fullname) - .unwrap() - .reader_gid_seq; - for gid in gids { - reader_gid_seq.remove(&gid); - } - *has_changed = true; - } - pub fn read(&self) -> Vec { unsafe { let mut zp: *mut ddsi_serdata = std::ptr::null_mut(); diff --git a/zenoh-plugin-ros2dds/src/route_publisher.rs b/zenoh-plugin-ros2dds/src/route_publisher.rs index 68d71ef..4459c06 100644 --- a/zenoh-plugin-ros2dds/src/route_publisher.rs +++ b/zenoh-plugin-ros2dds/src/route_publisher.rs @@ -74,6 +74,12 @@ pub struct RoutePublisher<'a> { impl Drop for RoutePublisher<'_> { fn drop(&mut self) { + // remove reader's GID from ros_discovery_info message + match get_guid(&self.dds_reader) { + Ok(gid) => self.context.ros_discovery_mgr.remove_dds_reader(gid), + Err(e) => log::warn!("{self}: {e}"), + } + if let Err(e) = delete_dds_entity(self.dds_reader) { log::warn!("{}: error deleting DDS Reader: {}", self, e); } @@ -141,6 +147,10 @@ impl RoutePublisher<'_> { read_period, congestion_ctrl, )?; + // add reader's GID in ros_discovery_info message + context + .ros_discovery_mgr + .add_dds_reader(get_guid(&dds_reader)?); Ok(RoutePublisher { ros2_name, diff --git a/zenoh-plugin-ros2dds/src/route_service_cli.rs b/zenoh-plugin-ros2dds/src/route_service_cli.rs index 4a71c95..5b72587 100644 --- a/zenoh-plugin-ros2dds/src/route_service_cli.rs +++ b/zenoh-plugin-ros2dds/src/route_service_cli.rs @@ -69,6 +69,17 @@ pub struct RouteServiceCli<'a> { impl Drop for RouteServiceCli<'_> { fn drop(&mut self) { + // remove reader's GID from ros_discovery_info message + match get_guid(&self.req_reader) { + Ok(gid) => self.context.ros_discovery_mgr.remove_dds_reader(gid), + Err(e) => log::warn!("{self}: {e}"), + } + // remove writer's GID from ros_discovery_info message + match get_guid(&self.rep_writer) { + Ok(gid) => self.context.ros_discovery_mgr.remove_dds_writer(gid), + Err(e) => log::warn!("{self}: {e}"), + } + if let Err(e) = delete_dds_entity(self.req_reader) { log::warn!("{}: error deleting DDS Reader: {}", self, e); } @@ -132,6 +143,10 @@ impl RouteServiceCli<'_> { true, qos.clone(), )?; + // add writer's GID in ros_discovery_info message + context + .ros_discovery_mgr + .add_dds_writer(get_guid(&rep_writer)?); let route_id: String = format!("Route Service Client (ROS:{ros2_name} <-> Zenoh:{zenoh_key_expr})",); @@ -153,6 +168,10 @@ impl RouteServiceCli<'_> { do_route_request(&route_id, sample, &zenoh_key_expr2, &zsession2, rep_writer); }, )?; + // add reader's GID in ros_discovery_info message + context + .ros_discovery_mgr + .add_dds_reader(get_guid(&req_reader)?); Ok(RouteServiceCli { ros2_name, diff --git a/zenoh-plugin-ros2dds/src/route_service_srv.rs b/zenoh-plugin-ros2dds/src/route_service_srv.rs index 8429306..94f361f 100644 --- a/zenoh-plugin-ros2dds/src/route_service_srv.rs +++ b/zenoh-plugin-ros2dds/src/route_service_srv.rs @@ -84,6 +84,17 @@ pub struct RouteServiceSrv<'a> { impl Drop for RouteServiceSrv<'_> { fn drop(&mut self) { + // remove writer's GID from ros_discovery_info message + match get_guid(&self.req_writer) { + Ok(gid) => self.context.ros_discovery_mgr.remove_dds_writer(gid), + Err(e) => log::warn!("{self}: {e}"), + } + // remove reader's GID from ros_discovery_info message + match get_guid(&self.rep_reader) { + Ok(gid) => self.context.ros_discovery_mgr.remove_dds_reader(gid), + Err(e) => log::warn!("{self}: {e}"), + } + if let Err(e) = delete_dds_entity(self.req_writer) { log::warn!("{}: error deleting DDS Writer: {}", self, e); } @@ -147,6 +158,10 @@ impl RouteServiceSrv<'_> { true, qos.clone(), )?; + // add writer's GID in ros_discovery_info message + context + .ros_discovery_mgr + .add_dds_writer(get_guid(&req_writer)?); // client_guid used in requests; use dds_instance_handle of writer as rmw_cyclonedds here: // https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/rmw_node.cpp#L4848 @@ -179,6 +194,10 @@ impl RouteServiceSrv<'_> { ); }, )?; + // add reader's GID in ros_discovery_info message + context + .ros_discovery_mgr + .add_dds_reader(get_guid(&rep_reader)?); Ok(RouteServiceSrv { ros2_name, diff --git a/zenoh-plugin-ros2dds/src/route_subscriber.rs b/zenoh-plugin-ros2dds/src/route_subscriber.rs index dd29d0c..3feb753 100644 --- a/zenoh-plugin-ros2dds/src/route_subscriber.rs +++ b/zenoh-plugin-ros2dds/src/route_subscriber.rs @@ -78,6 +78,12 @@ pub struct RouteSubscriber<'a> { impl Drop for RouteSubscriber<'_> { fn drop(&mut self) { + // remove writer's GID from ros_discovery_info message + match get_guid(&self.dds_writer) { + Ok(gid) => self.context.ros_discovery_mgr.remove_dds_writer(gid), + Err(e) => log::warn!("{self}: {e}"), + } + if let Err(e) = delete_dds_entity(self.dds_writer) { log::warn!("{}: error deleting DDS Reader: {}", self, e); } @@ -117,6 +123,10 @@ impl RouteSubscriber<'_> { keyless, writer_qos, )?; + // add writer's GID in ros_discovery_info message + context + .ros_discovery_mgr + .add_dds_writer(get_guid(&dds_writer)?); Ok(RouteSubscriber { ros2_name, diff --git a/zenoh-plugin-ros2dds/src/routes_mgr.rs b/zenoh-plugin-ros2dds/src/routes_mgr.rs index 283bae8..873efdd 100644 --- a/zenoh-plugin-ros2dds/src/routes_mgr.rs +++ b/zenoh-plugin-ros2dds/src/routes_mgr.rs @@ -168,12 +168,6 @@ impl<'a> RoutesMgr<'a> { self.admin_space .remove(&(*KE_PREFIX_ROUTE_PUBLISHER / iface.name_as_keyexpr())); let route = entry.remove(); - // remove reader's GID in ros_discovery_msg - self.context.ros_discovery_mgr.remove_dds_reader( - route.dds_reader_guid().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?, - ); log::info!("{route} removed"); } } @@ -214,12 +208,6 @@ impl<'a> RoutesMgr<'a> { self.admin_space .remove(&(*KE_PREFIX_ROUTE_SUBSCRIBER / iface.name_as_keyexpr())); let route = entry.remove(); - // remove writer's GID in ros_discovery_msg - self.context.ros_discovery_mgr.remove_dds_writer( - route.dds_writer_guid().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?, - ); log::info!("{route} removed"); } } @@ -241,17 +229,6 @@ impl<'a> RoutesMgr<'a> { self.admin_space .remove(&(*KE_PREFIX_ROUTE_SERVICE_SRV / iface.name_as_keyexpr())); let route = entry.remove(); - // remove reader's and writer's GID in ros_discovery_msg - self.context.ros_discovery_mgr.remove_dds_reader( - route.dds_rep_reader_guid().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?, - ); - self.context.ros_discovery_mgr.remove_dds_writer( - route.dds_req_writer_guid().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?, - ); log::info!("{route} removed"); } } @@ -273,17 +250,6 @@ impl<'a> RoutesMgr<'a> { self.admin_space .remove(&(*KE_PREFIX_ROUTE_SERVICE_CLI / iface.name_as_keyexpr())); let route = entry.remove(); - // remove reader's and writer's GID in ros_discovery_msg - self.context.ros_discovery_mgr.remove_dds_reader( - route.dds_req_reader_guid().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?, - ); - self.context.ros_discovery_mgr.remove_dds_writer( - route.dds_rep_writer_guid().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?, - ); log::info!("{route} removed"); } } @@ -304,17 +270,6 @@ impl<'a> RoutesMgr<'a> { self.admin_space .remove(&(*KE_PREFIX_ROUTE_ACTION_SRV / iface.name_as_keyexpr())); let route = entry.remove(); - // remove reader's and writer's GID in ros_discovery_msg - self.context.ros_discovery_mgr.remove_dds_readers( - route.dds_readers_guids().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?, - ); - self.context.ros_discovery_mgr.remove_dds_writers( - route.dds_writers_guids().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?, - ); log::info!("{route} removed"); } } @@ -335,17 +290,6 @@ impl<'a> RoutesMgr<'a> { self.admin_space .remove(&(*KE_PREFIX_ROUTE_ACTION_CLI / iface.name_as_keyexpr())); let route = entry.remove(); - // remove reader's and writer's GID in ros_discovery_msg - self.context.ros_discovery_mgr.remove_dds_readers( - route.dds_readers_guids().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?, - ); - self.context.ros_discovery_mgr.remove_dds_writers( - route.dds_writers_guids().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?, - ); log::info!("{route} removed"); } } @@ -394,12 +338,6 @@ impl<'a> RoutesMgr<'a> { self.admin_space .remove(&(*KE_PREFIX_ROUTE_SUBSCRIBER / &zenoh_key_expr)); let route = entry.remove(); - // remove writer's GID in ros_discovery_msg - self.context.ros_discovery_mgr.remove_dds_writer( - route.dds_writer_guid().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?, - ); log::info!("{route} removed"); } } @@ -439,12 +377,6 @@ impl<'a> RoutesMgr<'a> { self.admin_space .remove(&(*KE_PREFIX_ROUTE_PUBLISHER / &zenoh_key_expr)); let route = entry.remove(); - // remove reader's GID in ros_discovery_msg - self.context.ros_discovery_mgr.remove_dds_reader( - route.dds_reader_guid().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?, - ); log::info!("{route} removed"); } } @@ -476,17 +408,6 @@ impl<'a> RoutesMgr<'a> { self.admin_space .remove(&(*KE_PREFIX_ROUTE_SERVICE_CLI / &zenoh_key_expr)); let route = entry.remove(); - // remove reader's and writer's GID in ros_discovery_msg - self.context.ros_discovery_mgr.remove_dds_reader( - route.dds_req_reader_guid().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?, - ); - self.context.ros_discovery_mgr.remove_dds_writer( - route.dds_rep_writer_guid().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?, - ); log::info!("{route} removed"); } } @@ -518,17 +439,6 @@ impl<'a> RoutesMgr<'a> { self.admin_space .remove(&(*KE_PREFIX_ROUTE_SERVICE_SRV / &zenoh_key_expr)); let route = entry.remove(); - // remove reader's and writer's GID in ros_discovery_msg - self.context.ros_discovery_mgr.remove_dds_reader( - route.dds_rep_reader_guid().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?, - ); - self.context.ros_discovery_mgr.remove_dds_writer( - route.dds_req_writer_guid().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?, - ); log::info!("{route} removed"); } } @@ -560,17 +470,6 @@ impl<'a> RoutesMgr<'a> { self.admin_space .remove(&(*KE_PREFIX_ROUTE_SERVICE_CLI / &zenoh_key_expr)); let route = entry.remove(); - // remove reader's and writer's GID in ros_discovery_msg - self.context.ros_discovery_mgr.remove_dds_readers( - route.dds_readers_guids().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?, - ); - self.context.ros_discovery_mgr.remove_dds_writers( - route.dds_writers_guids().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?, - ); log::info!("{route} removed"); } } @@ -602,17 +501,6 @@ impl<'a> RoutesMgr<'a> { self.admin_space .remove(&(*KE_PREFIX_ROUTE_SERVICE_SRV / &zenoh_key_expr)); let route = entry.remove(); - // remove reader's and writer's GID in ros_discovery_msg - self.context.ros_discovery_mgr.remove_dds_readers( - route.dds_readers_guids().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?, - ); - self.context.ros_discovery_mgr.remove_dds_writers( - route.dds_writers_guids().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?, - ); log::info!("{route} removed"); } } @@ -739,18 +627,6 @@ impl<'a> RoutesMgr<'a> { .await?; log::info!("{route} created"); - // insert reader's and writer's GID in ros_discovery_msg - self.context.ros_discovery_mgr.add_dds_reader( - route.dds_rep_reader_guid().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?, - ); - self.context.ros_discovery_mgr.add_dds_writer( - route.dds_req_writer_guid().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?, - ); - if admin_space_ref { // insert reference in admin_space let admin_ke = *KE_PREFIX_ROUTE_SERVICE_SRV / zenoh_key_expr; @@ -785,18 +661,6 @@ impl<'a> RoutesMgr<'a> { .await?; log::info!("{route} created"); - // insert reader's and writer's GID in ros_discovery_msg - self.context.ros_discovery_mgr.add_dds_reader( - route.dds_req_reader_guid().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?, - ); - self.context.ros_discovery_mgr.add_dds_writer( - route.dds_rep_writer_guid().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?, - ); - if admin_space_ref { // insert reference in admin_space let admin_ke = *KE_PREFIX_ROUTE_SERVICE_CLI / zenoh_key_expr; From 55706ece7a9465ff660267df7788b6b071f97513 Mon Sep 17 00:00:00 2001 From: Julien Enoch Date: Fri, 6 Oct 2023 17:41:35 +0200 Subject: [PATCH 3/3] RoutePublisher: create Reader only on remote announcement --- zenoh-plugin-ros2dds/src/dds_discovery.rs | 156 +------- zenoh-plugin-ros2dds/src/dds_utils.rs | 17 +- zenoh-plugin-ros2dds/src/liveliness_mgt.rs | 1 - zenoh-plugin-ros2dds/src/ros_discovery.rs | 26 -- zenoh-plugin-ros2dds/src/route_action_cli.rs | 22 +- zenoh-plugin-ros2dds/src/route_action_srv.rs | 24 +- zenoh-plugin-ros2dds/src/route_publisher.rs | 335 ++++++++++-------- zenoh-plugin-ros2dds/src/route_service_cli.rs | 9 - zenoh-plugin-ros2dds/src/route_service_srv.rs | 9 - zenoh-plugin-ros2dds/src/route_subscriber.rs | 6 - zenoh-plugin-ros2dds/src/routes_mgr.rs | 38 -- 11 files changed, 196 insertions(+), 447 deletions(-) diff --git a/zenoh-plugin-ros2dds/src/dds_discovery.rs b/zenoh-plugin-ros2dds/src/dds_discovery.rs index 0e1f265..305c65d 100644 --- a/zenoh-plugin-ros2dds/src/dds_discovery.rs +++ b/zenoh-plugin-ros2dds/src/dds_discovery.rs @@ -11,8 +11,7 @@ // Contributors: // ZettaScale Zenoh Team, // -use async_std::task; -use cyclors::qos::{History, HistoryKind, Qos}; +use cyclors::qos::Qos; use cyclors::*; use flume::Sender; use serde::{Deserialize, Serialize}; @@ -21,14 +20,8 @@ use std::fmt; use std::mem::MaybeUninit; use std::os::raw; use std::sync::Arc; -use std::time::Duration; -use zenoh::prelude::*; -use zenoh::publication::CongestionControl; -use zenoh::Session; -use zenoh_core::SyncResolve; -use crate::dds_types::{DDSRawSample, TypeInfo}; -use crate::dds_utils::create_topic; +use crate::dds_types::TypeInfo; use crate::gid::Gid; const MAX_SAMPLES: usize = 32; @@ -295,148 +288,3 @@ pub fn run_discovery(dp: dds_entity_t, tx: Sender) { ); } } - -unsafe extern "C" fn data_forwarder_listener(dr: dds_entity_t, arg: *mut std::os::raw::c_void) { - let pa = arg as *mut (String, KeyExpr, Arc, CongestionControl); - let mut zp: *mut ddsi_serdata = std::ptr::null_mut(); - #[allow(clippy::uninit_assumed_init)] - let mut si = MaybeUninit::<[dds_sample_info_t; 1]>::uninit(); - while dds_takecdr( - dr, - &mut zp, - 1, - si.as_mut_ptr() as *mut dds_sample_info_t, - DDS_ANY_STATE, - ) > 0 - { - let si = si.assume_init(); - if si[0].valid_data { - let raw_sample = DDSRawSample::create(zp); - - if *crate::LOG_PAYLOAD { - log::trace!( - "Route Publisher (DDS:{} -> Zenoh:{}) - routing data - payload: {:02x?}", - &(*pa).0, - &(*pa).1, - raw_sample - ); - } else { - log::trace!( - "Route Publisher (DDS:{} -> Zenoh:{}) - routing data - {} bytes", - &(*pa).0, - &(*pa).1, - raw_sample.len() - ); - } - let _ = (*pa) - .2 - .put(&(*pa).1, &raw_sample) - .congestion_control((*pa).3) - .res_sync(); - } - ddsi_serdata_unref(zp); - } -} - -#[allow(clippy::too_many_arguments)] -pub fn create_forwarding_dds_reader( - dp: dds_entity_t, - topic_name: String, - type_name: String, - type_info: &Option>, - keyless: bool, - mut qos: Qos, - z_key: KeyExpr, - z: Arc, - read_period: Option, - congestion_ctrl: CongestionControl, -) -> Result { - unsafe { - let t = create_topic(dp, &topic_name, &type_name, type_info, keyless); - - match read_period { - None => { - // Use a Listener to route data as soon as it arrives - let arg = Box::new((topic_name, z_key, z, congestion_ctrl)); - let sub_listener = - dds_create_listener(Box::into_raw(arg) as *mut std::os::raw::c_void); - dds_lset_data_available(sub_listener, Some(data_forwarder_listener)); - let qos_native = qos.to_qos_native(); - let reader = dds_create_reader(dp, t, qos_native, sub_listener); - Qos::delete_qos_native(qos_native); - if reader >= 0 { - let res = dds_reader_wait_for_historical_data(reader, qos::DDS_100MS_DURATION); - if res < 0 { - log::error!( - "Error calling dds_reader_wait_for_historical_data(): {}", - CStr::from_ptr(dds_strretcode(-res)) - .to_str() - .unwrap_or("unrecoverable DDS retcode") - ); - } - Ok(reader) - } else { - Err(format!( - "Error creating DDS Reader: {}", - CStr::from_ptr(dds_strretcode(-reader)) - .to_str() - .unwrap_or("unrecoverable DDS retcode") - )) - } - } - Some(period) => { - // Use a periodic task that takes data to route from a Reader with KEEP_LAST 1 - qos.history = Some(History { - kind: HistoryKind::KEEP_LAST, - depth: 1, - }); - let qos_native = qos.to_qos_native(); - let reader = dds_create_reader(dp, t, qos_native, std::ptr::null()); - let z_key = z_key.into_owned(); - task::spawn(async move { - // loop while reader's instance handle remain the same - // (if reader was deleted, its dds_entity_t value might have been - // reused by a new entity... don't trust it! Only trust instance handle) - let mut original_handle: dds_instance_handle_t = 0; - dds_get_instance_handle(reader, &mut original_handle); - let mut handle: dds_instance_handle_t = 0; - while dds_get_instance_handle(reader, &mut handle) == DDS_RETCODE_OK as i32 { - if handle != original_handle { - break; - } - - async_std::task::sleep(period).await; - let mut zp: *mut ddsi_serdata = std::ptr::null_mut(); - #[allow(clippy::uninit_assumed_init)] - let mut si = MaybeUninit::<[dds_sample_info_t; 1]>::uninit(); - while dds_takecdr( - reader, - &mut zp, - 1, - si.as_mut_ptr() as *mut dds_sample_info_t, - DDS_ANY_STATE, - ) > 0 - { - let si = si.assume_init(); - if si[0].valid_data { - log::trace!( - "Route (periodic) data to zenoh resource with rid={}", - z_key - ); - - let raw_sample = DDSRawSample::create(zp); - - let _ = z - .put(&z_key, &raw_sample) - .congestion_control(congestion_ctrl) - .res_sync(); - } - ddsi_serdata_unref(zp); - } - } - }); - Ok(reader) - } - } - } -} diff --git a/zenoh-plugin-ros2dds/src/dds_utils.rs b/zenoh-plugin-ros2dds/src/dds_utils.rs index 1670255..c9655bb 100644 --- a/zenoh-plugin-ros2dds/src/dds_utils.rs +++ b/zenoh-plugin-ros2dds/src/dds_utils.rs @@ -32,10 +32,10 @@ use crate::{ vec_into_raw_parts, }; -const DDS_ENTITY_NULL: dds_entity_t = 0; +pub const DDS_ENTITY_NULL: dds_entity_t = 0; // An atomic dds_entity_t (=i32), for safe concurrent creation/deletion of DDS entities -type AtomicDDSEntity = AtomicI32; +pub type AtomicDDSEntity = AtomicI32; pub fn delete_dds_entity(entity: dds_entity_t) -> Result<(), String> { unsafe { @@ -47,15 +47,6 @@ pub fn delete_dds_entity(entity: dds_entity_t) -> Result<(), String> { } } -pub(crate) fn delete_atomic_dds_entity(entity: &mut AtomicDDSEntity) -> Result<(), String> { - let dds_entity = entity.swap(DDS_ENTITY_NULL, std::sync::atomic::Ordering::Relaxed); - if dds_entity != DDS_ENTITY_NULL { - delete_dds_entity(dds_entity) - } else { - Ok(()) - } -} - pub fn get_guid(entity: &dds_entity_t) -> Result { unsafe { let mut guid = dds_guid_t { v: [0; 16] }; @@ -82,6 +73,10 @@ pub fn serialize_atomic_entity_guid(entity: &AtomicDDSEntity, s: S) -> Result where S: Serializer, { + println!( + "--- serialize_atomic_entity_guid: {}", + entity.load(std::sync::atomic::Ordering::Relaxed) + ); match entity.load(std::sync::atomic::Ordering::Relaxed) { DDS_ENTITY_NULL => s.serialize_str(""), entity => serialize_entity_guid(&entity, s), diff --git a/zenoh-plugin-ros2dds/src/liveliness_mgt.rs b/zenoh-plugin-ros2dds/src/liveliness_mgt.rs index 63e10e8..053145e 100644 --- a/zenoh-plugin-ros2dds/src/liveliness_mgt.rs +++ b/zenoh-plugin-ros2dds/src/liveliness_mgt.rs @@ -184,7 +184,6 @@ pub(crate) fn parse_ke_liveliness_service_cli( Ok((plugin_id, zenoh_key_expr, ros2_type.to_string())) } -///////// pub(crate) fn new_ke_liveliness_action_srv( plugin_id: &keyexpr, zenoh_key_expr: &keyexpr, diff --git a/zenoh-plugin-ros2dds/src/ros_discovery.rs b/zenoh-plugin-ros2dds/src/ros_discovery.rs index e3963bb..aab0214 100644 --- a/zenoh-plugin-ros2dds/src/ros_discovery.rs +++ b/zenoh-plugin-ros2dds/src/ros_discovery.rs @@ -212,19 +212,6 @@ impl RosDiscoveryInfoMgr { *has_changed = true; } - pub fn add_dds_writers(&self, gids: Vec) { - let (ref mut info, ref mut has_changed) = *zwrite!(self.participant_entities_state); - let writer_gid_seq = &mut info - .node_entities_info_seq - .get_mut(&self.node_fullname) - .unwrap() - .writer_gid_seq; - for gid in gids { - writer_gid_seq.insert(gid); - } - *has_changed = true; - } - pub fn remove_dds_writer(&self, gid: Gid) { let (ref mut info, ref mut has_changed) = *zwrite!(self.participant_entities_state); info.node_entities_info_seq @@ -245,19 +232,6 @@ impl RosDiscoveryInfoMgr { *has_changed = true; } - pub fn add_dds_readers(&self, gids: Vec) { - let (ref mut info, ref mut has_changed) = *zwrite!(self.participant_entities_state); - let reader_gid_seq = &mut info - .node_entities_info_seq - .get_mut(&self.node_fullname) - .unwrap() - .reader_gid_seq; - for gid in gids { - reader_gid_seq.insert(gid); - } - *has_changed = true; - } - pub fn remove_dds_reader(&self, gid: Gid) { let (ref mut info, ref mut has_changed) = *zwrite!(self.participant_entities_state); info.node_entities_info_seq diff --git a/zenoh-plugin-ros2dds/src/route_action_cli.rs b/zenoh-plugin-ros2dds/src/route_action_cli.rs index 665115d..364fc05 100644 --- a/zenoh-plugin-ros2dds/src/route_action_cli.rs +++ b/zenoh-plugin-ros2dds/src/route_action_cli.rs @@ -17,7 +17,7 @@ use zenoh::{liveliness::LivelinessToken, prelude::*}; use zenoh_core::AsyncResolve; use crate::{ - gid::Gid, liveliness_mgt::new_ke_liveliness_action_cli, ros2_utils::*, + liveliness_mgt::new_ke_liveliness_action_cli, ros2_utils::*, route_action_srv::serialize_action_zenoh_key_expr, route_service_cli::RouteServiceCli, route_subscriber::RouteSubscriber, routes_mgr::Context, }; @@ -171,26 +171,6 @@ impl RouteActionCli<'_> { self.liveliness_token = None; } - pub fn dds_writers_guids(&self) -> Result, String> { - Ok([ - self.route_send_goal.dds_rep_writer_guid()?, - self.route_cancel_goal.dds_rep_writer_guid()?, - self.route_get_result.dds_rep_writer_guid()?, - self.route_feedback.dds_writer_guid()?, - self.route_status.dds_writer_guid()?, - ] - .into()) - } - - pub fn dds_readers_guids(&self) -> Result, String> { - Ok([ - self.route_send_goal.dds_req_reader_guid()?, - self.route_cancel_goal.dds_req_reader_guid()?, - self.route_get_result.dds_req_reader_guid()?, - ] - .into()) - } - #[inline] pub fn add_remote_route(&mut self, plugin_id: &str, zenoh_key_expr_prefix: &keyexpr) { self.route_send_goal.add_remote_route( diff --git a/zenoh-plugin-ros2dds/src/route_action_srv.rs b/zenoh-plugin-ros2dds/src/route_action_srv.rs index 98656f4..b72965a 100644 --- a/zenoh-plugin-ros2dds/src/route_action_srv.rs +++ b/zenoh-plugin-ros2dds/src/route_action_srv.rs @@ -17,8 +17,8 @@ use zenoh::{liveliness::LivelinessToken, prelude::*}; use zenoh_core::AsyncResolve; use crate::{ - gid::Gid, liveliness_mgt::new_ke_liveliness_action_srv, ros2_utils::*, - route_publisher::RoutePublisher, route_service_srv::RouteServiceSrv, routes_mgr::Context, + liveliness_mgt::new_ke_liveliness_action_srv, ros2_utils::*, route_publisher::RoutePublisher, + route_service_srv::RouteServiceSrv, routes_mgr::Context, }; #[derive(Serialize)] @@ -172,26 +172,6 @@ impl RouteActionSrv<'_> { self.liveliness_token = None; } - pub fn dds_writers_guids(&self) -> Result, String> { - Ok([ - self.route_send_goal.dds_req_writer_guid()?, - self.route_cancel_goal.dds_req_writer_guid()?, - self.route_get_result.dds_req_writer_guid()?, - ] - .into()) - } - - pub fn dds_readers_guids(&self) -> Result, String> { - Ok([ - self.route_send_goal.dds_rep_reader_guid()?, - self.route_cancel_goal.dds_rep_reader_guid()?, - self.route_get_result.dds_rep_reader_guid()?, - self.route_feedback.dds_reader_guid()?, - self.route_status.dds_reader_guid()?, - ] - .into()) - } - #[inline] pub fn add_remote_route(&mut self, plugin_id: &str, zenoh_key_expr_prefix: &keyexpr) { self.route_send_goal.add_remote_route( diff --git a/zenoh-plugin-ros2dds/src/route_publisher.rs b/zenoh-plugin-ros2dds/src/route_publisher.rs index 4459c06..b3624d7 100644 --- a/zenoh-plugin-ros2dds/src/route_publisher.rs +++ b/zenoh-plugin-ros2dds/src/route_publisher.rs @@ -13,29 +13,43 @@ // use cyclors::qos::{HistoryKind, Qos}; -use cyclors::{dds_entity_t, DDS_LENGTH_UNLIMITED}; -use serde::Serialize; +use cyclors::DDS_LENGTH_UNLIMITED; +use serde::{Serialize, Serializer}; +use std::ops::Deref; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; use std::{collections::HashSet, fmt}; use zenoh::liveliness::LivelinessToken; use zenoh::prelude::r#async::AsyncResolve; use zenoh::prelude::*; +use zenoh::publication::Publisher; +use zenoh_core::SyncResolve; use zenoh_ext::{PublicationCache, SessionExt}; -use crate::dds_discovery::create_forwarding_dds_reader; -use crate::dds_types::TypeInfo; -use crate::dds_utils::{delete_dds_entity, get_guid, serialize_entity_guid}; -use crate::gid::Gid; +use crate::dds_types::{DDSRawSample, TypeInfo}; +use crate::dds_utils::{ + create_dds_reader, delete_dds_entity, get_guid, serialize_atomic_entity_guid, AtomicDDSEntity, + DDS_ENTITY_NULL, +}; use crate::liveliness_mgt::new_ke_liveliness_pub; use crate::ros2_utils::{is_message_for_action, ros2_message_type_to_dds_type}; use crate::routes_mgr::Context; use crate::{qos_helpers::*, Config}; -use crate::{serialize_option_as_bool, KE_PREFIX_PUB_CACHE}; +use crate::{KE_PREFIX_PUB_CACHE, LOG_PAYLOAD}; -enum ZPublisher<'a> { - Publisher(KeyExpr<'a>), - PublicationCache(PublicationCache<'a>), +pub struct ZPublisher<'a> { + publisher: Publisher<'static>, + _cache: Option>, + cache_size: usize, +} + +impl<'a> Deref for ZPublisher<'a> { + type Target = Publisher<'static>; + + fn deref(&self) -> &Self::Target { + &self.publisher + } } // a route from DDS to Zenoh @@ -53,16 +67,25 @@ pub struct RoutePublisher<'a> { context: Context<'a>, // the zenoh publisher used to re-publish to zenoh the data received by the DDS Reader // `None` when route is created on a remote announcement and no local ROS2 Subscriber discovered yet - #[serde(rename = "is_active", serialize_with = "serialize_option_as_bool")] - zenoh_publisher: Option>, + #[serde( + rename = "publication_cache_size", + serialize_with = "serialize_pub_cache" + )] + zenoh_publisher: ZPublisher<'a>, // the local DDS Reader created to serve the route (i.e. re-publish to zenoh data coming from DDS) - #[serde(serialize_with = "serialize_entity_guid")] - dds_reader: dds_entity_t, - // if the Reader is TRANSIENT_LOCAL - transient_local: bool, + #[serde(serialize_with = "serialize_atomic_entity_guid")] + dds_reader: AtomicDDSEntity, + // TypeInfo for Reader creation (if available) + #[serde(skip)] + type_info: Option>, // if the topic is keyless #[serde(skip)] keyless: bool, + // the QoS for the DDS Reader to be created. + // those are either the QoS announced by a remote bridge on a Reader discovery, + // either the QoS adapted from a local disovered Writer + #[serde(skip)] + reader_qos: Qos, // a liveliness token associated to this route, for announcement to other plugins #[serde(skip)] liveliness_token: Option>, @@ -74,15 +97,7 @@ pub struct RoutePublisher<'a> { impl Drop for RoutePublisher<'_> { fn drop(&mut self) { - // remove reader's GID from ros_discovery_info message - match get_guid(&self.dds_reader) { - Ok(gid) => self.context.ros_discovery_mgr.remove_dds_reader(gid), - Err(e) => log::warn!("{self}: {e}"), - } - - if let Err(e) = delete_dds_entity(self.dds_reader) { - log::warn!("{}: error deleting DDS Reader: {}", self, e); - } + self.deactivate_dds_reader(); } } @@ -107,90 +122,20 @@ impl RoutePublisher<'_> { reader_qos: Qos, context: &Context<'a>, ) -> Result, String> { - let transient_local = is_transient_local(&reader_qos); log::debug!( "Route Publisher ({ros2_name} -> {zenoh_key_expr}): creation with type {ros2_type}" ); - // declare the zenoh key expression (for wire optimization) - let declared_ke = context.zsession - .declare_keyexpr(zenoh_key_expr.clone()) - .res() - .await - .map_err(|e| { - format!("Route Publisher ({ros2_name} -> {zenoh_key_expr}): failed to declare KeyExpr: {e}") - })?; - - // CongestionControl to be used when re-publishing over zenoh: Blocking if Writer is RELIABLE (since we don't know what is remote Reader's QoS) - let congestion_ctrl = match ( - context.config.reliable_routes_blocking, - is_reliable(&reader_qos), - ) { - (true, true) => CongestionControl::Block, - _ => CongestionControl::Drop, - }; - - let topic_name = format!("rt{ros2_name}"); - let type_name = ros2_message_type_to_dds_type(&ros2_type); - let read_period = get_read_period(&context.config, &zenoh_key_expr); - - // create matching DDS Reader that forwards data coming from DDS to Zenoh - let dds_reader = create_forwarding_dds_reader( - context.participant, - topic_name, - type_name, - type_info, - keyless, - reader_qos.clone(), - declared_ke.clone(), - context.zsession.clone(), - read_period, - congestion_ctrl, - )?; - // add reader's GID in ros_discovery_info message - context - .ros_discovery_mgr - .add_dds_reader(get_guid(&dds_reader)?); - - Ok(RoutePublisher { - ros2_name, - ros2_type, - dds_reader, - zenoh_key_expr, - context: context.clone(), - zenoh_publisher: None, - transient_local, - keyless, - liveliness_token: None, - remote_routes: HashSet::new(), - local_nodes: HashSet::new(), - }) - } - - async fn activate<'a>(&'a mut self, discovered_writer_qos: &Qos) -> Result<(), String> { - // For lifetime issue, redeclare the zenoh key expression that can't be stored in Self - let declared_ke = self - .context - .zsession - .declare_keyexpr(self.zenoh_key_expr.clone()) - .res() - .await - .map_err(|e| { - format!( - "Route Publisher (ROS:{} -> Zenoh:{}): failed to declare KeyExpr: {e}", - self.ros2_name, self.zenoh_key_expr - ) - })?; - // create the zenoh Publisher - // if Reader is TRANSIENT_LOCAL, use a PublicationCache to store historical data - self.zenoh_publisher = if self.transient_local { + // if Reader shall be TRANSIENT_LOCAL, use a PublicationCache to store historical data + let transient_local = is_transient_local(&reader_qos); + let (cache, cache_size) = if transient_local { #[allow(non_upper_case_globals)] - let history_qos = get_history_or_default(discovered_writer_qos); - let durability_service_qos = get_durability_service_or_default(discovered_writer_qos); + let history_qos = get_history_or_default(&reader_qos); + let durability_service_qos = get_durability_service_or_default(&reader_qos); let mut history = match (history_qos.kind, history_qos.depth) { (HistoryKind::KEEP_LAST, n) => { - if self.keyless { + if keyless { // only 1 instance => history=n n as usize } else if durability_service_qos.max_instances == DDS_LENGTH_UNLIMITED { @@ -208,46 +153,117 @@ impl RoutePublisher<'_> { (HistoryKind::KEEP_ALL, _) => usize::MAX, }; // In case there are several Writers served by this route, increase the cache size - history = history.saturating_mul(self.context.config.transient_local_cache_multiplier); + history = history.saturating_mul(context.config.transient_local_cache_multiplier); log::debug!( - "{self}: caching TRANSIENT_LOCAL publications via a PublicationCache with history={history} (computed from Reader's QoS: history=({:?},{}), durability_service.max_instances={})", + "Route Publisher ({ros2_name} -> {zenoh_key_expr}): caching TRANSIENT_LOCAL publications via a PublicationCache with history={history} (computed from Reader's QoS: history=({:?},{}), durability_service.max_instances={})", history_qos.kind, history_qos.depth, durability_service_qos.max_instances ); - let pub_cache = self - .context - .zsession - .declare_publication_cache(&declared_ke) - .history(history) - .queryable_prefix(*KE_PREFIX_PUB_CACHE / &self.context.plugin_id) - .queryable_allowed_origin(Locality::Remote) // Note: don't reply to queries from local QueryingSubscribers - .res() - .await - .map_err(|e| { - format!( - "Failed create PublicationCache for key {} (rid={}): {e}", - self.zenoh_key_expr, declared_ke - ) - })?; - Some(ZPublisher::PublicationCache(pub_cache)) + ( + Some( + context + .zsession + .declare_publication_cache(&zenoh_key_expr) + .history(history) + .queryable_prefix(*KE_PREFIX_PUB_CACHE / &context.plugin_id) + .queryable_allowed_origin(Locality::Remote) // Note: don't reply to queries from local QueryingSubscribers + .res_async() + .await + .map_err(|e| { + format!("Failed create PublicationCache for key {zenoh_key_expr}: {e}",) + })?, + ), + history, + ) } else { - if let Err(e) = self - .context - .zsession - .declare_publisher(declared_ke.clone()) - .res() - .await - { - log::warn!( - "Failed to declare publisher for key {} (rid={}): {}", - self.zenoh_key_expr, - declared_ke, - e + (None, 0) + }; + + // CongestionControl to be used when re-publishing over zenoh: Blocking if Writer is RELIABLE (since we don't know what is remote Reader's QoS) + let congestion_ctrl = match ( + context.config.reliable_routes_blocking, + is_reliable(&reader_qos), + ) { + (true, true) => CongestionControl::Block, + _ => CongestionControl::Drop, + }; + + let publisher: Publisher<'static> = context + .zsession + .declare_publisher(zenoh_key_expr.clone()) + .congestion_control(congestion_ctrl) + .res_async() + .await + .map_err(|e| format!("Failed create Publisher for key {zenoh_key_expr}: {e}",))?; + + Ok(RoutePublisher { + ros2_name, + ros2_type, + zenoh_key_expr, + context: context.clone(), + zenoh_publisher: ZPublisher { + publisher, + _cache: cache, + cache_size, + }, + dds_reader: DDS_ENTITY_NULL.into(), + type_info: type_info.clone(), + reader_qos, + keyless, + liveliness_token: None, + remote_routes: HashSet::new(), + local_nodes: HashSet::new(), + }) + } + + fn activate_dds_reader(&mut self) -> Result<(), String> { + let topic_name = format!("rt{}", self.ros2_name); + let type_name = ros2_message_type_to_dds_type(&self.ros2_type); + let read_period = get_read_period(&self.context.config, &self.zenoh_key_expr); + let route_id = self.to_string(); + let publisher = self.zenoh_publisher.deref().clone(); + + // create matching DDS Reader that forwards data coming from DDS to Zenoh + let dds_reader = create_dds_reader( + self.context.participant, + topic_name, + type_name, + &self.type_info, + self.keyless, + self.reader_qos.clone(), + read_period, + move |sample: &DDSRawSample| { + do_route_message( + sample, &publisher, // &ke, + &route_id, ); + }, + )?; + self.dds_reader.swap(dds_reader, Ordering::Relaxed); + + // add reader's GID in ros_discovery_info message + self.context + .ros_discovery_mgr + .add_dds_reader(get_guid(&dds_reader)?); + + Ok(()) + } + + fn deactivate_dds_reader(&mut self) { + let dds_reader = self.dds_reader.swap(DDS_ENTITY_NULL, Ordering::Relaxed); + if dds_reader != DDS_ENTITY_NULL { + // remove reader's GID from ros_discovery_info message + match get_guid(&dds_reader) { + Ok(gid) => self.context.ros_discovery_mgr.remove_dds_reader(gid), + Err(e) => log::warn!("{self}: {e}"), } - Some(ZPublisher::Publisher(declared_ke.clone())) - }; + if let Err(e) = delete_dds_entity(dds_reader) { + log::warn!("{}: error deleting DDS Reader: {}", self, e); + } + } + } - // if not for an Action (since actions declare their own liveliness) + async fn announce_route(&mut self, discovered_writer_qos: &Qos) -> Result<(), String> { + // only if not for an Action (since actions declare their own liveliness) if !is_message_for_action(&self.ros2_name) { // create associated LivelinessToken let liveliness_ke = new_ke_liveliness_pub( @@ -261,7 +277,7 @@ impl RoutePublisher<'_> { self.liveliness_token = Some(self.context.zsession .liveliness() .declare_token(liveliness_ke) - .res() + .res_async() .await .map_err(|e| { format!( @@ -273,24 +289,21 @@ impl RoutePublisher<'_> { Ok(()) } - fn deactivate(&mut self) { - log::debug!("{self} deactivate"); - // Drop Zenoh Publisher and Liveliness token - // The DDS Writer remains to be discovered by local ROS nodes - self.zenoh_publisher = None; + fn retire_route(&mut self) { self.liveliness_token = None; } - #[inline] - pub fn dds_reader_guid(&self) -> Result { - get_guid(&self.dds_reader) - } - #[inline] pub fn add_remote_route(&mut self, plugin_id: &str, zenoh_key_expr: &keyexpr) { self.remote_routes .insert(format!("{plugin_id}:{zenoh_key_expr}")); log::debug!("{self} now serving remote routes {:?}", self.remote_routes); + // if 1st remote route added, activate the DDS Reader + if self.remote_routes.len() == 1 { + if let Err(e) = self.activate_dds_reader() { + log::error!("{self} activation of DDS Reader failed: {e}"); + } + } } #[inline] @@ -298,6 +311,10 @@ impl RoutePublisher<'_> { self.remote_routes .remove(&format!("{plugin_id}:{zenoh_key_expr}")); log::debug!("{self} now serving remote routes {:?}", self.remote_routes); + // if last remote route removed, deactivate the DDS Reader + if self.remote_routes.is_empty() { + self.deactivate_dds_reader(); + } } #[inline] @@ -309,10 +326,10 @@ impl RoutePublisher<'_> { pub async fn add_local_node(&mut self, node: String, discovered_writer_qos: &Qos) { self.local_nodes.insert(node); log::debug!("{self} now serving local nodes {:?}", self.local_nodes); - // if 1st local node added, activate the route + // if 1st local node added, announce the route if self.local_nodes.len() == 1 { - if let Err(e) = self.activate(discovered_writer_qos).await { - log::error!("{self} activation failed: {e}"); + if let Err(e) = self.announce_route(discovered_writer_qos).await { + log::error!("{self} announcement failed: {e}"); } } } @@ -321,9 +338,9 @@ impl RoutePublisher<'_> { pub fn remove_local_node(&mut self, node: &str) { self.local_nodes.remove(node); log::debug!("{self} now serving local nodes {:?}", self.local_nodes); - // if last local node removed, deactivate the route + // if last local node removed, retire the route if self.local_nodes.is_empty() { - self.deactivate(); + self.retire_route(); } } @@ -338,6 +355,13 @@ impl RoutePublisher<'_> { } } +pub fn serialize_pub_cache(zpub: &ZPublisher, s: S) -> Result +where + S: Serializer, +{ + s.serialize_u64(zpub.cache_size as u64) +} + // Return the read period if keyexpr matches one of the "pub_max_frequencies" option fn get_read_period(config: &Config, ke: &keyexpr) -> Option { for (re, freq) in &config.pub_max_frequencies { @@ -347,3 +371,14 @@ fn get_read_period(config: &Config, ke: &keyexpr) -> Option { } None } + +fn do_route_message(sample: &DDSRawSample, publisher: &Publisher, route_id: &str) { + if *LOG_PAYLOAD { + log::trace!("{route_id}: routing message - payload: {:02x?}", sample); + } else { + log::trace!("{route_id}: routing message - {} bytes", sample.len()); + } + if let Err(e) = publisher.put(sample).res_sync() { + log::error!("{route_id}: failed to route message: {e}"); + } +} diff --git a/zenoh-plugin-ros2dds/src/route_service_cli.rs b/zenoh-plugin-ros2dds/src/route_service_cli.rs index 5b72587..514f4a2 100644 --- a/zenoh-plugin-ros2dds/src/route_service_cli.rs +++ b/zenoh-plugin-ros2dds/src/route_service_cli.rs @@ -29,7 +29,6 @@ use crate::dds_utils::serialize_entity_guid; use crate::dds_utils::{ create_dds_reader, create_dds_writer, dds_write, delete_dds_entity, get_guid, }; -use crate::gid::Gid; use crate::liveliness_mgt::new_ke_liveliness_service_cli; use crate::ros2_utils::{ is_service_for_action, new_service_id, ros2_service_type_to_reply_dds_type, @@ -222,14 +221,6 @@ impl RouteServiceCli<'_> { self.liveliness_token = None; } - pub fn dds_rep_writer_guid(&self) -> Result { - get_guid(&self.rep_writer) - } - - pub fn dds_req_reader_guid(&self) -> Result { - get_guid(&self.req_reader) - } - #[inline] pub fn add_remote_route(&mut self, plugin_id: &str, zenoh_key_expr: &keyexpr) { self.remote_routes diff --git a/zenoh-plugin-ros2dds/src/route_service_srv.rs b/zenoh-plugin-ros2dds/src/route_service_srv.rs index 94f361f..2148ab1 100644 --- a/zenoh-plugin-ros2dds/src/route_service_srv.rs +++ b/zenoh-plugin-ros2dds/src/route_service_srv.rs @@ -33,7 +33,6 @@ use crate::dds_utils::{ create_dds_reader, create_dds_writer, dds_write, delete_dds_entity, get_guid, get_instance_handle, }; -use crate::gid::Gid; use crate::liveliness_mgt::new_ke_liveliness_service_srv; use crate::ros2_utils::{ is_service_for_action, new_service_id, ros2_service_type_to_reply_dds_type, @@ -295,14 +294,6 @@ impl RouteServiceSrv<'_> { self.liveliness_token = None; } - pub fn dds_req_writer_guid(&self) -> Result { - get_guid(&self.req_writer) - } - - pub fn dds_rep_reader_guid(&self) -> Result { - get_guid(&self.rep_reader) - } - #[inline] pub fn add_remote_route(&mut self, plugin_id: &str, zenoh_key_expr: &keyexpr) { self.remote_routes diff --git a/zenoh-plugin-ros2dds/src/route_subscriber.rs b/zenoh-plugin-ros2dds/src/route_subscriber.rs index 3feb753..dd24a8d 100644 --- a/zenoh-plugin-ros2dds/src/route_subscriber.rs +++ b/zenoh-plugin-ros2dds/src/route_subscriber.rs @@ -27,7 +27,6 @@ use zenoh::{prelude::r#async::AsyncResolve, subscriber::Subscriber}; use zenoh_ext::{FetchingSubscriber, SubscriberBuilderExt}; use crate::dds_utils::{create_dds_writer, delete_dds_entity, get_guid}; -use crate::gid::Gid; use crate::liveliness_mgt::new_ke_liveliness_sub; use crate::qos_helpers::is_transient_local; use crate::ros2_utils::{is_message_for_action, ros2_message_type_to_dds_type}; @@ -267,11 +266,6 @@ impl RouteSubscriber<'_> { } } - #[inline] - pub fn dds_writer_guid(&self) -> Result { - get_guid(&self.dds_writer) - } - #[inline] pub fn add_remote_route(&mut self, plugin_id: &str, zenoh_key_expr: &keyexpr) { self.remote_routes diff --git a/zenoh-plugin-ros2dds/src/routes_mgr.rs b/zenoh-plugin-ros2dds/src/routes_mgr.rs index 873efdd..44a9871 100644 --- a/zenoh-plugin-ros2dds/src/routes_mgr.rs +++ b/zenoh-plugin-ros2dds/src/routes_mgr.rs @@ -542,13 +542,6 @@ impl<'a> RoutesMgr<'a> { .await?; log::info!("{route} created"); - // insert reader's GID in ros_discovery_msg - self.context - .ros_discovery_mgr - .add_dds_reader(route.dds_reader_guid().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?); - if admin_space_ref { // insert reference in admin_space let admin_ke = *KE_PREFIX_ROUTE_PUBLISHER / zenoh_key_expr; @@ -586,13 +579,6 @@ impl<'a> RoutesMgr<'a> { .await?; log::info!("{route} created"); - // insert writer's GID in ros_discovery_msg - self.context - .ros_discovery_mgr - .add_dds_writer(route.dds_writer_guid().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?); - if admin_space_ref { // insert reference in admin_space let admin_ke = *KE_PREFIX_ROUTE_SUBSCRIBER / zenoh_key_expr; @@ -693,18 +679,6 @@ impl<'a> RoutesMgr<'a> { .await?; log::info!("{route} created"); - // insert readers' and writes' GID in ros_discovery_msg - self.context - .ros_discovery_mgr - .add_dds_readers(route.dds_readers_guids().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?); - self.context - .ros_discovery_mgr - .add_dds_writers(route.dds_writers_guids().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?); - // insert reference in admin_space let admin_ke = *KE_PREFIX_ROUTE_ACTION_SRV / zenoh_key_expr; self.admin_space @@ -735,18 +709,6 @@ impl<'a> RoutesMgr<'a> { .await?; log::info!("{route} created"); - // insert readers' and writes' GID in ros_discovery_msg - self.context - .ros_discovery_mgr - .add_dds_readers(route.dds_readers_guids().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?); - self.context - .ros_discovery_mgr - .add_dds_writers(route.dds_writers_guids().map_err(|e| { - format!("{route}: failed to update ros_discovery_info message: {e}") - })?); - // insert reference in admin_space let admin_ke = *KE_PREFIX_ROUTE_ACTION_CLI / zenoh_key_expr; self.admin_space