diff --git a/zenoh-plugin-ros2dds/src/discovered_entities.rs b/zenoh-plugin-ros2dds/src/discovered_entities.rs index e1f4adc..348ac98 100644 --- a/zenoh-plugin-ros2dds/src/discovered_entities.rs +++ b/zenoh-plugin-ros2dds/src/discovered_entities.rs @@ -436,7 +436,7 @@ impl DiscoveredEntities { ) { match self.get_entity_json_value(entity_ref) { Ok(Some(v)) => { - let admin_keyexpr = admin_keyexpr_prefix / &key_expr; + let admin_keyexpr = admin_keyexpr_prefix / key_expr; if let Err(e) = query .reply(Ok(Sample::new(admin_keyexpr, v))) .res_async() diff --git a/zenoh-plugin-ros2dds/src/gid.rs b/zenoh-plugin-ros2dds/src/gid.rs index 9978631..b0006f9 100644 --- a/zenoh-plugin-ros2dds/src/gid.rs +++ b/zenoh-plugin-ros2dds/src/gid.rs @@ -43,7 +43,7 @@ impl From<[u8; 16]> for Gid { impl From<&[u8; 16]> for Gid { fn from(key: &[u8; 16]) -> Self { - Self(key.clone()) + Self(*key) } } @@ -54,7 +54,7 @@ impl Serialize for Gid { { if serializer.is_human_readable() { // serialize as an hexadecimal String - Serialize::serialize(&hex::encode(&self.0), serializer) + Serialize::serialize(&hex::encode(self.0), serializer) } else { // serialize as a little-endian [u8; 16] Serialize::serialize(&self.0, serializer) diff --git a/zenoh-plugin-ros2dds/src/lib.rs b/zenoh-plugin-ros2dds/src/lib.rs index 04bcd3d..ae8f053 100644 --- a/zenoh-plugin-ros2dds/src/lib.rs +++ b/zenoh-plugin-ros2dds/src/lib.rs @@ -47,6 +47,8 @@ mod node_info; mod qos_helpers; mod ros2_utils; mod ros_discovery; +mod route_action_cli; +mod route_action_srv; mod route_publisher; mod route_service_cli; mod route_service_srv; @@ -57,10 +59,7 @@ use config::Config; use crate::dds_utils::get_guid; use crate::discovery_mgr::DiscoveryMgr; use crate::events::ROS2DiscoveryEvent; -use crate::liveliness_mgt::{ - ke_liveliness_all, ke_liveliness_plugin, parse_ke_liveliness_pub, - parse_ke_liveliness_service_cli, parse_ke_liveliness_service_srv, parse_ke_liveliness_sub, -}; +use crate::liveliness_mgt::*; use crate::ros_discovery::RosDiscoveryInfoMgr; use crate::routes_mgr::RoutesMgr; @@ -387,7 +386,7 @@ impl<'a> ROS2PluginRuntime<'a> { log::warn!("Error updating route: {e}"); } } else { - log::info!("{evt} - Denied per config"); + log::debug!("{evt} - Denied per config"); } } Err(e) => log::error!("Internal Error: received from DiscoveryMgr: {e}") @@ -524,6 +523,36 @@ impl<'a> ROS2PluginRuntime<'a> { plugin_id, zenoh_key_expr, }), + ("AS/", SampleKind::Put) => parse_ke_liveliness_action_srv(liveliness_ke) + .map_err(|e| format!("Received invalid liveliness token: {e}")) + .map( + |(plugin_id, zenoh_key_expr, ros2_type)| AnnouncedActionSrv { + plugin_id, + zenoh_key_expr, + ros2_type, + }, + ), + ("AS/", SampleKind::Delete) => parse_ke_liveliness_action_srv(liveliness_ke) + .map_err(|e| format!("Received invalid liveliness token: {e}")) + .map(|(plugin_id, zenoh_key_expr, ..)| RetiredActionSrv { + plugin_id, + zenoh_key_expr, + }), + ("AC/", SampleKind::Put) => parse_ke_liveliness_action_cli(liveliness_ke) + .map_err(|e| format!("Received invalid liveliness token: {e}")) + .map( + |(plugin_id, zenoh_key_expr, ros2_type)| AnnouncedActionCli { + plugin_id, + zenoh_key_expr, + ros2_type, + }, + ), + ("AC/", SampleKind::Delete) => parse_ke_liveliness_action_cli(liveliness_ke) + .map_err(|e| format!("Received invalid liveliness token: {e}")) + .map(|(plugin_id, zenoh_key_expr, ..)| RetiredActionCli { + plugin_id, + zenoh_key_expr, + }), _ => Err(format!("invalid ROS2 interface kind: {iface_kind}")), } } diff --git a/zenoh-plugin-ros2dds/src/liveliness_mgt.rs b/zenoh-plugin-ros2dds/src/liveliness_mgt.rs index de6af84..63e10e8 100644 --- a/zenoh-plugin-ros2dds/src/liveliness_mgt.rs +++ b/zenoh-plugin-ros2dds/src/liveliness_mgt.rs @@ -28,6 +28,8 @@ zenoh::kedefine!( pub(crate) ke_liveliness_sub: "@ros2_lv/${plugin_id:*}/MS/${ke:*}/${typ:*}/${qos_ke:*}", pub(crate) ke_liveliness_service_srv: "@ros2_lv/${plugin_id:*}/SS/${ke:*}/${typ:*}", pub(crate) ke_liveliness_service_cli: "@ros2_lv/${plugin_id:*}/SC/${ke:*}/${typ:*}", + pub(crate) ke_liveliness_action_srv: "@ros2_lv/${plugin_id:*}/AS/${ke:*}/${typ:*}", + pub(crate) ke_liveliness_action_cli: "@ros2_lv/${plugin_id:*}/AC/${ke:*}/${typ:*}", ); pub(crate) fn new_ke_liveliness_pub( @@ -55,16 +57,16 @@ pub(crate) fn parse_ke_liveliness_pub( .ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no plugin_id"))?; let zenoh_key_expr = parsed .ke() - .map(|ke| unescape_slashes(ke)) + .map(unescape_slashes) .ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no ke"))?; let ros2_type = parsed .typ() - .map(|ke| unescape_slashes(ke)) + .map(unescape_slashes) .ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no typ"))?; let (keyless, qos) = parsed .qos_ke() .ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no typ")) - .and_then(|ke| key_expr_to_qos(ke)) + .and_then(key_expr_to_qos) .map_err(|e| format!("failed to parse liveliness keyexpr {ke}: {e}"))?; Ok(( plugin_id, @@ -100,16 +102,16 @@ pub(crate) fn parse_ke_liveliness_sub( .ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no plugin_id"))?; let zenoh_key_expr = parsed .ke() - .map(|ke| unescape_slashes(ke)) + .map(unescape_slashes) .ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no ke"))?; let ros2_type = parsed .typ() - .map(|ke| unescape_slashes(ke)) + .map(unescape_slashes) .ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no typ"))?; let (keyless, qos) = parsed .qos_ke() .ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no typ")) - .and_then(|ke| key_expr_to_qos(ke)) + .and_then(key_expr_to_qos) .map_err(|e| format!("failed to parse liveliness keyexpr {ke}: {e}"))?; Ok(( plugin_id, @@ -142,11 +144,11 @@ pub(crate) fn parse_ke_liveliness_service_srv( .ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no plugin_id"))?; let zenoh_key_expr = parsed .ke() - .map(|ke| unescape_slashes(ke)) + .map(unescape_slashes) .ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no ke"))?; let ros2_type = parsed .typ() - .map(|ke| unescape_slashes(ke)) + .map(unescape_slashes) .ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no typ"))?; Ok((plugin_id, zenoh_key_expr, ros2_type.to_string())) } @@ -173,11 +175,74 @@ pub(crate) fn parse_ke_liveliness_service_cli( .ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no plugin_id"))?; let zenoh_key_expr = parsed .ke() - .map(|ke| unescape_slashes(ke)) + .map(unescape_slashes) .ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no ke"))?; let ros2_type = parsed .typ() - .map(|ke| unescape_slashes(ke)) + .map(unescape_slashes) + .ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no typ"))?; + Ok((plugin_id, zenoh_key_expr, ros2_type.to_string())) +} + +///////// +pub(crate) fn new_ke_liveliness_action_srv( + plugin_id: &keyexpr, + zenoh_key_expr: &keyexpr, + ros2_type: &str, +) -> Result { + let ke = escape_slashes(zenoh_key_expr); + let typ = escape_slashes(ros2_type); + zenoh::keformat!(ke_liveliness_action_srv::formatter(), plugin_id, ke, typ) + .map_err(|e| e.to_string()) +} + +pub(crate) fn parse_ke_liveliness_action_srv( + ke: &keyexpr, +) -> Result<(OwnedKeyExpr, OwnedKeyExpr, String), String> { + let parsed = ke_liveliness_action_srv::parse(ke) + .map_err(|e| format!("failed to parse liveliness keyexpr {ke}: {e}"))?; + let plugin_id = parsed + .plugin_id() + .map(ToOwned::to_owned) + .ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no plugin_id"))?; + let zenoh_key_expr = parsed + .ke() + .map(unescape_slashes) + .ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no ke"))?; + let ros2_type = parsed + .typ() + .map(unescape_slashes) + .ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no typ"))?; + Ok((plugin_id, zenoh_key_expr, ros2_type.to_string())) +} + +pub(crate) fn new_ke_liveliness_action_cli( + plugin_id: &keyexpr, + zenoh_key_expr: &keyexpr, + ros2_type: &str, +) -> Result { + let ke = escape_slashes(zenoh_key_expr); + let typ = escape_slashes(ros2_type); + zenoh::keformat!(ke_liveliness_action_cli::formatter(), plugin_id, ke, typ) + .map_err(|e| e.to_string()) +} + +pub(crate) fn parse_ke_liveliness_action_cli( + ke: &keyexpr, +) -> Result<(OwnedKeyExpr, OwnedKeyExpr, String), String> { + let parsed = ke_liveliness_action_cli::parse(ke) + .map_err(|e| format!("failed to parse liveliness keyexpr {ke}: {e}"))?; + let plugin_id = parsed + .plugin_id() + .map(ToOwned::to_owned) + .ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no plugin_id"))?; + let zenoh_key_expr = parsed + .ke() + .map(unescape_slashes) + .ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no ke"))?; + let ros2_type = parsed + .typ() + .map(unescape_slashes) .ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no typ"))?; Ok((plugin_id, zenoh_key_expr, ros2_type.to_string())) } diff --git a/zenoh-plugin-ros2dds/src/node_info.rs b/zenoh-plugin-ros2dds/src/node_info.rs index 427f133..c0aeffc 100644 --- a/zenoh-plugin-ros2dds/src/node_info.rs +++ b/zenoh-plugin-ros2dds/src/node_info.rs @@ -447,11 +447,6 @@ impl NodeInfo { }) } - #[inline] - pub fn id(&self) -> &str { - &self.id - } - #[inline] pub fn fullname(&self) -> &str { &self.id[self.fullname.clone()] @@ -472,12 +467,6 @@ impl NodeInfo { ke_for_sure!(&self.id) } - #[inline] - pub fn fullname_as_keyexpr(&self) -> &keyexpr { - // fullname always start with '/' - remove it - ke_for_sure!(&self.fullname()[1..]) - } - pub fn update_with_reader(&mut self, entity: &DdsEntity) -> Option { let topic_prefix = &entity.topic_name[..3]; let topic_suffix = &entity.topic_name[2..]; diff --git a/zenoh-plugin-ros2dds/src/qos_helpers.rs b/zenoh-plugin-ros2dds/src/qos_helpers.rs index 81b5ce8..f359c74 100644 --- a/zenoh-plugin-ros2dds/src/qos_helpers.rs +++ b/zenoh-plugin-ros2dds/src/qos_helpers.rs @@ -27,24 +27,6 @@ pub fn get_durability_service_or_default(qos: &Qos) -> DurabilityService { } } -pub fn partition_is_empty(partition: &Option>) -> bool { - partition - .as_ref() - .map_or(true, |partition| partition.is_empty()) -} - -pub fn partition_contains(partition: &Option>, name: &String) -> bool { - partition - .as_ref() - .map_or(false, |partition| partition.contains(name)) -} - -pub fn is_writer_reliable(reliability: &Option) -> bool { - reliability.as_ref().map_or(true, |reliability| { - reliability.kind == ReliabilityKind::RELIABLE - }) -} - pub fn is_reader_reliable(reliability: &Option) -> bool { reliability.as_ref().map_or(false, |reliability| { reliability.kind == ReliabilityKind::RELIABLE diff --git a/zenoh-plugin-ros2dds/src/ros2_utils.rs b/zenoh-plugin-ros2dds/src/ros2_utils.rs index b1aec16..ba92ec5 100644 --- a/zenoh-plugin-ros2dds/src/ros2_utils.rs +++ b/zenoh-plugin-ros2dds/src/ros2_utils.rs @@ -14,10 +14,31 @@ use std::sync::atomic::{AtomicU32, Ordering}; -use cyclors::dds_entity_t; -use zenoh::prelude::KeyExpr; +use cyclors::{ + dds_entity_t, + qos::{ + Durability, DurabilityKind, History, HistoryKind, Qos, Reliability, ReliabilityKind, + TypeConsistency, TypeConsistencyKind, WriterDataLifecycle, DDS_INFINITE_TIME, + }, +}; +use zenoh::prelude::{keyexpr, KeyExpr}; -use crate::dds_utils::get_guid; +use crate::{dds_utils::get_guid, ke_for_sure}; + +pub const ROS2_ACTION_CANCEL_GOAL_SRV_TYPE: &str = "action_msgs/srv/CancelGoal"; +pub const ROS2_ACTION_STATUS_MSG_TYPE: &str = "action_msgs/msg/GoalStatusArray"; + +lazy_static::lazy_static!( + pub static ref KE_SUFFIX_ACTION_SEND_GOAL: &'static keyexpr = ke_for_sure!("_action/send_goal"); + pub static ref KE_SUFFIX_ACTION_CANCEL_GOAL: &'static keyexpr = ke_for_sure!("_action/cancel_goal"); + pub static ref KE_SUFFIX_ACTION_GET_RESULT: &'static keyexpr = ke_for_sure!("_action/get_result"); + pub static ref KE_SUFFIX_ACTION_FEEDBACK: &'static keyexpr = ke_for_sure!("_action/feedback"); + pub static ref KE_SUFFIX_ACTION_STATUS: &'static keyexpr = ke_for_sure!("_action/status"); + + pub static ref QOS_ACTION_FEEDBACK: Qos = ros2_action_feedback_default_qos(); + pub static ref QOS_ACTION_STATUS: Qos = ros2_action_status_default_qos(); + +); /// Convert DDS Topic type to ROS2 Message type pub fn dds_type_to_ros2_message_type(dds_topic: &str) -> String { @@ -31,10 +52,10 @@ pub fn dds_type_to_ros2_message_type(dds_topic: &str) -> String { /// Convert ROS2 Message type to DDS Topic type pub fn ros2_message_type_to_dds_type(ros_topic: &str) -> String { - let mut result = ros_topic.replace("/", "::"); - result - .rfind(':') - .map(|pos| result.insert_str(pos + 1, "dds_::")); + let mut result = ros_topic.replace('/', "::"); + if let Some(pos) = result.rfind(':') { + result.insert_str(pos + 1, "dds_::") + } result.push('_'); result } @@ -51,12 +72,12 @@ pub fn dds_type_to_ros2_service_type(dds_topic: &str) -> String { /// Convert ROS2 Service type to DDS Topic type for Request pub fn ros2_service_type_to_request_dds_type(ros_service: &str) -> String { - format!("{}Request_", ros2_message_type_to_dds_type(ros_service)) + ros2_message_type_to_dds_type(&format!("{ros_service}_Request")) } /// Convert ROS2 Service type to DDS Topic type for Reply pub fn ros2_service_type_to_reply_dds_type(ros_service: &str) -> String { - format!("{}Response_", ros2_message_type_to_dds_type(ros_service)) + ros2_message_type_to_dds_type(&format!("{ros_service}_Response")) } /// Convert DDS Topic type for ROS2 Action to ROS2 Action type @@ -74,6 +95,66 @@ pub fn dds_type_to_ros2_action_type(dds_topic: &str) -> String { ) } +fn ros2_action_feedback_default_qos() -> Qos { + let mut qos = Qos::default(); + qos.history = Some(History { + kind: HistoryKind::KEEP_LAST, + depth: 10, + }); + qos.reliability = Some(Reliability { + kind: ReliabilityKind::RELIABLE, + max_blocking_time: DDS_INFINITE_TIME, + }); + qos.data_representation = Some([0].into()); + qos.writer_data_lifecycle = Some(WriterDataLifecycle { + autodispose_unregistered_instances: false, + }); + qos.type_consistency = Some(TypeConsistency { + kind: TypeConsistencyKind::ALLOW_TYPE_COERCION, + ignore_sequence_bounds: true, + ignore_string_bounds: true, + ignore_member_names: false, + prevent_type_widening: false, + force_type_validation: false, + }); + qos +} + +fn ros2_action_status_default_qos() -> Qos { + let mut qos = Qos::default(); + qos.durability = Some(Durability { + kind: DurabilityKind::TRANSIENT_LOCAL, + }); + qos.reliability = Some(Reliability { + kind: ReliabilityKind::RELIABLE, + max_blocking_time: DDS_INFINITE_TIME, + }); + qos.data_representation = Some([0].into()); + qos.writer_data_lifecycle = Some(WriterDataLifecycle { + autodispose_unregistered_instances: false, + }); + qos.type_consistency = Some(TypeConsistency { + kind: TypeConsistencyKind::ALLOW_TYPE_COERCION, + ignore_sequence_bounds: true, + ignore_string_bounds: true, + ignore_member_names: false, + prevent_type_widening: false, + force_type_validation: false, + }); + qos +} + +pub fn is_service_for_action(ros2_service_name: &str) -> bool { + ros2_service_name.ends_with(KE_SUFFIX_ACTION_SEND_GOAL.as_str()) + || ros2_service_name.ends_with(KE_SUFFIX_ACTION_CANCEL_GOAL.as_str()) + || ros2_service_name.ends_with(KE_SUFFIX_ACTION_GET_RESULT.as_str()) +} + +pub fn is_message_for_action(ros2_message_name: &str) -> bool { + ros2_message_name.ends_with(KE_SUFFIX_ACTION_FEEDBACK.as_str()) + || ros2_message_name.ends_with(KE_SUFFIX_ACTION_STATUS.as_str()) +} + /// Check if name is a ROS name: starting with '/' and useable as a key expression (removing 1st '/') #[inline] pub fn check_ros_name(name: &str) -> Result<(), String> { @@ -95,7 +176,7 @@ lazy_static::lazy_static!( pub fn new_service_id(participant: &dds_entity_t) -> Result { // Service client or server id (16 bytes) generated in the same way than rmw_cyclone_dds here: // https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/rmw_node.cpp#L4908 - let mut id: [u8; 16] = *get_guid(&participant)?; + let mut id: [u8; 16] = *get_guid(participant)?; let counter_be = CLIENT_ID_COUNTER .fetch_add(1, Ordering::Relaxed) .to_be_bytes(); diff --git a/zenoh-plugin-ros2dds/src/ros_discovery.rs b/zenoh-plugin-ros2dds/src/ros_discovery.rs index 756e0b5..1c7a4c3 100644 --- a/zenoh-plugin-ros2dds/src/ros_discovery.rs +++ b/zenoh-plugin-ros2dds/src/ros_discovery.rs @@ -212,6 +212,19 @@ impl RosDiscoveryInfoMgr { *has_changed = true; } + pub fn add_dds_writers(&self, gids: Vec) { + let (ref mut info, ref mut has_changed) = *zwrite!(self.participant_entities_state); + let writer_gid_seq = &mut info + .node_entities_info_seq + .get_mut(&self.node_fullname) + .unwrap() + .writer_gid_seq; + for gid in gids { + writer_gid_seq.insert(gid); + } + *has_changed = true; + } + pub fn remove_dds_writer(&self, gid: Gid) { let (ref mut info, ref mut has_changed) = *zwrite!(self.participant_entities_state); info.node_entities_info_seq @@ -222,6 +235,19 @@ impl RosDiscoveryInfoMgr { *has_changed = true; } + pub fn remove_dds_writers(&self, gids: Vec) { + let (ref mut info, ref mut has_changed) = *zwrite!(self.participant_entities_state); + let writer_gid_seq = &mut info + .node_entities_info_seq + .get_mut(&self.node_fullname) + .unwrap() + .writer_gid_seq; + for gid in gids { + writer_gid_seq.remove(&gid); + } + *has_changed = true; + } + pub fn add_dds_reader(&self, gid: Gid) { let (ref mut info, ref mut has_changed) = *zwrite!(self.participant_entities_state); info.node_entities_info_seq @@ -232,6 +258,19 @@ impl RosDiscoveryInfoMgr { *has_changed = true; } + pub fn add_dds_readers(&self, gids: Vec) { + let (ref mut info, ref mut has_changed) = *zwrite!(self.participant_entities_state); + let reader_gid_seq = &mut info + .node_entities_info_seq + .get_mut(&self.node_fullname) + .unwrap() + .reader_gid_seq; + for gid in gids { + reader_gid_seq.insert(gid); + } + *has_changed = true; + } + pub fn remove_dds_reader(&self, gid: Gid) { let (ref mut info, ref mut has_changed) = *zwrite!(self.participant_entities_state); info.node_entities_info_seq @@ -242,6 +281,19 @@ impl RosDiscoveryInfoMgr { *has_changed = true; } + pub fn remove_dds_readers(&self, gids: Vec) { + let (ref mut info, ref mut has_changed) = *zwrite!(self.participant_entities_state); + let reader_gid_seq = &mut info + .node_entities_info_seq + .get_mut(&self.node_fullname) + .unwrap() + .reader_gid_seq; + for gid in gids { + reader_gid_seq.remove(&gid); + } + *has_changed = true; + } + pub fn read(&self) -> Vec { unsafe { let mut zp: *mut ddsi_serdata = std::ptr::null_mut(); @@ -271,7 +323,6 @@ impl RosDiscoveryInfoMgr { } map.values() - .into_iter() .filter_map(|sample| { log::trace!("Deserialize ParticipantEntitiesInfo: {:?}", sample); match cdr::deserialize_from::<_, ParticipantEntitiesInfo, _>( diff --git a/zenoh-plugin-ros2dds/src/route_action_cli.rs b/zenoh-plugin-ros2dds/src/route_action_cli.rs new file mode 100644 index 0000000..b640a00 --- /dev/null +++ b/zenoh-plugin-ros2dds/src/route_action_cli.rs @@ -0,0 +1,309 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use cyclors::dds_entity_t; +use serde::Serialize; +use std::{collections::HashSet, fmt, sync::Arc}; +use zenoh::{liveliness::LivelinessToken, prelude::*}; +use zenoh_core::AsyncResolve; + +use crate::{ + config::Config, gid::Gid, liveliness_mgt::new_ke_liveliness_action_cli, ros2_utils::*, + route_action_srv::serialize_action_zenoh_key_expr, route_service_cli::RouteServiceCli, + route_subscriber::RouteSubscriber, +}; + +#[derive(Serialize)] +pub struct RouteActionCli<'a> { + // the ROS2 Action name + ros2_name: String, + // the ROS2 type + ros2_type: String, + // the Zenoh key expression prefix used for services/messages routing + #[serde( + rename = "zenoh_key_expr", + serialize_with = "serialize_action_zenoh_key_expr" + )] + zenoh_key_expr_prefix: OwnedKeyExpr, + // the zenoh session + #[serde(skip)] + zsession: &'a Arc, + // the config + #[serde(skip)] + _config: Arc, + is_active: bool, + #[serde(skip)] + route_send_goal: RouteServiceCli<'a>, + #[serde(skip)] + route_cancel_goal: RouteServiceCli<'a>, + #[serde(skip)] + route_get_result: RouteServiceCli<'a>, + #[serde(skip)] + route_feedback: RouteSubscriber<'a>, + #[serde(skip)] + route_status: RouteSubscriber<'a>, + // a liveliness token associated to this route, for announcement to other plugins + #[serde(skip)] + liveliness_token: Option>, + // the list of remote routes served by this route (":"") + remote_routes: HashSet, + // the list of nodes served by this route + local_nodes: HashSet, +} + +impl fmt::Display for RouteActionCli<'_> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "Route Action Client (ROS:{} <-> Zenoh:{}/*)", + self.ros2_name, self.zenoh_key_expr_prefix + ) + } +} + +impl RouteActionCli<'_> { + #[allow(clippy::too_many_arguments)] + pub async fn create( + config: Arc, + zsession: &Arc, + participant: dds_entity_t, + ros2_name: String, + ros2_type: String, + zenoh_key_expr_prefix: OwnedKeyExpr, + ) -> Result, String> { + let route_send_goal = RouteServiceCli::create( + config.clone(), + zsession, + participant, + format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_SEND_GOAL), + format!("{ros2_type}_SendGoal"), + &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_SEND_GOAL, + &None, + ) + .await?; + + let route_cancel_goal = RouteServiceCli::create( + config.clone(), + zsession, + participant, + format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_CANCEL_GOAL), + ROS2_ACTION_CANCEL_GOAL_SRV_TYPE.to_string(), + &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_CANCEL_GOAL, + &None, + ) + .await?; + + let route_get_result = RouteServiceCli::create( + config.clone(), + zsession, + participant, + format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_GET_RESULT), + format!("{ros2_type}_GetResult"), + &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_GET_RESULT, + &None, + ) + .await?; + + let route_feedback = RouteSubscriber::create( + config.clone(), + zsession, + participant, + format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_FEEDBACK), + format!("{ros2_type}_FeedbackMessage"), + &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_FEEDBACK, + true, + QOS_ACTION_FEEDBACK.clone(), + ) + .await?; + + let route_status = RouteSubscriber::create( + config.clone(), + zsession, + participant, + format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_STATUS), + ROS2_ACTION_STATUS_MSG_TYPE.to_string(), + &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_STATUS, + true, + QOS_ACTION_STATUS.clone(), + ) + .await?; + + Ok(RouteActionCli { + ros2_name, + ros2_type, + zenoh_key_expr_prefix, + zsession, + _config: config, + is_active: false, + route_send_goal, + route_cancel_goal, + route_get_result, + route_feedback, + route_status, + liveliness_token: None, + remote_routes: HashSet::new(), + local_nodes: HashSet::new(), + }) + } + + async fn activate<'a>(&'a mut self, plugin_id: &keyexpr) -> Result<(), String> { + self.is_active = true; + + // create associated LivelinessToken + let liveliness_ke = + new_ke_liveliness_action_cli(plugin_id, &self.zenoh_key_expr_prefix, &self.ros2_type)?; + let ros2_name = self.ros2_name.clone(); + self.liveliness_token = Some(self.zsession + .liveliness() + .declare_token(liveliness_ke) + .res_async() + .await + .map_err(|e| { + format!( + "Failed create LivelinessToken associated to route for Action Client {ros2_name}: {e}" + ) + })? + ); + Ok(()) + } + + fn deactivate(&mut self) { + log::debug!("{self} deactivate"); + // Drop Zenoh Publisher and Liveliness token + // The DDS Writer remains to be discovered by local ROS nodes + self.is_active = false; + self.liveliness_token = None; + } + + pub fn dds_writers_guids(&self) -> Result, String> { + Ok([ + self.route_send_goal.dds_rep_writer_guid()?, + self.route_cancel_goal.dds_rep_writer_guid()?, + self.route_get_result.dds_rep_writer_guid()?, + self.route_feedback.dds_writer_guid()?, + self.route_status.dds_writer_guid()?, + ] + .into()) + } + + pub fn dds_readers_guids(&self) -> Result, String> { + Ok([ + self.route_send_goal.dds_req_reader_guid()?, + self.route_cancel_goal.dds_req_reader_guid()?, + self.route_get_result.dds_req_reader_guid()?, + ] + .into()) + } + + #[inline] + pub fn add_remote_route(&mut self, plugin_id: &str, zenoh_key_expr_prefix: &keyexpr) { + self.route_send_goal.add_remote_route( + plugin_id, + &(zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_SEND_GOAL), + ); + self.route_cancel_goal.add_remote_route( + plugin_id, + &(zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_CANCEL_GOAL), + ); + self.route_get_result.add_remote_route( + plugin_id, + &(zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_GET_RESULT), + ); + self.route_feedback.add_remote_route( + plugin_id, + &(zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_FEEDBACK), + ); + self.route_status.add_remote_route( + plugin_id, + &(zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_STATUS), + ); + self.remote_routes + .insert(format!("{plugin_id}:{zenoh_key_expr_prefix}")); + log::debug!("{self} now serving remote routes {:?}", self.remote_routes); + } + + #[inline] + pub fn remove_remote_route(&mut self, plugin_id: &str, zenoh_key_expr_prefix: &keyexpr) { + self.route_send_goal.remove_remote_route( + plugin_id, + &(zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_SEND_GOAL), + ); + self.route_cancel_goal.remove_remote_route( + plugin_id, + &(zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_CANCEL_GOAL), + ); + self.route_get_result.remove_remote_route( + plugin_id, + &(zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_GET_RESULT), + ); + self.route_feedback.remove_remote_route( + plugin_id, + &(zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_FEEDBACK), + ); + self.route_status.remove_remote_route( + plugin_id, + &(zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_STATUS), + ); + self.remote_routes + .remove(&format!("{plugin_id}:{zenoh_key_expr_prefix}")); + log::debug!("{self} now serving remote routes {:?}", self.remote_routes); + } + + #[inline] + pub async fn add_local_node(&mut self, node: String, plugin_id: &keyexpr) { + futures::join!( + self.route_send_goal.add_local_node(node.clone(), plugin_id), + self.route_cancel_goal + .add_local_node(node.clone(), plugin_id), + self.route_get_result + .add_local_node(node.clone(), plugin_id), + self.route_feedback + .add_local_node(node.clone(), plugin_id, &QOS_ACTION_FEEDBACK), + self.route_status + .add_local_node(node.clone(), plugin_id, &QOS_ACTION_STATUS), + ); + + self.local_nodes.insert(node); + log::debug!("{self} now serving local nodes {:?}", self.local_nodes); + // if 1st local node added, activate the route + if self.local_nodes.len() == 1 { + if let Err(e) = self.activate(plugin_id).await { + log::error!("{self} activation failed: {e}"); + } + } + } + + #[inline] + pub fn remove_local_node(&mut self, node: &str) { + self.route_send_goal.remove_local_node(node); + self.route_cancel_goal.remove_local_node(node); + self.route_get_result.remove_local_node(node); + self.route_feedback.remove_local_node(node); + self.route_status.remove_local_node(node); + + self.local_nodes.remove(node); + log::debug!("{self} now serving local nodes {:?}", self.local_nodes); + // if last local node removed, deactivate the route + if self.local_nodes.is_empty() { + self.deactivate(); + } + } + + pub fn is_unused(&self) -> bool { + self.route_send_goal.is_unused() + && self.route_cancel_goal.is_unused() + && self.route_get_result.is_unused() + && self.route_status.is_unused() + && self.route_feedback.is_unused() + } +} diff --git a/zenoh-plugin-ros2dds/src/route_action_srv.rs b/zenoh-plugin-ros2dds/src/route_action_srv.rs new file mode 100644 index 0000000..d0cc5bb --- /dev/null +++ b/zenoh-plugin-ros2dds/src/route_action_srv.rs @@ -0,0 +1,321 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +use cyclors::dds_entity_t; +use serde::{Serialize, Serializer}; +use std::{collections::HashSet, fmt, sync::Arc}; +use zenoh::{liveliness::LivelinessToken, prelude::*}; +use zenoh_core::AsyncResolve; + +use crate::{ + config::Config, gid::Gid, liveliness_mgt::new_ke_liveliness_action_srv, ros2_utils::*, + route_publisher::RoutePublisher, route_service_srv::RouteServiceSrv, +}; + +#[derive(Serialize)] +pub struct RouteActionSrv<'a> { + // the ROS2 Action name + ros2_name: String, + // the ROS2 type + ros2_type: String, + // the Zenoh key expression prefix used for services/messages routing + #[serde( + rename = "zenoh_key_expr", + serialize_with = "serialize_action_zenoh_key_expr" + )] + zenoh_key_expr_prefix: OwnedKeyExpr, + // the zenoh session + #[serde(skip)] + zsession: &'a Arc, + // the config + #[serde(skip)] + _config: Arc, + is_active: bool, + #[serde(skip)] + route_send_goal: RouteServiceSrv<'a>, + #[serde(skip)] + route_cancel_goal: RouteServiceSrv<'a>, + #[serde(skip)] + route_get_result: RouteServiceSrv<'a>, + #[serde(skip)] + route_feedback: RoutePublisher<'a>, + #[serde(skip)] + route_status: RoutePublisher<'a>, + // a liveliness token associated to this route, for announcement to other plugins + #[serde(skip)] + liveliness_token: Option>, + // the list of remote routes served by this route (":"") + remote_routes: HashSet, + // the list of nodes served by this route + local_nodes: HashSet, +} + +impl fmt::Display for RouteActionSrv<'_> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "Route Action Server (ROS:{} <-> Zenoh:{}/*)", + self.ros2_name, self.zenoh_key_expr_prefix + ) + } +} + +impl RouteActionSrv<'_> { + #[allow(clippy::too_many_arguments)] + pub async fn create( + config: Arc, + zsession: &Arc, + participant: dds_entity_t, + ros2_name: String, + ros2_type: String, + zenoh_key_expr_prefix: OwnedKeyExpr, + ) -> Result, String> { + let route_send_goal = RouteServiceSrv::create( + config.clone(), + zsession, + participant, + format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_SEND_GOAL), + format!("{ros2_type}_SendGoal"), + &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_SEND_GOAL, + &None, + ) + .await?; + + let route_cancel_goal = RouteServiceSrv::create( + config.clone(), + zsession, + participant, + format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_CANCEL_GOAL), + ROS2_ACTION_CANCEL_GOAL_SRV_TYPE.to_string(), + &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_CANCEL_GOAL, + &None, + ) + .await?; + + let route_get_result = RouteServiceSrv::create( + config.clone(), + zsession, + participant, + format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_GET_RESULT), + format!("{ros2_type}_GetResult"), + &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_GET_RESULT, + &None, + ) + .await?; + + let route_feedback = RoutePublisher::create( + config.clone(), + zsession, + participant, + format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_FEEDBACK), + format!("{ros2_type}_FeedbackMessage"), + &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_FEEDBACK, + &None, + true, + QOS_ACTION_FEEDBACK.clone(), + ) + .await?; + + let route_status = RoutePublisher::create( + config.clone(), + zsession, + participant, + format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_STATUS), + ROS2_ACTION_STATUS_MSG_TYPE.to_string(), + &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_STATUS, + &None, + true, + QOS_ACTION_STATUS.clone(), + ) + .await?; + + Ok(RouteActionSrv { + ros2_name, + ros2_type, + zenoh_key_expr_prefix, + zsession, + _config: config, + is_active: false, + route_send_goal, + route_cancel_goal, + route_get_result, + route_feedback, + route_status, + liveliness_token: None, + remote_routes: HashSet::new(), + local_nodes: HashSet::new(), + }) + } + + async fn activate<'a>(&'a mut self, plugin_id: &keyexpr) -> Result<(), String> { + self.is_active = true; + + // create associated LivelinessToken + let liveliness_ke = + new_ke_liveliness_action_srv(plugin_id, &self.zenoh_key_expr_prefix, &self.ros2_type)?; + let ros2_name = self.ros2_name.clone(); + self.liveliness_token = Some(self.zsession + .liveliness() + .declare_token(liveliness_ke) + .res_async() + .await + .map_err(|e| { + format!( + "Failed create LivelinessToken associated to route for Action Service {ros2_name}: {e}" + ) + })? + ); + Ok(()) + } + + fn deactivate(&mut self) { + log::debug!("{self} deactivate"); + // Drop Zenoh Publisher and Liveliness token + // The DDS Writer remains to be discovered by local ROS nodes + self.is_active = false; + self.liveliness_token = None; + } + + pub fn dds_writers_guids(&self) -> Result, String> { + Ok([ + self.route_send_goal.dds_req_writer_guid()?, + self.route_cancel_goal.dds_req_writer_guid()?, + self.route_get_result.dds_req_writer_guid()?, + ] + .into()) + } + + pub fn dds_readers_guids(&self) -> Result, String> { + Ok([ + self.route_send_goal.dds_rep_reader_guid()?, + self.route_cancel_goal.dds_rep_reader_guid()?, + self.route_get_result.dds_rep_reader_guid()?, + self.route_feedback.dds_reader_guid()?, + self.route_status.dds_reader_guid()?, + ] + .into()) + } + + #[inline] + pub fn add_remote_route(&mut self, plugin_id: &str, zenoh_key_expr_prefix: &keyexpr) { + self.route_send_goal.add_remote_route( + plugin_id, + &(zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_SEND_GOAL), + ); + self.route_cancel_goal.add_remote_route( + plugin_id, + &(zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_CANCEL_GOAL), + ); + self.route_get_result.add_remote_route( + plugin_id, + &(zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_GET_RESULT), + ); + self.route_feedback.add_remote_route( + plugin_id, + &(zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_FEEDBACK), + ); + self.route_status.add_remote_route( + plugin_id, + &(zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_STATUS), + ); + self.remote_routes + .insert(format!("{plugin_id}:{zenoh_key_expr_prefix}")); + log::debug!("{self} now serving remote routes {:?}", self.remote_routes); + } + + #[inline] + pub fn remove_remote_route(&mut self, plugin_id: &str, zenoh_key_expr_prefix: &keyexpr) { + self.route_send_goal.remove_remote_route( + plugin_id, + &(zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_SEND_GOAL), + ); + self.route_cancel_goal.remove_remote_route( + plugin_id, + &(zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_CANCEL_GOAL), + ); + self.route_get_result.remove_remote_route( + plugin_id, + &(zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_GET_RESULT), + ); + self.route_feedback.remove_remote_route( + plugin_id, + &(zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_FEEDBACK), + ); + self.route_status.remove_remote_route( + plugin_id, + &(zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_STATUS), + ); + self.remote_routes + .remove(&format!("{plugin_id}:{zenoh_key_expr_prefix}")); + log::debug!("{self} now serving remote routes {:?}", self.remote_routes); + } + + #[inline] + pub async fn add_local_node(&mut self, node: String, plugin_id: &keyexpr) { + futures::join!( + self.route_send_goal.add_local_node(node.clone(), plugin_id), + self.route_cancel_goal + .add_local_node(node.clone(), plugin_id), + self.route_get_result + .add_local_node(node.clone(), plugin_id), + self.route_feedback + .add_local_node(node.clone(), plugin_id, &QOS_ACTION_FEEDBACK), + self.route_status + .add_local_node(node.clone(), plugin_id, &QOS_ACTION_STATUS), + ); + + self.local_nodes.insert(node); + log::debug!("{self} now serving local nodes {:?}", self.local_nodes); + // if 1st local node added, activate the route + if self.local_nodes.len() == 1 { + if let Err(e) = self.activate(plugin_id).await { + log::error!("{self} activation failed: {e}"); + } + } + } + + #[inline] + pub fn remove_local_node(&mut self, node: &str) { + self.route_send_goal.remove_local_node(node); + self.route_cancel_goal.remove_local_node(node); + self.route_get_result.remove_local_node(node); + self.route_feedback.remove_local_node(node); + self.route_status.remove_local_node(node); + + self.local_nodes.remove(node); + log::debug!("{self} now serving local nodes {:?}", self.local_nodes); + // if last local node removed, deactivate the route + if self.local_nodes.is_empty() { + self.deactivate(); + } + } + + pub fn is_unused(&self) -> bool { + self.route_send_goal.is_unused() + && self.route_cancel_goal.is_unused() + && self.route_get_result.is_unused() + && self.route_status.is_unused() + && self.route_feedback.is_unused() + } +} + +pub fn serialize_action_zenoh_key_expr( + zenoh_key_expr_prefix: &OwnedKeyExpr, + ser: S, +) -> Result +where + S: Serializer, +{ + let str = format!("{zenoh_key_expr_prefix}/*"); + ser.serialize_str(&str) +} diff --git a/zenoh-plugin-ros2dds/src/route_publisher.rs b/zenoh-plugin-ros2dds/src/route_publisher.rs index f42a94d..816925c 100644 --- a/zenoh-plugin-ros2dds/src/route_publisher.rs +++ b/zenoh-plugin-ros2dds/src/route_publisher.rs @@ -28,7 +28,7 @@ use crate::dds_types::TypeInfo; use crate::dds_utils::{delete_dds_entity, get_guid, serialize_entity_guid}; use crate::gid::Gid; use crate::liveliness_mgt::new_ke_liveliness_pub; -use crate::ros2_utils::ros2_message_type_to_dds_type; +use crate::ros2_utils::{is_message_for_action, ros2_message_type_to_dds_type}; use crate::{qos_helpers::*, Config}; use crate::{serialize_option_as_bool, KE_PREFIX_PUB_CACHE}; @@ -243,26 +243,29 @@ impl RoutePublisher<'_> { Some(ZPublisher::Publisher(declared_ke.clone())) }; - // create associated LivelinessToken - let liveliness_ke = new_ke_liveliness_pub( - plugin_id, - &self.zenoh_key_expr, - &self.ros2_type, - self.keyless, - &discovered_writer_qos, - )?; - let ros2_name = self.ros2_name.clone(); - self.liveliness_token = Some(self.zsession - .liveliness() - .declare_token(liveliness_ke) - .res() - .await - .map_err(|e| { - format!( - "Failed create LivelinessToken associated to route for Publisher {ros2_name}: {e}" - ) - })? - ); + // if not for an Action (since actions declare their own liveliness) + if !is_message_for_action(&self.ros2_name) { + // create associated LivelinessToken + let liveliness_ke = new_ke_liveliness_pub( + plugin_id, + &self.zenoh_key_expr, + &self.ros2_type, + self.keyless, + discovered_writer_qos, + )?; + let ros2_name = self.ros2_name.clone(); + self.liveliness_token = Some(self.zsession + .liveliness() + .declare_token(liveliness_ke) + .res() + .await + .map_err(|e| { + format!( + "Failed create LivelinessToken associated to route for Publisher {ros2_name}: {e}" + ) + })? + ); + } Ok(()) } diff --git a/zenoh-plugin-ros2dds/src/route_service_cli.rs b/zenoh-plugin-ros2dds/src/route_service_cli.rs index c6ff168..e6c2b47 100644 --- a/zenoh-plugin-ros2dds/src/route_service_cli.rs +++ b/zenoh-plugin-ros2dds/src/route_service_cli.rs @@ -32,7 +32,8 @@ use crate::dds_utils::{ use crate::gid::Gid; use crate::liveliness_mgt::new_ke_liveliness_service_cli; use crate::ros2_utils::{ - new_service_id, ros2_service_type_to_reply_dds_type, ros2_service_type_to_request_dds_type, + is_service_for_action, new_service_id, ros2_service_type_to_reply_dds_type, + ros2_service_type_to_request_dds_type, }; use crate::{Config, LOG_PAYLOAD}; @@ -83,7 +84,7 @@ impl fmt::Display for RouteServiceCli<'_> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( f, - "Route Service Client (ROS:{} -> Zenoh:{})", + "Route Service Client (ROS:{} <-> Zenoh:{})", self.ros2_name, self.zenoh_key_expr ) } @@ -101,7 +102,7 @@ impl RouteServiceCli<'_> { type_info: &Option>, ) -> Result, String> { log::debug!( - "Route Service Client ({ros2_name} -> {zenoh_key_expr}): creation with type {ros2_type}" + "Route Service Client (ROS:{ros2_name} <-> Zenoh:{zenoh_key_expr}): creation with type {ros2_type}" ); // Default Service QoS copied from: @@ -122,7 +123,7 @@ impl RouteServiceCli<'_> { let user_data = format!("serviceid= {server_id_str};"); qos.user_data = Some(user_data.into_bytes()); log::debug!( - "Route Service Client ({ros2_name} -> {zenoh_key_expr}): using id '{server_id_str}' => USER_DATA={:?}", qos.user_data.as_ref().unwrap() + "Route Service Client (ROS:{ros2_name} <-> Zenoh:{zenoh_key_expr}): using id '{server_id_str}' => USER_DATA={:?}", qos.user_data.as_ref().unwrap() ); // create DDS Writer to send replies coming from Zenoh to the Client @@ -137,7 +138,7 @@ impl RouteServiceCli<'_> { )?; let route_id: String = - format!("Route Service Client (ROS:{ros2_name} -> Zenoh:{zenoh_key_expr})",); + format!("Route Service Client (ROS:{ros2_name} <-> Zenoh:{zenoh_key_expr})",); // create DDS Reader to receive requests and route them to Zenoh let req_topic_name = format!("rq{ros2_name}Request"); @@ -181,21 +182,24 @@ impl RouteServiceCli<'_> { async fn activate<'a>(&'a mut self, plugin_id: &keyexpr) -> Result<(), String> { self.is_active = true; - // create associated LivelinessToken - let liveliness_ke = - new_ke_liveliness_service_cli(plugin_id, &self.zenoh_key_expr, &self.ros2_type)?; - let ros2_name = self.ros2_name.clone(); - self.liveliness_token = Some(self.zsession - .liveliness() - .declare_token(liveliness_ke) - .res_async() - .await - .map_err(|e| { - format!( - "Failed create LivelinessToken associated to route for Service Client {ros2_name}: {e}" - ) - })? - ); + // if not for an Action (since actions declare their own liveliness) + if !is_service_for_action(&self.ros2_name) { + // create associated LivelinessToken + let liveliness_ke = + new_ke_liveliness_service_cli(plugin_id, &self.zenoh_key_expr, &self.ros2_type)?; + let ros2_name = self.ros2_name.clone(); + self.liveliness_token = Some(self.zsession + .liveliness() + .declare_token(liveliness_ke) + .res_async() + .await + .map_err(|e| { + format!( + "Failed create LivelinessToken associated to route for Service Client {ros2_name}: {e}" + ) + })? + ); + } Ok(()) } @@ -267,14 +271,13 @@ impl RouteServiceCli<'_> { } } -fn do_route_request<'a>( +fn do_route_request( route_id: &str, sample: &DDSRawSample, zenoh_key_expr: OwnedKeyExpr, - zsession: &'a Arc, + zsession: &Arc, rep_writer: dds_entity_t, ) { - println!("-- {route_id} Routing request..."); // request payload is expected to be the Request type encoded as CDR, including a 4 bytes header, // the client guid (8 bytes) and a sequence_number (8 bytes). As per rmw_cyclonedds here: // https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/serdata.hpp#L73 @@ -284,11 +287,8 @@ fn do_route_request<'a>( } let zbuf: ZBuf = sample.into(); - println!("--- {zbuf:?}"); let dds_req_buf = zbuf.contiguous(); - println!("--- {dds_req_buf:02x?}"); let request_id: [u8; 16] = dds_req_buf[4..20].try_into().unwrap(); - println!("--- {request_id:02x?}"); // route request buffer stripped from request_id (client_id + sequence_number) let mut zenoh_req_buf = ZBuf::empty(); diff --git a/zenoh-plugin-ros2dds/src/route_service_srv.rs b/zenoh-plugin-ros2dds/src/route_service_srv.rs index 98e583e..31be52a 100644 --- a/zenoh-plugin-ros2dds/src/route_service_srv.rs +++ b/zenoh-plugin-ros2dds/src/route_service_srv.rs @@ -36,13 +36,13 @@ use crate::dds_utils::{ use crate::gid::Gid; use crate::liveliness_mgt::new_ke_liveliness_service_srv; use crate::ros2_utils::{ - new_service_id, ros2_service_type_to_reply_dds_type, ros2_service_type_to_request_dds_type, + is_service_for_action, new_service_id, ros2_service_type_to_reply_dds_type, + ros2_service_type_to_request_dds_type, }; use crate::Config; use crate::{serialize_option_as_bool, LOG_PAYLOAD}; // a route for a Service Server exposed in Zenoh as a Queryable -#[allow(clippy::upper_case_acronyms)] #[derive(Serialize)] pub struct RouteServiceSrv<'a> { // the ROS2 Service name @@ -100,7 +100,7 @@ impl fmt::Display for RouteServiceSrv<'_> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( f, - "Route Service Server (ROS:{} -> Zenoh:{})", + "Route Service Server (ROS:{} <-> Zenoh:{})", self.ros2_name, self.zenoh_key_expr ) } @@ -118,7 +118,7 @@ impl RouteServiceSrv<'_> { type_info: &Option>, ) -> Result, String> { log::debug!( - "Route Service Server ({ros2_name} -> {zenoh_key_expr}): creation with type {ros2_type}" + "Route Service Server (ROS:{ros2_name} <-> Zenoh:{zenoh_key_expr}): creation with type {ros2_type}" ); // Default Service QoS copied from: @@ -139,7 +139,7 @@ impl RouteServiceSrv<'_> { let user_data = format!("clientid= {client_id_str};"); qos.user_data = Some(user_data.into_bytes()); log::debug!( - "Route Service Server ({ros2_name} -> {zenoh_key_expr}): using id '{client_id_str}' => USER_DATA={:?}", qos.user_data.as_ref().unwrap() + "Route Service Server (ROS:{ros2_name} <-> Zenoh:{zenoh_key_expr}): using id '{client_id_str}' => USER_DATA={:?}", qos.user_data.as_ref().unwrap() ); // create DDS Writer to send requests coming from Zenoh to the Service @@ -178,7 +178,7 @@ impl RouteServiceSrv<'_> { do_route_reply( sample, zenoh_key_expr2.clone(), - &mut *zwrite!(queries_in_progress2), + &mut zwrite!(queries_in_progress2), "", client_guid, ); @@ -231,7 +231,7 @@ impl RouteServiceSrv<'_> { .callback(move |query| { do_route_request( query, - &mut *zwrite!(queries_in_progress), + &mut zwrite!(queries_in_progress), &sequence_number, &route_id, client_guid, @@ -248,21 +248,24 @@ impl RouteServiceSrv<'_> { })?, ); - // create associated LivelinessToken - let liveliness_ke = - new_ke_liveliness_service_srv(plugin_id, &self.zenoh_key_expr, &self.ros2_type)?; - let ros2_name = self.ros2_name.clone(); - self.liveliness_token = Some(self.zsession - .liveliness() - .declare_token(liveliness_ke) - .res() - .await - .map_err(|e| { - format!( - "Failed create LivelinessToken associated to route for Service Server {ros2_name}: {e}" - ) - })? - ); + // if not for an Action (since actions declare their own liveliness) + if !is_service_for_action(&self.ros2_name) { + // create associated LivelinessToken + let liveliness_ke = + new_ke_liveliness_service_srv(plugin_id, &self.zenoh_key_expr, &self.ros2_type)?; + let ros2_name = self.ros2_name.clone(); + self.liveliness_token = Some(self.zsession + .liveliness() + .declare_token(liveliness_ke) + .res() + .await + .map_err(|e| { + format!( + "Failed create LivelinessToken associated to route for Service Server {ros2_name}: {e}" + ) + })? + ); + } Ok(()) } @@ -377,7 +380,7 @@ fn do_route_request( dds_req_buf } else { // No query payload - send a request containing just client_guid + sequence_number - let mut dds_req_buf: Vec = CDR_HEADER_LE.clone().into(); + let mut dds_req_buf: Vec = CDR_HEADER_LE.into(); dds_req_buf.extend_from_slice(&client_guid.to_le_bytes()); dds_req_buf.extend_from_slice(&n.to_le_bytes()); dds_req_buf diff --git a/zenoh-plugin-ros2dds/src/route_subscriber.rs b/zenoh-plugin-ros2dds/src/route_subscriber.rs index c709da7..c9b2209 100644 --- a/zenoh-plugin-ros2dds/src/route_subscriber.rs +++ b/zenoh-plugin-ros2dds/src/route_subscriber.rs @@ -31,7 +31,7 @@ use crate::dds_utils::{create_dds_writer, delete_dds_entity, get_guid}; use crate::gid::Gid; use crate::liveliness_mgt::new_ke_liveliness_sub; use crate::qos_helpers::is_transient_local; -use crate::ros2_utils::ros2_message_type_to_dds_type; +use crate::ros2_utils::{is_message_for_action, ros2_message_type_to_dds_type}; use crate::{ dds_utils::serialize_entity_guid, qos::Qos, vec_into_raw_parts, Config, KE_ANY_1_SEGMENT, LOG_PAYLOAD, @@ -58,7 +58,7 @@ pub struct RouteSubscriber<'a> { zsession: &'a Arc, // the config #[serde(skip)] - _config: Arc, + config: Arc, // the zenoh subscriber receiving data to be re-published by the DDS Writer // `None` when route is created on a remote announcement and no local ROS2 Subscriber discovered yet #[serde(rename = "is_active", serialize_with = "serialize_option_as_bool")] @@ -100,16 +100,16 @@ impl fmt::Display for RouteSubscriber<'_> { impl RouteSubscriber<'_> { #[allow(clippy::too_many_arguments)] - pub async fn create<'a, 'b>( + pub async fn create<'b>( config: Arc, - zsession: &'a Arc, + zsession: &Arc, participant: dds_entity_t, ros2_name: String, ros2_type: String, zenoh_key_expr: OwnedKeyExpr, keyless: bool, writer_qos: Qos, - ) -> Result, String> { + ) -> Result, String> { let transient_local = is_transient_local(&writer_qos); log::debug!("Route Subscriber ({zenoh_key_expr} -> {ros2_name}): creation with type {ros2_type} (transient_local:{transient_local})"); @@ -124,7 +124,7 @@ impl RouteSubscriber<'_> { ros2_type, zenoh_key_expr, zsession, - _config: config, + config: config, zenoh_subscriber: None, dds_writer, transient_local, @@ -137,7 +137,6 @@ impl RouteSubscriber<'_> { async fn activate( &mut self, - config: &Config, plugin_id: &keyexpr, discovered_reader_qos: &Qos, ) -> Result<(), String> { @@ -155,34 +154,7 @@ impl RouteSubscriber<'_> { // query all PublicationCaches on "/*/" let query_selector: Selector = (*KE_PREFIX_PUB_CACHE / *KE_ANY_1_SEGMENT / &self.zenoh_key_expr).into(); - log::error!("{self}: query historical data from everybody for TRANSIENT_LOCAL Reader on {query_selector}"); - { - use zenoh_core::SyncResolve; - // - println!("********* QUERY FROM {query_selector}"); - let rep = self - .zsession - .get(&query_selector) - .target(QueryTarget::All) - .consolidation(ConsolidationMode::None) - .accept_replies(ReplyKeyExpr::Any) - .res_sync() - .unwrap(); - while let Ok(reply) = rep.recv() { - match reply.sample { - Ok(sample) => println!( - ">>>>>> Received ('{}': '{:02x?}')", - sample.key_expr.as_str(), - sample.value.payload.contiguous(), - ), - Err(err) => { - println!(">> Received (ERROR: '{}')", String::try_from(&err).unwrap()) - } - } - } - // - } - + log::debug!("{self}: query historical data from everybody for TRANSIENT_LOCAL Reader on {query_selector}"); let sub = self .zsession .declare_subscriber(&self.zenoh_key_expr) @@ -190,7 +162,7 @@ impl RouteSubscriber<'_> { .allowed_origin(Locality::Remote) // Allow only remote publications to avoid loops .reliable() .querying() - .query_timeout(config.queries_timeout) + .query_timeout(self.config.queries_timeout) .query_selector(query_selector) .query_accept_replies(ReplyKeyExpr::Any) .res() @@ -210,27 +182,30 @@ impl RouteSubscriber<'_> { Some(ZSubscriber::Subscriber(sub)) }; - // create associated LivelinessToken - let liveliness_ke = new_ke_liveliness_sub( - plugin_id, - &self.zenoh_key_expr, - &self.ros2_type, - self.keyless, - &discovered_reader_qos, - )?; - let ros2_name = self.ros2_name.clone(); - self.liveliness_token = Some( - self.zsession - .liveliness() - .declare_token(liveliness_ke) - .res() - .await - .map_err(|e| { - format!( - "Failed create LivelinessToken associated to route for Subscriber {ros2_name} : {e}" - ) - })?, - ); + // if not for an Action (since actions declare their own liveliness) + if !is_message_for_action(&self.ros2_name) { + // create associated LivelinessToken + let liveliness_ke = new_ke_liveliness_sub( + plugin_id, + &self.zenoh_key_expr, + &self.ros2_type, + self.keyless, + discovered_reader_qos, + )?; + let ros2_name = self.ros2_name.clone(); + self.liveliness_token = Some( + self.zsession + .liveliness() + .declare_token(liveliness_ke) + .res() + .await + .map_err(|e| { + format!( + "Failed create LivelinessToken associated to route for Subscriber {ros2_name} : {e}" + ) + })?, + ); + } Ok(()) } @@ -261,33 +236,6 @@ impl RouteSubscriber<'_> { .fetch({ let session = &self.zsession; let query_selector = query_selector.clone(); - { - use zenoh_core::SyncResolve; - // - println!("********* FETCH FROM {query_selector}"); - let rep = session - .get(&query_selector) - .target(QueryTarget::All) - .consolidation(ConsolidationMode::None) - .accept_replies(ReplyKeyExpr::Any) - .res_sync() - .unwrap(); - while let Ok(reply) = rep.recv() { - match reply.sample { - Ok(sample) => println!( - ">>>>>> Received ('{}': '{:02x?}')", - sample.key_expr.as_str(), - sample.value.payload.contiguous(), - ), - Err(err) => println!( - ">> Received (ERROR: '{}')", - String::try_from(&err).unwrap() - ), - } - } - // - } - move |cb| { use zenoh_core::SyncResolve; session @@ -341,7 +289,6 @@ impl RouteSubscriber<'_> { pub async fn add_local_node( &mut self, entity_key: String, - config: &Config, plugin_id: &keyexpr, discovered_reader_qos: &Qos, ) { @@ -349,10 +296,7 @@ impl RouteSubscriber<'_> { log::debug!("{self} now serving local nodes {:?}", self.local_nodes); // if 1st local node added, activate the route if self.local_nodes.len() == 1 { - if let Err(e) = self - .activate(config, plugin_id, discovered_reader_qos) - .await - { + if let Err(e) = self.activate(plugin_id, discovered_reader_qos).await { log::error!("{self} activation failed: {e}"); } } diff --git a/zenoh-plugin-ros2dds/src/routes_mgr.rs b/zenoh-plugin-ros2dds/src/routes_mgr.rs index 92f3c70..1279ecc 100644 --- a/zenoh-plugin-ros2dds/src/routes_mgr.rs +++ b/zenoh-plugin-ros2dds/src/routes_mgr.rs @@ -18,6 +18,8 @@ use crate::events::ROS2DiscoveryEvent; use crate::qos_helpers::adapt_reader_qos_for_writer; use crate::qos_helpers::adapt_writer_qos_for_reader; use crate::ros_discovery::RosDiscoveryInfoMgr; +use crate::route_action_cli::RouteActionCli; +use crate::route_action_srv::RouteActionSrv; use crate::route_publisher::RoutePublisher; use crate::route_service_cli::RouteServiceCli; use crate::route_service_srv::RouteServiceSrv; @@ -44,6 +46,8 @@ lazy_static::lazy_static!( static ref KE_PREFIX_ROUTE_SUBSCRIBER: &'static keyexpr = ke_for_sure!("route/topic/sub"); static ref KE_PREFIX_ROUTE_SERVICE_SRV: &'static keyexpr = ke_for_sure!("route/service/srv"); static ref KE_PREFIX_ROUTE_SERVICE_CLI: &'static keyexpr = ke_for_sure!("route/service/cli"); + static ref KE_PREFIX_ROUTE_ACTION_SRV: &'static keyexpr = ke_for_sure!("route/action/srv"); + static ref KE_PREFIX_ROUTE_ACTION_CLI: &'static keyexpr = ke_for_sure!("route/action/cli"); ); #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] @@ -60,6 +64,8 @@ enum RouteRef { SubscriberRoute(String), ServiceSrvRoute(String), ServiceCliRoute(String), + ActionSrvRoute(String), + ActionCliRoute(String), } pub struct RoutesMgr<'a> { @@ -73,6 +79,8 @@ pub struct RoutesMgr<'a> { routes_subscribers: HashMap>, routes_service_srv: HashMap>, routes_service_cli: HashMap>, + routes_action_srv: HashMap>, + routes_action_cli: HashMap>, // ros_discovery_info read/write manager ros_discovery_mgr: Arc, admin_prefix: OwnedKeyExpr, @@ -100,6 +108,8 @@ impl<'a> RoutesMgr<'a> { routes_subscribers: HashMap::new(), routes_service_srv: HashMap::new(), routes_service_cli: HashMap::new(), + routes_action_srv: HashMap::new(), + routes_action_cli: HashMap::new(), ros_discovery_mgr, admin_prefix, admin_space: HashMap::new(), @@ -132,11 +142,10 @@ impl<'a> RoutesMgr<'a> { iface.typ, entity.keyless, adapt_writer_qos_for_reader(&entity.qos), + true, ) .await?; - route - .add_local_node(node.into(), &plugin_id, &entity.qos) - .await; + route.add_local_node(node, &plugin_id, &entity.qos).await; } UndiscoveredMsgPub(node, iface) => { @@ -171,7 +180,6 @@ impl<'a> RoutesMgr<'a> { .clone() }; let plugin_id = self.plugin_id.clone(); - let config = self.config.clone(); // Get route (create it if not yet exists) let route = self .get_or_create_route_subscriber( @@ -179,11 +187,10 @@ impl<'a> RoutesMgr<'a> { iface.typ, entity.keyless, adapt_reader_qos_for_writer(&entity.qos), + true, ) .await?; - route - .add_local_node(node.into(), &config, &plugin_id, &entity.qos) - .await; + route.add_local_node(node, &plugin_id, &entity.qos).await; } UndiscoveredMsgSub(node, iface) => { @@ -209,9 +216,9 @@ impl<'a> RoutesMgr<'a> { let plugin_id = self.plugin_id.clone(); // Get route (create it if not yet exists) let route = self - .get_or_create_route_service_srv(iface.name, iface.typ) + .get_or_create_route_service_srv(iface.name, iface.typ, true) .await?; - route.add_local_node(node.into(), &plugin_id).await; + route.add_local_node(node, &plugin_id).await; } UndiscoveredServiceSrv(node, iface) => { if let Entry::Occupied(mut entry) = @@ -242,9 +249,9 @@ impl<'a> RoutesMgr<'a> { let plugin_id = self.plugin_id.clone(); // Get route (create it if not yet exists) let route = self - .get_or_create_route_service_cli(iface.name, iface.typ) + .get_or_create_route_service_cli(iface.name, iface.typ, true) .await?; - route.add_local_node(node.into(), &plugin_id).await; + route.add_local_node(node, &plugin_id).await; } UndiscoveredServiceCli(node, iface) => { if let Entry::Occupied(mut entry) = @@ -271,17 +278,69 @@ impl<'a> RoutesMgr<'a> { } } } - DiscoveredActionSrv(_node, iface) => { - log::info!("... TODO: create Action Server route for {}", iface.name); + DiscoveredActionSrv(node, iface) => { + let plugin_id = self.plugin_id.clone(); + // Get route (create it if not yet exists) + let route = self + .get_or_create_route_action_srv(iface.name, iface.typ) + .await?; + route.add_local_node(node, &plugin_id).await; } - UndiscoveredActionSrv(_node, iface) => { - log::info!("... TODO: delete Action Server route for {}", iface.name); + UndiscoveredActionSrv(node, iface) => { + if let Entry::Occupied(mut entry) = self.routes_action_srv.entry(iface.name.clone()) + { + let route = entry.get_mut(); + route.remove_local_node(&node); + if route.is_unused() { + self.admin_space + .remove(&(*KE_PREFIX_ROUTE_ACTION_SRV / iface.name_as_keyexpr())); + let route = entry.remove(); + // remove reader's and writer's GID in ros_discovery_msg + self.ros_discovery_mgr.remove_dds_readers( + route.dds_readers_guids().map_err(|e| { + format!("Failed to update ros_discovery_info message: {e}") + })?, + ); + self.ros_discovery_mgr.remove_dds_writers( + route.dds_writers_guids().map_err(|e| { + format!("Failed to update ros_discovery_info message: {e}") + })?, + ); + log::info!("{route} removed"); + } + } } - DiscoveredActionCli(_node, iface) => { - log::info!("... TODO: create Action Client route for {}", iface.name); + DiscoveredActionCli(node, iface) => { + let plugin_id = self.plugin_id.clone(); + // Get route (create it if not yet exists) + let route = self + .get_or_create_route_action_cli(iface.name, iface.typ) + .await?; + route.add_local_node(node, &plugin_id).await; } - UndiscoveredActionCli(_node, iface) => { - log::info!("... TODO: delete Action Client route for {}", iface.name); + UndiscoveredActionCli(node, iface) => { + if let Entry::Occupied(mut entry) = self.routes_action_cli.entry(iface.name.clone()) + { + let route = entry.get_mut(); + route.remove_local_node(&node); + if route.is_unused() { + self.admin_space + .remove(&(*KE_PREFIX_ROUTE_ACTION_CLI / iface.name_as_keyexpr())); + let route = entry.remove(); + // remove reader's and writer's GID in ros_discovery_msg + self.ros_discovery_mgr.remove_dds_readers( + route.dds_readers_guids().map_err(|e| { + format!("Failed to update ros_discovery_info message: {e}") + })?, + ); + self.ros_discovery_mgr.remove_dds_writers( + route.dds_writers_guids().map_err(|e| { + format!("Failed to update ros_discovery_info message: {e}") + })?, + ); + log::info!("{route} removed"); + } + } } } Ok(()) @@ -308,6 +367,7 @@ impl<'a> RoutesMgr<'a> { ros2_type, keyless, writer_qos, + true, ) .await?; route.add_remote_route(&plugin_id, &zenoh_key_expr); @@ -351,6 +411,7 @@ impl<'a> RoutesMgr<'a> { ros2_type, keyless, reader_qos, + true, ) .await?; route.add_remote_route(&plugin_id, &zenoh_key_expr); @@ -387,7 +448,7 @@ impl<'a> RoutesMgr<'a> { // On remote Service Server route announcement, prepare a Service Client route // with a associated DDS Reader/Writer allowing local ROS2 Nodes to discover it let route = self - .get_or_create_route_service_cli(format!("/{zenoh_key_expr}"), ros2_type) + .get_or_create_route_service_cli(format!("/{zenoh_key_expr}"), ros2_type, true) .await?; route.add_remote_route(&plugin_id, &zenoh_key_expr); } @@ -429,7 +490,7 @@ impl<'a> RoutesMgr<'a> { // On remote Service Client route announcement, prepare a Service Server route // with a associated DDS Reader/Writer allowing local ROS2 Nodes to discover it let route = self - .get_or_create_route_service_srv(format!("/{zenoh_key_expr}"), ros2_type) + .get_or_create_route_service_srv(format!("/{zenoh_key_expr}"), ros2_type, true) .await?; route.add_remote_route(&plugin_id, &zenoh_key_expr); } @@ -463,7 +524,89 @@ impl<'a> RoutesMgr<'a> { } } - _ => log::info!("... TODO: manage {event:?}"), + AnnouncedActionSrv { + plugin_id, + zenoh_key_expr, + ros2_type, + } => { + // On remote Action Server route announcement, prepare a Action Client route + // with a associated DDS Reader/Writer allowing local ROS2 Nodes to discover it + let route = self + .get_or_create_route_action_cli(format!("/{zenoh_key_expr}"), ros2_type) + .await?; + route.add_remote_route(&plugin_id, &zenoh_key_expr); + } + + RetiredActionSrv { + plugin_id, + zenoh_key_expr, + } => { + if let Entry::Occupied(mut entry) = + self.routes_action_cli.entry(format!("/{zenoh_key_expr}")) + { + let route = entry.get_mut(); + route.remove_remote_route(&plugin_id, &zenoh_key_expr); + if route.is_unused() { + self.admin_space + .remove(&(*KE_PREFIX_ROUTE_SERVICE_CLI / &zenoh_key_expr)); + let route = entry.remove(); + // remove reader's and writer's GID in ros_discovery_msg + self.ros_discovery_mgr.remove_dds_readers( + route.dds_readers_guids().map_err(|e| { + format!("Failed to update ros_discovery_info message: {e}") + })?, + ); + self.ros_discovery_mgr.remove_dds_writers( + route.dds_writers_guids().map_err(|e| { + format!("Failed to update ros_discovery_info message: {e}") + })?, + ); + log::info!("{route} removed"); + } + } + } + + AnnouncedActionCli { + plugin_id, + zenoh_key_expr, + ros2_type, + } => { + // On remote Action Client route announcement, prepare a Action Server route + // with a associated DDS Reader/Writer allowing local ROS2 Nodes to discover it + let route = self + .get_or_create_route_action_srv(format!("/{zenoh_key_expr}"), ros2_type) + .await?; + route.add_remote_route(&plugin_id, &zenoh_key_expr); + } + + RetiredActionCli { + plugin_id, + zenoh_key_expr, + } => { + if let Entry::Occupied(mut entry) = + self.routes_action_srv.entry(format!("/{zenoh_key_expr}")) + { + let route = entry.get_mut(); + route.remove_remote_route(&plugin_id, &zenoh_key_expr); + if route.is_unused() { + self.admin_space + .remove(&(*KE_PREFIX_ROUTE_SERVICE_SRV / &zenoh_key_expr)); + let route = entry.remove(); + // remove reader's and writer's GID in ros_discovery_msg + self.ros_discovery_mgr.remove_dds_readers( + route.dds_readers_guids().map_err(|e| { + format!("Failed to update ros_discovery_info message: {e}") + })?, + ); + self.ros_discovery_mgr.remove_dds_writers( + route.dds_writers_guids().map_err(|e| { + format!("Failed to update ros_discovery_info message: {e}") + })?, + ); + log::info!("{route} removed"); + } + } + } } Ok(()) } @@ -471,7 +614,7 @@ impl<'a> RoutesMgr<'a> { pub async fn query_historical_all_publications(&mut self, plugin_id: &keyexpr) { for route in self.routes_subscribers.values_mut() { route - .query_historical_publications(&plugin_id, self.config.queries_timeout) + .query_historical_publications(plugin_id, self.config.queries_timeout) .await; } } @@ -482,6 +625,7 @@ impl<'a> RoutesMgr<'a> { ros2_type: String, keyless: bool, reader_qos: Qos, + admin_space_ref: bool, ) -> Result<&mut RoutePublisher<'a>, String> { match self.routes_publishers.entry(ros2_name.clone()) { Entry::Vacant(entry) => { @@ -490,7 +634,7 @@ impl<'a> RoutesMgr<'a> { // create route let route = RoutePublisher::create( self.config.clone(), - &self.zsession, + self.zsession, self.participant, ros2_name.clone(), ros2_type, @@ -502,11 +646,6 @@ impl<'a> RoutesMgr<'a> { .await?; log::info!("{route} created"); - // insert reference in admin_space - let admin_ke = *KE_PREFIX_ROUTE_PUBLISHER / zenoh_key_expr; - self.admin_space - .insert(admin_ke, RouteRef::PublisherRoute(ros2_name)); - // insert reader's GID in ros_discovery_msg self.ros_discovery_mgr.add_dds_reader( route @@ -514,6 +653,13 @@ impl<'a> RoutesMgr<'a> { .map_err(|e| format!("Failed to update ros_discovery_info message: {e}"))?, ); + if admin_space_ref { + // insert reference in admin_space + let admin_ke = *KE_PREFIX_ROUTE_PUBLISHER / zenoh_key_expr; + self.admin_space + .insert(admin_ke, RouteRef::PublisherRoute(ros2_name)); + } + Ok(entry.insert(route)) } Entry::Occupied(entry) => Ok(entry.into_mut()), @@ -526,6 +672,7 @@ impl<'a> RoutesMgr<'a> { ros2_type: String, keyless: bool, writer_qos: Qos, + admin_space_ref: bool, ) -> Result<&mut RouteSubscriber<'a>, String> { match self.routes_subscribers.entry(ros2_name.clone()) { Entry::Vacant(entry) => { @@ -534,7 +681,7 @@ impl<'a> RoutesMgr<'a> { // create route let route = RouteSubscriber::create( self.config.clone(), - &self.zsession, + self.zsession, self.participant, ros2_name.clone(), ros2_type, @@ -545,11 +692,6 @@ impl<'a> RoutesMgr<'a> { .await?; log::info!("{route} created"); - // insert reference in admin_space - let admin_ke = *KE_PREFIX_ROUTE_SUBSCRIBER / zenoh_key_expr; - self.admin_space - .insert(admin_ke, RouteRef::SubscriberRoute(ros2_name)); - // insert writer's GID in ros_discovery_msg self.ros_discovery_mgr.add_dds_writer( route @@ -557,6 +699,13 @@ impl<'a> RoutesMgr<'a> { .map_err(|e| format!("Failed to update ros_discovery_info message: {e}"))?, ); + if admin_space_ref { + // insert reference in admin_space + let admin_ke = *KE_PREFIX_ROUTE_SUBSCRIBER / zenoh_key_expr; + self.admin_space + .insert(admin_ke, RouteRef::SubscriberRoute(ros2_name)); + } + Ok(entry.insert(route)) } Entry::Occupied(entry) => Ok(entry.into_mut()), @@ -567,6 +716,7 @@ impl<'a> RoutesMgr<'a> { &mut self, ros2_name: String, ros2_type: String, + admin_space_ref: bool, ) -> Result<&mut RouteServiceSrv<'a>, String> { match self.routes_service_srv.entry(ros2_name.clone()) { Entry::Vacant(entry) => { @@ -575,7 +725,7 @@ impl<'a> RoutesMgr<'a> { // create route let route = RouteServiceSrv::create( self.config.clone(), - &self.zsession, + self.zsession, self.participant, ros2_name.clone(), ros2_type, @@ -585,11 +735,6 @@ impl<'a> RoutesMgr<'a> { .await?; log::info!("{route} created"); - // insert reference in admin_space - let admin_ke = *KE_PREFIX_ROUTE_SERVICE_SRV / zenoh_key_expr; - self.admin_space - .insert(admin_ke, RouteRef::ServiceSrvRoute(ros2_name)); - // insert reader's and writer's GID in ros_discovery_msg self.ros_discovery_mgr.add_dds_reader( route @@ -602,6 +747,13 @@ impl<'a> RoutesMgr<'a> { .map_err(|e| format!("Failed to update ros_discovery_info message: {e}"))?, ); + if admin_space_ref { + // insert reference in admin_space + let admin_ke = *KE_PREFIX_ROUTE_SERVICE_SRV / zenoh_key_expr; + self.admin_space + .insert(admin_ke, RouteRef::ServiceSrvRoute(ros2_name)); + } + Ok(entry.insert(route)) } Entry::Occupied(entry) => Ok(entry.into_mut()), @@ -612,6 +764,7 @@ impl<'a> RoutesMgr<'a> { &mut self, ros2_name: String, ros2_type: String, + admin_space_ref: bool, ) -> Result<&mut RouteServiceCli<'a>, String> { match self.routes_service_cli.entry(ros2_name.clone()) { Entry::Vacant(entry) => { @@ -620,7 +773,7 @@ impl<'a> RoutesMgr<'a> { // create route let route = RouteServiceCli::create( self.config.clone(), - &self.zsession, + self.zsession, self.participant, ros2_name.clone(), ros2_type, @@ -630,11 +783,6 @@ impl<'a> RoutesMgr<'a> { .await?; log::info!("{route} created"); - // insert reference in admin_space - let admin_ke = *KE_PREFIX_ROUTE_SERVICE_CLI / zenoh_key_expr; - self.admin_space - .insert(admin_ke, RouteRef::ServiceCliRoute(ros2_name)); - // insert reader's and writer's GID in ros_discovery_msg self.ros_discovery_mgr.add_dds_reader( route @@ -647,6 +795,101 @@ impl<'a> RoutesMgr<'a> { .map_err(|e| format!("Failed to update ros_discovery_info message: {e}"))?, ); + if admin_space_ref { + // insert reference in admin_space + let admin_ke = *KE_PREFIX_ROUTE_SERVICE_CLI / zenoh_key_expr; + self.admin_space + .insert(admin_ke, RouteRef::ServiceCliRoute(ros2_name)); + } + + Ok(entry.insert(route)) + } + Entry::Occupied(entry) => Ok(entry.into_mut()), + } + } + + async fn get_or_create_route_action_srv( + &mut self, + ros2_name: String, + ros2_type: String, + ) -> Result<&mut RouteActionSrv<'a>, String> { + match self.routes_action_srv.entry(ros2_name.clone()) { + Entry::Vacant(entry) => { + // ROS2 topic name => Zenoh key expr : strip '/' prefix + let zenoh_key_expr = ke_for_sure!(&ros2_name[1..]); + // create route + let route = RouteActionSrv::create( + self.config.clone(), + self.zsession, + self.participant, + ros2_name.clone(), + ros2_type, + zenoh_key_expr.to_owned(), + ) + .await?; + log::info!("{route} created"); + + // insert readers' and writes' GID in ros_discovery_msg + self.ros_discovery_mgr.add_dds_readers( + route + .dds_readers_guids() + .map_err(|e| format!("Failed to update ros_discovery_info message: {e}"))?, + ); + self.ros_discovery_mgr.add_dds_writers( + route + .dds_writers_guids() + .map_err(|e| format!("Failed to update ros_discovery_info message: {e}"))?, + ); + + // insert reference in admin_space + let admin_ke = *KE_PREFIX_ROUTE_ACTION_SRV / zenoh_key_expr; + self.admin_space + .insert(admin_ke, RouteRef::ActionSrvRoute(ros2_name)); + + Ok(entry.insert(route)) + } + Entry::Occupied(entry) => Ok(entry.into_mut()), + } + } + + async fn get_or_create_route_action_cli( + &mut self, + ros2_name: String, + ros2_type: String, + ) -> Result<&mut RouteActionCli<'a>, String> { + match self.routes_action_cli.entry(ros2_name.clone()) { + Entry::Vacant(entry) => { + // ROS2 topic name => Zenoh key expr : strip '/' prefix + let zenoh_key_expr = ke_for_sure!(&ros2_name[1..]); + // create route + let route = RouteActionCli::create( + self.config.clone(), + self.zsession, + self.participant, + ros2_name.clone(), + ros2_type, + zenoh_key_expr.to_owned(), + ) + .await?; + log::info!("{route} created"); + + // insert readers' and writes' GID in ros_discovery_msg + self.ros_discovery_mgr.add_dds_readers( + route + .dds_readers_guids() + .map_err(|e| format!("Failed to update ros_discovery_info message: {e}"))?, + ); + self.ros_discovery_mgr.add_dds_writers( + route + .dds_writers_guids() + .map_err(|e| format!("Failed to update ros_discovery_info message: {e}"))?, + ); + + // insert reference in admin_space + let admin_ke = *KE_PREFIX_ROUTE_ACTION_CLI / zenoh_key_expr; + self.admin_space + .insert(admin_ke, RouteRef::ActionCliRoute(ros2_name)); + Ok(entry.insert(route)) } Entry::Occupied(entry) => Ok(entry.into_mut()), @@ -685,7 +928,7 @@ impl<'a> RoutesMgr<'a> { async fn send_admin_reply(&self, query: &Query, key_expr: &keyexpr, route_ref: &RouteRef) { match self.get_entity_json_value(route_ref) { Ok(Some(v)) => { - let admin_keyexpr = &self.admin_prefix / &key_expr; + let admin_keyexpr = &self.admin_prefix / key_expr; if let Err(e) = query .reply(Ok(Sample::new(admin_keyexpr, v))) .res_async() @@ -726,6 +969,16 @@ impl<'a> RoutesMgr<'a> { .get(ke) .map(serde_json::to_value) .transpose(), + RouteRef::ActionSrvRoute(ke) => self + .routes_action_srv + .get(ke) + .map(serde_json::to_value) + .transpose(), + RouteRef::ActionCliRoute(ke) => self + .routes_action_cli + .get(ke) + .map(serde_json::to_value) + .transpose(), } } }