diff --git a/zenoh-plugin-ros2dds/src/route_service_cli.rs b/zenoh-plugin-ros2dds/src/route_service_cli.rs index ba79266..0ac84cc 100644 --- a/zenoh-plugin-ros2dds/src/route_service_cli.rs +++ b/zenoh-plugin-ros2dds/src/route_service_cli.rs @@ -14,10 +14,12 @@ use cyclors::dds_entity_t; use serde::Serialize; +use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::time::Duration; use std::{collections::HashSet, fmt}; use zenoh::buffers::{ZBuf, ZSlice}; +use zenoh::handlers::{Callback, Dyn}; use zenoh::liveliness::LivelinessToken; use zenoh::prelude::r#async::AsyncResolve; use zenoh::prelude::*; @@ -319,15 +321,34 @@ fn route_dds_request_to_zenoh( ); } - let route_id2 = route_id.to_string(); if let Err(e) = zsession .get(zenoh_key_expr) .with_value(zenoh_req_buf) .with_attachment(request_id.as_attachment()) .allowed_destination(Locality::Remote) .timeout(query_timeout) - .callback(move |reply| { - route_zenoh_reply_to_dds(route_id2.clone(), reply, request_id, rep_writer) + .with({ + let route_id1: String = route_id.to_string(); + let route_id2 = route_id.to_string(); + let reply_received1 = Arc::new(AtomicBool::new(false)); + let reply_received2 = reply_received1.clone(); + CallbackPair { + callback: move |reply| { + if !reply_received1.swap(true, std::sync::atomic::Ordering::Relaxed) { + route_zenoh_reply_to_dds(&route_id1, reply, request_id, rep_writer) + } else { + log::warn!("{route_id1}: received more than 1 reply for request {request_id} - dropping the extra replies"); + } + }, + drop: move || { + if !reply_received2.load(std::sync::atomic::Ordering::Relaxed) { + // There is no way to send an error message as a reply to a ROS Service Client ! + // (sending an invalid message will make it crash...) + // We have no choice but to log the error and let the client hanging without reply, until a timeout (if set by the client) + log::warn!("{route_id2}: received NO reply for request {request_id} - cannot reply to client, it will hang until timeout"); + } + }, + } }) .res_sync() { @@ -335,8 +356,38 @@ fn route_dds_request_to_zenoh( } } +// TODO: remove and replace with Zenoh's CallbackPair when https://github.com/eclipse-zenoh/zenoh/pull/653 is available +struct CallbackPair +where + DropFn: FnMut() + Send + Sync + 'static, +{ + pub callback: Callback, + pub drop: DropFn, +} + +impl Drop for CallbackPair +where + DropCallback: FnMut() + Send + Sync + 'static, +{ + fn drop(&mut self) { + (self.drop)() + } +} + +impl<'a, OnEvent, Event, DropCallback> IntoCallbackReceiverPair<'a, Event> + for CallbackPair +where + OnEvent: Fn(Event) + Send + Sync + 'a, + DropCallback: FnMut() + Send + Sync + 'static, +{ + type Receiver = (); + fn into_cb_receiver_pair(self) -> (Callback<'a, Event>, Self::Receiver) { + (Dyn::from(move |x| (self.callback)(x)), ()) + } +} + fn route_zenoh_reply_to_dds( - route_id: String, + route_id: &str, reply: Reply, request_id: CddsRequestHeader, rep_writer: dds_entity_t,