Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use new Serialization and zenoh-ext API #279

Merged
merged 4 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 129 additions & 121 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 3 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,18 @@ serde_json = "1.0.114"
tokio = { version = "1.35.1", default-features = false } # Default features are disabled due to some crates' requirements
tracing = "0.1"
zenoh = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = [
"internal",
"internal_config",
"unstable",
"plugins",
"unstable",
] }
zenoh-config = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", default-features = false }
zenoh-ext = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = [
"unstable",
] }
zenoh-plugin-rest = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", default-features = false, features=["static_plugin"]}
zenoh-plugin-ros2dds = { version = "1.0.0-dev", path = "zenoh-plugin-ros2dds/", default-features = false }
zenoh-plugin-rest = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", default-features = false, features=["static_plugin"]}
zenoh-plugin-trait = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", default-features = false }


[profile.release]
codegen-units = 1
debug = false
Expand Down
6 changes: 3 additions & 3 deletions zenoh-plugin-ros2dds/src/discovered_entities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,10 +453,10 @@ impl DiscoveredEntities {
match self.get_entity_json_value(entity_ref) {
Ok(Some(v)) => {
let admin_keyexpr = admin_keyexpr_prefix / key_expr;
match ZBytes::try_from(v) {
Ok(payload) => {
match serde_json::to_vec(&v) {
Ok(bytes) => {
if let Err(e) = query
.reply(admin_keyexpr, payload)
.reply(admin_keyexpr, ZBytes::from(bytes))
.encoding(Encoding::APPLICATION_JSON)
.await
{
Expand Down
8 changes: 4 additions & 4 deletions zenoh-plugin-ros2dds/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,8 +601,8 @@ impl ROS2PluginRuntime {
async fn send_admin_reply(&self, query: &Query, key_expr: &keyexpr, admin_ref: &AdminRef) {
let z_bytes: ZBytes = match admin_ref {
AdminRef::Version => match serde_json::to_value(ROS2Plugin::PLUGIN_LONG_VERSION) {
Ok(v) => match ZBytes::try_from(v) {
Ok(value) => value,
Ok(v) => match serde_json::to_vec(&v) {
Ok(bytes) => ZBytes::from(bytes),
Err(e) => {
tracing::warn!("Error transforming JSON to ZBytes: {}", e);
return;
Expand All @@ -614,8 +614,8 @@ impl ROS2PluginRuntime {
}
},
AdminRef::Config => match serde_json::to_value(&*self.config) {
Ok(v) => match ZBytes::try_from(v) {
Ok(value) => value,
Ok(v) => match serde_json::to_vec(&v) {
Ok(bytes) => ZBytes::from(bytes),
Err(e) => {
tracing::warn!("Error transforming JSON to ZBytes: {}", e);
return;
Expand Down
12 changes: 9 additions & 3 deletions zenoh-plugin-ros2dds/src/ros2_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,16 +228,22 @@ impl CddsRequestHeader {
buf[16] = self.is_little_endian as u8;

hashmap.insert(ATTACHMENT_KEY_REQUEST_HEADER, buf);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why a hashmap was used... probably for ease of serialization via ZBytes::from_iter(hashmap.iter()).
But now that this ease is gone, I don't see the point of this hashmap. We could just do:

writer.append(ZBytes::from(ATTACHMENT_KEY_REQUEST_HEADER));
writer.append(ZBytes::from(buf));
writer.finish();

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed Hashmap, packing bytes directly.

ZBytes::from_iter(hashmap.iter())

let mut writer = ZBytes::writer();
for (k, v) in hashmap.iter() {
writer.append(ZBytes::from(k));
writer.append(ZBytes::from(v));
}
writer.finish()
}
}

impl TryFrom<&ZBytes> for CddsRequestHeader {
type Error = ZError;

fn try_from(value: &ZBytes) -> Result<Self, Self::Error> {
let hashmap: HashMap<[u8; 3], [u8; 17]> =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On receiving side, we expect the the attachment value to contain only:

  • the ATTACHMENT_KEY_REQUEST_HEADER
  • the header (16 bytes)

Rather than using a hashmap, we could just do:

  • try to read len(ATTACHMENT_KEY_REQUEST_HEADER) bytes (return Err if fail)
  • check if those bytes are equal to ATTACHMENT_KEY_REQUEST_HEADER (return Err if fail)
  • try to read 16 bytes (return Err if fail)

Copy link
Member Author

@Charles-Schleich Charles-Schleich Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading Bytes from slice, failing if not enough bytes are found or if bytes do not match ATTACHMENT_KEY_REQUEST_HEADER values.

HashMap::from_iter(value.iter::<([u8; 3], [u8; 17])>().map(Result::unwrap));
let hashmap: HashMap<[u8; 3], [u8; 17]> = zenoh_ext::z_deserialize(value).unwrap();

match hashmap.get(&ATTACHMENT_KEY_REQUEST_HEADER) {
Some(buf) => {
if buf.len() == 17 {
Expand Down
5 changes: 3 additions & 2 deletions zenoh-plugin-ros2dds/src/route_service_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ fn route_dds_request_to_zenoh(
// https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/serdata.hpp#L73

let z_bytes: ZBytes = sample.into();
let slice: ZSlice = z_bytes.into();
let slice: ZSlice = ZBuf::from(z_bytes).to_zslice();

// Decompose the slice into 3 sub-slices (4 bytes header, 16 bytes request_id and payload)
let (payload, request_id, header) = match (
Expand All @@ -376,6 +376,7 @@ fn route_dds_request_to_zenoh(

// route request buffer stripped from request_id
let mut zenoh_req_buf = ZBuf::empty();

zenoh_req_buf.push_zslice(header);
zenoh_req_buf.push_zslice(payload);

Expand Down Expand Up @@ -431,7 +432,7 @@ fn route_zenoh_reply_to_dds(
) {
match reply.result() {
Ok(sample) => {
let zenoh_rep_buf = sample.payload().into::<Vec<u8>>();
let zenoh_rep_buf = sample.payload().to_bytes();
if zenoh_rep_buf.len() < 4 || zenoh_rep_buf[1] > 1 {
tracing::warn!(
"{route_id}: received invalid reply from Zenoh for {request_id}: {zenoh_rep_buf:0x?}"
Expand Down
6 changes: 3 additions & 3 deletions zenoh-plugin-ros2dds/src/route_service_srv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ fn route_zenoh_request_to_dds(
// if any and if long enoough it shall be the Request type encoded as CDR (including 4 bytes header)
let is_little_endian = match query.payload() {
Some(value) if value.len() > 4 => {
is_cdr_little_endian(value.into::<ZSlice>().as_ref()).unwrap_or(true)
is_cdr_little_endian(value.to_bytes().as_ref()).unwrap_or(true)
}
_ => true,
};
Expand All @@ -383,7 +383,7 @@ fn route_zenoh_request_to_dds(
// https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/serdata.hpp#L73
let dds_req_buf = if let Some(value) = query.payload() {
// The query comes with some payload. It's expected to be the Request type encoded as CDR (including 4 bytes header)
let zenoh_req_buf = value.into::<Vec<u8>>();
let zenoh_req_buf = value.to_bytes();
if zenoh_req_buf.len() < 4 || zenoh_req_buf[1] > 1 {
tracing::warn!("{route_id}: received invalid request: {zenoh_req_buf:0x?}");
return;
Expand Down Expand Up @@ -442,7 +442,7 @@ fn route_dds_reply_to_zenoh(
// https://github.com/ros2/rmw_cyclonedds/blob/2263814fab142ac19dd3395971fb1f358d22a653/rmw_cyclonedds_cpp/src/serdata.hpp#L73

let z_bytes: ZBytes = sample.into();
let slice: ZSlice = z_bytes.into();
let slice: ZSlice = ZBuf::from(z_bytes).to_zslice();

// Decompose the slice into 3 sub-slices (4 bytes header, 16 bytes request_id and payload)
let (payload, request_id, header) = match (
Expand Down
2 changes: 1 addition & 1 deletion zenoh-plugin-ros2dds/src/route_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ fn route_zenoh_message_to_dds(s: Sample, ros2_name: &str, data_writer: dds_entit
}

unsafe {
let bs = s.payload().into();
let bs = s.payload().to_bytes().to_vec();
// As per the Vec documentation (see https://doc.rust-lang.org/std/vec/struct.Vec.html#method.into_raw_parts)
// the only way to correctly releasing it is to create a vec using from_raw_parts
// and then have its destructor do the cleanup.
Expand Down
6 changes: 3 additions & 3 deletions zenoh-plugin-ros2dds/src/routes_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,10 +792,10 @@ impl<'a> RoutesMgr {
match self.get_entity_json_value(route_ref) {
Ok(Some(v)) => {
let admin_keyexpr = &self.admin_prefix / key_expr;
match ZBytes::try_from(v) {
Ok(payload) => {
match serde_json::to_vec(&v) {
Ok(bytes) => {
if let Err(e) = query
.reply(admin_keyexpr, payload)
.reply(admin_keyexpr, ZBytes::from(bytes))
.encoding(Encoding::APPLICATION_JSON)
.await
{
Expand Down