Skip to content

Commit

Permalink
route_service_cli: use Querier::matching_listener to activate/deactiv…
Browse files Browse the repository at this point in the history
…ate the route, rather than Liveliness Tokens
  • Loading branch information
JEnoch committed Dec 10, 2024
1 parent 2719ac0 commit 6c015fc
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 127 deletions.
296 changes: 175 additions & 121 deletions zenoh-plugin-ros2dds/src/route_service_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::{
is_service_for_action, new_service_id, ros2_service_type_to_reply_dds_type,
ros2_service_type_to_request_dds_type, CddsRequestHeader, QOS_DEFAULT_SERVICE,
},
ros_discovery::RosDiscoveryInfoMgr,
routes_mgr::Context,
LOG_PAYLOAD,
};
Expand All @@ -60,17 +61,15 @@ pub struct RouteServiceCli {
ros2_type: String,
// the Zenoh key expression used for routing
zenoh_key_expr: OwnedKeyExpr,
// the DDS type info (if available)
#[serde(skip)]
type_info: Option<Arc<TypeInfo>>,
// the context
#[serde(skip)]
context: Context,
#[serde(skip)]
zenoh_querier: Arc<Querier<'static>>,
_zenoh_querier: Arc<Querier<'static>>,
#[serde(serialize_with = "crate::config::serialize_duration_as_f32")]
queries_timeout: Duration,
is_active: bool,
#[serde(skip)]
_matching_listener: zenoh::matching::MatchingListener<()>,
// the local DDS Reader receiving client's requests and routing them to Zenoh
#[serde(serialize_with = "serialize_atomic_entity_guid")]
req_reader: Arc<AtomicDDSEntity>,
Expand Down Expand Up @@ -127,17 +126,61 @@ impl RouteServiceCli {
.map_err(|e| format!("Failed create Querier for key {zenoh_key_expr}: {e}",))?,
);

let route_id = format!("Route Service Client (ROS:{ros2_name} -> Zenoh:{zenoh_key_expr}");

// activate/deactivate DDS Reader/Writer on detection/undetection of matching Subscribers
// (copy/move all required args for the callback)
let rep_writer: Arc<AtomicDDSEntity> = Arc::new(DDS_ENTITY_NULL.into());
let req_reader: Arc<AtomicDDSEntity> = Arc::new(DDS_ENTITY_NULL.into());

let matching_listener = zenoh_querier
.matching_listener()
.callback({
let rep_writer = rep_writer.clone();
let req_reader = req_reader.clone();
let ros2_name = ros2_name.clone();
let ros2_type = ros2_type.clone();
let context = context.clone();
let zquerier = zenoh_querier.clone();

move |status| {
tracing::debug!("{route_id} MatchingStatus changed: {status:?}");
if status.matching() {
if let Err(e) = activate(
&rep_writer,
&req_reader,
&ros2_name,
&ros2_type,
&route_id,
&context,
&type_info,
&zquerier,
) {
tracing::error!("{route_id}: failed to activate DDS Reader: {e}");
}
} else {
deactivate(
&rep_writer,
&req_reader,
&route_id,
&context.ros_discovery_mgr,
)
}
}
})
.await
.map_err(|e| format!("Route Service Client (ROS:{ros2_name} <-> Zenoh:{zenoh_key_expr}): failed to listen of matching status changes: {e}",))?;

Ok(RouteServiceCli {
ros2_name,
ros2_type,
zenoh_key_expr,
type_info,
context,
zenoh_querier,
_zenoh_querier: zenoh_querier,
queries_timeout,
is_active: false,
rep_writer: Arc::new(DDS_ENTITY_NULL.into()),
req_reader: Arc::new(DDS_ENTITY_NULL.into()),
_matching_listener: matching_listener,
rep_writer,
req_reader,
liveliness_token: None,
remote_routes: HashSet::new(),
local_nodes: HashSet::new(),
Expand Down Expand Up @@ -178,125 +221,21 @@ impl RouteServiceCli {
self.liveliness_token = None;
}

fn activate(&mut self) -> Result<(), String> {
tracing::debug!("{self}: activate");
// Default Service QoS
let mut qos = QOS_DEFAULT_SERVICE.clone();

// Add DATA_USER QoS similarly to rmw_cyclone_dds here:
// https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/rmw_node.cpp#L5028C17-L5028C17
let server_id_str = new_service_id(&self.context.participant)?;
let user_data = format!("serviceid= {server_id_str};");
qos.user_data = Some(user_data.into_bytes());
tracing::debug!(
"{self}: using id '{server_id_str}' => USER_DATA={:?}",
qos.user_data.as_ref().unwrap()
);

// create DDS Writer to send replies coming from Zenoh to the Client
let rep_topic_name = format!("rr{}Reply", self.ros2_name);
let rep_type_name = ros2_service_type_to_reply_dds_type(&self.ros2_type);
let rep_writer = create_dds_writer(
self.context.participant,
rep_topic_name,
rep_type_name,
true,
qos.clone(),
)?;
let old = self.rep_writer.swap(rep_writer, Ordering::Relaxed);
if old != DDS_ENTITY_NULL {
tracing::warn!(
"{self}: on activation their was already a DDS Reply Writer - overwrite it"
);
if let Err(e) = delete_dds_entity(old) {
tracing::warn!("{self}: failed to delete overwritten DDS Reply Writer: {e}");
}
}

// add writer's GID in ros_discovery_info message
self.context
.ros_discovery_mgr
.add_dds_writer(get_guid(&rep_writer)?);

// create DDS Reader to receive requests and route them to Zenoh
let route_id: String = self.to_string();
let req_topic_name = format!("rq{}Request", self.ros2_name);
let req_type_name = ros2_service_type_to_request_dds_type(&self.ros2_type);
let zquerier = self.zenoh_querier.clone();
let req_reader = create_dds_reader(
self.context.participant,
req_topic_name,
req_type_name,
&self.type_info,
true,
qos,
None,
move |sample| {
route_dds_request_to_zenoh(&route_id, sample, &zquerier, rep_writer);
},
)?;
let old = self.req_reader.swap(req_reader, Ordering::Relaxed);
if old != DDS_ENTITY_NULL {
tracing::warn!(
"{self}: on activation their was already a DDS Request Reader - overwrite it"
);
if let Err(e) = delete_dds_entity(old) {
tracing::warn!("{self}: failed to delete overwritten DDS Request Reader: {e}");
}
}

// add reader's GID in ros_discovery_info message
self.context
.ros_discovery_mgr
.add_dds_reader(get_guid(&req_reader)?);

self.is_active = true;
Ok(())
}

fn deactivate(&mut self) {
tracing::debug!("{self}: Deactivate");
let req_reader = self.req_reader.swap(DDS_ENTITY_NULL, Ordering::Relaxed);
if req_reader != DDS_ENTITY_NULL {
// remove reader's GID from ros_discovery_info message
match get_guid(&req_reader) {
Ok(gid) => self.context.ros_discovery_mgr.remove_dds_reader(gid),
Err(e) => tracing::warn!("{self}: {e}"),
}
if let Err(e) = delete_dds_entity(req_reader) {
tracing::warn!("{}: error deleting DDS Reader: {}", self, e);
}
}
let rep_writer = self.rep_writer.swap(DDS_ENTITY_NULL, Ordering::Relaxed);
if rep_writer != DDS_ENTITY_NULL {
// remove writer's GID from ros_discovery_info message
match get_guid(&rep_writer) {
Ok(gid) => self.context.ros_discovery_mgr.remove_dds_writer(gid),
Err(e) => tracing::warn!("{self}: {e}"),
}
if let Err(e) = delete_dds_entity(rep_writer) {
tracing::warn!("{}: error deleting DDS Writer: {}", self, e);
}
}
self.is_active = false;
let route_id = self.to_string();
deactivate(
&self.rep_writer,
&self.req_reader,
&route_id,
&self.context.ros_discovery_mgr,
);
}

#[inline]
pub fn add_remote_route(&mut self, zenoh_id: &str, zenoh_key_expr: &keyexpr) {
self.remote_routes
.insert(format!("{zenoh_id}:{zenoh_key_expr}"));
tracing::debug!("{self}: now serving remote routes {:?}", self.remote_routes);
// if 1st remote node added (i.e. a Server has been announced), activate the route
// NOTE: The route shall not be active if a remote Service Server have not been detected.
// Otherwise, the Client will send a request to this route that will get no reply
// and will drop it, leading the Client to hang (see #62).
// TODO: rather rely on a Querier MatchingStatus (in the same way that it's done for RoutePublisher)
// when available in zenoh...
if self.remote_routes.len() == 1 {
if let Err(e) = self.activate() {
tracing::error!("{self}: activation failed: {e}");
}
}
}

#[inline]
Expand Down Expand Up @@ -348,6 +287,121 @@ impl RouteServiceCli {
}
}

fn activate(
rep_writer: &Arc<AtomicDDSEntity>,
req_reader: &Arc<AtomicDDSEntity>,
ros2_name: &str,
ros2_type: &str,
route_id: &str,
context: &Context,
type_info: &Option<Arc<TypeInfo>>,
zenoh_querier: &Arc<Querier<'static>>,
) -> Result<(), String> {
tracing::debug!("{route_id}: activate");
// Default Service QoS
let mut qos = QOS_DEFAULT_SERVICE.clone();

// Add DATA_USER QoS similarly to rmw_cyclone_dds here:
// https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/rmw_node.cpp#L5028C17-L5028C17
let server_id_str = new_service_id(&context.participant)?;
let user_data = format!("serviceid= {server_id_str};");
qos.user_data = Some(user_data.into_bytes());
tracing::debug!(
"{route_id}: using id '{server_id_str}' => USER_DATA={:?}",
qos.user_data.as_ref().unwrap()
);

// create DDS Writer to send replies coming from Zenoh to the Client
let rep_topic_name = format!("rr{}Reply", ros2_name);
let rep_type_name = ros2_service_type_to_reply_dds_type(&ros2_type);
let dds_writer = create_dds_writer(
context.participant,
rep_topic_name,
rep_type_name,
true,
qos.clone(),
)?;
let old = rep_writer.swap(dds_writer, Ordering::Relaxed);
if old != DDS_ENTITY_NULL {
tracing::warn!(
"{route_id}: on activation their was already a DDS Reply Writer - overwrite it"
);
if let Err(e) = delete_dds_entity(old) {
tracing::warn!("{route_id}: failed to delete overwritten DDS Reply Writer: {e}");
}
}

// add writer's GID in ros_discovery_info message
context
.ros_discovery_mgr
.add_dds_writer(get_guid(&dds_writer)?);

// create DDS Reader to receive requests and route them to Zenoh
let req_topic_name = format!("rq{}Request", ros2_name);
let req_type_name = ros2_service_type_to_request_dds_type(&ros2_type);
let zquerier = zenoh_querier.clone();
let route_id2 = route_id.to_owned();
let dds_reader = create_dds_reader(
context.participant,
req_topic_name,
req_type_name,
&type_info,
true,
qos,
None,
move |sample| {
route_dds_request_to_zenoh(&route_id2, sample, &zquerier, dds_writer);
},
)?;
let old = req_reader.swap(dds_reader, Ordering::Relaxed);
if old != DDS_ENTITY_NULL {
tracing::warn!(
"{route_id}: on activation their was already a DDS Request Reader - overwrite it"
);
if let Err(e) = delete_dds_entity(old) {
tracing::warn!("{route_id}: failed to delete overwritten DDS Request Reader: {e}");
}
}

// add reader's GID in ros_discovery_info message
context
.ros_discovery_mgr
.add_dds_reader(get_guid(&dds_reader)?);

Ok(())
}

fn deactivate(
rep_writer: &Arc<AtomicDDSEntity>,
req_reader: &Arc<AtomicDDSEntity>,
route_id: &str,
ros_discovery_mgr: &Arc<RosDiscoveryInfoMgr>,
) {
tracing::debug!("{route_id}: Deactivate");
let req_reader = req_reader.swap(DDS_ENTITY_NULL, Ordering::Relaxed);
if req_reader != DDS_ENTITY_NULL {
// remove reader's GID from ros_discovery_info message
match get_guid(&req_reader) {
Ok(gid) => ros_discovery_mgr.remove_dds_reader(gid),
Err(e) => tracing::warn!("{route_id}: {e}"),
}
if let Err(e) = delete_dds_entity(req_reader) {
tracing::warn!("{route_id}: error deleting DDS Reader: {e}");
}
}
let rep_writer = rep_writer.swap(DDS_ENTITY_NULL, Ordering::Relaxed);
if rep_writer != DDS_ENTITY_NULL {
// remove writer's GID from ros_discovery_info message
match get_guid(&rep_writer) {
Ok(gid) => ros_discovery_mgr.remove_dds_writer(gid),
Err(e) => tracing::warn!("{route_id}: {e}"),
}
if let Err(e) = delete_dds_entity(rep_writer) {
tracing::warn!("{route_id}: error deleting DDS Writer: {e}");
}
}
}

fn route_dds_request_to_zenoh(
route_id: &str,
sample: &DDSRawSample,
Expand Down
7 changes: 1 addition & 6 deletions zenoh-plugin-ros2dds/src/route_service_srv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,7 @@ impl RouteServiceSrv {
.zsession
.declare_keyexpr(self.zenoh_key_expr.clone())
.await
.map_err(|e| {
format!(
"Route Publisher (ROS:{} -> Zenoh:{}): failed to declare KeyExpr: {e}",
self.ros2_name, self.zenoh_key_expr
)
})?;
.map_err(|e| format!("{self}: failed to declare KeyExpr: {e}"))?;

// create the zenoh Queryable
// if Reader is TRANSIENT_LOCAL, use a PublicationCache to store historical data
Expand Down

0 comments on commit 6c015fc

Please sign in to comment.