From b4089efccade451858b3c059cb9ae269a27b7f3e Mon Sep 17 00:00:00 2001 From: Julien Enoch Date: Mon, 9 Oct 2023 11:26:34 +0200 Subject: [PATCH 1/2] Add ignore_local QoS to services/actions --- zenoh-plugin-ros2dds/src/ros2_utils.rs | 32 +- zenoh-plugin-ros2dds/src/route_action_cli.rs | 8 +- zenoh-plugin-ros2dds/src/route_action_srv.rs | 8 +- .../src/route_publisher.rs.BAK | 536 +++++++++++++++++ .../src/route_publisher.rs.BAK2 | 556 ++++++++++++++++++ zenoh-plugin-ros2dds/src/route_service_cli.rs | 15 +- zenoh-plugin-ros2dds/src/route_service_srv.rs | 15 +- 7 files changed, 1134 insertions(+), 36 deletions(-) create mode 100644 zenoh-plugin-ros2dds/src/route_publisher.rs.BAK create mode 100644 zenoh-plugin-ros2dds/src/route_publisher.rs.BAK2 diff --git a/zenoh-plugin-ros2dds/src/ros2_utils.rs b/zenoh-plugin-ros2dds/src/ros2_utils.rs index ba92ec5..b9ccd5a 100644 --- a/zenoh-plugin-ros2dds/src/ros2_utils.rs +++ b/zenoh-plugin-ros2dds/src/ros2_utils.rs @@ -18,7 +18,7 @@ use cyclors::{ dds_entity_t, qos::{ Durability, DurabilityKind, History, HistoryKind, Qos, Reliability, ReliabilityKind, - TypeConsistency, TypeConsistencyKind, WriterDataLifecycle, DDS_INFINITE_TIME, + TypeConsistency, TypeConsistencyKind, WriterDataLifecycle, DDS_INFINITE_TIME, IgnoreLocal, IgnoreLocalKind, }, }; use zenoh::prelude::{keyexpr, KeyExpr}; @@ -35,9 +35,9 @@ lazy_static::lazy_static!( pub static ref KE_SUFFIX_ACTION_FEEDBACK: &'static keyexpr = ke_for_sure!("_action/feedback"); pub static ref KE_SUFFIX_ACTION_STATUS: &'static keyexpr = ke_for_sure!("_action/status"); - pub static ref QOS_ACTION_FEEDBACK: Qos = ros2_action_feedback_default_qos(); - pub static ref QOS_ACTION_STATUS: Qos = ros2_action_status_default_qos(); - + pub static ref QOS_DEFAULT_SERVICE: Qos = ros2_service_default_qos(); + pub static ref QOS_DEFAULT_ACTION_FEEDBACK: Qos = ros2_action_feedback_default_qos(); + pub static ref QOS_DEFAULT_ACTION_STATUS: Qos = ros2_action_status_default_qos(); ); /// Convert DDS Topic type to ROS2 Message type @@ -95,6 +95,24 @@ pub fn dds_type_to_ros2_action_type(dds_topic: &str) -> String { ) } +fn ros2_service_default_qos() -> Qos { + // Default Service QoS copied from: + // https://github.com/ros2/rmw/blob/83445be486deae8c78d275e092eafb4bf380bd49/rmw/include/rmw/qos_profiles.h#L64C44-L64C44 + let mut qos = Qos::default(); + qos.history = Some(History { + kind: HistoryKind::KEEP_LAST, + depth: 10, + }); + qos.reliability = Some(Reliability { + kind: ReliabilityKind::RELIABLE, + max_blocking_time: DDS_INFINITE_TIME, + }); + // Add ignore_local to avoid loops + qos.ignore_local = Some(IgnoreLocal { kind: IgnoreLocalKind::PARTICIPANT}); + qos +} + + fn ros2_action_feedback_default_qos() -> Qos { let mut qos = Qos::default(); qos.history = Some(History { @@ -117,10 +135,14 @@ fn ros2_action_feedback_default_qos() -> Qos { prevent_type_widening: false, force_type_validation: false, }); + // Add ignore_local to avoid loops + qos.ignore_local = Some(IgnoreLocal { kind: IgnoreLocalKind::PARTICIPANT}); qos } fn ros2_action_status_default_qos() -> Qos { + // Default Status topic QoS copied from: + // https://github.com/ros2/rcl/blob/8f7f4f0804a34ee9d9ecd2d7e75a57ce2b7ced5d/rcl_action/include/rcl_action/default_qos.h#L30 let mut qos = Qos::default(); qos.durability = Some(Durability { kind: DurabilityKind::TRANSIENT_LOCAL, @@ -141,6 +163,8 @@ fn ros2_action_status_default_qos() -> Qos { prevent_type_widening: false, force_type_validation: false, }); + // Add ignore_local to avoid loops + qos.ignore_local = Some(IgnoreLocal { kind: IgnoreLocalKind::PARTICIPANT}); qos } diff --git a/zenoh-plugin-ros2dds/src/route_action_cli.rs b/zenoh-plugin-ros2dds/src/route_action_cli.rs index 364fc05..d7a86e8 100644 --- a/zenoh-plugin-ros2dds/src/route_action_cli.rs +++ b/zenoh-plugin-ros2dds/src/route_action_cli.rs @@ -107,7 +107,7 @@ impl RouteActionCli<'_> { format!("{ros2_type}_FeedbackMessage"), &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_FEEDBACK, true, - QOS_ACTION_FEEDBACK.clone(), + QOS_DEFAULT_ACTION_FEEDBACK.clone(), context, ) .await?; @@ -117,7 +117,7 @@ impl RouteActionCli<'_> { ROS2_ACTION_STATUS_MSG_TYPE.to_string(), &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_STATUS, true, - QOS_ACTION_STATUS.clone(), + QOS_DEFAULT_ACTION_STATUS.clone(), context, ) .await?; @@ -232,9 +232,9 @@ impl RouteActionCli<'_> { self.route_cancel_goal.add_local_node(node.clone()), self.route_get_result.add_local_node(node.clone()), self.route_feedback - .add_local_node(node.clone(), &QOS_ACTION_FEEDBACK), + .add_local_node(node.clone(), &QOS_DEFAULT_ACTION_FEEDBACK), self.route_status - .add_local_node(node.clone(), &QOS_ACTION_STATUS), + .add_local_node(node.clone(), &QOS_DEFAULT_ACTION_STATUS), ); self.local_nodes.insert(node); diff --git a/zenoh-plugin-ros2dds/src/route_action_srv.rs b/zenoh-plugin-ros2dds/src/route_action_srv.rs index b72965a..ce7a7ae 100644 --- a/zenoh-plugin-ros2dds/src/route_action_srv.rs +++ b/zenoh-plugin-ros2dds/src/route_action_srv.rs @@ -107,7 +107,7 @@ impl RouteActionSrv<'_> { &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_FEEDBACK, &None, true, - QOS_ACTION_FEEDBACK.clone(), + QOS_DEFAULT_ACTION_FEEDBACK.clone(), context, ) .await?; @@ -118,7 +118,7 @@ impl RouteActionSrv<'_> { &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_STATUS, &None, true, - QOS_ACTION_STATUS.clone(), + QOS_DEFAULT_ACTION_STATUS.clone(), context, ) .await?; @@ -233,9 +233,9 @@ impl RouteActionSrv<'_> { self.route_cancel_goal.add_local_node(node.clone()), self.route_get_result.add_local_node(node.clone()), self.route_feedback - .add_local_node(node.clone(), &QOS_ACTION_FEEDBACK), + .add_local_node(node.clone(), &QOS_DEFAULT_ACTION_FEEDBACK), self.route_status - .add_local_node(node.clone(), &QOS_ACTION_STATUS), + .add_local_node(node.clone(), &QOS_DEFAULT_ACTION_STATUS), ); self.local_nodes.insert(node); diff --git a/zenoh-plugin-ros2dds/src/route_publisher.rs.BAK b/zenoh-plugin-ros2dds/src/route_publisher.rs.BAK new file mode 100644 index 0000000..460b857 --- /dev/null +++ b/zenoh-plugin-ros2dds/src/route_publisher.rs.BAK @@ -0,0 +1,536 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use cyclors::qos::{HistoryKind, Qos}; +use cyclors::DDS_LENGTH_UNLIMITED; +use serde::{Serialize, Serializer}; +use std::ops::Deref; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::time::Duration; +use std::{collections::HashSet, fmt}; +use zenoh::liveliness::LivelinessToken; +use zenoh::prelude::r#async::AsyncResolve; +use zenoh::prelude::*; +use zenoh::publication::{Publisher, MatchingStatus}; +use zenoh_core::SyncResolve; +use zenoh_ext::{PublicationCache, SessionExt}; + +use crate::dds_types::{DDSRawSample, TypeInfo}; +use crate::dds_utils::{ + create_dds_reader, delete_dds_entity, get_guid, serialize_atomic_entity_guid, AtomicDDSEntity, + DDS_ENTITY_NULL, +}; +use crate::liveliness_mgt::new_ke_liveliness_pub; +use crate::ros2_utils::{is_message_for_action, ros2_message_type_to_dds_type}; +use crate::routes_mgr::Context; +use crate::{qos_helpers::*, Config}; +use crate::{KE_PREFIX_PUB_CACHE, LOG_PAYLOAD}; + +pub struct ZPublisher<'a> { + publisher: Publisher<'static>, + _cache: Option>, + cache_size: usize, +} + +impl<'a> Deref for ZPublisher<'a> { + type Target = Publisher<'static>; + + fn deref(&self) -> &Self::Target { + &self.publisher + } +} + +// a route from DDS to Zenoh +#[allow(clippy::upper_case_acronyms)] +#[derive(Serialize)] +pub struct RoutePublisher<'a> { + // the ROS2 Publisher name + ros2_name: String, + // the ROS2 type + ros2_type: String, + // the Zenoh key expression used for routing + zenoh_key_expr: OwnedKeyExpr, + // the context + #[serde(skip)] + context: Arc, + // the zenoh publisher used to re-publish to zenoh the data received by the DDS Reader + // `None` when route is created on a remote announcement and no local ROS2 Subscriber discovered yet + #[serde( + rename = "publication_cache_size", + serialize_with = "serialize_pub_cache" + )] + zenoh_publisher: ZPublisher<'a>, + // the local DDS Reader created to serve the route (i.e. re-publish to zenoh data coming from DDS) + #[serde(serialize_with = "serialize_atomic_entity_guid")] + dds_reader: Arc, + // the listener of matching subscriptions - when triggeres, create or remove dds_reader + #[serde(skip)] + matching_listener: zenoh::publication::MatchingListener<'a, ()> , + // TypeInfo for Reader creation (if available) + #[serde(skip)] + type_info: Option>, + // if the topic is keyless + #[serde(skip)] + keyless: bool, + // the QoS for the DDS Reader to be created. + // those are either the QoS announced by a remote bridge on a Reader discovery, + // either the QoS adapted from a local disovered Writer + #[serde(skip)] + reader_qos: Qos, + // a liveliness token associated to this route, for announcement to other plugins + #[serde(skip)] + liveliness_token: Option>, + // the list of remote routes served by this route (":"") + remote_routes: HashSet, + // the list of nodes served by this route + local_nodes: HashSet, +} + +impl Drop for RoutePublisher<'_> { + fn drop(&mut self) { + self.deactivate_dds_reader(); + } +} + +impl fmt::Display for RoutePublisher<'_> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "Route Publisher (ROS:{} -> Zenoh:{})", + self.ros2_name, self.zenoh_key_expr + ) + } +} + +impl RoutePublisher<'_> { + #[allow(clippy::too_many_arguments)] + pub async fn create<'a>( + ros2_name: String, + ros2_type: String, + zenoh_key_expr: OwnedKeyExpr, + type_info: &Option>, + keyless: bool, + reader_qos: Qos, + context: Arc, + ) -> Result, String> { + log::debug!( + "Route Publisher ({ros2_name} -> {zenoh_key_expr}): creation with type {ros2_type}" + ); + + let contextA = context.clone(); + + // create the zenoh Publisher + // if Reader shall be TRANSIENT_LOCAL, use a PublicationCache to store historical data + let transient_local = is_transient_local(&reader_qos); + let (cache, cache_size) = if transient_local { + #[allow(non_upper_case_globals)] + let history_qos = get_history_or_default(&reader_qos); + let durability_service_qos = get_durability_service_or_default(&reader_qos); + let mut history = match (history_qos.kind, history_qos.depth) { + (HistoryKind::KEEP_LAST, n) => { + if keyless { + // only 1 instance => history=n + n as usize + } else if durability_service_qos.max_instances == DDS_LENGTH_UNLIMITED { + // No limit! => history=MAX + usize::MAX + } else if durability_service_qos.max_instances > 0 { + // Compute cache size as history.depth * durability_service.max_instances + // This makes the assumption that the frequency of publication is the same for all instances... + // But as we have no way to have 1 cache per-instance, there is no other choice. + n.saturating_mul(durability_service_qos.max_instances) as usize + } else { + n as usize + } + } + (HistoryKind::KEEP_ALL, _) => usize::MAX, + }; + // In case there are several Writers served by this route, increase the cache size + history = history.saturating_mul(context.config.transient_local_cache_multiplier); + log::debug!( + "Route Publisher ({ros2_name} -> {zenoh_key_expr}): caching TRANSIENT_LOCAL publications via a PublicationCache with history={history} (computed from Reader's QoS: history=({:?},{}), durability_service.max_instances={})", + history_qos.kind, history_qos.depth, durability_service_qos.max_instances + ); + ( + Some( + context + .zsession + .declare_publication_cache(&zenoh_key_expr) + .history(history) + .queryable_prefix(*KE_PREFIX_PUB_CACHE / &context.plugin_id) + .queryable_allowed_origin(Locality::Remote) // Note: don't reply to queries from local QueryingSubscribers + .res_async() + .await + .map_err(|e| { + format!("Failed create PublicationCache for key {zenoh_key_expr}: {e}",) + })?, + ), + history, + ) + } else { + (None, 0) + }; + + // CongestionControl to be used when re-publishing over zenoh: Blocking if Writer is RELIABLE (since we don't know what is remote Reader's QoS) + let congestion_ctrl = match ( + context.config.reliable_routes_blocking, + is_reliable(&reader_qos), + ) { + (true, true) => CongestionControl::Block, + _ => CongestionControl::Drop, + }; + + let publisher: Publisher<'static> = context + .zsession + .declare_publisher(zenoh_key_expr.clone()) + .congestion_control(congestion_ctrl) + .res_async() + .await + .map_err(|e| format!("Failed create Publisher for key {zenoh_key_expr}: {e}",))?; + + // let route = RoutePublisher { + // ros2_name, + // ros2_type, + // zenoh_key_expr, + // context: context.clone(), + // zenoh_publisher: ZPublisher { + // publisher, + // _cache: cache, + // cache_size, + // }, + // dds_reader: Arc::new(DDS_ENTITY_NULL.into()), + // type_info: type_info.clone(), + // reader_qos, + // keyless, + // liveliness_token: None, + // remote_routes: HashSet::new(), + // local_nodes: HashSet::new(), + // }; + + // activate/deactivate DDS Reader on detection/undetection of matching Subscribers + // (copy/move all required args for the callback) + let dds_reader: Arc = Arc::new(DDS_ENTITY_NULL.into()); + + // let dds_reader = route.dds_reader.clone(); + // let ros2_name = route.ros2_name.clone(); + // let ros2_type = route.ros2_type.clone(); + // let zenoh_key_expr = route.zenoh_key_expr.clone(); + // let route_id = route.to_string(); + // let context = route.context.clone(); + // let reader_qos = route.reader_qos.clone(); + // let type_info = route.type_info.clone(); + // let publisher = route.zenoh_publisher.publisher.clone(); + // let publisher2 = route.zenoh_publisher.publisher.clone(); + + let matching_listener: zenoh::publication::MatchingListener<'_, ()> = { + let dds_reader = dds_reader.clone(); + let ros2_name = ros2_name.clone(); + let ros2_type = ros2_type.clone(); + let zenoh_key_expr = zenoh_key_expr.clone(); + let route_id = format!("Route Publisher (ROS:{ros2_name} -> Zenoh:{zenoh_key_expr})"); + let context2 = context.clone(); + let reader_qos = reader_qos.clone(); + let type_info = type_info.clone(); + let publisher2: Publisher<'static> = publisher.clone(); + let publisher3: Publisher<'static> = publisher2.clone(); + + publisher2.matching_listener().callback( + move |status| { + if status.is_matching() { + if let Err(e) = activate_dds_reader( + &dds_reader, + &ros2_name, + &ros2_type, + &zenoh_key_expr, + &route_id, + &context2, + keyless, + &reader_qos, + &type_info, + publisher3.clone()) + { + log::error!("{route_id}: failed to activate DDS Reader: {e}"); + } + } else { + deactivate_dds_reader(&dds_reader, &route_id, &context) + } + } + ).res_async().await + .map_err(|e| format!("Failed to lisetn of matchibng status changes: {e}",))? + } ; + + // Ok(route) + + Ok(RoutePublisher { + ros2_name, + ros2_type, + zenoh_key_expr, + context: context, + zenoh_publisher: ZPublisher { + publisher, + _cache: cache, + cache_size, + }, + dds_reader, + matching_listener, + type_info: type_info.clone(), + reader_qos, + keyless, + liveliness_token: None, + remote_routes: HashSet::new(), + local_nodes: HashSet::new(), + }) + } + + fn activate_dds_reader(&mut self) -> Result<(), String> { + let topic_name = format!("rt{}", self.ros2_name); + let type_name = ros2_message_type_to_dds_type(&self.ros2_type); + let read_period = get_read_period(&self.context.config, &self.zenoh_key_expr); + let route_id = self.to_string(); + let publisher = self.zenoh_publisher.deref().clone(); + + // create matching DDS Reader that forwards data coming from DDS to Zenoh + let dds_reader = create_dds_reader( + self.context.participant, + topic_name, + type_name, + &self.type_info, + self.keyless, + self.reader_qos.clone(), + read_period, + move |sample: &DDSRawSample| { + do_route_message( + sample, &publisher, // &ke, + &route_id, + ); + }, + )?; + let old = self.dds_reader.swap(dds_reader, Ordering::Relaxed); + if old != DDS_ENTITY_NULL { + if let Err(e) = delete_dds_entity(old) { + log::warn!("{self}: failed to delete overwritten DDS Reader: {e}"); + } + } + + // add reader's GID in ros_discovery_info message + self.context + .ros_discovery_mgr + .add_dds_reader(get_guid(&dds_reader)?); + + Ok(()) + } + + fn deactivate_dds_reader(&mut self) { + let dds_reader = self.dds_reader.swap(DDS_ENTITY_NULL, Ordering::Relaxed); + if dds_reader != DDS_ENTITY_NULL { + // remove reader's GID from ros_discovery_info message + match get_guid(&dds_reader) { + Ok(gid) => self.context.ros_discovery_mgr.remove_dds_reader(gid), + Err(e) => log::warn!("{self}: {e}"), + } + if let Err(e) = delete_dds_entity(dds_reader) { + log::warn!("{}: error deleting DDS Reader: {}", self, e); + } + } + } + + async fn announce_route(&mut self, discovered_writer_qos: &Qos) -> Result<(), String> { + // only if not for an Action (since actions declare their own liveliness) + if !is_message_for_action(&self.ros2_name) { + // create associated LivelinessToken + let liveliness_ke = new_ke_liveliness_pub( + &self.context.plugin_id, + &self.zenoh_key_expr, + &self.ros2_type, + self.keyless, + discovered_writer_qos, + )?; + let ros2_name = self.ros2_name.clone(); + self.liveliness_token = Some(self.context.zsession + .liveliness() + .declare_token(liveliness_ke) + .res_async() + .await + .map_err(|e| { + format!( + "Failed create LivelinessToken associated to route for Publisher {ros2_name}: {e}" + ) + })? + ); + } + Ok(()) + } + + fn retire_route(&mut self) { + self.liveliness_token = None; + } + + #[inline] + pub fn add_remote_route(&mut self, plugin_id: &str, zenoh_key_expr: &keyexpr) { + self.remote_routes + .insert(format!("{plugin_id}:{zenoh_key_expr}")); + log::debug!("{self} now serving remote routes {:?}", self.remote_routes); + // if 1st remote route added, activate the DDS Reader + if self.remote_routes.len() == 1 { + if let Err(e) = self.activate_dds_reader() { + log::error!("{self} activation of DDS Reader failed: {e}"); + } + } + } + + #[inline] + pub fn remove_remote_route(&mut self, plugin_id: &str, zenoh_key_expr: &keyexpr) { + self.remote_routes + .remove(&format!("{plugin_id}:{zenoh_key_expr}")); + log::debug!("{self} now serving remote routes {:?}", self.remote_routes); + // if last remote route removed, deactivate the DDS Reader + if self.remote_routes.is_empty() { + self.deactivate_dds_reader(); + } + } + + #[inline] + pub fn is_serving_remote_route(&self) -> bool { + !self.remote_routes.is_empty() + } + + #[inline] + pub async fn add_local_node(&mut self, node: String, discovered_writer_qos: &Qos) { + self.local_nodes.insert(node); + log::debug!("{self} now serving local nodes {:?}", self.local_nodes); + // if 1st local node added, announce the route + if self.local_nodes.len() == 1 { + if let Err(e) = self.announce_route(discovered_writer_qos).await { + log::error!("{self} announcement failed: {e}"); + } + } + } + + #[inline] + pub fn remove_local_node(&mut self, node: &str) { + self.local_nodes.remove(node); + log::debug!("{self} now serving local nodes {:?}", self.local_nodes); + // if last local node removed, retire the route + if self.local_nodes.is_empty() { + self.retire_route(); + } + } + + #[inline] + pub fn is_serving_local_node(&self) -> bool { + !self.local_nodes.is_empty() + } + + #[inline] + pub fn is_unused(&self) -> bool { + !self.is_serving_local_node() && !self.is_serving_remote_route() + } +} + +pub fn serialize_pub_cache(zpub: &ZPublisher, s: S) -> Result +where + S: Serializer, +{ + s.serialize_u64(zpub.cache_size as u64) +} + +// Return the read period if keyexpr matches one of the "pub_max_frequencies" option +fn get_read_period(config: &Config, ke: &keyexpr) -> Option { + for (re, freq) in &config.pub_max_frequencies { + if re.is_match(ke) { + return Some(Duration::from_secs_f32(1f32 / freq)); + } + } + None +} + +fn do_route_message(sample: &DDSRawSample, publisher: &Publisher, route_id: &str) { + if *LOG_PAYLOAD { + log::trace!("{route_id}: routing message - payload: {:02x?}", sample); + } else { + log::trace!("{route_id}: routing message - {} bytes", sample.len()); + } + if let Err(e) = publisher.put(sample).res_sync() { + log::error!("{route_id}: failed to route message: {e}"); + } +} + + +/////// +/// //// +/// //// + +fn activate_dds_reader( + dds_reader: &Arc, + ros2_name: &str, + ros2_type: &str, + zenoh_key_expr: &OwnedKeyExpr, + route_id: &str, + context: &Context, + keyless: bool, + reader_qos: &Qos, + type_info: &Option>, + zenoh_publisher: Publisher<'static> +) -> Result<(), String> { + let topic_name = format!("rt{}", ros2_name); + let type_name = ros2_message_type_to_dds_type(ros2_type); + let read_period = get_read_period(&context.config, zenoh_key_expr); + let route_id = route_id.to_string(); + let publisher: Publisher<'static> = zenoh_publisher.clone(); + + // create matching DDS Reader that forwards data coming from DDS to Zenoh + let reader = create_dds_reader( + context.participant, + topic_name, + type_name, + type_info, + keyless, + reader_qos.clone(), + read_period, + move |sample: &DDSRawSample| { + do_route_message( + sample, &publisher, // &ke, + &route_id, + ); + }, + )?; + let old = dds_reader.deref().swap(reader, Ordering::Relaxed); + // add reader's GID in ros_discovery_info message + context + .ros_discovery_mgr + .add_dds_reader(get_guid(&reader)?); + + if old != DDS_ENTITY_NULL { + if let Err(e) = delete_dds_entity(old) { + log::warn!("{route_id}: failed to delete overwritten DDS Reader: {e}"); + } + } + + Ok(()) +} + +fn deactivate_dds_reader(dds_reader: &Arc, route_id: &str, context: &Context) { + let reader = dds_reader.swap(DDS_ENTITY_NULL, Ordering::Relaxed); + if reader != DDS_ENTITY_NULL { + // remove reader's GID from ros_discovery_info message + match get_guid(&reader) { + Ok(gid) => context.ros_discovery_mgr.remove_dds_reader(gid), + Err(e) => log::warn!("{route_id}: {e}"), + } + if let Err(e) = delete_dds_entity(reader) { + log::warn!("{route_id}: error deleting DDS Reader: {e}"); + } + } +} diff --git a/zenoh-plugin-ros2dds/src/route_publisher.rs.BAK2 b/zenoh-plugin-ros2dds/src/route_publisher.rs.BAK2 new file mode 100644 index 0000000..fe70dba --- /dev/null +++ b/zenoh-plugin-ros2dds/src/route_publisher.rs.BAK2 @@ -0,0 +1,556 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use cyclors::qos::{HistoryKind, Qos}; +use cyclors::DDS_LENGTH_UNLIMITED; +use serde::{Serialize, Serializer}; +use std::ops::Deref; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::time::Duration; +use std::{collections::HashSet, fmt}; +use zenoh::liveliness::LivelinessToken; +use zenoh::prelude::r#async::AsyncResolve; +use zenoh::prelude::*; +use zenoh::publication::{Publisher, MatchingStatus, matching_listener_for}; +use zenoh_core::SyncResolve; +use zenoh_ext::{PublicationCache, SessionExt}; + +use crate::dds_types::{DDSRawSample, TypeInfo}; +use crate::dds_utils::{ + create_dds_reader, delete_dds_entity, get_guid, serialize_atomic_entity_guid, AtomicDDSEntity, + DDS_ENTITY_NULL, +}; +use crate::liveliness_mgt::new_ke_liveliness_pub; +use crate::ros2_utils::{is_message_for_action, ros2_message_type_to_dds_type}; +use crate::routes_mgr::Context; +use crate::{qos_helpers::*, Config}; +use crate::{KE_PREFIX_PUB_CACHE, LOG_PAYLOAD}; + +pub struct ZPublisher<'a> { + publisher: Publisher<'static>, + _cache: Option>, + cache_size: usize, +} + +impl<'a> Deref for ZPublisher<'a> { + type Target = Publisher<'static>; + + fn deref(&self) -> &Self::Target { + &self.publisher + } +} + +// a route from DDS to Zenoh +#[allow(clippy::upper_case_acronyms)] +#[derive(Serialize)] +pub struct RoutePublisher<'a> { + // the ROS2 Publisher name + ros2_name: String, + // the ROS2 type + ros2_type: String, + // the Zenoh key expression used for routing + zenoh_key_expr: OwnedKeyExpr, + // the context + #[serde(skip)] + context: Arc, + // the zenoh publisher used to re-publish to zenoh the data received by the DDS Reader + // `None` when route is created on a remote announcement and no local ROS2 Subscriber discovered yet + #[serde( + rename = "publication_cache_size", + serialize_with = "serialize_pub_cache" + )] + zenoh_publisher: ZPublisher<'a>, + // the local DDS Reader created to serve the route (i.e. re-publish to zenoh data coming from DDS) + #[serde(serialize_with = "serialize_atomic_entity_guid")] + dds_reader: Arc, + // the listener of matching subscriptions - when triggeres, create or remove dds_reader + #[serde(skip)] + matching_listener: zenoh::publication::MatchingListener<'a, ()> , + // TypeInfo for Reader creation (if available) + #[serde(skip)] + type_info: Option>, + // if the topic is keyless + #[serde(skip)] + keyless: bool, + // the QoS for the DDS Reader to be created. + // those are either the QoS announced by a remote bridge on a Reader discovery, + // either the QoS adapted from a local disovered Writer + #[serde(skip)] + reader_qos: Qos, + // a liveliness token associated to this route, for announcement to other plugins + #[serde(skip)] + liveliness_token: Option>, + // the list of remote routes served by this route (":"") + remote_routes: HashSet, + // the list of nodes served by this route + local_nodes: HashSet, +} + +impl Drop for RoutePublisher<'_> { + fn drop(&mut self) { + self.deactivate_dds_reader(); + } +} + +impl fmt::Display for RoutePublisher<'_> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "Route Publisher (ROS:{} -> Zenoh:{})", + self.ros2_name, self.zenoh_key_expr + ) + } +} + +impl RoutePublisher<'_> { + #[allow(clippy::too_many_arguments)] + pub async fn create<'a>( + ros2_name: String, + ros2_type: String, + zenoh_key_expr: OwnedKeyExpr, + type_info: &Option>, + keyless: bool, + reader_qos: Qos, + context: Arc, + ) -> Result, String> { + log::debug!( + "Route Publisher ({ros2_name} -> {zenoh_key_expr}): creation with type {ros2_type}" + ); + + // create the zenoh Publisher + // if Reader shall be TRANSIENT_LOCAL, use a PublicationCache to store historical data + let transient_local = is_transient_local(&reader_qos); + let (cache, cache_size) = if transient_local { + #[allow(non_upper_case_globals)] + let history_qos = get_history_or_default(&reader_qos); + let durability_service_qos = get_durability_service_or_default(&reader_qos); + let mut history = match (history_qos.kind, history_qos.depth) { + (HistoryKind::KEEP_LAST, n) => { + if keyless { + // only 1 instance => history=n + n as usize + } else if durability_service_qos.max_instances == DDS_LENGTH_UNLIMITED { + // No limit! => history=MAX + usize::MAX + } else if durability_service_qos.max_instances > 0 { + // Compute cache size as history.depth * durability_service.max_instances + // This makes the assumption that the frequency of publication is the same for all instances... + // But as we have no way to have 1 cache per-instance, there is no other choice. + n.saturating_mul(durability_service_qos.max_instances) as usize + } else { + n as usize + } + } + (HistoryKind::KEEP_ALL, _) => usize::MAX, + }; + // In case there are several Writers served by this route, increase the cache size + history = history.saturating_mul(context.config.transient_local_cache_multiplier); + log::debug!( + "Route Publisher ({ros2_name} -> {zenoh_key_expr}): caching TRANSIENT_LOCAL publications via a PublicationCache with history={history} (computed from Reader's QoS: history=({:?},{}), durability_service.max_instances={})", + history_qos.kind, history_qos.depth, durability_service_qos.max_instances + ); + ( + Some( + context + .zsession + .declare_publication_cache(&zenoh_key_expr) + .history(history) + .queryable_prefix(*KE_PREFIX_PUB_CACHE / &context.plugin_id) + .queryable_allowed_origin(Locality::Remote) // Note: don't reply to queries from local QueryingSubscribers + .res_async() + .await + .map_err(|e| { + format!("Failed create PublicationCache for key {zenoh_key_expr}: {e}",) + })?, + ), + history, + ) + } else { + (None, 0) + }; + + // CongestionControl to be used when re-publishing over zenoh: Blocking if Writer is RELIABLE (since we don't know what is remote Reader's QoS) + let congestion_ctrl = match ( + context.config.reliable_routes_blocking, + is_reliable(&reader_qos), + ) { + (true, true) => CongestionControl::Block, + _ => CongestionControl::Drop, + }; + + let publisher: Publisher<'static> = context + .zsession + .declare_publisher(zenoh_key_expr.clone()) + .congestion_control(congestion_ctrl) + .res_async() + .await + .map_err(|e| format!("Failed create Publisher for key {zenoh_key_expr}: {e}",))?; + + // let route = RoutePublisher { + // ros2_name, + // ros2_type, + // zenoh_key_expr, + // context: context.clone(), + // zenoh_publisher: ZPublisher { + // publisher, + // _cache: cache, + // cache_size, + // }, + // dds_reader: Arc::new(DDS_ENTITY_NULL.into()), + // type_info: type_info.clone(), + // reader_qos, + // keyless, + // liveliness_token: None, + // remote_routes: HashSet::new(), + // local_nodes: HashSet::new(), + // }; + + // activate/deactivate DDS Reader on detection/undetection of matching Subscribers + // (copy/move all required args for the callback) + let dds_reader: Arc = Arc::new(DDS_ENTITY_NULL.into()); + + // let dds_reader = route.dds_reader.clone(); + // let ros2_name = route.ros2_name.clone(); + // let ros2_type = route.ros2_type.clone(); + // let zenoh_key_expr = route.zenoh_key_expr.clone(); + // let route_id = route.to_string(); + // let context = route.context.clone(); + // let reader_qos = route.reader_qos.clone(); + // let type_info = route.type_info.clone(); + // let publisher = route.zenoh_publisher.publisher.clone(); + // let publisher2 = route.zenoh_publisher.publisher.clone(); + + // let zpublisher = ZPublisher { + // publisher, + // _cache: cache, + // cache_size, + // }; + + let matching_listener: zenoh::publication::MatchingListener<'_, ()> = { + let dds_reader = dds_reader.clone(); + let ros2_name = ros2_name.clone(); + let ros2_type = ros2_type.clone(); + let zenoh_key_expr = zenoh_key_expr.clone(); + let route_id = format!("Route Publisher (ROS:{ros2_name} -> Zenoh:{zenoh_key_expr})"); + let context2 = context.clone(); + let reader_qos = reader_qos.clone(); + let type_info = type_info.clone(); + // let publisher2: Publisher<'static> = publisher.clone(); + // let publisher3: Publisher<'static> = publisher2.clone(); + + matching_listener_for(publisher.clone()).callback( + move |status| { + if status.is_matching() { + if let Err(e) = activate_dds_reader( + &dds_reader, + &ros2_name, + &ros2_type, + &zenoh_key_expr, + &route_id, + context2.clone(), + keyless, + &reader_qos, + &type_info, + // publisher3.clone() + ) + { + log::error!("{route_id}: failed to activate DDS Reader: {e}"); + } + } else { + deactivate_dds_reader(&dds_reader, &route_id, context2.clone()) + } + } + ).res_async().await + .map_err(|e| format!("Failed to lisetn of matchibng status changes: {e}",))? + } ; + + // Ok(route) + + Ok(RoutePublisher { + ros2_name, + ros2_type, + zenoh_key_expr, + context, + zenoh_publisher: ZPublisher { + publisher, + _cache: cache, + cache_size, + }, + dds_reader, + matching_listener, + type_info: type_info.clone(), + reader_qos, + keyless, + liveliness_token: None, + remote_routes: HashSet::new(), + local_nodes: HashSet::new(), + }) + } + + fn activate_dds_reader(&mut self) -> Result<(), String> { + let topic_name = format!("rt{}", self.ros2_name); + let type_name = ros2_message_type_to_dds_type(&self.ros2_type); + let read_period = get_read_period(&self.context.config, &self.zenoh_key_expr); + let route_id = self.to_string(); + let publisher = self.zenoh_publisher.deref().clone(); + + // create matching DDS Reader that forwards data coming from DDS to Zenoh + let dds_reader = create_dds_reader( + self.context.participant, + topic_name, + type_name, + &self.type_info, + self.keyless, + self.reader_qos.clone(), + read_period, + move |sample: &DDSRawSample| { + do_route_message( + sample, + // &publisher, + &self.zenoh_key_expr, + &self.context.zsession, + &route_id, + ); + }, + )?; + let old = self.dds_reader.swap(dds_reader, Ordering::Relaxed); + if old != DDS_ENTITY_NULL { + if let Err(e) = delete_dds_entity(old) { + log::warn!("{self}: failed to delete overwritten DDS Reader: {e}"); + } + } + + // add reader's GID in ros_discovery_info message + self.context + .ros_discovery_mgr + .add_dds_reader(get_guid(&dds_reader)?); + + Ok(()) + } + + fn deactivate_dds_reader(&mut self) { + let dds_reader = self.dds_reader.swap(DDS_ENTITY_NULL, Ordering::Relaxed); + if dds_reader != DDS_ENTITY_NULL { + // remove reader's GID from ros_discovery_info message + match get_guid(&dds_reader) { + Ok(gid) => self.context.ros_discovery_mgr.remove_dds_reader(gid), + Err(e) => log::warn!("{self}: {e}"), + } + if let Err(e) = delete_dds_entity(dds_reader) { + log::warn!("{}: error deleting DDS Reader: {}", self, e); + } + } + } + + async fn announce_route(&mut self, discovered_writer_qos: &Qos) -> Result<(), String> { + // only if not for an Action (since actions declare their own liveliness) + if !is_message_for_action(&self.ros2_name) { + // create associated LivelinessToken + let liveliness_ke = new_ke_liveliness_pub( + &self.context.plugin_id, + &self.zenoh_key_expr, + &self.ros2_type, + self.keyless, + discovered_writer_qos, + )?; + let ros2_name = self.ros2_name.clone(); + self.liveliness_token = Some(self.context.zsession + .liveliness() + .declare_token(liveliness_ke) + .res_async() + .await + .map_err(|e| { + format!( + "Failed create LivelinessToken associated to route for Publisher {ros2_name}: {e}" + ) + })? + ); + } + Ok(()) + } + + fn retire_route(&mut self) { + self.liveliness_token = None; + } + + #[inline] + pub fn add_remote_route(&mut self, plugin_id: &str, zenoh_key_expr: &keyexpr) { + self.remote_routes + .insert(format!("{plugin_id}:{zenoh_key_expr}")); + log::debug!("{self} now serving remote routes {:?}", self.remote_routes); + // if 1st remote route added, activate the DDS Reader + if self.remote_routes.len() == 1 { + if let Err(e) = self.activate_dds_reader() { + log::error!("{self} activation of DDS Reader failed: {e}"); + } + } + } + + #[inline] + pub fn remove_remote_route(&mut self, plugin_id: &str, zenoh_key_expr: &keyexpr) { + self.remote_routes + .remove(&format!("{plugin_id}:{zenoh_key_expr}")); + log::debug!("{self} now serving remote routes {:?}", self.remote_routes); + // if last remote route removed, deactivate the DDS Reader + if self.remote_routes.is_empty() { + self.deactivate_dds_reader(); + } + } + + #[inline] + pub fn is_serving_remote_route(&self) -> bool { + !self.remote_routes.is_empty() + } + + #[inline] + pub async fn add_local_node(&mut self, node: String, discovered_writer_qos: &Qos) { + self.local_nodes.insert(node); + log::debug!("{self} now serving local nodes {:?}", self.local_nodes); + // if 1st local node added, announce the route + if self.local_nodes.len() == 1 { + if let Err(e) = self.announce_route(discovered_writer_qos).await { + log::error!("{self} announcement failed: {e}"); + } + } + } + + #[inline] + pub fn remove_local_node(&mut self, node: &str) { + self.local_nodes.remove(node); + log::debug!("{self} now serving local nodes {:?}", self.local_nodes); + // if last local node removed, retire the route + if self.local_nodes.is_empty() { + self.retire_route(); + } + } + + #[inline] + pub fn is_serving_local_node(&self) -> bool { + !self.local_nodes.is_empty() + } + + #[inline] + pub fn is_unused(&self) -> bool { + !self.is_serving_local_node() && !self.is_serving_remote_route() + } +} + +pub fn serialize_pub_cache(zpub: &ZPublisher, s: S) -> Result +where + S: Serializer, +{ + s.serialize_u64(zpub.cache_size as u64) +} + +// Return the read period if keyexpr matches one of the "pub_max_frequencies" option +fn get_read_period(config: &Config, ke: &keyexpr) -> Option { + for (re, freq) in &config.pub_max_frequencies { + if re.is_match(ke) { + return Some(Duration::from_secs_f32(1f32 / freq)); + } + } + None +} + +fn do_route_message( + sample: &DDSRawSample, + // publisher: &Publisher, + zenoh_key_expr: &OwnedKeyExpr, + zsession: &Arc, + route_id: &str) +{ + if *LOG_PAYLOAD { + log::trace!("{route_id}: routing message - payload: {:02x?}", sample); + } else { + log::trace!("{route_id}: routing message - {} bytes", sample.len()); + } + // if let Err(e) = publisher.put(sample).res_sync() { + // log::error!("{route_id}: failed to route message: {e}"); + // } + if let Err(e) = zsession.put(zenoh_key_expr, sample).res_sync() { + log::error!("{route_id}: failed to route message: {e}"); + } +} + + +/////// +/// //// +/// //// + +fn activate_dds_reader( + dds_reader: &Arc, + ros2_name: &str, + ros2_type: &str, + zenoh_key_expr: &OwnedKeyExpr, + route_id: &str, + context: Arc, + keyless: bool, + reader_qos: &Qos, + type_info: &Option>, + // zenoh_publisher: Publisher<'static> +) -> Result<(), String> { + let topic_name = format!("rt{}", ros2_name); + let type_name = ros2_message_type_to_dds_type(ros2_type); + let read_period = get_read_period(&context.config, zenoh_key_expr); + let route_id = route_id.to_string(); + // let publisher: Publisher<'static> = zenoh_publisher.clone(); + + // create matching DDS Reader that forwards data coming from DDS to Zenoh + let reader = create_dds_reader( + context.participant, + topic_name, + type_name, + type_info, + keyless, + reader_qos.clone(), + read_period, + move |sample: &DDSRawSample| { + do_route_message( + sample, + // &publisher, + zenoh_key_expr, + &context.zsession, + &route_id, + ); + }, + )?; + let old = dds_reader.deref().swap(reader, Ordering::Relaxed); + // add reader's GID in ros_discovery_info message + context + .ros_discovery_mgr + .add_dds_reader(get_guid(&reader)?); + + if old != DDS_ENTITY_NULL { + if let Err(e) = delete_dds_entity(old) { + log::warn!("{route_id}: failed to delete overwritten DDS Reader: {e}"); + } + } + + Ok(()) +} + +fn deactivate_dds_reader(dds_reader: &Arc, route_id: &str, context: Arc) { + let reader = dds_reader.swap(DDS_ENTITY_NULL, Ordering::Relaxed); + if reader != DDS_ENTITY_NULL { + // remove reader's GID from ros_discovery_info message + match get_guid(&reader) { + Ok(gid) => context.ros_discovery_mgr.remove_dds_reader(gid), + Err(e) => log::warn!("{route_id}: {e}"), + } + if let Err(e) = delete_dds_entity(reader) { + log::warn!("{route_id}: error deleting DDS Reader: {e}"); + } + } +} diff --git a/zenoh-plugin-ros2dds/src/route_service_cli.rs b/zenoh-plugin-ros2dds/src/route_service_cli.rs index 514f4a2..8412fe7 100644 --- a/zenoh-plugin-ros2dds/src/route_service_cli.rs +++ b/zenoh-plugin-ros2dds/src/route_service_cli.rs @@ -32,7 +32,7 @@ use crate::dds_utils::{ use crate::liveliness_mgt::new_ke_liveliness_service_cli; use crate::ros2_utils::{ is_service_for_action, new_service_id, ros2_service_type_to_reply_dds_type, - ros2_service_type_to_request_dds_type, + ros2_service_type_to_request_dds_type, QOS_DEFAULT_SERVICE, }; use crate::routes_mgr::Context; use crate::LOG_PAYLOAD; @@ -111,17 +111,8 @@ impl RouteServiceCli<'_> { "Route Service Client (ROS:{ros2_name} <-> Zenoh:{zenoh_key_expr}): creation with type {ros2_type}" ); - // Default Service QoS copied from: - // https://github.com/ros2/rmw/blob/83445be486deae8c78d275e092eafb4bf380bd49/rmw/include/rmw/qos_profiles.h#L64C44-L64C44 - let mut qos = Qos::default(); - qos.history = Some(History { - kind: HistoryKind::KEEP_LAST, - depth: 10, - }); - qos.reliability = Some(Reliability { - kind: ReliabilityKind::RELIABLE, - max_blocking_time: DDS_INFINITE_TIME, - }); + // Default Service QoS + let mut qos = QOS_DEFAULT_SERVICE.clone(); // Add DATA_USER QoS similarly to rmw_cyclone_dds here: // https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/rmw_node.cpp#L5028C17-L5028C17 diff --git a/zenoh-plugin-ros2dds/src/route_service_srv.rs b/zenoh-plugin-ros2dds/src/route_service_srv.rs index 2148ab1..fbddd9e 100644 --- a/zenoh-plugin-ros2dds/src/route_service_srv.rs +++ b/zenoh-plugin-ros2dds/src/route_service_srv.rs @@ -36,7 +36,7 @@ use crate::dds_utils::{ use crate::liveliness_mgt::new_ke_liveliness_service_srv; use crate::ros2_utils::{ is_service_for_action, new_service_id, ros2_service_type_to_reply_dds_type, - ros2_service_type_to_request_dds_type, + ros2_service_type_to_request_dds_type, QOS_DEFAULT_SERVICE, }; use crate::routes_mgr::Context; use crate::{serialize_option_as_bool, LOG_PAYLOAD}; @@ -126,17 +126,8 @@ impl RouteServiceSrv<'_> { "Route Service Server (ROS:{ros2_name} <-> Zenoh:{zenoh_key_expr}): creation with type {ros2_type}" ); - // Default Service QoS copied from: - // https://github.com/ros2/rmw/blob/83445be486deae8c78d275e092eafb4bf380bd49/rmw/include/rmw/qos_profiles.h#L64C44-L64C44 - let mut qos = Qos::default(); - qos.history = Some(History { - kind: HistoryKind::KEEP_LAST, - depth: 10, - }); - qos.reliability = Some(Reliability { - kind: ReliabilityKind::RELIABLE, - max_blocking_time: DDS_INFINITE_TIME, - }); + // Default Service QoS + let mut qos = QOS_DEFAULT_SERVICE.clone(); // Add DATA_USER QoS similarly to rmw_cyclone_dds here: // https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/rmw_node.cpp#L5028C17-L5028C17 From ec7371b3260bf57c98389f263dfbd55b79198875 Mon Sep 17 00:00:00 2001 From: Julien Enoch Date: Mon, 9 Oct 2023 11:31:27 +0200 Subject: [PATCH 2/2] fix formatting --- zenoh-plugin-ros2dds/src/dds_utils.rs | 4 ---- zenoh-plugin-ros2dds/src/ros2_utils.rs | 18 ++++++++++++------ zenoh-plugin-ros2dds/src/route_service_cli.rs | 2 +- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/zenoh-plugin-ros2dds/src/dds_utils.rs b/zenoh-plugin-ros2dds/src/dds_utils.rs index c9655bb..3544ba3 100644 --- a/zenoh-plugin-ros2dds/src/dds_utils.rs +++ b/zenoh-plugin-ros2dds/src/dds_utils.rs @@ -73,10 +73,6 @@ pub fn serialize_atomic_entity_guid(entity: &AtomicDDSEntity, s: S) -> Result where S: Serializer, { - println!( - "--- serialize_atomic_entity_guid: {}", - entity.load(std::sync::atomic::Ordering::Relaxed) - ); match entity.load(std::sync::atomic::Ordering::Relaxed) { DDS_ENTITY_NULL => s.serialize_str(""), entity => serialize_entity_guid(&entity, s), diff --git a/zenoh-plugin-ros2dds/src/ros2_utils.rs b/zenoh-plugin-ros2dds/src/ros2_utils.rs index b9ccd5a..0e0b81a 100644 --- a/zenoh-plugin-ros2dds/src/ros2_utils.rs +++ b/zenoh-plugin-ros2dds/src/ros2_utils.rs @@ -17,8 +17,9 @@ use std::sync::atomic::{AtomicU32, Ordering}; use cyclors::{ dds_entity_t, qos::{ - Durability, DurabilityKind, History, HistoryKind, Qos, Reliability, ReliabilityKind, - TypeConsistency, TypeConsistencyKind, WriterDataLifecycle, DDS_INFINITE_TIME, IgnoreLocal, IgnoreLocalKind, + Durability, DurabilityKind, History, HistoryKind, IgnoreLocal, IgnoreLocalKind, Qos, + Reliability, ReliabilityKind, TypeConsistency, TypeConsistencyKind, WriterDataLifecycle, + DDS_INFINITE_TIME, }, }; use zenoh::prelude::{keyexpr, KeyExpr}; @@ -108,11 +109,12 @@ fn ros2_service_default_qos() -> Qos { max_blocking_time: DDS_INFINITE_TIME, }); // Add ignore_local to avoid loops - qos.ignore_local = Some(IgnoreLocal { kind: IgnoreLocalKind::PARTICIPANT}); + qos.ignore_local = Some(IgnoreLocal { + kind: IgnoreLocalKind::PARTICIPANT, + }); qos } - fn ros2_action_feedback_default_qos() -> Qos { let mut qos = Qos::default(); qos.history = Some(History { @@ -136,7 +138,9 @@ fn ros2_action_feedback_default_qos() -> Qos { force_type_validation: false, }); // Add ignore_local to avoid loops - qos.ignore_local = Some(IgnoreLocal { kind: IgnoreLocalKind::PARTICIPANT}); + qos.ignore_local = Some(IgnoreLocal { + kind: IgnoreLocalKind::PARTICIPANT, + }); qos } @@ -164,7 +168,9 @@ fn ros2_action_status_default_qos() -> Qos { force_type_validation: false, }); // Add ignore_local to avoid loops - qos.ignore_local = Some(IgnoreLocal { kind: IgnoreLocalKind::PARTICIPANT}); + qos.ignore_local = Some(IgnoreLocal { + kind: IgnoreLocalKind::PARTICIPANT, + }); qos } diff --git a/zenoh-plugin-ros2dds/src/route_service_cli.rs b/zenoh-plugin-ros2dds/src/route_service_cli.rs index 8412fe7..953e818 100644 --- a/zenoh-plugin-ros2dds/src/route_service_cli.rs +++ b/zenoh-plugin-ros2dds/src/route_service_cli.rs @@ -344,7 +344,7 @@ fn do_route_reply(route_id: String, reply: Reply, request_id: [u8; 16], rep_writ } } Err(val) => { - log::warn!("{route_id}: received error as reply for {request_id:02x?}: {val:?}"); + log::warn!("{route_id}: received error as reply for {request_id:02x?}: {val}"); } } }