diff --git a/zenoh-test-ros2dds/Cargo.toml b/zenoh-test-ros2dds/Cargo.toml index 7e35d71..344c494 100644 --- a/zenoh-test-ros2dds/Cargo.toml +++ b/zenoh-test-ros2dds/Cargo.toml @@ -20,6 +20,7 @@ cdr = "0.2.4" futures = "0.3.26" r2r = "0.9" serde = "1.0.154" +serde_derive = "1.0.154" serde_json = "1.0.114" tokio = { version = "1.35.1", default-features = false } # Default features are disabled due to some crates' requirements zenoh = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = [ diff --git a/zenoh-test-ros2dds/tests/common.rs b/zenoh-test-ros2dds/tests/common.rs new file mode 100644 index 0000000..9ffbc97 --- /dev/null +++ b/zenoh-test-ros2dds/tests/common.rs @@ -0,0 +1,42 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use zenoh::{ + config::Config, + internal::{plugins::PluginsManager, runtime::RuntimeBuilder}, +}; +use zenoh_config::ModeDependentValue; + +pub fn init_env() { + std::env::set_var("RMW_IMPLEMENTATION", "rmw_cyclonedds_cpp"); +} + +pub async fn create_bridge() { + let mut plugins_mgr = PluginsManager::static_plugins_only(); + plugins_mgr.declare_static_plugin::("ros2dds", true); + let mut config = Config::default(); + config.insert_json5("plugins/ros2dds", "{}").unwrap(); + config + .timestamping + .set_enabled(Some(ModeDependentValue::Unique(true))) + .unwrap(); + config.adminspace.set_enabled(true).unwrap(); + config.plugins_loading.set_enabled(true).unwrap(); + let mut runtime = RuntimeBuilder::new(config) + .plugins_manager(plugins_mgr) + .build() + .await + .unwrap(); + runtime.start().await.unwrap(); +} diff --git a/zenoh-test-ros2dds/tests/service.rs b/zenoh-test-ros2dds/tests/service.rs new file mode 100644 index 0000000..11118fa --- /dev/null +++ b/zenoh-test-ros2dds/tests/service.rs @@ -0,0 +1,150 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +pub mod common; + +use std::time::Duration; + +use futures::StreamExt; +use r2r::{self, QosProfile}; +use serde_derive::{Deserialize, Serialize}; +use tokio::sync::oneshot; +use zenoh::Wait; + +// The test service +const TEST_SERVICE_Z2R: &str = "test_service_z2r"; +const TEST_SERVICE_R2Z: &str = "test_service_r2Z"; + +#[derive(Serialize, Deserialize, PartialEq, Clone)] +struct AddTwoIntsRequest { + a: i64, + b: i64, +} +#[derive(Serialize, Deserialize, PartialEq, Clone)] +struct AddTwoIntsReply { + sum: i64, +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_ros_client_zenoh_service() { + common::init_env(); + // Create zenoh-bridge-ros2dds + tokio::spawn(common::create_bridge()); + + let a = 1; + let b = 2; + + // Zenoh service + let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + let _queryable = session + .declare_queryable(TEST_SERVICE_R2Z) + .callback(|query| { + let request: AddTwoIntsRequest = + cdr::deserialize(&query.payload().unwrap().to_bytes()).unwrap(); + let response = AddTwoIntsReply { + sum: request.a + request.b, + }; + let data = cdr::serialize::<_, _, cdr::CdrLe>(&response, cdr::Infinite).unwrap(); + query.reply(TEST_SERVICE_R2Z, data).wait().unwrap(); + }) + .await + .unwrap(); + + // ROS client + let ctx = r2r::Context::create().unwrap(); + let mut node = r2r::Node::create(ctx, "ros_client", "").unwrap(); + let client = node + .create_client::( + &format!("/{}", TEST_SERVICE_R2Z), + QosProfile::default(), + ) + .unwrap(); + + // Node spin + let (term_tx, mut term_rx) = oneshot::channel(); + let _handler = tokio::task::spawn_blocking(move || { + while term_rx.try_recv().is_err() { + node.spin_once(std::time::Duration::from_millis(100)); + } + }); + + // Wait for the environment to be ready + tokio::time::sleep(Duration::from_secs(1)).await; + + // Send the request and then process the response + let my_req = r2r::example_interfaces::srv::AddTwoInts::Request { a, b }; + let resp = client.request(&my_req).unwrap().await.unwrap(); + + assert_eq!(resp.sum, a + b); + + term_tx.send(()).unwrap(); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_zenoh_client_ros_service() { + common::init_env(); + // Create zenoh-bridge-ros2dds + tokio::spawn(common::create_bridge()); + + let a = 1; + let b = 2; + + // ROS service + let ctx = r2r::Context::create().unwrap(); + let mut node = r2r::Node::create(ctx, "ros_service", "").unwrap(); + let mut service = node + .create_service::( + &format!("/{}", TEST_SERVICE_Z2R), + QosProfile::default(), + ) + .unwrap(); + // Processing the requests and send back responses + tokio::spawn(async move { + while let Some(req) = service.next().await { + let resp = r2r::example_interfaces::srv::AddTwoInts::Response { + sum: req.message.a + req.message.b, + }; + req.respond(resp).unwrap(); + } + }); + + // Node spin + let (term_tx, mut term_rx) = oneshot::channel(); + let _handler = tokio::task::spawn_blocking(move || { + while term_rx.try_recv().is_err() { + node.spin_once(std::time::Duration::from_millis(100)); + } + }); + + // Zenoh client + let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + let client = session.declare_querier(TEST_SERVICE_Z2R).await.unwrap(); + + // Wait for the environment to be ready + tokio::time::sleep(Duration::from_secs(1)).await; + + // Send request to ROS service + let req = r2r::example_interfaces::srv::AddTwoInts::Request { a, b }; + let buf = cdr::serialize::<_, _, cdr::CdrLe>(&req, cdr::size::Infinite).unwrap(); + let recv_handler = client.get().payload(buf).await.unwrap(); + + // Process the response + let reply = recv_handler.recv().unwrap(); + let reader = reply.result().unwrap().payload().reader(); + let result: Result = cdr::deserialize_from(reader, cdr::size::Infinite); + + assert_eq!(result.unwrap(), a + b); + + term_tx.send(()).unwrap(); +} diff --git a/zenoh-test-ros2dds/tests/topic.rs b/zenoh-test-ros2dds/tests/topic.rs index 9ceb43d..90d3795 100644 --- a/zenoh-test-ros2dds/tests/topic.rs +++ b/zenoh-test-ros2dds/tests/topic.rs @@ -12,51 +12,25 @@ // ZettaScale Zenoh Team, // +pub mod common; + use std::{sync::mpsc::channel, time::Duration}; use futures::StreamExt; use r2r::{self, QosProfile}; -use zenoh::{ - config::Config, - internal::{plugins::PluginsManager, runtime::RuntimeBuilder}, -}; -use zenoh_config::ModeDependentValue; // The test topic const TEST_TOPIC: &str = "test_topic"; // The test TEST_PAYLOAD const TEST_PAYLOAD: &str = "Hello World"; -fn init_env() { - std::env::set_var("RMW_IMPLEMENTATION", "rmw_cyclonedds_cpp"); -} - -async fn create_bridge() { - let mut plugins_mgr = PluginsManager::static_plugins_only(); - plugins_mgr.declare_static_plugin::("ros2dds", true); - let mut config = Config::default(); - config.insert_json5("plugins/ros2dds", "{}").unwrap(); - config - .timestamping - .set_enabled(Some(ModeDependentValue::Unique(true))) - .unwrap(); - config.adminspace.set_enabled(true).unwrap(); - config.plugins_loading.set_enabled(true).unwrap(); - let mut runtime = RuntimeBuilder::new(config) - .plugins_manager(plugins_mgr) - .build() - .await - .unwrap(); - runtime.start().await.unwrap(); -} - #[tokio::test(flavor = "multi_thread")] async fn test_zenoh_pub_ros_sub() { - init_env(); + common::init_env(); let (tx, rx) = channel(); // Create zenoh-bridge-ros2dds - tokio::spawn(create_bridge()); + tokio::spawn(common::create_bridge()); // ROS subscriber let ctx = r2r::Context::create().unwrap(); @@ -94,9 +68,9 @@ async fn test_zenoh_pub_ros_sub() { #[tokio::test(flavor = "multi_thread")] async fn test_ros_pub_zenoh_sub() { - init_env(); + common::init_env(); // Create zenoh-bridge-ros2dds - tokio::spawn(create_bridge()); + tokio::spawn(common::create_bridge()); // Zenoh subscriber let session = zenoh::open(zenoh::Config::default()).await.unwrap();