Skip to content

Commit

Permalink
Support several Publisher/Subscriber on a same topic per Node (close #27
Browse files Browse the repository at this point in the history
)
  • Loading branch information
JEnoch committed Dec 13, 2023
1 parent 83ba7e4 commit 40e78db
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 86 deletions.
14 changes: 11 additions & 3 deletions zenoh-plugin-ros2dds/src/discovered_entities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,11 +333,15 @@ impl DiscoveredEntities {
// For each declared Reader
for rgid in &ros_node_info.reader_gid_seq {
if let Some(entity) = readers.get(rgid) {
log::debug!(
"ROS Node {ros_node_info} declares Reader on {}",
log::trace!(
"ROS Node {ros_node_info} declares a Reader on {}",
entity.topic_name
);
if let Some(e) = node.update_with_reader(entity) {
log::debug!(
"ROS Node {ros_node_info} declares a new Reader on {}",
entity.topic_name
);
events.push(e)
};
} else {
Expand All @@ -350,11 +354,15 @@ impl DiscoveredEntities {
// For each declared Writer
for wgid in &ros_node_info.writer_gid_seq {
if let Some(entity) = writers.get(wgid) {
log::debug!(
log::trace!(
"ROS Node {ros_node_info} declares Writer on {}",
entity.topic_name
);
if let Some(e) = node.update_with_writer(entity) {
log::debug!(
"ROS Node {ros_node_info} declares a new Writer on {}",
entity.topic_name
);
events.push(e)
};
} else {
Expand Down
96 changes: 62 additions & 34 deletions zenoh-plugin-ros2dds/src/node_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use serde::ser::SerializeSeq;
use serde::{Serialize, Serializer};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::ops::Range;
use zenoh::prelude::{keyexpr, KeyExpr};

Expand All @@ -29,14 +29,24 @@ pub struct MsgPub {
pub name: String,
#[serde(rename = "type")]
pub typ: String,
// List of DDS Writers declared by 1 Node for a same topic
// Issue #27: usually only 1 Writer, but may happen that 1 Node declares several Publishers
// on the same topic. In this case `ros2 node info <node_id>` still shows only 1 Publisher,
// but all the writers can be seen with `ros2 topic info <topic_name> -v`.
// Hence the choice here to aggregate all the Writers in 1 Publisher representation.
// The Publisher is declared undiscovered only when all its Writers are undiscovered.
#[serde(skip)]
pub writer: Gid,
pub writers: HashSet<Gid>,
}

impl MsgPub {
pub fn create(name: String, typ: String, writer: Gid) -> Result<MsgPub, String> {
check_ros_name(&name)?;
Ok(MsgPub { name, typ, writer })
Ok(MsgPub {
name,
typ,
writers: HashSet::from([writer]),
})
}

pub fn name_as_keyexpr(&self) -> &keyexpr {
Expand All @@ -56,14 +66,24 @@ pub struct MsgSub {
pub name: String,
#[serde(rename = "type")]
pub typ: String,
// List of DDS Readers declared by 1 Node for a same topic
// Issue #27: usually only 1 Reader, but may happen that 1 Node declares several Subscribers
// on the same topic. In this case `ros2 node info <node_id>` still shows only 1 Subscriber,
// but all the writers can be seen with `ros2 topic info <topic_name> -v`.
// Hence the choice here to aggregate all the Readers in 1 Subscriber representation.
// The Subscriber is declared undiscovered only when all its Readers are undiscovered.
#[serde(skip)]
pub reader: Gid,
pub readers: HashSet<Gid>,
}

impl MsgSub {
pub fn create(name: String, typ: String, reader: Gid) -> Result<MsgSub, String> {
check_ros_name(&name)?;
Ok(MsgSub { name, typ, reader })
Ok(MsgSub {
name,
typ,
readers: HashSet::from([reader]),
})
}

pub fn name_as_keyexpr(&self) -> &keyexpr {
Expand Down Expand Up @@ -632,7 +652,7 @@ impl NodeInfo {
}
Err(e) => {
log::error!(
"ROS Node {self} declared an incompatible Publisher: {e} - ignored"
"ROS Node {node_fullname} declared an incompatible Publisher: {e} - ignored"
);
None
}
Expand All @@ -641,18 +661,12 @@ impl NodeInfo {
let v = e.get_mut();
let mut result: Option<ROS2DiscoveryEvent> = None;
if v.typ != typ {
log::warn!(
r#"ROS declaration of Publisher "{v}" changed it's type to "{typ}""#
);
v.typ = typ;
result = Some(DiscoveredMsgPub(node_fullname.clone(), v.clone()));
}
if v.writer != *writer {
log::debug!(
r#"ROS declaration of Publisher "{v}" changed it's DDS Writer's GID from {} to {writer}"#,
v.writer
log::error!(
r#"ROS Node {node_fullname} declares 2 Publishers on same topic {name} but with different types: {} vs {typ} - Publisher with 2nd type ignored""#,
v.typ
);
v.writer = *writer;
} else if v.writers.insert(*writer) && v.writers.len() == 1 {
// Send DiscoveredMsgPub event only for the 1st discovered Writer
result = Some(DiscoveredMsgPub(node_fullname, v.clone()));
}
result
Expand All @@ -677,7 +691,7 @@ impl NodeInfo {
}
Err(e) => {
log::error!(
"ROS Node {self} declared an incompatible Subscriber: {e} - ignored"
"ROS Node {node_fullname} declared an incompatible Subscriber: {e} - ignored"
);
None
}
Expand All @@ -686,18 +700,12 @@ impl NodeInfo {
let v = e.get_mut();
let mut result: Option<ROS2DiscoveryEvent> = None;
if v.typ != typ {
log::warn!(
r#"ROS declaration of Subscriber "{v}" changed it's type to "{typ}""#
);
v.typ = typ;
result = Some(DiscoveredMsgSub(node_fullname.clone(), v.clone()));
}
if v.reader != *reader {
log::debug!(
r#"ROS declaration of Subscriber "{v}" changed it's DDS Writer's GID from {} to {reader}"#,
v.reader
log::error!(
r#"ROS Node {node_fullname} declares 2 Subscriber on same topic {name} but with different types: {} vs {typ} - Publisher with 2nd type ignored""#,
v.typ
);
v.reader = *reader;
} else if v.readers.insert(*reader) && v.readers.len() == 1 {
// Send DiscoveredMsgSub event only for the 1st discovered Reader
result = Some(DiscoveredMsgSub(node_fullname, v.clone()));
}
result
Expand Down Expand Up @@ -1732,10 +1740,20 @@ impl NodeInfo {
pub fn remove_reader(&mut self, reader: &Gid) -> Option<ROS2DiscoveryEvent> {
use ROS2DiscoveryEvent::*;
let node_fullname = self.fullname().to_string();
if let Some((name, _)) = self.msg_sub.iter().find(|(_, v)| v.reader == *reader) {
// Search in Subscribers list if one is using the writer
if let Some(name) = self.msg_sub.iter_mut().find_map(|(name, sub)| {
if sub.readers.remove(reader) && sub.readers.is_empty() {
// found Subscriber using the reader: remove the reader from list
// and if the list is empty return the Subscriber name to "undiscover" it
Some(name.clone())
} else {
None
}
}) {
// Return undiscovery event for this Subscriber, since all its DDS Writer have been undiscovered
return Some(UndiscoveredMsgSub(
node_fullname,
self.msg_sub.remove(&name.clone()).unwrap(),
self.msg_sub.remove(&name).unwrap(),
));
}
if let Some((name, _)) = self
Expand Down Expand Up @@ -1785,14 +1803,24 @@ impl NodeInfo {
}

// Remove a DDS Writer possibly used by this node, and returns an UndiscoveredX event if
// this Writer was used by some Subscription, Service or Action
// this Writer was used by some Publication, Service or Action
pub fn remove_writer(&mut self, writer: &Gid) -> Option<ROS2DiscoveryEvent> {
use ROS2DiscoveryEvent::*;
let node_fullname = self.fullname().to_string();
if let Some((name, _)) = self.msg_pub.iter().find(|(_, v)| v.writer == *writer) {
// Search in Publishers list if one is using the writer
if let Some(name) = self.msg_pub.iter_mut().find_map(|(name, publ)| {
if publ.writers.remove(writer) && publ.writers.is_empty() {
// found Publisher using the writer: remove the writer from list
// and if the list is empty return the Publisher name to "undiscover" it
Some(name.clone())
} else {
None
}
}) {
// Return undiscovery event for this Publisher, since all its DDS Writer have been undiscovered
return Some(UndiscoveredMsgPub(
node_fullname,
self.msg_pub.remove(&name.clone()).unwrap(),
self.msg_pub.remove(&name).unwrap(),
));
}
if let Some((name, _)) = self
Expand Down
24 changes: 13 additions & 11 deletions zenoh-plugin-ros2dds/src/route_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,23 +341,25 @@ impl RoutePublisher<'_> {

#[inline]
pub async fn add_local_node(&mut self, node: String, discovered_writer_qos: &Qos) {
self.local_nodes.insert(node);
log::debug!("{self} now serving local nodes {:?}", self.local_nodes);
// if 1st local node added, announce the route
if self.local_nodes.len() == 1 {
if let Err(e) = self.announce_route(discovered_writer_qos).await {
log::error!("{self} announcement failed: {e}");
if self.local_nodes.insert(node) {
log::debug!("{self} now serving local nodes {:?}", self.local_nodes);
// if 1st local node added, announce the route
if self.local_nodes.len() == 1 {
if let Err(e) = self.announce_route(discovered_writer_qos).await {
log::error!("{self} announcement failed: {e}");
}
}
}
}

#[inline]
pub fn remove_local_node(&mut self, node: &str) {
self.local_nodes.remove(node);
log::debug!("{self} now serving local nodes {:?}", self.local_nodes);
// if last local node removed, retire the route
if self.local_nodes.is_empty() {
self.retire_route();
if self.local_nodes.remove(node) {
log::debug!("{self} now serving local nodes {:?}", self.local_nodes);
// if last local node removed, retire the route
if self.local_nodes.is_empty() {
self.retire_route();
}
}
}

Expand Down
92 changes: 54 additions & 38 deletions zenoh-plugin-ros2dds/src/routes_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,28 +137,36 @@ impl<'a> RoutesMgr<'a> {
use ROS2DiscoveryEvent::*;
match event {
DiscoveredMsgPub(node, iface) => {
// Retrieve info on DDS Writer
// Pick 1 discovered Writer amongst the possibly multiple ones listed in MsgPub
let entity = {
let entities = zread!(self.context.discovered_entities);
entities
.get_writer(&iface.writer)
.ok_or(format!(
"Failed to get DDS info for {iface} Writer {}. Already deleted ?",
iface.writer
))?
.clone()
iface
.writers
.iter()
.find_map(|w| entities.get_writer(w))
.map(Clone::clone)
};
// Get route (create it if not yet exists)
let route = self
.get_or_create_route_publisher(
iface.name,
iface.typ,
entity.keyless,
adapt_writer_qos_for_reader(&entity.qos),
true,
)
.await?;
route.add_local_node(node, &entity.qos).await;
match entity {
Some(entity) => {
// Get route (create it if not yet exists)
let route = self
.get_or_create_route_publisher(
iface.name,
iface.typ,
entity.keyless,
adapt_writer_qos_for_reader(&entity.qos),
true,
)
.await?;
route.add_local_node(node, &entity.qos).await;
}
None => {
return Err(format!(
"Failed to get DDS info for any Writer of {iface} ({:?})",
iface.writers
))
}
}
}

UndiscoveredMsgPub(node, iface) => {
Expand All @@ -176,28 +184,36 @@ impl<'a> RoutesMgr<'a> {
}

DiscoveredMsgSub(node, iface) => {
// Retrieve info on DDS Reader
// Pick 1 discovered Reader amongst the possibly multiple ones listed in MsgSub
let entity = {
let entities = zread!(self.context.discovered_entities);
entities
.get_reader(&iface.reader)
.ok_or(format!(
"Failed to get DDS info for {iface} Reader {}. Already deleted ?",
iface.reader
))?
.clone()
iface
.readers
.iter()
.find_map(|r| entities.get_reader(r))
.map(Clone::clone)
};
// Get route (create it if not yet exists)
let route = self
.get_or_create_route_subscriber(
iface.name,
iface.typ,
entity.keyless,
adapt_reader_qos_for_writer(&entity.qos),
true,
)
.await?;
route.add_local_node(node, &entity.qos).await;
match entity {
Some(entity) => {
// Get route (create it if not yet exists)
let route = self
.get_or_create_route_subscriber(
iface.name,
iface.typ,
entity.keyless,
adapt_reader_qos_for_writer(&entity.qos),
true,
)
.await?;
route.add_local_node(node, &entity.qos).await;
}
None => {
return Err(format!(
"Failed to get DDS info for any Reader of {iface} ({:?})",
iface.readers
))
}
}
}

UndiscoveredMsgSub(node, iface) => {
Expand Down

0 comments on commit 40e78db

Please sign in to comment.