Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ROS 2 service in CI #357

Merged
merged 4 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ jobs:

- name: Run ROS tests
shell: bash
run: "source /opt/ros/humble/setup.bash && cd zenoh-test-ros2dds && cargo test --verbose"
# Note that we limit only one test item tested at the same time to avoid the confliction between bridges
run: "source /opt/ros/humble/setup.bash && cd zenoh-test-ros2dds && cargo test --verbose -- --test-threads=1"

system_tests_with_ros2_jazzy:
name: System tests with ROS 2 Jazzy
Expand All @@ -115,7 +116,8 @@ jobs:

- name: Run ROS tests
shell: bash
run: "source /opt/ros/jazzy/setup.bash && cd zenoh-test-ros2dds && cargo test --verbose"
# Note that we limit only one test item tested at the same time to avoid the confliction between bridges
run: "source /opt/ros/jazzy/setup.bash && cd zenoh-test-ros2dds && cargo test --verbose -- --test-threads=1"

# NOTE: In GitHub repository settings, the "Require status checks to pass
# before merging" branch protection rule ensures that commits are only merged
Expand Down
1 change: 1 addition & 0 deletions zenoh-test-ros2dds/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ cdr = "0.2.4"
futures = "0.3.26"
r2r = "0.9"
serde = "1.0.154"
serde_derive = "1.0.154"
serde_json = "1.0.114"
tokio = { version = "1.35.1", default-features = false } # Default features are disabled due to some crates' requirements
zenoh = { version = "1.0.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = [
Expand Down
42 changes: 42 additions & 0 deletions zenoh-test-ros2dds/tests/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

use zenoh::{
config::Config,
internal::{plugins::PluginsManager, runtime::RuntimeBuilder},
};
use zenoh_config::ModeDependentValue;

pub fn init_env() {
std::env::set_var("RMW_IMPLEMENTATION", "rmw_cyclonedds_cpp");
}

pub async fn create_bridge() {
let mut plugins_mgr = PluginsManager::static_plugins_only();
plugins_mgr.declare_static_plugin::<zenoh_plugin_ros2dds::ROS2Plugin, &str>("ros2dds", true);
let mut config = Config::default();
config.insert_json5("plugins/ros2dds", "{}").unwrap();
config
.timestamping
.set_enabled(Some(ModeDependentValue::Unique(true)))
.unwrap();
config.adminspace.set_enabled(true).unwrap();
config.plugins_loading.set_enabled(true).unwrap();
let mut runtime = RuntimeBuilder::new(config)
.plugins_manager(plugins_mgr)
.build()
.await
.unwrap();
runtime.start().await.unwrap();
}
185 changes: 185 additions & 0 deletions zenoh-test-ros2dds/tests/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

pub mod common;

use std::time::Duration;

use futures::StreamExt;
use r2r::{self, QosProfile};
use serde_derive::{Deserialize, Serialize};
use zenoh::Wait;

// The test service
const TEST_SERVICE_Z2R: &str = "test_service_z2r";
const TEST_SERVICE_R2Z: &str = "test_service_r2Z";

#[derive(Serialize, Deserialize, PartialEq, Clone)]
struct AddTwoIntsRequest {
a: i64,
b: i64,
}
#[derive(Serialize, Deserialize, PartialEq, Clone)]
struct AddTwoIntsReply {
sum: i64,
}

#[test]
fn test_ros_client_zenoh_service() {
// Run the bridge for MQTT and Zenoh
let rt = tokio::runtime::Runtime::new().unwrap();

let (sender, receiver) = std::sync::mpsc::channel();

rt.block_on(async {
common::init_env();
// Create zenoh-bridge-ros2dds
tokio::spawn(common::create_bridge());

let a = 1;
let b = 2;

// Zenoh service
let session = zenoh::open(zenoh::Config::default()).await.unwrap();
let _queryable = session
.declare_queryable(TEST_SERVICE_R2Z)
.callback(|query| {
let request: AddTwoIntsRequest =
cdr::deserialize(&query.payload().unwrap().to_bytes()).unwrap();
let response = AddTwoIntsReply {
sum: request.a + request.b,
};
let data = cdr::serialize::<_, _, cdr::CdrLe>(&response, cdr::Infinite).unwrap();
query.reply(TEST_SERVICE_R2Z, data).wait().unwrap();
})
.await
.unwrap();

// ROS client
let ctx = r2r::Context::create().unwrap();
let mut node = r2r::Node::create(ctx, "ros_client", "").unwrap();
let client = node
.create_client::<r2r::example_interfaces::srv::AddTwoInts::Service>(
&format!("/{}", TEST_SERVICE_R2Z),
QosProfile::default(),
)
.unwrap();

// Node spin
let _handler = tokio::task::spawn_blocking(move || loop {
node.spin_once(std::time::Duration::from_millis(100));
});

// Wait for the environment to be ready
tokio::time::sleep(Duration::from_secs(1)).await;

// Send the request and then process the response
let my_req = r2r::example_interfaces::srv::AddTwoInts::Request { a, b };
let resp = client.request(&my_req).unwrap().await.unwrap();
assert_eq!(resp.sum, a + b);

// Tell the main test thread, we're completed
sender.send(()).unwrap();
});

let test_result = receiver.recv_timeout(Duration::from_secs(5));
// Stop the tokio runtime
// Note that we should shutdown the runtime before doing any check that might panic the test.
// Otherwise, the tasks inside the runtime will never be completed.
rt.shutdown_background();
match test_result {
Ok(_) => {
println!("Test passed");
}
Err(_) => {
panic!("Test failed due to timeout.....");
}
}
}

#[test]
fn test_zenoh_client_ros_service() {
// Run the bridge for MQTT and Zenoh
let rt = tokio::runtime::Runtime::new().unwrap();

let (sender, receiver) = std::sync::mpsc::channel();

rt.spawn(async move {
common::init_env();
// Create zenoh-bridge-ros2dds
tokio::spawn(common::create_bridge());

let a = 1;
let b = 2;

// ROS service
let ctx = r2r::Context::create().unwrap();
let mut node = r2r::Node::create(ctx, "ros_service", "").unwrap();
let mut service = node
.create_service::<r2r::example_interfaces::srv::AddTwoInts::Service>(
&format!("/{}", TEST_SERVICE_Z2R),
QosProfile::default(),
)
.unwrap();
// Processing the requests and send back responses
tokio::spawn(async move {
while let Some(req) = service.next().await {
let resp = r2r::example_interfaces::srv::AddTwoInts::Response {
sum: req.message.a + req.message.b,
};
req.respond(resp).unwrap();
}
});

// Node spin
let _handler = tokio::task::spawn_blocking(move || loop {
node.spin_once(std::time::Duration::from_millis(100));
});

// Zenoh client
let session = zenoh::open(zenoh::Config::default()).await.unwrap();
let client = session.declare_querier(TEST_SERVICE_Z2R).await.unwrap();

// Wait for the environment to be ready
tokio::time::sleep(Duration::from_secs(1)).await;

// Send request to ROS service
let req = r2r::example_interfaces::srv::AddTwoInts::Request { a, b };
let buf = cdr::serialize::<_, _, cdr::CdrLe>(&req, cdr::size::Infinite).unwrap();
let recv_handler = client.get().payload(buf).await.unwrap();

// Process the response
let reply = recv_handler.recv().unwrap();
let reader = reply.result().unwrap().payload().reader();
let result: Result<i64, _> = cdr::deserialize_from(reader, cdr::size::Infinite);
assert_eq!(result.unwrap(), a + b);

// Tell the main test thread, we're completed
sender.send(()).unwrap();
});

let test_result = receiver.recv_timeout(Duration::from_secs(5));
// Stop the tokio runtime
// Note that we should shutdown the runtime before doing any check that might panic the test.
// Otherwise, the tasks inside the runtime will never be completed.
rt.shutdown_background();
match test_result {
Ok(_) => {
println!("Test passed");
}
Err(_) => {
panic!("Test failed due to timeout.....");
}
}
}
38 changes: 6 additions & 32 deletions zenoh-test-ros2dds/tests/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,51 +12,25 @@
// ZettaScale Zenoh Team, <[email protected]>
//

pub mod common;

use std::{sync::mpsc::channel, time::Duration};

use futures::StreamExt;
use r2r::{self, QosProfile};
use zenoh::{
config::Config,
internal::{plugins::PluginsManager, runtime::RuntimeBuilder},
};
use zenoh_config::ModeDependentValue;

// The test topic
const TEST_TOPIC: &str = "test_topic";
// The test TEST_PAYLOAD
const TEST_PAYLOAD: &str = "Hello World";

fn init_env() {
std::env::set_var("RMW_IMPLEMENTATION", "rmw_cyclonedds_cpp");
}

async fn create_bridge() {
let mut plugins_mgr = PluginsManager::static_plugins_only();
plugins_mgr.declare_static_plugin::<zenoh_plugin_ros2dds::ROS2Plugin, &str>("ros2dds", true);
let mut config = Config::default();
config.insert_json5("plugins/ros2dds", "{}").unwrap();
config
.timestamping
.set_enabled(Some(ModeDependentValue::Unique(true)))
.unwrap();
config.adminspace.set_enabled(true).unwrap();
config.plugins_loading.set_enabled(true).unwrap();
let mut runtime = RuntimeBuilder::new(config)
.plugins_manager(plugins_mgr)
.build()
.await
.unwrap();
runtime.start().await.unwrap();
}

#[tokio::test(flavor = "multi_thread")]
async fn test_zenoh_pub_ros_sub() {
init_env();
common::init_env();
let (tx, rx) = channel();

// Create zenoh-bridge-ros2dds
tokio::spawn(create_bridge());
tokio::spawn(common::create_bridge());

// ROS subscriber
let ctx = r2r::Context::create().unwrap();
Expand Down Expand Up @@ -94,9 +68,9 @@ async fn test_zenoh_pub_ros_sub() {

#[tokio::test(flavor = "multi_thread")]
async fn test_ros_pub_zenoh_sub() {
init_env();
common::init_env();
// Create zenoh-bridge-ros2dds
tokio::spawn(create_bridge());
tokio::spawn(common::create_bridge());

// Zenoh subscriber
let session = zenoh::open(zenoh::Config::default()).await.unwrap();
Expand Down
Loading