From 40e78db3461d90f3f756252f7a6d883c19a09708 Mon Sep 17 00:00:00 2001 From: Julien Enoch Date: Wed, 13 Dec 2023 18:44:10 +0100 Subject: [PATCH] Support several Publisher/Subscriber on a same topic per Node (close #27) --- .../src/discovered_entities.rs | 14 ++- zenoh-plugin-ros2dds/src/node_info.rs | 96 ++++++++++++------- zenoh-plugin-ros2dds/src/route_publisher.rs | 24 ++--- zenoh-plugin-ros2dds/src/routes_mgr.rs | 92 ++++++++++-------- 4 files changed, 140 insertions(+), 86 deletions(-) diff --git a/zenoh-plugin-ros2dds/src/discovered_entities.rs b/zenoh-plugin-ros2dds/src/discovered_entities.rs index 4ea6906..fbc7a30 100644 --- a/zenoh-plugin-ros2dds/src/discovered_entities.rs +++ b/zenoh-plugin-ros2dds/src/discovered_entities.rs @@ -333,11 +333,15 @@ impl DiscoveredEntities { // For each declared Reader for rgid in &ros_node_info.reader_gid_seq { if let Some(entity) = readers.get(rgid) { - log::debug!( - "ROS Node {ros_node_info} declares Reader on {}", + log::trace!( + "ROS Node {ros_node_info} declares a Reader on {}", entity.topic_name ); if let Some(e) = node.update_with_reader(entity) { + log::debug!( + "ROS Node {ros_node_info} declares a new Reader on {}", + entity.topic_name + ); events.push(e) }; } else { @@ -350,11 +354,15 @@ impl DiscoveredEntities { // For each declared Writer for wgid in &ros_node_info.writer_gid_seq { if let Some(entity) = writers.get(wgid) { - log::debug!( + log::trace!( "ROS Node {ros_node_info} declares Writer on {}", entity.topic_name ); if let Some(e) = node.update_with_writer(entity) { + log::debug!( + "ROS Node {ros_node_info} declares a new Writer on {}", + entity.topic_name + ); events.push(e) }; } else { diff --git a/zenoh-plugin-ros2dds/src/node_info.rs b/zenoh-plugin-ros2dds/src/node_info.rs index c0aeffc..76e944b 100644 --- a/zenoh-plugin-ros2dds/src/node_info.rs +++ b/zenoh-plugin-ros2dds/src/node_info.rs @@ -14,7 +14,7 @@ use serde::ser::SerializeSeq; use serde::{Serialize, Serializer}; use std::collections::hash_map::Entry; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::ops::Range; use zenoh::prelude::{keyexpr, KeyExpr}; @@ -29,14 +29,24 @@ pub struct MsgPub { pub name: String, #[serde(rename = "type")] pub typ: String, + // List of DDS Writers declared by 1 Node for a same topic + // Issue #27: usually only 1 Writer, but may happen that 1 Node declares several Publishers + // on the same topic. In this case `ros2 node info ` still shows only 1 Publisher, + // but all the writers can be seen with `ros2 topic info -v`. + // Hence the choice here to aggregate all the Writers in 1 Publisher representation. + // The Publisher is declared undiscovered only when all its Writers are undiscovered. #[serde(skip)] - pub writer: Gid, + pub writers: HashSet, } impl MsgPub { pub fn create(name: String, typ: String, writer: Gid) -> Result { check_ros_name(&name)?; - Ok(MsgPub { name, typ, writer }) + Ok(MsgPub { + name, + typ, + writers: HashSet::from([writer]), + }) } pub fn name_as_keyexpr(&self) -> &keyexpr { @@ -56,14 +66,24 @@ pub struct MsgSub { pub name: String, #[serde(rename = "type")] pub typ: String, + // List of DDS Readers declared by 1 Node for a same topic + // Issue #27: usually only 1 Reader, but may happen that 1 Node declares several Subscribers + // on the same topic. In this case `ros2 node info ` still shows only 1 Subscriber, + // but all the writers can be seen with `ros2 topic info -v`. + // Hence the choice here to aggregate all the Readers in 1 Subscriber representation. + // The Subscriber is declared undiscovered only when all its Readers are undiscovered. #[serde(skip)] - pub reader: Gid, + pub readers: HashSet, } impl MsgSub { pub fn create(name: String, typ: String, reader: Gid) -> Result { check_ros_name(&name)?; - Ok(MsgSub { name, typ, reader }) + Ok(MsgSub { + name, + typ, + readers: HashSet::from([reader]), + }) } pub fn name_as_keyexpr(&self) -> &keyexpr { @@ -632,7 +652,7 @@ impl NodeInfo { } Err(e) => { log::error!( - "ROS Node {self} declared an incompatible Publisher: {e} - ignored" + "ROS Node {node_fullname} declared an incompatible Publisher: {e} - ignored" ); None } @@ -641,18 +661,12 @@ impl NodeInfo { let v = e.get_mut(); let mut result: Option = None; if v.typ != typ { - log::warn!( - r#"ROS declaration of Publisher "{v}" changed it's type to "{typ}""# - ); - v.typ = typ; - result = Some(DiscoveredMsgPub(node_fullname.clone(), v.clone())); - } - if v.writer != *writer { - log::debug!( - r#"ROS declaration of Publisher "{v}" changed it's DDS Writer's GID from {} to {writer}"#, - v.writer + log::error!( + r#"ROS Node {node_fullname} declares 2 Publishers on same topic {name} but with different types: {} vs {typ} - Publisher with 2nd type ignored""#, + v.typ ); - v.writer = *writer; + } else if v.writers.insert(*writer) && v.writers.len() == 1 { + // Send DiscoveredMsgPub event only for the 1st discovered Writer result = Some(DiscoveredMsgPub(node_fullname, v.clone())); } result @@ -677,7 +691,7 @@ impl NodeInfo { } Err(e) => { log::error!( - "ROS Node {self} declared an incompatible Subscriber: {e} - ignored" + "ROS Node {node_fullname} declared an incompatible Subscriber: {e} - ignored" ); None } @@ -686,18 +700,12 @@ impl NodeInfo { let v = e.get_mut(); let mut result: Option = None; if v.typ != typ { - log::warn!( - r#"ROS declaration of Subscriber "{v}" changed it's type to "{typ}""# - ); - v.typ = typ; - result = Some(DiscoveredMsgSub(node_fullname.clone(), v.clone())); - } - if v.reader != *reader { - log::debug!( - r#"ROS declaration of Subscriber "{v}" changed it's DDS Writer's GID from {} to {reader}"#, - v.reader + log::error!( + r#"ROS Node {node_fullname} declares 2 Subscriber on same topic {name} but with different types: {} vs {typ} - Publisher with 2nd type ignored""#, + v.typ ); - v.reader = *reader; + } else if v.readers.insert(*reader) && v.readers.len() == 1 { + // Send DiscoveredMsgSub event only for the 1st discovered Reader result = Some(DiscoveredMsgSub(node_fullname, v.clone())); } result @@ -1732,10 +1740,20 @@ impl NodeInfo { pub fn remove_reader(&mut self, reader: &Gid) -> Option { use ROS2DiscoveryEvent::*; let node_fullname = self.fullname().to_string(); - if let Some((name, _)) = self.msg_sub.iter().find(|(_, v)| v.reader == *reader) { + // Search in Subscribers list if one is using the writer + if let Some(name) = self.msg_sub.iter_mut().find_map(|(name, sub)| { + if sub.readers.remove(reader) && sub.readers.is_empty() { + // found Subscriber using the reader: remove the reader from list + // and if the list is empty return the Subscriber name to "undiscover" it + Some(name.clone()) + } else { + None + } + }) { + // Return undiscovery event for this Subscriber, since all its DDS Writer have been undiscovered return Some(UndiscoveredMsgSub( node_fullname, - self.msg_sub.remove(&name.clone()).unwrap(), + self.msg_sub.remove(&name).unwrap(), )); } if let Some((name, _)) = self @@ -1785,14 +1803,24 @@ impl NodeInfo { } // Remove a DDS Writer possibly used by this node, and returns an UndiscoveredX event if - // this Writer was used by some Subscription, Service or Action + // this Writer was used by some Publication, Service or Action pub fn remove_writer(&mut self, writer: &Gid) -> Option { use ROS2DiscoveryEvent::*; let node_fullname = self.fullname().to_string(); - if let Some((name, _)) = self.msg_pub.iter().find(|(_, v)| v.writer == *writer) { + // Search in Publishers list if one is using the writer + if let Some(name) = self.msg_pub.iter_mut().find_map(|(name, publ)| { + if publ.writers.remove(writer) && publ.writers.is_empty() { + // found Publisher using the writer: remove the writer from list + // and if the list is empty return the Publisher name to "undiscover" it + Some(name.clone()) + } else { + None + } + }) { + // Return undiscovery event for this Publisher, since all its DDS Writer have been undiscovered return Some(UndiscoveredMsgPub( node_fullname, - self.msg_pub.remove(&name.clone()).unwrap(), + self.msg_pub.remove(&name).unwrap(), )); } if let Some((name, _)) = self diff --git a/zenoh-plugin-ros2dds/src/route_publisher.rs b/zenoh-plugin-ros2dds/src/route_publisher.rs index 1066e4e..e34da1f 100644 --- a/zenoh-plugin-ros2dds/src/route_publisher.rs +++ b/zenoh-plugin-ros2dds/src/route_publisher.rs @@ -341,23 +341,25 @@ impl RoutePublisher<'_> { #[inline] pub async fn add_local_node(&mut self, node: String, discovered_writer_qos: &Qos) { - self.local_nodes.insert(node); - log::debug!("{self} now serving local nodes {:?}", self.local_nodes); - // if 1st local node added, announce the route - if self.local_nodes.len() == 1 { - if let Err(e) = self.announce_route(discovered_writer_qos).await { - log::error!("{self} announcement failed: {e}"); + if self.local_nodes.insert(node) { + log::debug!("{self} now serving local nodes {:?}", self.local_nodes); + // if 1st local node added, announce the route + if self.local_nodes.len() == 1 { + if let Err(e) = self.announce_route(discovered_writer_qos).await { + log::error!("{self} announcement failed: {e}"); + } } } } #[inline] pub fn remove_local_node(&mut self, node: &str) { - self.local_nodes.remove(node); - log::debug!("{self} now serving local nodes {:?}", self.local_nodes); - // if last local node removed, retire the route - if self.local_nodes.is_empty() { - self.retire_route(); + if self.local_nodes.remove(node) { + log::debug!("{self} now serving local nodes {:?}", self.local_nodes); + // if last local node removed, retire the route + if self.local_nodes.is_empty() { + self.retire_route(); + } } } diff --git a/zenoh-plugin-ros2dds/src/routes_mgr.rs b/zenoh-plugin-ros2dds/src/routes_mgr.rs index 3f3e744..2354436 100644 --- a/zenoh-plugin-ros2dds/src/routes_mgr.rs +++ b/zenoh-plugin-ros2dds/src/routes_mgr.rs @@ -137,28 +137,36 @@ impl<'a> RoutesMgr<'a> { use ROS2DiscoveryEvent::*; match event { DiscoveredMsgPub(node, iface) => { - // Retrieve info on DDS Writer + // Pick 1 discovered Writer amongst the possibly multiple ones listed in MsgPub let entity = { let entities = zread!(self.context.discovered_entities); - entities - .get_writer(&iface.writer) - .ok_or(format!( - "Failed to get DDS info for {iface} Writer {}. Already deleted ?", - iface.writer - ))? - .clone() + iface + .writers + .iter() + .find_map(|w| entities.get_writer(w)) + .map(Clone::clone) }; - // Get route (create it if not yet exists) - let route = self - .get_or_create_route_publisher( - iface.name, - iface.typ, - entity.keyless, - adapt_writer_qos_for_reader(&entity.qos), - true, - ) - .await?; - route.add_local_node(node, &entity.qos).await; + match entity { + Some(entity) => { + // Get route (create it if not yet exists) + let route = self + .get_or_create_route_publisher( + iface.name, + iface.typ, + entity.keyless, + adapt_writer_qos_for_reader(&entity.qos), + true, + ) + .await?; + route.add_local_node(node, &entity.qos).await; + } + None => { + return Err(format!( + "Failed to get DDS info for any Writer of {iface} ({:?})", + iface.writers + )) + } + } } UndiscoveredMsgPub(node, iface) => { @@ -176,28 +184,36 @@ impl<'a> RoutesMgr<'a> { } DiscoveredMsgSub(node, iface) => { - // Retrieve info on DDS Reader + // Pick 1 discovered Reader amongst the possibly multiple ones listed in MsgSub let entity = { let entities = zread!(self.context.discovered_entities); - entities - .get_reader(&iface.reader) - .ok_or(format!( - "Failed to get DDS info for {iface} Reader {}. Already deleted ?", - iface.reader - ))? - .clone() + iface + .readers + .iter() + .find_map(|r| entities.get_reader(r)) + .map(Clone::clone) }; - // Get route (create it if not yet exists) - let route = self - .get_or_create_route_subscriber( - iface.name, - iface.typ, - entity.keyless, - adapt_reader_qos_for_writer(&entity.qos), - true, - ) - .await?; - route.add_local_node(node, &entity.qos).await; + match entity { + Some(entity) => { + // Get route (create it if not yet exists) + let route = self + .get_or_create_route_subscriber( + iface.name, + iface.typ, + entity.keyless, + adapt_reader_qos_for_writer(&entity.qos), + true, + ) + .await?; + route.add_local_node(node, &entity.qos).await; + } + None => { + return Err(format!( + "Failed to get DDS info for any Reader of {iface} ({:?})", + iface.readers + )) + } + } } UndiscoveredMsgSub(node, iface) => {