Skip to content

Commit

Permalink
Add Actions routing (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
JEnoch authored Oct 3, 2023
1 parent d830809 commit cbc234b
Show file tree
Hide file tree
Showing 15 changed files with 1,295 additions and 265 deletions.
2 changes: 1 addition & 1 deletion zenoh-plugin-ros2dds/src/discovered_entities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ impl DiscoveredEntities {
) {
match self.get_entity_json_value(entity_ref) {
Ok(Some(v)) => {
let admin_keyexpr = admin_keyexpr_prefix / &key_expr;
let admin_keyexpr = admin_keyexpr_prefix / key_expr;
if let Err(e) = query
.reply(Ok(Sample::new(admin_keyexpr, v)))
.res_async()
Expand Down
4 changes: 2 additions & 2 deletions zenoh-plugin-ros2dds/src/gid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl From<[u8; 16]> for Gid {

impl From<&[u8; 16]> for Gid {
fn from(key: &[u8; 16]) -> Self {
Self(key.clone())
Self(*key)
}
}

Expand All @@ -54,7 +54,7 @@ impl Serialize for Gid {
{
if serializer.is_human_readable() {
// serialize as an hexadecimal String
Serialize::serialize(&hex::encode(&self.0), serializer)
Serialize::serialize(&hex::encode(self.0), serializer)
} else {
// serialize as a little-endian [u8; 16]
Serialize::serialize(&self.0, serializer)
Expand Down
39 changes: 34 additions & 5 deletions zenoh-plugin-ros2dds/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ mod node_info;
mod qos_helpers;
mod ros2_utils;
mod ros_discovery;
mod route_action_cli;
mod route_action_srv;
mod route_publisher;
mod route_service_cli;
mod route_service_srv;
Expand All @@ -57,10 +59,7 @@ use config::Config;
use crate::dds_utils::get_guid;
use crate::discovery_mgr::DiscoveryMgr;
use crate::events::ROS2DiscoveryEvent;
use crate::liveliness_mgt::{
ke_liveliness_all, ke_liveliness_plugin, parse_ke_liveliness_pub,
parse_ke_liveliness_service_cli, parse_ke_liveliness_service_srv, parse_ke_liveliness_sub,
};
use crate::liveliness_mgt::*;
use crate::ros_discovery::RosDiscoveryInfoMgr;
use crate::routes_mgr::RoutesMgr;

Expand Down Expand Up @@ -387,7 +386,7 @@ impl<'a> ROS2PluginRuntime<'a> {
log::warn!("Error updating route: {e}");
}
} else {
log::info!("{evt} - Denied per config");
log::debug!("{evt} - Denied per config");
}
}
Err(e) => log::error!("Internal Error: received from DiscoveryMgr: {e}")
Expand Down Expand Up @@ -524,6 +523,36 @@ impl<'a> ROS2PluginRuntime<'a> {
plugin_id,
zenoh_key_expr,
}),
("AS/", SampleKind::Put) => parse_ke_liveliness_action_srv(liveliness_ke)
.map_err(|e| format!("Received invalid liveliness token: {e}"))
.map(
|(plugin_id, zenoh_key_expr, ros2_type)| AnnouncedActionSrv {
plugin_id,
zenoh_key_expr,
ros2_type,
},
),
("AS/", SampleKind::Delete) => parse_ke_liveliness_action_srv(liveliness_ke)
.map_err(|e| format!("Received invalid liveliness token: {e}"))
.map(|(plugin_id, zenoh_key_expr, ..)| RetiredActionSrv {
plugin_id,
zenoh_key_expr,
}),
("AC/", SampleKind::Put) => parse_ke_liveliness_action_cli(liveliness_ke)
.map_err(|e| format!("Received invalid liveliness token: {e}"))
.map(
|(plugin_id, zenoh_key_expr, ros2_type)| AnnouncedActionCli {
plugin_id,
zenoh_key_expr,
ros2_type,
},
),
("AC/", SampleKind::Delete) => parse_ke_liveliness_action_cli(liveliness_ke)
.map_err(|e| format!("Received invalid liveliness token: {e}"))
.map(|(plugin_id, zenoh_key_expr, ..)| RetiredActionCli {
plugin_id,
zenoh_key_expr,
}),
_ => Err(format!("invalid ROS2 interface kind: {iface_kind}")),
}
}
Expand Down
85 changes: 75 additions & 10 deletions zenoh-plugin-ros2dds/src/liveliness_mgt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ zenoh::kedefine!(
pub(crate) ke_liveliness_sub: "@ros2_lv/${plugin_id:*}/MS/${ke:*}/${typ:*}/${qos_ke:*}",
pub(crate) ke_liveliness_service_srv: "@ros2_lv/${plugin_id:*}/SS/${ke:*}/${typ:*}",
pub(crate) ke_liveliness_service_cli: "@ros2_lv/${plugin_id:*}/SC/${ke:*}/${typ:*}",
pub(crate) ke_liveliness_action_srv: "@ros2_lv/${plugin_id:*}/AS/${ke:*}/${typ:*}",
pub(crate) ke_liveliness_action_cli: "@ros2_lv/${plugin_id:*}/AC/${ke:*}/${typ:*}",
);

pub(crate) fn new_ke_liveliness_pub(
Expand Down Expand Up @@ -55,16 +57,16 @@ pub(crate) fn parse_ke_liveliness_pub(
.ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no plugin_id"))?;
let zenoh_key_expr = parsed
.ke()
.map(|ke| unescape_slashes(ke))
.map(unescape_slashes)
.ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no ke"))?;
let ros2_type = parsed
.typ()
.map(|ke| unescape_slashes(ke))
.map(unescape_slashes)
.ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no typ"))?;
let (keyless, qos) = parsed
.qos_ke()
.ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no typ"))
.and_then(|ke| key_expr_to_qos(ke))
.and_then(key_expr_to_qos)
.map_err(|e| format!("failed to parse liveliness keyexpr {ke}: {e}"))?;
Ok((
plugin_id,
Expand Down Expand Up @@ -100,16 +102,16 @@ pub(crate) fn parse_ke_liveliness_sub(
.ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no plugin_id"))?;
let zenoh_key_expr = parsed
.ke()
.map(|ke| unescape_slashes(ke))
.map(unescape_slashes)
.ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no ke"))?;
let ros2_type = parsed
.typ()
.map(|ke| unescape_slashes(ke))
.map(unescape_slashes)
.ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no typ"))?;
let (keyless, qos) = parsed
.qos_ke()
.ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no typ"))
.and_then(|ke| key_expr_to_qos(ke))
.and_then(key_expr_to_qos)
.map_err(|e| format!("failed to parse liveliness keyexpr {ke}: {e}"))?;
Ok((
plugin_id,
Expand Down Expand Up @@ -142,11 +144,11 @@ pub(crate) fn parse_ke_liveliness_service_srv(
.ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no plugin_id"))?;
let zenoh_key_expr = parsed
.ke()
.map(|ke| unescape_slashes(ke))
.map(unescape_slashes)
.ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no ke"))?;
let ros2_type = parsed
.typ()
.map(|ke| unescape_slashes(ke))
.map(unescape_slashes)
.ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no typ"))?;
Ok((plugin_id, zenoh_key_expr, ros2_type.to_string()))
}
Expand All @@ -173,11 +175,74 @@ pub(crate) fn parse_ke_liveliness_service_cli(
.ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no plugin_id"))?;
let zenoh_key_expr = parsed
.ke()
.map(|ke| unescape_slashes(ke))
.map(unescape_slashes)
.ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no ke"))?;
let ros2_type = parsed
.typ()
.map(|ke| unescape_slashes(ke))
.map(unescape_slashes)
.ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no typ"))?;
Ok((plugin_id, zenoh_key_expr, ros2_type.to_string()))
}

/////////
pub(crate) fn new_ke_liveliness_action_srv(
plugin_id: &keyexpr,
zenoh_key_expr: &keyexpr,
ros2_type: &str,
) -> Result<OwnedKeyExpr, String> {
let ke = escape_slashes(zenoh_key_expr);
let typ = escape_slashes(ros2_type);
zenoh::keformat!(ke_liveliness_action_srv::formatter(), plugin_id, ke, typ)
.map_err(|e| e.to_string())
}

pub(crate) fn parse_ke_liveliness_action_srv(
ke: &keyexpr,
) -> Result<(OwnedKeyExpr, OwnedKeyExpr, String), String> {
let parsed = ke_liveliness_action_srv::parse(ke)
.map_err(|e| format!("failed to parse liveliness keyexpr {ke}: {e}"))?;
let plugin_id = parsed
.plugin_id()
.map(ToOwned::to_owned)
.ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no plugin_id"))?;
let zenoh_key_expr = parsed
.ke()
.map(unescape_slashes)
.ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no ke"))?;
let ros2_type = parsed
.typ()
.map(unescape_slashes)
.ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no typ"))?;
Ok((plugin_id, zenoh_key_expr, ros2_type.to_string()))
}

pub(crate) fn new_ke_liveliness_action_cli(
plugin_id: &keyexpr,
zenoh_key_expr: &keyexpr,
ros2_type: &str,
) -> Result<OwnedKeyExpr, String> {
let ke = escape_slashes(zenoh_key_expr);
let typ = escape_slashes(ros2_type);
zenoh::keformat!(ke_liveliness_action_cli::formatter(), plugin_id, ke, typ)
.map_err(|e| e.to_string())
}

pub(crate) fn parse_ke_liveliness_action_cli(
ke: &keyexpr,
) -> Result<(OwnedKeyExpr, OwnedKeyExpr, String), String> {
let parsed = ke_liveliness_action_cli::parse(ke)
.map_err(|e| format!("failed to parse liveliness keyexpr {ke}: {e}"))?;
let plugin_id = parsed
.plugin_id()
.map(ToOwned::to_owned)
.ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no plugin_id"))?;
let zenoh_key_expr = parsed
.ke()
.map(unescape_slashes)
.ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no ke"))?;
let ros2_type = parsed
.typ()
.map(unescape_slashes)
.ok_or_else(|| format!("failed to parse liveliness keyexpr {ke}: no typ"))?;
Ok((plugin_id, zenoh_key_expr, ros2_type.to_string()))
}
Expand Down
11 changes: 0 additions & 11 deletions zenoh-plugin-ros2dds/src/node_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,11 +447,6 @@ impl NodeInfo {
})
}

#[inline]
pub fn id(&self) -> &str {
&self.id
}

#[inline]
pub fn fullname(&self) -> &str {
&self.id[self.fullname.clone()]
Expand All @@ -472,12 +467,6 @@ impl NodeInfo {
ke_for_sure!(&self.id)
}

#[inline]
pub fn fullname_as_keyexpr(&self) -> &keyexpr {
// fullname always start with '/' - remove it
ke_for_sure!(&self.fullname()[1..])
}

pub fn update_with_reader(&mut self, entity: &DdsEntity) -> Option<ROS2DiscoveryEvent> {
let topic_prefix = &entity.topic_name[..3];
let topic_suffix = &entity.topic_name[2..];
Expand Down
18 changes: 0 additions & 18 deletions zenoh-plugin-ros2dds/src/qos_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,6 @@ pub fn get_durability_service_or_default(qos: &Qos) -> DurabilityService {
}
}

pub fn partition_is_empty(partition: &Option<Vec<String>>) -> bool {
partition
.as_ref()
.map_or(true, |partition| partition.is_empty())
}

pub fn partition_contains(partition: &Option<Vec<String>>, name: &String) -> bool {
partition
.as_ref()
.map_or(false, |partition| partition.contains(name))
}

pub fn is_writer_reliable(reliability: &Option<Reliability>) -> bool {
reliability.as_ref().map_or(true, |reliability| {
reliability.kind == ReliabilityKind::RELIABLE
})
}

pub fn is_reader_reliable(reliability: &Option<Reliability>) -> bool {
reliability.as_ref().map_or(false, |reliability| {
reliability.kind == ReliabilityKind::RELIABLE
Expand Down
Loading

0 comments on commit cbc234b

Please sign in to comment.