Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve queries_timeout configuration #19

Merged
merged 1 commit into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,24 @@
// reliable_routes_blocking: true,

////
//// queries_timeout: A duration in seconds (default: 5.0 sec) that will be used as a timeout when the bridge
//// queries any other remote bridge for discovery information and for historical data for TRANSIENT_LOCAL DDS Readers it serves
//// (i.e. if the query to the remote bridge exceed the timeout, some historical samples might be not routed to the Readers,
//// but the route will not be blocked forever).
////
// queries_timeout: 5.0,
//// queries_timeout: Timeouts configuration for various Zenoh queries.
//// Each field is optional. If not set, A default value of 5.0 seconds applies.
//// Each value can be either a float in seconds that will apply as a timeout to all queries,
//// either a list of strings with format "<regex>=<float>" where:
//// - "regex" is a regular expression matching an interface name
//// - "float" is the timeout in seconds
// queries_timeout: {
// //// timeouts for TRANSIENT_LOCAL subscriber when querying publishers for historical publications
// transient_local_subscribers: 1.0,
// //// timeouts for Service clients calling a Service server
// services: ["add_two_ints=0.5", ".*=1.0"],
// //// timeouts for Action clients calling an Action server (send_goal, cancel_goal and get_result services)
// actions: {
// send_goal: 1.0,
// cancel_goal: 1.0,
// get_result: [".*long_mission=3600", ".*short_action=10.0"],
// }
// }
},

////
Expand Down
202 changes: 164 additions & 38 deletions zenoh-plugin-ros2dds/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ pub const DEFAULT_NODENAME: &str = "zenoh_bridge_ros2dds";
pub const DEFAULT_DOMAIN: u32 = 0;
pub const DEFAULT_RELIABLE_ROUTES_BLOCKING: bool = true;
pub const DEFAULT_TRANSIENT_LOCAL_CACHE_MULTIPLIER: usize = 10;
pub const DEFAULT_QUERIES_TIMEOUT: f32 = 5.0;
pub const DEFAULT_DDS_LOCALHOST_ONLY: bool = false;

lazy_static::lazy_static!(
pub static ref DEFAULT_QUERIES_TIMEOUT: Duration = Duration::from_secs_f32(5.0);
);

#[derive(Deserialize, Debug, Serialize)]
#[serde(deny_unknown_fields)]
pub struct Config {
Expand All @@ -43,20 +46,17 @@ pub struct Config {
pub allowance: Option<Allowance>,
#[serde(
default,
deserialize_with = "deserialize_max_frequencies",
serialize_with = "serialize_max_frequencies"
deserialize_with = "deserialize_vec_regex_f32",
serialize_with = "serialize_vec_regex_f32"
)]
pub pub_max_frequencies: Vec<(Regex, f32)>,
#[serde(default)]
#[cfg(feature = "dds_shm")]
pub shm_enabled: bool,
#[serde(default = "default_transient_local_cache_multiplier")]
pub transient_local_cache_multiplier: usize,
#[serde(
default = "default_queries_timeout",
deserialize_with = "deserialize_duration"
)]
pub queries_timeout: Duration,
#[serde(default)]
pub queries_timeout: Option<QueriesTimeouts>,
#[serde(default = "default_reliable_routes_blocking")]
pub reliable_routes_blocking: bool,
#[serde(default)]
Expand All @@ -65,6 +65,123 @@ pub struct Config {
__path__: Vec<String>,
}

impl Config {
pub fn get_pub_max_frequencies(&self, ros2_name: &str) -> Option<f32> {
for (re, freq) in &self.pub_max_frequencies {
if re.is_match(ros2_name) {
return Some(*freq);
}
}
None
}

pub fn get_queries_timeout_tl_sub(&self, ros2_name: &str) -> Duration {
if let Some(qt) = &self.queries_timeout {
for (re, secs) in &qt.transient_local_subscribers {
if re.is_match(ros2_name) {
return Duration::from_secs_f32(*secs);
}
}
}
*DEFAULT_QUERIES_TIMEOUT
}

pub fn get_queries_timeout_service(&self, ros2_name: &str) -> Duration {
if let Some(qt) = &self.queries_timeout {
for (re, secs) in &qt.services {
if re.is_match(ros2_name) {
return Duration::from_secs_f32(*secs);
}
}
}
*DEFAULT_QUERIES_TIMEOUT
}

pub fn get_queries_timeout_action_send_goal(&self, ros2_name: &str) -> Duration {
if let Some(QueriesTimeouts {
actions: Some(at), ..
}) = &self.queries_timeout
{
for (re, secs) in &at.send_goal {
if re.is_match(ros2_name) {
return Duration::from_secs_f32(*secs);
}
}
}
*DEFAULT_QUERIES_TIMEOUT
}

pub fn get_queries_timeout_action_cancel_goal(&self, ros2_name: &str) -> Duration {
if let Some(QueriesTimeouts {
actions: Some(at), ..
}) = &self.queries_timeout
{
for (re, secs) in &at.cancel_goal {
if re.is_match(ros2_name) {
return Duration::from_secs_f32(*secs);
}
}
}
*DEFAULT_QUERIES_TIMEOUT
}

pub fn get_queries_timeout_action_get_result(&self, ros2_name: &str) -> Duration {
if let Some(QueriesTimeouts {
actions: Some(at), ..
}) = &self.queries_timeout
{
for (re, secs) in &at.get_result {
if re.is_match(ros2_name) {
return Duration::from_secs_f32(*secs);
}
}
}
*DEFAULT_QUERIES_TIMEOUT
}
}

#[derive(Deserialize, Debug, Serialize)]
#[serde(deny_unknown_fields)]
pub struct QueriesTimeouts {
#[serde(
default,
deserialize_with = "deserialize_vec_regex_f32",
serialize_with = "serialize_vec_regex_f32"
)]
transient_local_subscribers: Vec<(Regex, f32)>,
#[serde(
default,
deserialize_with = "deserialize_vec_regex_f32",
serialize_with = "serialize_vec_regex_f32"
)]
services: Vec<(Regex, f32)>,
#[serde(default)]
actions: Option<ActionsTimeouts>,
}

#[derive(Deserialize, Debug, Serialize)]
#[serde(deny_unknown_fields)]
pub struct ActionsTimeouts {
#[serde(
default,
deserialize_with = "deserialize_vec_regex_f32",
serialize_with = "serialize_vec_regex_f32"
)]
send_goal: Vec<(Regex, f32)>,
#[serde(
default,
deserialize_with = "deserialize_vec_regex_f32",
serialize_with = "serialize_vec_regex_f32"
)]
cancel_goal: Vec<(Regex, f32)>,
#[serde(
default,
deserialize_with = "deserialize_vec_regex_f32",
serialize_with = "serialize_vec_regex_f32"
)]
get_result: Vec<(Regex, f32)>,
}

#[derive(Deserialize, Debug, Serialize)]
pub enum Allowance {
#[serde(rename = "allow")]
Expand Down Expand Up @@ -279,18 +396,6 @@ fn default_transient_local_cache_multiplier() -> usize {
DEFAULT_TRANSIENT_LOCAL_CACHE_MULTIPLIER
}

fn default_queries_timeout() -> Duration {
Duration::from_secs_f32(DEFAULT_QUERIES_TIMEOUT)
}

fn deserialize_duration<'de, D>(deserializer: D) -> Result<Duration, D::Error>
where
D: Deserializer<'de>,
{
let seconds: f32 = Deserialize::deserialize(deserializer)?;
Ok(Duration::from_secs_f32(seconds))
}

fn serialize_regex<S>(r: &Option<Regex>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
Expand Down Expand Up @@ -347,30 +452,44 @@ impl<'de> Visitor<'de> for RegexVisitor {
}
}

fn deserialize_max_frequencies<'de, D>(deserializer: D) -> Result<Vec<(Regex, f32)>, D::Error>
fn deserialize_vec_regex_f32<'de, D>(deserializer: D) -> Result<Vec<(Regex, f32)>, D::Error>
where
D: Deserializer<'de>,
{
let strs: Vec<String> = Deserialize::deserialize(deserializer)?;
let mut result: Vec<(Regex, f32)> = Vec::with_capacity(strs.len());
for s in strs {
let i = s
.find('=')
.ok_or_else(|| de::Error::custom(format!("Invalid 'max_frequency': {s}")))?;
let regex = Regex::new(&s[0..i]).map_err(|e| {
de::Error::custom(format!("Invalid regex for 'max_frequency': '{s}': {e}"))
})?;
let frequency: f32 = s[i + 1..].parse().map_err(|e| {
de::Error::custom(format!(
"Invalid float value for 'max_frequency': '{s}': {e}"
))
})?;
result.push((regex, frequency));
#[derive(Deserialize)]
#[serde(untagged)]
enum AcceptedValues {
Float(f32),
List(Vec<String>),
}

let values: AcceptedValues = Deserialize::deserialize(deserializer).unwrap();
match values {
AcceptedValues::Float(f) => {
// same float for any string (i.e. matching ".*")
Ok(vec![(Regex::new(".*").unwrap(), f)])
}
AcceptedValues::List(strs) => {
let mut result: Vec<(Regex, f32)> = Vec::with_capacity(strs.len());
for s in strs {
let i = s.find('=').ok_or_else(|| {
de::Error::custom(format!(
r#"Invalid list of "<regex>=<float>" elements": {s}"#
))
})?;
let regex = Regex::new(&s[0..i])
.map_err(|e| de::Error::custom(format!("Invalid regex in '{s}': {e}")))?;
let frequency: f32 = s[i + 1..]
.parse()
.map_err(|e| de::Error::custom(format!("Invalid float value in '{s}': {e}")))?;
result.push((regex, frequency));
}
Ok(result)
}
}
Ok(result)
}

fn serialize_max_frequencies<S>(v: &Vec<(Regex, f32)>, serializer: S) -> Result<S::Ok, S::Error>
fn serialize_vec_regex_f32<S>(v: &Vec<(Regex, f32)>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
Expand All @@ -382,6 +501,13 @@ where
seq.end()
}

pub fn serialize_duration_as_f32<S>(d: &Duration, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_f32(d.as_secs_f32())
}

mod tests {

#[test]
Expand Down
4 changes: 2 additions & 2 deletions zenoh-plugin-ros2dds/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,8 @@ impl<'a> ROS2PluginRuntime<'a> {
// New remote bridge detected
(None, SampleKind::Put) => {
log::info!("New ROS 2 bridge detected: {}", plugin_id);
// make all routes for a TRANSIENT_LOCAL Subscriber to query historical publications from this new plugin
routes_mgr.query_historical_all_publications(plugin_id).await;
// make each routes for a TRANSIENT_LOCAL Subscriber to query historical publications from this new plugin
routes_mgr.query_all_historical_publications(plugin_id).await;
}
// New remote bridge left
(None, SampleKind::Delete) => log::info!("Remote ROS 2 bridge left: {}", plugin_id),
Expand Down
15 changes: 15 additions & 0 deletions zenoh-plugin-ros2dds/src/route_action_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,29 +75,44 @@ impl RouteActionCli<'_> {
zenoh_key_expr_prefix: OwnedKeyExpr,
context: Context,
) -> Result<RouteActionCli<'a>, String> {
// configured queries timeout for calls to send_goal service
let send_goal_queries_timeout = context
.config
.get_queries_timeout_action_send_goal(&ros2_name);
let route_send_goal = RouteServiceCli::create(
format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_SEND_GOAL),
format!("{ros2_type}_SendGoal"),
&zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_SEND_GOAL,
&None,
send_goal_queries_timeout,
context.clone(),
)
.await?;

// configured queries timeout for calls to cancel_goal service
let cancel_goal_queries_timeout = context
.config
.get_queries_timeout_action_cancel_goal(&ros2_name);
let route_cancel_goal = RouteServiceCli::create(
format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_CANCEL_GOAL),
ROS2_ACTION_CANCEL_GOAL_SRV_TYPE.to_string(),
&zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_CANCEL_GOAL,
&None,
cancel_goal_queries_timeout,
context.clone(),
)
.await?;

// configured queries timeout for calls to get_result service
let get_result_queries_timeout = context
.config
.get_queries_timeout_action_get_result(&ros2_name);
let route_get_result = RouteServiceCli::create(
format!("{ros2_name}/{}", *KE_SUFFIX_ACTION_GET_RESULT),
format!("{ros2_type}_GetResult"),
&zenoh_key_expr_prefix / *KE_SUFFIX_ACTION_GET_RESULT,
&None,
get_result_queries_timeout,
context.clone(),
)
.await?;
Expand Down
17 changes: 6 additions & 11 deletions zenoh-plugin-ros2dds/src/route_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ impl RoutePublisher<'_> {
&dds_reader,
&ros2_name,
&ros2_type,
&zenoh_key_expr,
&route_id,
&context,
keyless,
Expand Down Expand Up @@ -379,22 +378,18 @@ where
s.serialize_u64(zpub.cache_size as u64)
}

// Return the read period if keyexpr matches one of the "pub_max_frequencies" option
fn get_read_period(config: &Config, ke: &keyexpr) -> Option<Duration> {
for (re, freq) in &config.pub_max_frequencies {
if re.is_match(ke) {
return Some(Duration::from_secs_f32(1f32 / freq));
}
}
None
// Return the read period if name matches one of the "pub_max_frequencies" option
fn get_read_period(config: &Config, ros2_name: &str) -> Option<Duration> {
config
.get_pub_max_frequencies(ros2_name)
.map(|f| Duration::from_secs_f32(1f32 / f))
}

#[allow(clippy::too_many_arguments)]
fn activate_dds_reader(
dds_reader: &Arc<AtomicDDSEntity>,
ros2_name: &str,
ros2_type: &str,
zenoh_key_expr: &OwnedKeyExpr,
route_id: &str,
context: &Context,
keyless: bool,
Expand All @@ -404,7 +399,7 @@ fn activate_dds_reader(
) -> Result<(), String> {
let topic_name: String = format!("rt{}", ros2_name);
let type_name = ros2_message_type_to_dds_type(ros2_type);
let read_period = get_read_period(&context.config, zenoh_key_expr);
let read_period = get_read_period(&context.config, ros2_name);

// create matching DDS Reader that forwards data coming from DDS to Zenoh
let reader = create_dds_reader(
Expand Down
Loading
Loading