Skip to content

Commit

Permalink
Fix routing loop when 2 Subscribers for a same topic exist across 2 b…
Browse files Browse the repository at this point in the history
…ridges (#41)
  • Loading branch information
JEnoch committed Jan 22, 2024
1 parent c56fc62 commit a094d7e
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 25 deletions.
12 changes: 6 additions & 6 deletions zenoh-plugin-ros2dds/src/route_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ pub struct RoutePublisher<'a> {
// the context
#[serde(skip)]
context: Context,
// the zenoh publisher used to re-publish to zenoh the data received by the DDS Reader
// the zenoh publisher used to re-publish to zenoh the message received by the DDS Reader
// `None` when route is created on a remote announcement and no local ROS2 Subscriber discovered yet
#[serde(
rename = "publication_cache_size",
serialize_with = "serialize_pub_cache"
)]
zenoh_publisher: ZPublisher,
// the local DDS Reader created to serve the route (i.e. re-publish to zenoh data coming from DDS)
// the local DDS Reader created to serve the route (i.e. re-publish to zenoh message coming from DDS)
#[serde(serialize_with = "serialize_atomic_entity_guid")]
dds_reader: Arc<AtomicDDSEntity>,
// TypeInfo for Reader creation (if available)
Expand Down Expand Up @@ -129,7 +129,7 @@ impl RoutePublisher<'_> {
);

// create the zenoh Publisher
// if Reader shall be TRANSIENT_LOCAL, use a PublicationCache to store historical data
// if Reader shall be TRANSIENT_LOCAL, use a PublicationCache to store historical messages
let transient_local = is_transient_local(&reader_qos);
let (cache, cache_size): (Option<PublicationCache>, usize) = if transient_local {
#[allow(non_upper_case_globals)]
Expand Down Expand Up @@ -405,7 +405,7 @@ fn activate_dds_reader(
let type_name = ros2_message_type_to_dds_type(ros2_type);
let read_period = get_read_period(&context.config, ros2_name);

// create matching DDS Reader that forwards data coming from DDS to Zenoh
// create matching DDS Reader that forwards message coming from DDS to Zenoh
let reader = create_dds_reader(
context.participant,
topic_name.clone(),
Expand All @@ -418,7 +418,7 @@ fn activate_dds_reader(
let route_id = route_id.to_string();
let publisher = publisher.clone();
move |sample: &DDSRawSample| {
do_route_message(sample, &publisher, &route_id);
route_dds_message_to_zenoh(sample, &publisher, &route_id);
}
},
)?;
Expand Down Expand Up @@ -454,7 +454,7 @@ fn deactivate_dds_reader(
}
}

fn do_route_message(sample: &DDSRawSample, publisher: &Arc<Publisher>, route_id: &str) {
fn route_dds_message_to_zenoh(sample: &DDSRawSample, publisher: &Arc<Publisher>, route_id: &str) {
if *LOG_PAYLOAD {
log::debug!("{route_id}: routing message - payload: {:02x?}", sample);
} else {
Expand Down
14 changes: 9 additions & 5 deletions zenoh-plugin-ros2dds/src/route_service_srv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,6 @@ impl RouteServiceSrv<'_> {
let client_id_str = new_service_id(&context.participant)?;
let user_data = format!("clientid= {client_id_str};");
qos.user_data = Some(user_data.into_bytes());
log::debug!(
"{route_id}: using id '{client_id_str}' => USER_DATA={:?}",
qos.user_data.as_ref().unwrap()
);

// create DDS Writer to send requests coming from Zenoh to the Service
let req_topic_name = format!("rq{ros2_name}Request");
Expand All @@ -156,6 +152,11 @@ impl RouteServiceSrv<'_> {
// https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/rmw_node.cpp#L4848
let client_guid = get_instance_handle(req_writer)?;

log::debug!(
"{route_id}: (local client_guid={client_guid}) id='{client_id_str}' => USER_DATA={:?}",
qos.user_data.as_ref().unwrap()
);

// map of queries in progress
let queries_in_progress: Arc<RwLock<HashMap<u64, Query>>> =
Arc::new(RwLock::new(HashMap::new()));
Expand Down Expand Up @@ -434,7 +435,10 @@ fn do_route_reply(
};

if guid != client_guid {
log::warn!(
// The reply is for another client than this bridge...
// Could happen since the same topic is used for this service replies to all clients!
// Just drop it.
log::trace!(
"{route_id}: received response for another client: {guid:0x?} (me: {client_guid:0x?})"
);
return;
Expand Down
24 changes: 12 additions & 12 deletions zenoh-plugin-ros2dds/src/route_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ pub struct RouteSubscriber<'a> {
// the context
#[serde(skip)]
context: Context,
// the zenoh subscriber receiving data to be re-published by the DDS Writer
// the zenoh subscriber receiving messages to be re-published by the DDS Writer
// `None` when route is created on a remote announcement and no local ROS2 Subscriber discovered yet
#[serde(rename = "is_active", serialize_with = "serialize_option_as_bool")]
zenoh_subscriber: Option<ZSubscriber<'a>>,
// the local DDS Writer created to serve the route (i.e. re-publish to DDS data coming from zenoh)
// the local DDS Writer created to serve the route (i.e. re-publish to DDS message coming from zenoh)
#[serde(serialize_with = "serialize_entity_guid")]
dds_writer: dds_entity_t,
// if the Writer is TRANSIENT_LOCAL
Expand Down Expand Up @@ -162,20 +162,20 @@ impl RouteSubscriber<'_> {

async fn activate(&mut self, discovered_reader_qos: &Qos) -> Result<(), String> {
log::debug!("{self} activate");
// Callback routing data received by Zenoh subscriber to DDS Writer (if set)
// Callback routing message received by Zenoh subscriber to DDS Writer (if set)
let ros2_name = self.ros2_name.clone();
let dds_writer = self.dds_writer;
let subscriber_callback = move |s: Sample| {
do_route_data(s, &ros2_name, dds_writer);
route_zenoh_message_to_dds(s, &ros2_name, dds_writer);
};

// create zenoh subscriber
// if Writer is TRANSIENT_LOCAL, use a QueryingSubscriber to fetch remote historical data to write
// if Writer is TRANSIENT_LOCAL, use a QueryingSubscriber to fetch remote historical messages to write
self.zenoh_subscriber = if self.transient_local {
// query all PublicationCaches on "<KE_PREFIX_PUB_CACHE>/*/<routing_keyexpr>"
let query_selector: Selector =
(*KE_PREFIX_PUB_CACHE / *KE_ANY_1_SEGMENT / &self.zenoh_key_expr).into();
log::debug!("{self}: query historical data from everybody for TRANSIENT_LOCAL Reader on {query_selector}");
log::debug!("{self}: query historical messages from everybody for TRANSIENT_LOCAL Reader on {query_selector}");
let sub = self
.context
.zsession
Expand Down Expand Up @@ -247,7 +247,7 @@ impl RouteSubscriber<'_> {
// query all PublicationCaches on "<KE_PREFIX_PUB_CACHE>/<plugin_id>/<routing_keyexpr>"
let query_selector: Selector =
(*KE_PREFIX_PUB_CACHE / plugin_id / &self.zenoh_key_expr).into();
log::debug!("Route Subscriber (Zenoh:{} -> ROS:{}): query historical data from {plugin_id} for TRANSIENT_LOCAL Reader on {query_selector}",
log::debug!("Route Subscriber (Zenoh:{} -> ROS:{}): query historical messages from {plugin_id} for TRANSIENT_LOCAL Reader on {query_selector}",
self.zenoh_key_expr, self.ros2_name
);

Expand Down Expand Up @@ -333,17 +333,17 @@ impl RouteSubscriber<'_> {
}
}

fn do_route_data(s: Sample, ros2_name: &str, data_writer: dds_entity_t) {
fn route_zenoh_message_to_dds(s: Sample, ros2_name: &str, data_writer: dds_entity_t) {
if *LOG_PAYLOAD {
log::debug!(
"Route Subscriber (Zenoh:{} -> ROS:{}): routing data - payload: {:02x?}",
"Route Subscriber (Zenoh:{} -> ROS:{}): routing message - payload: {:02x?}",
s.key_expr,
&ros2_name,
s.value.payload
);
} else {
log::trace!(
"Route Subscriber (Zenoh:{} -> ROS:{}): routing data - {} bytes",
"Route Subscriber (Zenoh:{} -> ROS:{}): routing message - {} bytes",
s.key_expr,
&ros2_name,
s.value.payload.len()
Expand All @@ -363,7 +363,7 @@ fn do_route_data(s: Sample, ros2_name: &str, data_writer: dds_entity_t) {
Ok(s) => s,
Err(_) => {
log::warn!(
"Route Subscriber (Zenoh:{} -> ROS:{}): can't route data; excessive payload size ({})",
"Route Subscriber (Zenoh:{} -> ROS:{}): can't route message; excessive payload size ({})",
s.key_expr,
ros2_name,
len
Expand All @@ -381,7 +381,7 @@ fn do_route_data(s: Sample, ros2_name: &str, data_writer: dds_entity_t) {
let ret = dds_get_entity_sertype(data_writer, &mut sertype_ptr);
if ret < 0 {
log::warn!(
"Route Subscriber (Zenoh:{} -> ROS:{}): can't route data; sertype lookup failed ({})",
"Route Subscriber (Zenoh:{} -> ROS:{}): can't route message; sertype lookup failed ({})",
s.key_expr,
ros2_name,
CStr::from_ptr(dds_strretcode(ret))
Expand Down
13 changes: 11 additions & 2 deletions zenoh-plugin-ros2dds/src/routes_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::route_service_cli::RouteServiceCli;
use crate::route_service_srv::RouteServiceSrv;
use crate::route_subscriber::RouteSubscriber;
use cyclors::dds_entity_t;
use cyclors::qos::IgnoreLocal;
use cyclors::qos::Qos;
use serde::{Deserialize, Serialize};
use std::collections::hash_map::Entry;
Expand Down Expand Up @@ -329,14 +330,18 @@ impl<'a> RoutesMgr<'a> {
keyless,
writer_qos,
} => {
let mut qos = writer_qos.clone();
qos.ignore_local = Some(IgnoreLocal {
kind: cyclors::qos::IgnoreLocalKind::PARTICIPANT,
});
// On remote Publisher route announcement, prepare a Subscriber route
// with an associated DDS Writer allowing local ROS2 Nodes to discover it
let route = self
.get_or_create_route_subscriber(
key_expr_to_ros2_name(&zenoh_key_expr, &self.context.config),
ros2_type,
keyless,
writer_qos,
qos,
true,
)
.await?;
Expand Down Expand Up @@ -368,14 +373,18 @@ impl<'a> RoutesMgr<'a> {
keyless,
reader_qos,
} => {
let mut qos = reader_qos.clone();
qos.ignore_local = Some(IgnoreLocal {
kind: cyclors::qos::IgnoreLocalKind::PARTICIPANT,
});
// On remote Subscriber route announcement, prepare a Publisher route
// with an associated DDS Reader allowing local ROS2 Nodes to discover it
let route = self
.get_or_create_route_publisher(
key_expr_to_ros2_name(&zenoh_key_expr, &self.context.config),
ros2_type,
keyless,
reader_qos,
qos,
true,
)
.await?;
Expand Down

0 comments on commit a094d7e

Please sign in to comment.