From 04c78849ce349d9fbfd39b6851ff12bc87791c8e Mon Sep 17 00:00:00 2001 From: "ChenYing Kuo (CY)" Date: Mon, 16 Dec 2024 19:17:30 +0800 Subject: [PATCH] Support ROS 2 service in CI (#357) --- .github/workflows/ci.yml | 6 +- zenoh-test-ros2dds/Cargo.toml | 1 + zenoh-test-ros2dds/tests/common.rs | 42 +++++++ zenoh-test-ros2dds/tests/service.rs | 185 ++++++++++++++++++++++++++++ zenoh-test-ros2dds/tests/topic.rs | 38 +----- 5 files changed, 238 insertions(+), 34 deletions(-) create mode 100644 zenoh-test-ros2dds/tests/common.rs create mode 100644 zenoh-test-ros2dds/tests/service.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 19006dd..e1ce4fb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -88,7 +88,8 @@ jobs: - name: Run ROS tests shell: bash - run: "source /opt/ros/humble/setup.bash && cd zenoh-test-ros2dds && cargo test --verbose" + # Note that we limit only one test item tested at the same time to avoid the confliction between bridges + run: "source /opt/ros/humble/setup.bash && cd zenoh-test-ros2dds && cargo test --verbose -- --test-threads=1" system_tests_with_ros2_jazzy: name: System tests with ROS 2 Jazzy @@ -115,7 +116,8 @@ jobs: - name: Run ROS tests shell: bash - run: "source /opt/ros/jazzy/setup.bash && cd zenoh-test-ros2dds && cargo test --verbose" + # Note that we limit only one test item tested at the same time to avoid the confliction between bridges + run: "source /opt/ros/jazzy/setup.bash && cd zenoh-test-ros2dds && cargo test --verbose -- --test-threads=1" # NOTE: In GitHub repository settings, the "Require status checks to pass # before merging" branch protection rule ensures that commits are only merged diff --git a/zenoh-test-ros2dds/Cargo.toml b/zenoh-test-ros2dds/Cargo.toml index 7e35d71..344c494 100644 --- a/zenoh-test-ros2dds/Cargo.toml +++ b/zenoh-test-ros2dds/Cargo.toml @@ -20,6 +20,7 @@ cdr = "0.2.4" futures = "0.3.26" r2r = "0.9" serde = "1.0.154" +serde_derive = "1.0.154" serde_json = "1.0.114" tokio = { version = "1.35.1", default-features = false } # Default features are disabled due to some crates' requirements zenoh = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = [ diff --git a/zenoh-test-ros2dds/tests/common.rs b/zenoh-test-ros2dds/tests/common.rs new file mode 100644 index 0000000..9ffbc97 --- /dev/null +++ b/zenoh-test-ros2dds/tests/common.rs @@ -0,0 +1,42 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use zenoh::{ + config::Config, + internal::{plugins::PluginsManager, runtime::RuntimeBuilder}, +}; +use zenoh_config::ModeDependentValue; + +pub fn init_env() { + std::env::set_var("RMW_IMPLEMENTATION", "rmw_cyclonedds_cpp"); +} + +pub async fn create_bridge() { + let mut plugins_mgr = PluginsManager::static_plugins_only(); + plugins_mgr.declare_static_plugin::("ros2dds", true); + let mut config = Config::default(); + config.insert_json5("plugins/ros2dds", "{}").unwrap(); + config + .timestamping + .set_enabled(Some(ModeDependentValue::Unique(true))) + .unwrap(); + config.adminspace.set_enabled(true).unwrap(); + config.plugins_loading.set_enabled(true).unwrap(); + let mut runtime = RuntimeBuilder::new(config) + .plugins_manager(plugins_mgr) + .build() + .await + .unwrap(); + runtime.start().await.unwrap(); +} diff --git a/zenoh-test-ros2dds/tests/service.rs b/zenoh-test-ros2dds/tests/service.rs new file mode 100644 index 0000000..1f1f12d --- /dev/null +++ b/zenoh-test-ros2dds/tests/service.rs @@ -0,0 +1,185 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +pub mod common; + +use std::time::Duration; + +use futures::StreamExt; +use r2r::{self, QosProfile}; +use serde_derive::{Deserialize, Serialize}; +use zenoh::Wait; + +// The test service +const TEST_SERVICE_Z2R: &str = "test_service_z2r"; +const TEST_SERVICE_R2Z: &str = "test_service_r2Z"; + +#[derive(Serialize, Deserialize, PartialEq, Clone)] +struct AddTwoIntsRequest { + a: i64, + b: i64, +} +#[derive(Serialize, Deserialize, PartialEq, Clone)] +struct AddTwoIntsReply { + sum: i64, +} + +#[test] +fn test_ros_client_zenoh_service() { + // Run the bridge for MQTT and Zenoh + let rt = tokio::runtime::Runtime::new().unwrap(); + + let (sender, receiver) = std::sync::mpsc::channel(); + + rt.block_on(async { + common::init_env(); + // Create zenoh-bridge-ros2dds + tokio::spawn(common::create_bridge()); + + let a = 1; + let b = 2; + + // Zenoh service + let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + let _queryable = session + .declare_queryable(TEST_SERVICE_R2Z) + .callback(|query| { + let request: AddTwoIntsRequest = + cdr::deserialize(&query.payload().unwrap().to_bytes()).unwrap(); + let response = AddTwoIntsReply { + sum: request.a + request.b, + }; + let data = cdr::serialize::<_, _, cdr::CdrLe>(&response, cdr::Infinite).unwrap(); + query.reply(TEST_SERVICE_R2Z, data).wait().unwrap(); + }) + .await + .unwrap(); + + // ROS client + let ctx = r2r::Context::create().unwrap(); + let mut node = r2r::Node::create(ctx, "ros_client", "").unwrap(); + let client = node + .create_client::( + &format!("/{}", TEST_SERVICE_R2Z), + QosProfile::default(), + ) + .unwrap(); + + // Node spin + let _handler = tokio::task::spawn_blocking(move || loop { + node.spin_once(std::time::Duration::from_millis(100)); + }); + + // Wait for the environment to be ready + tokio::time::sleep(Duration::from_secs(1)).await; + + // Send the request and then process the response + let my_req = r2r::example_interfaces::srv::AddTwoInts::Request { a, b }; + let resp = client.request(&my_req).unwrap().await.unwrap(); + assert_eq!(resp.sum, a + b); + + // Tell the main test thread, we're completed + sender.send(()).unwrap(); + }); + + let test_result = receiver.recv_timeout(Duration::from_secs(5)); + // Stop the tokio runtime + // Note that we should shutdown the runtime before doing any check that might panic the test. + // Otherwise, the tasks inside the runtime will never be completed. + rt.shutdown_background(); + match test_result { + Ok(_) => { + println!("Test passed"); + } + Err(_) => { + panic!("Test failed due to timeout....."); + } + } +} + +#[test] +fn test_zenoh_client_ros_service() { + // Run the bridge for MQTT and Zenoh + let rt = tokio::runtime::Runtime::new().unwrap(); + + let (sender, receiver) = std::sync::mpsc::channel(); + + rt.spawn(async move { + common::init_env(); + // Create zenoh-bridge-ros2dds + tokio::spawn(common::create_bridge()); + + let a = 1; + let b = 2; + + // ROS service + let ctx = r2r::Context::create().unwrap(); + let mut node = r2r::Node::create(ctx, "ros_service", "").unwrap(); + let mut service = node + .create_service::( + &format!("/{}", TEST_SERVICE_Z2R), + QosProfile::default(), + ) + .unwrap(); + // Processing the requests and send back responses + tokio::spawn(async move { + while let Some(req) = service.next().await { + let resp = r2r::example_interfaces::srv::AddTwoInts::Response { + sum: req.message.a + req.message.b, + }; + req.respond(resp).unwrap(); + } + }); + + // Node spin + let _handler = tokio::task::spawn_blocking(move || loop { + node.spin_once(std::time::Duration::from_millis(100)); + }); + + // Zenoh client + let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + let client = session.declare_querier(TEST_SERVICE_Z2R).await.unwrap(); + + // Wait for the environment to be ready + tokio::time::sleep(Duration::from_secs(1)).await; + + // Send request to ROS service + let req = r2r::example_interfaces::srv::AddTwoInts::Request { a, b }; + let buf = cdr::serialize::<_, _, cdr::CdrLe>(&req, cdr::size::Infinite).unwrap(); + let recv_handler = client.get().payload(buf).await.unwrap(); + + // Process the response + let reply = recv_handler.recv().unwrap(); + let reader = reply.result().unwrap().payload().reader(); + let result: Result = cdr::deserialize_from(reader, cdr::size::Infinite); + assert_eq!(result.unwrap(), a + b); + + // Tell the main test thread, we're completed + sender.send(()).unwrap(); + }); + + let test_result = receiver.recv_timeout(Duration::from_secs(5)); + // Stop the tokio runtime + // Note that we should shutdown the runtime before doing any check that might panic the test. + // Otherwise, the tasks inside the runtime will never be completed. + rt.shutdown_background(); + match test_result { + Ok(_) => { + println!("Test passed"); + } + Err(_) => { + panic!("Test failed due to timeout....."); + } + } +} diff --git a/zenoh-test-ros2dds/tests/topic.rs b/zenoh-test-ros2dds/tests/topic.rs index 9ceb43d..90d3795 100644 --- a/zenoh-test-ros2dds/tests/topic.rs +++ b/zenoh-test-ros2dds/tests/topic.rs @@ -12,51 +12,25 @@ // ZettaScale Zenoh Team, // +pub mod common; + use std::{sync::mpsc::channel, time::Duration}; use futures::StreamExt; use r2r::{self, QosProfile}; -use zenoh::{ - config::Config, - internal::{plugins::PluginsManager, runtime::RuntimeBuilder}, -}; -use zenoh_config::ModeDependentValue; // The test topic const TEST_TOPIC: &str = "test_topic"; // The test TEST_PAYLOAD const TEST_PAYLOAD: &str = "Hello World"; -fn init_env() { - std::env::set_var("RMW_IMPLEMENTATION", "rmw_cyclonedds_cpp"); -} - -async fn create_bridge() { - let mut plugins_mgr = PluginsManager::static_plugins_only(); - plugins_mgr.declare_static_plugin::("ros2dds", true); - let mut config = Config::default(); - config.insert_json5("plugins/ros2dds", "{}").unwrap(); - config - .timestamping - .set_enabled(Some(ModeDependentValue::Unique(true))) - .unwrap(); - config.adminspace.set_enabled(true).unwrap(); - config.plugins_loading.set_enabled(true).unwrap(); - let mut runtime = RuntimeBuilder::new(config) - .plugins_manager(plugins_mgr) - .build() - .await - .unwrap(); - runtime.start().await.unwrap(); -} - #[tokio::test(flavor = "multi_thread")] async fn test_zenoh_pub_ros_sub() { - init_env(); + common::init_env(); let (tx, rx) = channel(); // Create zenoh-bridge-ros2dds - tokio::spawn(create_bridge()); + tokio::spawn(common::create_bridge()); // ROS subscriber let ctx = r2r::Context::create().unwrap(); @@ -94,9 +68,9 @@ async fn test_zenoh_pub_ros_sub() { #[tokio::test(flavor = "multi_thread")] async fn test_ros_pub_zenoh_sub() { - init_env(); + common::init_env(); // Create zenoh-bridge-ros2dds - tokio::spawn(create_bridge()); + tokio::spawn(common::create_bridge()); // Zenoh subscriber let session = zenoh::open(zenoh::Config::default()).await.unwrap();