From 1f4cb66bc6f366a191bf2074395287efb1a47388 Mon Sep 17 00:00:00 2001 From: ChenYing Kuo Date: Thu, 12 Dec 2024 13:37:11 +0800 Subject: [PATCH 1/4] Support ROS <=> Zenoh CI. Signed-off-by: ChenYing Kuo --- zenoh-test-ros2dds/Cargo.toml | 1 + zenoh-test-ros2dds/tests/common.rs | 42 ++++++++ zenoh-test-ros2dds/tests/service.rs | 150 ++++++++++++++++++++++++++++ zenoh-test-ros2dds/tests/topic.rs | 38 ++----- 4 files changed, 199 insertions(+), 32 deletions(-) create mode 100644 zenoh-test-ros2dds/tests/common.rs create mode 100644 zenoh-test-ros2dds/tests/service.rs diff --git a/zenoh-test-ros2dds/Cargo.toml b/zenoh-test-ros2dds/Cargo.toml index 7e35d71..344c494 100644 --- a/zenoh-test-ros2dds/Cargo.toml +++ b/zenoh-test-ros2dds/Cargo.toml @@ -20,6 +20,7 @@ cdr = "0.2.4" futures = "0.3.26" r2r = "0.9" serde = "1.0.154" +serde_derive = "1.0.154" serde_json = "1.0.114" tokio = { version = "1.35.1", default-features = false } # Default features are disabled due to some crates' requirements zenoh = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = [ diff --git a/zenoh-test-ros2dds/tests/common.rs b/zenoh-test-ros2dds/tests/common.rs new file mode 100644 index 0000000..9ffbc97 --- /dev/null +++ b/zenoh-test-ros2dds/tests/common.rs @@ -0,0 +1,42 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use zenoh::{ + config::Config, + internal::{plugins::PluginsManager, runtime::RuntimeBuilder}, +}; +use zenoh_config::ModeDependentValue; + +pub fn init_env() { + std::env::set_var("RMW_IMPLEMENTATION", "rmw_cyclonedds_cpp"); +} + +pub async fn create_bridge() { + let mut plugins_mgr = PluginsManager::static_plugins_only(); + plugins_mgr.declare_static_plugin::("ros2dds", true); + let mut config = Config::default(); + config.insert_json5("plugins/ros2dds", "{}").unwrap(); + config + .timestamping + .set_enabled(Some(ModeDependentValue::Unique(true))) + .unwrap(); + config.adminspace.set_enabled(true).unwrap(); + config.plugins_loading.set_enabled(true).unwrap(); + let mut runtime = RuntimeBuilder::new(config) + .plugins_manager(plugins_mgr) + .build() + .await + .unwrap(); + runtime.start().await.unwrap(); +} diff --git a/zenoh-test-ros2dds/tests/service.rs b/zenoh-test-ros2dds/tests/service.rs new file mode 100644 index 0000000..11118fa --- /dev/null +++ b/zenoh-test-ros2dds/tests/service.rs @@ -0,0 +1,150 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +pub mod common; + +use std::time::Duration; + +use futures::StreamExt; +use r2r::{self, QosProfile}; +use serde_derive::{Deserialize, Serialize}; +use tokio::sync::oneshot; +use zenoh::Wait; + +// The test service +const TEST_SERVICE_Z2R: &str = "test_service_z2r"; +const TEST_SERVICE_R2Z: &str = "test_service_r2Z"; + +#[derive(Serialize, Deserialize, PartialEq, Clone)] +struct AddTwoIntsRequest { + a: i64, + b: i64, +} +#[derive(Serialize, Deserialize, PartialEq, Clone)] +struct AddTwoIntsReply { + sum: i64, +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_ros_client_zenoh_service() { + common::init_env(); + // Create zenoh-bridge-ros2dds + tokio::spawn(common::create_bridge()); + + let a = 1; + let b = 2; + + // Zenoh service + let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + let _queryable = session + .declare_queryable(TEST_SERVICE_R2Z) + .callback(|query| { + let request: AddTwoIntsRequest = + cdr::deserialize(&query.payload().unwrap().to_bytes()).unwrap(); + let response = AddTwoIntsReply { + sum: request.a + request.b, + }; + let data = cdr::serialize::<_, _, cdr::CdrLe>(&response, cdr::Infinite).unwrap(); + query.reply(TEST_SERVICE_R2Z, data).wait().unwrap(); + }) + .await + .unwrap(); + + // ROS client + let ctx = r2r::Context::create().unwrap(); + let mut node = r2r::Node::create(ctx, "ros_client", "").unwrap(); + let client = node + .create_client::( + &format!("/{}", TEST_SERVICE_R2Z), + QosProfile::default(), + ) + .unwrap(); + + // Node spin + let (term_tx, mut term_rx) = oneshot::channel(); + let _handler = tokio::task::spawn_blocking(move || { + while term_rx.try_recv().is_err() { + node.spin_once(std::time::Duration::from_millis(100)); + } + }); + + // Wait for the environment to be ready + tokio::time::sleep(Duration::from_secs(1)).await; + + // Send the request and then process the response + let my_req = r2r::example_interfaces::srv::AddTwoInts::Request { a, b }; + let resp = client.request(&my_req).unwrap().await.unwrap(); + + assert_eq!(resp.sum, a + b); + + term_tx.send(()).unwrap(); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_zenoh_client_ros_service() { + common::init_env(); + // Create zenoh-bridge-ros2dds + tokio::spawn(common::create_bridge()); + + let a = 1; + let b = 2; + + // ROS service + let ctx = r2r::Context::create().unwrap(); + let mut node = r2r::Node::create(ctx, "ros_service", "").unwrap(); + let mut service = node + .create_service::( + &format!("/{}", TEST_SERVICE_Z2R), + QosProfile::default(), + ) + .unwrap(); + // Processing the requests and send back responses + tokio::spawn(async move { + while let Some(req) = service.next().await { + let resp = r2r::example_interfaces::srv::AddTwoInts::Response { + sum: req.message.a + req.message.b, + }; + req.respond(resp).unwrap(); + } + }); + + // Node spin + let (term_tx, mut term_rx) = oneshot::channel(); + let _handler = tokio::task::spawn_blocking(move || { + while term_rx.try_recv().is_err() { + node.spin_once(std::time::Duration::from_millis(100)); + } + }); + + // Zenoh client + let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + let client = session.declare_querier(TEST_SERVICE_Z2R).await.unwrap(); + + // Wait for the environment to be ready + tokio::time::sleep(Duration::from_secs(1)).await; + + // Send request to ROS service + let req = r2r::example_interfaces::srv::AddTwoInts::Request { a, b }; + let buf = cdr::serialize::<_, _, cdr::CdrLe>(&req, cdr::size::Infinite).unwrap(); + let recv_handler = client.get().payload(buf).await.unwrap(); + + // Process the response + let reply = recv_handler.recv().unwrap(); + let reader = reply.result().unwrap().payload().reader(); + let result: Result = cdr::deserialize_from(reader, cdr::size::Infinite); + + assert_eq!(result.unwrap(), a + b); + + term_tx.send(()).unwrap(); +} diff --git a/zenoh-test-ros2dds/tests/topic.rs b/zenoh-test-ros2dds/tests/topic.rs index 9ceb43d..90d3795 100644 --- a/zenoh-test-ros2dds/tests/topic.rs +++ b/zenoh-test-ros2dds/tests/topic.rs @@ -12,51 +12,25 @@ // ZettaScale Zenoh Team, // +pub mod common; + use std::{sync::mpsc::channel, time::Duration}; use futures::StreamExt; use r2r::{self, QosProfile}; -use zenoh::{ - config::Config, - internal::{plugins::PluginsManager, runtime::RuntimeBuilder}, -}; -use zenoh_config::ModeDependentValue; // The test topic const TEST_TOPIC: &str = "test_topic"; // The test TEST_PAYLOAD const TEST_PAYLOAD: &str = "Hello World"; -fn init_env() { - std::env::set_var("RMW_IMPLEMENTATION", "rmw_cyclonedds_cpp"); -} - -async fn create_bridge() { - let mut plugins_mgr = PluginsManager::static_plugins_only(); - plugins_mgr.declare_static_plugin::("ros2dds", true); - let mut config = Config::default(); - config.insert_json5("plugins/ros2dds", "{}").unwrap(); - config - .timestamping - .set_enabled(Some(ModeDependentValue::Unique(true))) - .unwrap(); - config.adminspace.set_enabled(true).unwrap(); - config.plugins_loading.set_enabled(true).unwrap(); - let mut runtime = RuntimeBuilder::new(config) - .plugins_manager(plugins_mgr) - .build() - .await - .unwrap(); - runtime.start().await.unwrap(); -} - #[tokio::test(flavor = "multi_thread")] async fn test_zenoh_pub_ros_sub() { - init_env(); + common::init_env(); let (tx, rx) = channel(); // Create zenoh-bridge-ros2dds - tokio::spawn(create_bridge()); + tokio::spawn(common::create_bridge()); // ROS subscriber let ctx = r2r::Context::create().unwrap(); @@ -94,9 +68,9 @@ async fn test_zenoh_pub_ros_sub() { #[tokio::test(flavor = "multi_thread")] async fn test_ros_pub_zenoh_sub() { - init_env(); + common::init_env(); // Create zenoh-bridge-ros2dds - tokio::spawn(create_bridge()); + tokio::spawn(common::create_bridge()); // Zenoh subscriber let session = zenoh::open(zenoh::Config::default()).await.unwrap(); From 47839392713948ae79d6eb6caeda3c5437d98b2d Mon Sep 17 00:00:00 2001 From: ChenYing Kuo Date: Fri, 13 Dec 2024 15:35:11 +0800 Subject: [PATCH 2/4] Use a more proper way to avoid endless test. Signed-off-by: ChenYing Kuo --- zenoh-test-ros2dds/tests/service.rs | 231 ++++++++++++++++------------ 1 file changed, 133 insertions(+), 98 deletions(-) diff --git a/zenoh-test-ros2dds/tests/service.rs b/zenoh-test-ros2dds/tests/service.rs index 11118fa..1f1f12d 100644 --- a/zenoh-test-ros2dds/tests/service.rs +++ b/zenoh-test-ros2dds/tests/service.rs @@ -19,7 +19,6 @@ use std::time::Duration; use futures::StreamExt; use r2r::{self, QosProfile}; use serde_derive::{Deserialize, Serialize}; -use tokio::sync::oneshot; use zenoh::Wait; // The test service @@ -36,115 +35,151 @@ struct AddTwoIntsReply { sum: i64, } -#[tokio::test(flavor = "multi_thread")] -async fn test_ros_client_zenoh_service() { - common::init_env(); - // Create zenoh-bridge-ros2dds - tokio::spawn(common::create_bridge()); - - let a = 1; - let b = 2; - - // Zenoh service - let session = zenoh::open(zenoh::Config::default()).await.unwrap(); - let _queryable = session - .declare_queryable(TEST_SERVICE_R2Z) - .callback(|query| { - let request: AddTwoIntsRequest = - cdr::deserialize(&query.payload().unwrap().to_bytes()).unwrap(); - let response = AddTwoIntsReply { - sum: request.a + request.b, - }; - let data = cdr::serialize::<_, _, cdr::CdrLe>(&response, cdr::Infinite).unwrap(); - query.reply(TEST_SERVICE_R2Z, data).wait().unwrap(); - }) - .await - .unwrap(); - - // ROS client - let ctx = r2r::Context::create().unwrap(); - let mut node = r2r::Node::create(ctx, "ros_client", "").unwrap(); - let client = node - .create_client::( - &format!("/{}", TEST_SERVICE_R2Z), - QosProfile::default(), - ) - .unwrap(); - - // Node spin - let (term_tx, mut term_rx) = oneshot::channel(); - let _handler = tokio::task::spawn_blocking(move || { - while term_rx.try_recv().is_err() { +#[test] +fn test_ros_client_zenoh_service() { + // Run the bridge for MQTT and Zenoh + let rt = tokio::runtime::Runtime::new().unwrap(); + + let (sender, receiver) = std::sync::mpsc::channel(); + + rt.block_on(async { + common::init_env(); + // Create zenoh-bridge-ros2dds + tokio::spawn(common::create_bridge()); + + let a = 1; + let b = 2; + + // Zenoh service + let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + let _queryable = session + .declare_queryable(TEST_SERVICE_R2Z) + .callback(|query| { + let request: AddTwoIntsRequest = + cdr::deserialize(&query.payload().unwrap().to_bytes()).unwrap(); + let response = AddTwoIntsReply { + sum: request.a + request.b, + }; + let data = cdr::serialize::<_, _, cdr::CdrLe>(&response, cdr::Infinite).unwrap(); + query.reply(TEST_SERVICE_R2Z, data).wait().unwrap(); + }) + .await + .unwrap(); + + // ROS client + let ctx = r2r::Context::create().unwrap(); + let mut node = r2r::Node::create(ctx, "ros_client", "").unwrap(); + let client = node + .create_client::( + &format!("/{}", TEST_SERVICE_R2Z), + QosProfile::default(), + ) + .unwrap(); + + // Node spin + let _handler = tokio::task::spawn_blocking(move || loop { node.spin_once(std::time::Duration::from_millis(100)); - } - }); + }); - // Wait for the environment to be ready - tokio::time::sleep(Duration::from_secs(1)).await; + // Wait for the environment to be ready + tokio::time::sleep(Duration::from_secs(1)).await; - // Send the request and then process the response - let my_req = r2r::example_interfaces::srv::AddTwoInts::Request { a, b }; - let resp = client.request(&my_req).unwrap().await.unwrap(); + // Send the request and then process the response + let my_req = r2r::example_interfaces::srv::AddTwoInts::Request { a, b }; + let resp = client.request(&my_req).unwrap().await.unwrap(); + assert_eq!(resp.sum, a + b); - assert_eq!(resp.sum, a + b); - - term_tx.send(()).unwrap(); -} + // Tell the main test thread, we're completed + sender.send(()).unwrap(); + }); -#[tokio::test(flavor = "multi_thread")] -async fn test_zenoh_client_ros_service() { - common::init_env(); - // Create zenoh-bridge-ros2dds - tokio::spawn(common::create_bridge()); - - let a = 1; - let b = 2; - - // ROS service - let ctx = r2r::Context::create().unwrap(); - let mut node = r2r::Node::create(ctx, "ros_service", "").unwrap(); - let mut service = node - .create_service::( - &format!("/{}", TEST_SERVICE_Z2R), - QosProfile::default(), - ) - .unwrap(); - // Processing the requests and send back responses - tokio::spawn(async move { - while let Some(req) = service.next().await { - let resp = r2r::example_interfaces::srv::AddTwoInts::Response { - sum: req.message.a + req.message.b, - }; - req.respond(resp).unwrap(); + let test_result = receiver.recv_timeout(Duration::from_secs(5)); + // Stop the tokio runtime + // Note that we should shutdown the runtime before doing any check that might panic the test. + // Otherwise, the tasks inside the runtime will never be completed. + rt.shutdown_background(); + match test_result { + Ok(_) => { + println!("Test passed"); } - }); + Err(_) => { + panic!("Test failed due to timeout....."); + } + } +} - // Node spin - let (term_tx, mut term_rx) = oneshot::channel(); - let _handler = tokio::task::spawn_blocking(move || { - while term_rx.try_recv().is_err() { +#[test] +fn test_zenoh_client_ros_service() { + // Run the bridge for MQTT and Zenoh + let rt = tokio::runtime::Runtime::new().unwrap(); + + let (sender, receiver) = std::sync::mpsc::channel(); + + rt.spawn(async move { + common::init_env(); + // Create zenoh-bridge-ros2dds + tokio::spawn(common::create_bridge()); + + let a = 1; + let b = 2; + + // ROS service + let ctx = r2r::Context::create().unwrap(); + let mut node = r2r::Node::create(ctx, "ros_service", "").unwrap(); + let mut service = node + .create_service::( + &format!("/{}", TEST_SERVICE_Z2R), + QosProfile::default(), + ) + .unwrap(); + // Processing the requests and send back responses + tokio::spawn(async move { + while let Some(req) = service.next().await { + let resp = r2r::example_interfaces::srv::AddTwoInts::Response { + sum: req.message.a + req.message.b, + }; + req.respond(resp).unwrap(); + } + }); + + // Node spin + let _handler = tokio::task::spawn_blocking(move || loop { node.spin_once(std::time::Duration::from_millis(100)); - } - }); + }); - // Zenoh client - let session = zenoh::open(zenoh::Config::default()).await.unwrap(); - let client = session.declare_querier(TEST_SERVICE_Z2R).await.unwrap(); + // Zenoh client + let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + let client = session.declare_querier(TEST_SERVICE_Z2R).await.unwrap(); - // Wait for the environment to be ready - tokio::time::sleep(Duration::from_secs(1)).await; + // Wait for the environment to be ready + tokio::time::sleep(Duration::from_secs(1)).await; - // Send request to ROS service - let req = r2r::example_interfaces::srv::AddTwoInts::Request { a, b }; - let buf = cdr::serialize::<_, _, cdr::CdrLe>(&req, cdr::size::Infinite).unwrap(); - let recv_handler = client.get().payload(buf).await.unwrap(); + // Send request to ROS service + let req = r2r::example_interfaces::srv::AddTwoInts::Request { a, b }; + let buf = cdr::serialize::<_, _, cdr::CdrLe>(&req, cdr::size::Infinite).unwrap(); + let recv_handler = client.get().payload(buf).await.unwrap(); - // Process the response - let reply = recv_handler.recv().unwrap(); - let reader = reply.result().unwrap().payload().reader(); - let result: Result = cdr::deserialize_from(reader, cdr::size::Infinite); + // Process the response + let reply = recv_handler.recv().unwrap(); + let reader = reply.result().unwrap().payload().reader(); + let result: Result = cdr::deserialize_from(reader, cdr::size::Infinite); + assert_eq!(result.unwrap(), a + b); - assert_eq!(result.unwrap(), a + b); + // Tell the main test thread, we're completed + sender.send(()).unwrap(); + }); - term_tx.send(()).unwrap(); + let test_result = receiver.recv_timeout(Duration::from_secs(5)); + // Stop the tokio runtime + // Note that we should shutdown the runtime before doing any check that might panic the test. + // Otherwise, the tasks inside the runtime will never be completed. + rt.shutdown_background(); + match test_result { + Ok(_) => { + println!("Test passed"); + } + Err(_) => { + panic!("Test failed due to timeout....."); + } + } } From 3cfd59fb49a8358654ab2fe17c5c33d339f97170 Mon Sep 17 00:00:00 2001 From: ChenYing Kuo Date: Fri, 13 Dec 2024 15:37:06 +0800 Subject: [PATCH 3/4] Run the test in single-threaded to avoid conflicts. Signed-off-by: ChenYing Kuo --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 19006dd..1cf5ff8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -88,7 +88,7 @@ jobs: - name: Run ROS tests shell: bash - run: "source /opt/ros/humble/setup.bash && cd zenoh-test-ros2dds && cargo test --verbose" + run: "source /opt/ros/humble/setup.bash && cd zenoh-test-ros2dds && cargo test --verbose -- --test-threads=1" system_tests_with_ros2_jazzy: name: System tests with ROS 2 Jazzy @@ -115,7 +115,7 @@ jobs: - name: Run ROS tests shell: bash - run: "source /opt/ros/jazzy/setup.bash && cd zenoh-test-ros2dds && cargo test --verbose" + run: "source /opt/ros/jazzy/setup.bash && cd zenoh-test-ros2dds && cargo test --verbose -- --test-threads=1" # NOTE: In GitHub repository settings, the "Require status checks to pass # before merging" branch protection rule ensures that commits are only merged From d4041b8ccde982cda8d8b16372398896d7b0f08a Mon Sep 17 00:00:00 2001 From: ChenYing Kuo Date: Mon, 16 Dec 2024 10:38:12 +0800 Subject: [PATCH 4/4] Add some comments. Signed-off-by: ChenYing Kuo --- .github/workflows/ci.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1cf5ff8..e1ce4fb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -88,6 +88,7 @@ jobs: - name: Run ROS tests shell: bash + # Note that we limit only one test item tested at the same time to avoid the confliction between bridges run: "source /opt/ros/humble/setup.bash && cd zenoh-test-ros2dds && cargo test --verbose -- --test-threads=1" system_tests_with_ros2_jazzy: @@ -115,6 +116,7 @@ jobs: - name: Run ROS tests shell: bash + # Note that we limit only one test item tested at the same time to avoid the confliction between bridges run: "source /opt/ros/jazzy/setup.bash && cd zenoh-test-ros2dds && cargo test --verbose -- --test-threads=1" # NOTE: In GitHub repository settings, the "Require status checks to pass