diff --git a/DEFAULT_CONFIG.json5 b/DEFAULT_CONFIG.json5 index 5b6f767..68b8f25 100644 --- a/DEFAULT_CONFIG.json5 +++ b/DEFAULT_CONFIG.json5 @@ -97,12 +97,24 @@ // reliable_routes_blocking: true, //// - //// queries_timeout: A duration in seconds (default: 5.0 sec) that will be used as a timeout when the bridge - //// queries any other remote bridge for discovery information and for historical data for TRANSIENT_LOCAL DDS Readers it serves - //// (i.e. if the query to the remote bridge exceed the timeout, some historical samples might be not routed to the Readers, - //// but the route will not be blocked forever). - //// - // queries_timeout: 5.0, + //// queries_timeout: Timeouts configuration for various Zenoh queries. + //// Each field is optional. If not set, A default value of 5.0 seconds applies. + //// Each value can be either a float in seconds that will apply as a timeout to all queries, + //// either a list of strings with format "=" where: + //// - "regex" is a regular expression matching an interface name + //// - "float" is the timeout in seconds + // queries_timeout: { + // //// timeouts for TRANSIENT_LOCAL subscriber when querying publishers for historical publications + // transient_local_subscribers: 1.0, + // //// timeouts for Service clients calling a Service server + // services: ["add_two_ints=0.5", ".*=1.0"], + // //// timeouts for Action clients calling an Action server (send_goal, cancel_goal and get_result services) + // actions: { + // send_goal: 1.0, + // cancel_goal: 1.0, + // get_result: [".*long_mission=3600", ".*short_action=10.0"], + // } + // } }, //// diff --git a/zenoh-plugin-ros2dds/src/config.rs b/zenoh-plugin-ros2dds/src/config.rs index b8ce98c..77adb47 100644 --- a/zenoh-plugin-ros2dds/src/config.rs +++ b/zenoh-plugin-ros2dds/src/config.rs @@ -23,9 +23,12 @@ pub const DEFAULT_NODENAME: &str = "zenoh_bridge_ros2dds"; pub const DEFAULT_DOMAIN: u32 = 0; pub const DEFAULT_RELIABLE_ROUTES_BLOCKING: bool = true; pub const DEFAULT_TRANSIENT_LOCAL_CACHE_MULTIPLIER: usize = 10; -pub const DEFAULT_QUERIES_TIMEOUT: f32 = 5.0; pub const DEFAULT_DDS_LOCALHOST_ONLY: bool = false; +lazy_static::lazy_static!( + pub static ref DEFAULT_QUERIES_TIMEOUT: Duration = Duration::from_secs_f32(5.0); +); + #[derive(Deserialize, Debug, Serialize)] #[serde(deny_unknown_fields)] pub struct Config { @@ -43,8 +46,8 @@ pub struct Config { pub allowance: Option, #[serde( default, - deserialize_with = "deserialize_max_frequencies", - serialize_with = "serialize_max_frequencies" + deserialize_with = "deserialize_vec_regex_f32", + serialize_with = "serialize_vec_regex_f32" )] pub pub_max_frequencies: Vec<(Regex, f32)>, #[serde(default)] @@ -52,11 +55,8 @@ pub struct Config { pub shm_enabled: bool, #[serde(default = "default_transient_local_cache_multiplier")] pub transient_local_cache_multiplier: usize, - #[serde( - default = "default_queries_timeout", - deserialize_with = "deserialize_duration" - )] - pub queries_timeout: Duration, + #[serde(default)] + pub queries_timeout: Option, #[serde(default = "default_reliable_routes_blocking")] pub reliable_routes_blocking: bool, #[serde(default)] @@ -65,6 +65,123 @@ pub struct Config { __path__: Vec, } +impl Config { + pub fn get_pub_max_frequencies(&self, ros2_name: &str) -> Option { + for (re, freq) in &self.pub_max_frequencies { + if re.is_match(ros2_name) { + return Some(*freq); + } + } + None + } + + pub fn get_queries_timeout_tl_sub(&self, ros2_name: &str) -> Duration { + if let Some(qt) = &self.queries_timeout { + for (re, secs) in &qt.transient_local_subscribers { + if re.is_match(ros2_name) { + return Duration::from_secs_f32(*secs); + } + } + } + *DEFAULT_QUERIES_TIMEOUT + } + + pub fn get_queries_timeout_service(&self, ros2_name: &str) -> Duration { + if let Some(qt) = &self.queries_timeout { + for (re, secs) in &qt.services { + if re.is_match(ros2_name) { + return Duration::from_secs_f32(*secs); + } + } + } + *DEFAULT_QUERIES_TIMEOUT + } + + pub fn get_queries_timeout_action_send_goal(&self, ros2_name: &str) -> Duration { + if let Some(QueriesTimeouts { + actions: Some(at), .. + }) = &self.queries_timeout + { + for (re, secs) in &at.send_goal { + if re.is_match(ros2_name) { + return Duration::from_secs_f32(*secs); + } + } + } + *DEFAULT_QUERIES_TIMEOUT + } + + pub fn get_queries_timeout_action_cancel_goal(&self, ros2_name: &str) -> Duration { + if let Some(QueriesTimeouts { + actions: Some(at), .. + }) = &self.queries_timeout + { + for (re, secs) in &at.cancel_goal { + if re.is_match(ros2_name) { + return Duration::from_secs_f32(*secs); + } + } + } + *DEFAULT_QUERIES_TIMEOUT + } + + pub fn get_queries_timeout_action_get_result(&self, ros2_name: &str) -> Duration { + if let Some(QueriesTimeouts { + actions: Some(at), .. + }) = &self.queries_timeout + { + for (re, secs) in &at.get_result { + if re.is_match(ros2_name) { + return Duration::from_secs_f32(*secs); + } + } + } + *DEFAULT_QUERIES_TIMEOUT + } +} + +#[derive(Deserialize, Debug, Serialize)] +#[serde(deny_unknown_fields)] +pub struct QueriesTimeouts { + #[serde( + default, + deserialize_with = "deserialize_vec_regex_f32", + serialize_with = "serialize_vec_regex_f32" + )] + transient_local_subscribers: Vec<(Regex, f32)>, + #[serde( + default, + deserialize_with = "deserialize_vec_regex_f32", + serialize_with = "serialize_vec_regex_f32" + )] + services: Vec<(Regex, f32)>, + #[serde(default)] + actions: Option, +} + +#[derive(Deserialize, Debug, Serialize)] +#[serde(deny_unknown_fields)] +pub struct ActionsTimeouts { + #[serde( + default, + deserialize_with = "deserialize_vec_regex_f32", + serialize_with = "serialize_vec_regex_f32" + )] + send_goal: Vec<(Regex, f32)>, + #[serde( + default, + deserialize_with = "deserialize_vec_regex_f32", + serialize_with = "serialize_vec_regex_f32" + )] + cancel_goal: Vec<(Regex, f32)>, + #[serde( + default, + deserialize_with = "deserialize_vec_regex_f32", + serialize_with = "serialize_vec_regex_f32" + )] + get_result: Vec<(Regex, f32)>, +} + #[derive(Deserialize, Debug, Serialize)] pub enum Allowance { #[serde(rename = "allow")] @@ -279,18 +396,6 @@ fn default_transient_local_cache_multiplier() -> usize { DEFAULT_TRANSIENT_LOCAL_CACHE_MULTIPLIER } -fn default_queries_timeout() -> Duration { - Duration::from_secs_f32(DEFAULT_QUERIES_TIMEOUT) -} - -fn deserialize_duration<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - let seconds: f32 = Deserialize::deserialize(deserializer)?; - Ok(Duration::from_secs_f32(seconds)) -} - fn serialize_regex(r: &Option, serializer: S) -> Result where S: Serializer, @@ -347,30 +452,44 @@ impl<'de> Visitor<'de> for RegexVisitor { } } -fn deserialize_max_frequencies<'de, D>(deserializer: D) -> Result, D::Error> +fn deserialize_vec_regex_f32<'de, D>(deserializer: D) -> Result, D::Error> where D: Deserializer<'de>, { - let strs: Vec = Deserialize::deserialize(deserializer)?; - let mut result: Vec<(Regex, f32)> = Vec::with_capacity(strs.len()); - for s in strs { - let i = s - .find('=') - .ok_or_else(|| de::Error::custom(format!("Invalid 'max_frequency': {s}")))?; - let regex = Regex::new(&s[0..i]).map_err(|e| { - de::Error::custom(format!("Invalid regex for 'max_frequency': '{s}': {e}")) - })?; - let frequency: f32 = s[i + 1..].parse().map_err(|e| { - de::Error::custom(format!( - "Invalid float value for 'max_frequency': '{s}': {e}" - )) - })?; - result.push((regex, frequency)); + #[derive(Deserialize)] + #[serde(untagged)] + enum AcceptedValues { + Float(f32), + List(Vec), + } + + let values: AcceptedValues = Deserialize::deserialize(deserializer).unwrap(); + match values { + AcceptedValues::Float(f) => { + // same float for any string (i.e. matching ".*") + Ok(vec![(Regex::new(".*").unwrap(), f)]) + } + AcceptedValues::List(strs) => { + let mut result: Vec<(Regex, f32)> = Vec::with_capacity(strs.len()); + for s in strs { + let i = s.find('=').ok_or_else(|| { + de::Error::custom(format!( + r#"Invalid list of "=" elements": {s}"# + )) + })?; + let regex = Regex::new(&s[0..i]) + .map_err(|e| de::Error::custom(format!("Invalid regex in '{s}': {e}")))?; + let frequency: f32 = s[i + 1..] + .parse() + .map_err(|e| de::Error::custom(format!("Invalid float value in '{s}': {e}")))?; + result.push((regex, frequency)); + } + Ok(result) + } } - Ok(result) } -fn serialize_max_frequencies(v: &Vec<(Regex, f32)>, serializer: S) -> Result +fn serialize_vec_regex_f32(v: &Vec<(Regex, f32)>, serializer: S) -> Result where S: Serializer, { @@ -382,6 +501,13 @@ where seq.end() } +pub fn serialize_duration_as_f32(d: &Duration, serializer: S) -> Result +where + S: Serializer, +{ + serializer.serialize_f32(d.as_secs_f32()) +} + mod tests { #[test] diff --git a/zenoh-plugin-ros2dds/src/lib.rs b/zenoh-plugin-ros2dds/src/lib.rs index 529a457..bbf3ac8 100644 --- a/zenoh-plugin-ros2dds/src/lib.rs +++ b/zenoh-plugin-ros2dds/src/lib.rs @@ -395,8 +395,8 @@ impl<'a> ROS2PluginRuntime<'a> { // New remote bridge detected (None, SampleKind::Put) => { log::info!("New ROS 2 bridge detected: {}", plugin_id); - // make all routes for a TRANSIENT_LOCAL Subscriber to query historical publications from this new plugin - routes_mgr.query_historical_all_publications(plugin_id).await; + // make each routes for a TRANSIENT_LOCAL Subscriber to query historical publications from this new plugin + routes_mgr.query_all_historical_publications(plugin_id).await; } // New remote bridge left (None, SampleKind::Delete) => log::info!("Remote ROS 2 bridge left: {}", plugin_id), diff --git a/zenoh-plugin-ros2dds/src/route_action_cli.rs b/zenoh-plugin-ros2dds/src/route_action_cli.rs index 27c7def..7d5eb58 100644 --- a/zenoh-plugin-ros2dds/src/route_action_cli.rs +++ b/zenoh-plugin-ros2dds/src/route_action_cli.rs @@ -75,29 +75,44 @@ impl RouteActionCli<'_> { zenoh_key_expr_prefix: OwnedKeyExpr, context: Context, ) -> Result, String> { + // configured queries timeout for calls to send_goal service + let send_goal_queries_timeout = context + .config + .get_queries_timeout_action_send_goal(&ros2_name); let route_send_goal = RouteServiceCli::create( format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_SEND_GOAL), format!("{ros2_type}_SendGoal"), &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_SEND_GOAL, &None, + send_goal_queries_timeout, context.clone(), ) .await?; + // configured queries timeout for calls to cancel_goal service + let cancel_goal_queries_timeout = context + .config + .get_queries_timeout_action_cancel_goal(&ros2_name); let route_cancel_goal = RouteServiceCli::create( format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_CANCEL_GOAL), ROS2_ACTION_CANCEL_GOAL_SRV_TYPE.to_string(), &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_CANCEL_GOAL, &None, + cancel_goal_queries_timeout, context.clone(), ) .await?; + // configured queries timeout for calls to get_result service + let get_result_queries_timeout = context + .config + .get_queries_timeout_action_get_result(&ros2_name); let route_get_result = RouteServiceCli::create( format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_GET_RESULT), format!("{ros2_type}_GetResult"), &zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_GET_RESULT, &None, + get_result_queries_timeout, context.clone(), ) .await?; diff --git a/zenoh-plugin-ros2dds/src/route_publisher.rs b/zenoh-plugin-ros2dds/src/route_publisher.rs index 3916426..8116b09 100644 --- a/zenoh-plugin-ros2dds/src/route_publisher.rs +++ b/zenoh-plugin-ros2dds/src/route_publisher.rs @@ -224,7 +224,6 @@ impl RoutePublisher<'_> { &dds_reader, &ros2_name, &ros2_type, - &zenoh_key_expr, &route_id, &context, keyless, @@ -379,14 +378,11 @@ where s.serialize_u64(zpub.cache_size as u64) } -// Return the read period if keyexpr matches one of the "pub_max_frequencies" option -fn get_read_period(config: &Config, ke: &keyexpr) -> Option { - for (re, freq) in &config.pub_max_frequencies { - if re.is_match(ke) { - return Some(Duration::from_secs_f32(1f32 / freq)); - } - } - None +// Return the read period if name matches one of the "pub_max_frequencies" option +fn get_read_period(config: &Config, ros2_name: &str) -> Option { + config + .get_pub_max_frequencies(ros2_name) + .map(|f| Duration::from_secs_f32(1f32 / f)) } #[allow(clippy::too_many_arguments)] @@ -394,7 +390,6 @@ fn activate_dds_reader( dds_reader: &Arc, ros2_name: &str, ros2_type: &str, - zenoh_key_expr: &OwnedKeyExpr, route_id: &str, context: &Context, keyless: bool, @@ -404,7 +399,7 @@ fn activate_dds_reader( ) -> Result<(), String> { let topic_name: String = format!("rt{}", ros2_name); let type_name = ros2_message_type_to_dds_type(ros2_type); - let read_period = get_read_period(&context.config, zenoh_key_expr); + let read_period = get_read_period(&context.config, ros2_name); // create matching DDS Reader that forwards data coming from DDS to Zenoh let reader = create_dds_reader( diff --git a/zenoh-plugin-ros2dds/src/route_service_cli.rs b/zenoh-plugin-ros2dds/src/route_service_cli.rs index 168799f..05d3366 100644 --- a/zenoh-plugin-ros2dds/src/route_service_cli.rs +++ b/zenoh-plugin-ros2dds/src/route_service_cli.rs @@ -15,6 +15,7 @@ use cyclors::dds_entity_t; use serde::Serialize; use std::sync::Arc; +use std::time::Duration; use std::{collections::HashSet, fmt}; use zenoh::buffers::{ZBuf, ZSlice}; use zenoh::liveliness::LivelinessToken; @@ -49,6 +50,8 @@ pub struct RouteServiceCli<'a> { // the context #[serde(skip)] context: Context, + #[serde(serialize_with = "crate::config::serialize_duration_as_f32")] + queries_timeout: Duration, is_active: bool, // the local DDS Reader receiving client's requests and routing them to Zenoh #[serde(serialize_with = "serialize_entity_guid")] @@ -104,6 +107,7 @@ impl RouteServiceCli<'_> { ros2_type: String, zenoh_key_expr: OwnedKeyExpr, type_info: &Option>, + queries_timeout: Duration, context: Context, ) -> Result, String> { log::debug!( @@ -154,7 +158,14 @@ impl RouteServiceCli<'_> { qos, None, move |sample| { - do_route_request(&route_id, sample, &zenoh_key_expr2, &zsession2, rep_writer); + do_route_request( + &route_id, + sample, + &zenoh_key_expr2, + &zsession2, + queries_timeout, + rep_writer, + ); }, )?; // add reader's GID in ros_discovery_info message @@ -167,6 +178,7 @@ impl RouteServiceCli<'_> { ros2_type, zenoh_key_expr, context, + queries_timeout, is_active: false, rep_writer, req_reader, @@ -268,6 +280,7 @@ fn do_route_request( sample: &DDSRawSample, zenoh_key_expr: &OwnedKeyExpr, zsession: &Arc, + queries_timeout: Duration, rep_writer: dds_entity_t, ) { // request payload is expected to be the Request type encoded as CDR, including a 4 bytes header, @@ -303,6 +316,7 @@ fn do_route_request( if let Err(e) = zsession .get(zenoh_key_expr) .with_value(zenoh_req_buf) + .timeout(queries_timeout) .callback(move |reply| do_route_reply(route_id2.clone(), reply, request_id, rep_writer)) .res_sync() { diff --git a/zenoh-plugin-ros2dds/src/route_subscriber.rs b/zenoh-plugin-ros2dds/src/route_subscriber.rs index 87c1861..6ae477b 100644 --- a/zenoh-plugin-ros2dds/src/route_subscriber.rs +++ b/zenoh-plugin-ros2dds/src/route_subscriber.rs @@ -62,6 +62,8 @@ pub struct RouteSubscriber<'a> { dds_writer: dds_entity_t, // if the Writer is TRANSIENT_LOCAL transient_local: bool, + // queries timeout for historical publication (if TRANSIENT_LOCAL) + queries_timeout: Duration, // if the topic is keyless #[serde(skip)] keyless: bool, @@ -113,6 +115,7 @@ impl RouteSubscriber<'_> { let topic_name = format!("rt{ros2_name}"); let type_name = ros2_message_type_to_dds_type(&ros2_type); + let queries_timeout = context.config.get_queries_timeout_tl_sub(&ros2_name); let dds_writer = create_dds_writer( context.participant, @@ -134,6 +137,7 @@ impl RouteSubscriber<'_> { zenoh_subscriber: None, dds_writer, transient_local, + queries_timeout, keyless, liveliness_token: None, remote_routes: HashSet::new(), @@ -165,7 +169,7 @@ impl RouteSubscriber<'_> { .allowed_origin(Locality::Remote) // Allow only remote publications to avoid loops .reliable() .querying() - .query_timeout(self.context.config.queries_timeout) + .query_timeout(self.queries_timeout) .query_selector(query_selector) .query_accept_replies(ReplyKeyExpr::Any) .res() @@ -223,11 +227,7 @@ impl RouteSubscriber<'_> { /// If this route uses a FetchingSubscriber, query for historical publications /// using the specified Selector. Otherwise, do nothing. - pub async fn query_historical_publications<'a>( - &mut self, - plugin_id: &keyexpr, - query_timeout: Duration, - ) { + pub async fn query_historical_publications<'a>(&mut self, plugin_id: &keyexpr) { if let Some(ZSubscriber::FetchingSubscriber(sub)) = &mut self.zenoh_subscriber { // query all PublicationCaches on "//" let query_selector: Selector = @@ -240,6 +240,7 @@ impl RouteSubscriber<'_> { .fetch({ let session = &self.context.zsession; let query_selector = query_selector.clone(); + let queries_timeout = self.queries_timeout; move |cb| { use zenoh_core::SyncResolve; session @@ -247,7 +248,7 @@ impl RouteSubscriber<'_> { .target(QueryTarget::All) .consolidation(ConsolidationMode::None) .accept_replies(ReplyKeyExpr::Any) - .timeout(query_timeout) + .timeout(queries_timeout) .callback(cb) .res_sync() } diff --git a/zenoh-plugin-ros2dds/src/routes_mgr.rs b/zenoh-plugin-ros2dds/src/routes_mgr.rs index bed8b06..3f3e744 100644 --- a/zenoh-plugin-ros2dds/src/routes_mgr.rs +++ b/zenoh-plugin-ros2dds/src/routes_mgr.rs @@ -525,11 +525,9 @@ impl<'a> RoutesMgr<'a> { Ok(()) } - pub async fn query_historical_all_publications(&mut self, plugin_id: &keyexpr) { + pub async fn query_all_historical_publications(&mut self, plugin_id: &keyexpr) { for route in self.routes_subscribers.values_mut() { - route - .query_historical_publications(plugin_id, self.context.config.queries_timeout) - .await; + route.query_historical_publications(plugin_id).await; } } @@ -652,12 +650,15 @@ impl<'a> RoutesMgr<'a> { Entry::Vacant(entry) => { // ROS2 topic name => Zenoh key expr : strip '/' prefix let zenoh_key_expr = ros2_name_to_key_expr(&ros2_name, &self.context.config); + // configured queries timeout for services calls + let queries_timeout = self.context.config.get_queries_timeout_service(&ros2_name); // create route let route = RouteServiceCli::create( ros2_name.clone(), ros2_type, zenoh_key_expr.clone(), &None, + queries_timeout, self.context.clone(), ) .await?;