Skip to content

Commit

Permalink
Add pub_priorities configuration (#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
JEnoch authored Feb 9, 2024
1 parent d5d6a95 commit cc84c30
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 0 deletions.
10 changes: 10 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@
//// if publication rate is higher, downsampling will occur when routing.
// pub_max_frequencies: [".*/laser_scan=5", "/tf=10"],

////
//// pub_priorities: Specify a list of priorities of publications routing over zenoh for a set of Publishers.
//// In case of high traffic, the publications with higher priorities will overtake
//// the publications with lower priorities in Zenoh publication queues.
//// The strings must have the format "<regex>=<integer>":
//// - "regex" is a regular expression matching a Publisher interface name
//// - "integer" is a priority value in the range [1-7]. Highest priority is 1, lowest is 7 and default is 5.
//// (see Zenoh Priority definition here: https://docs.rs/zenoh/latest/zenoh/publication/enum.Priority.html)
////
// pub_priorities: ["/pose=2", "/rosout=7"],

////
//// reliable_routes_blocking: When true, the publications from a RELIABLE DDS Writer will be
Expand Down
49 changes: 49 additions & 0 deletions zenoh-plugin-ros2dds/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ pub struct Config {
pub queries_timeout: Option<QueriesTimeouts>,
#[serde(default = "default_reliable_routes_blocking")]
pub reliable_routes_blocking: bool,
#[serde(
default,
deserialize_with = "deserialize_vec_regex_prio",
serialize_with = "serialize_vec_regex_prio"
)]
pub pub_priorities: Vec<(Regex, Priority)>,
__required__: Option<bool>,
#[serde(default, deserialize_with = "deserialize_path")]
__path__: Option<Vec<String>>,
Expand All @@ -71,6 +77,15 @@ impl Config {
None
}

pub fn get_pub_priorities(&self, ros2_name: &str) -> Option<Priority> {
for (re, p) in &self.pub_priorities {
if re.is_match(ros2_name) {
return Some(*p);
}
}
None
}

pub fn get_queries_timeout_tl_sub(&self, ros2_name: &str) -> Duration {
if let Some(qt) = &self.queries_timeout {
for (re, secs) in &qt.transient_local_subscribers {
Expand Down Expand Up @@ -543,6 +558,40 @@ where
seq.end()
}

fn deserialize_vec_regex_prio<'de, D>(deserializer: D) -> Result<Vec<(Regex, Priority)>, D::Error>
where
D: Deserializer<'de>,
{
let strs: Vec<String> = Deserialize::deserialize(deserializer).unwrap();
let mut result: Vec<(Regex, Priority)> = Vec::with_capacity(strs.len());
for s in strs {
let i = s.find('=').ok_or_else(|| {
de::Error::custom(format!(r#"Invalid list of "<regex>=<int>" elements": {s}"#))
})?;
let regex = Regex::new(&s[0..i])
.map_err(|e| de::Error::custom(format!("Invalid regex in '{s}': {e}")))?;
let i: u8 = s[i + 1..].parse().map_err(|e| {
de::Error::custom(format!("Invalid priority (not an integer) in '{s}': {e}"))
})?;
let priority = Priority::try_from(i)
.map_err(|e| de::Error::custom(format!("Invalid priority in '{s}': {e}")))?;
result.push((regex, priority));
}
Ok(result)
}

fn serialize_vec_regex_prio<S>(v: &Vec<(Regex, Priority)>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut seq = serializer.serialize_seq(Some(v.len()))?;
for (r, p) in v {
let s = format!("{}={}", r.as_str(), *p as u8);
seq.serialize_element(&s)?;
}
seq.end()
}

pub fn serialize_duration_as_f32<S>(d: &Duration, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
Expand Down
18 changes: 18 additions & 0 deletions zenoh-plugin-ros2dds/src/route_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ pub struct RoutePublisher<'a> {
// the local DDS Reader created to serve the route (i.e. re-publish to zenoh message coming from DDS)
#[serde(serialize_with = "serialize_atomic_entity_guid")]
dds_reader: Arc<AtomicDDSEntity>,
// the Zenoh Priority for publications
#[serde(serialize_with = "serialize_priority")]
priority: Priority,
// TypeInfo for Reader creation (if available)
#[serde(skip)]
_type_info: Option<Arc<TypeInfo>>,
Expand Down Expand Up @@ -189,11 +192,18 @@ impl RoutePublisher<'_> {
_ => CongestionControl::Drop,
};

// Priority if configured for this topic
let priority = context
.config
.get_pub_priorities(&ros2_name)
.unwrap_or_default();

let publisher: Arc<Publisher<'static>> = context
.zsession
.declare_publisher(zenoh_key_expr.clone())
.allowed_destination(Locality::Remote)
.congestion_control(congestion_ctrl)
.priority(priority)
.res_async()
.await
.map_err(|e| format!("Failed create Publisher for key {zenoh_key_expr}: {e}",))?
Expand Down Expand Up @@ -260,6 +270,7 @@ impl RoutePublisher<'_> {
cache_size,
},
dds_reader,
priority,
_type_info: type_info.clone(),
_reader_qos: reader_qos,
keyless,
Expand Down Expand Up @@ -379,6 +390,13 @@ where
s.serialize_u64(zpub.cache_size as u64)
}

fn serialize_priority<S>(p: &Priority, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
s.serialize_u8(*p as u8)
}

// Return the read period if name matches one of the "pub_max_frequencies" option
fn get_read_period(config: &Config, ros2_name: &str) -> Option<Duration> {
config
Expand Down

0 comments on commit cc84c30

Please sign in to comment.