Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test mil poi #1071

Merged
merged 6 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions mil_common/utils/mil_poi/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,8 @@ generate_messages(
)

catkin_package()

if(CATKIN_ENABLE_TESTING)
find_package(rostest REQUIRED)
add_rostest(test/poi.test)
endif()
6 changes: 6 additions & 0 deletions mil_common/utils/mil_poi/test/poi.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<launch>
<node pkg="mil_poi" type="poi_server" name="poi_server">
</node>
<test test-name="async_test" pkg="mil_poi" type="test_async.py" time-limit="19.0"/>
<test test-name="sync_test" pkg="mil_poi" type="test_sync.py" time-limit="19.0"/>
</launch>
98 changes: 98 additions & 0 deletions mil_common/utils/mil_poi/test/test_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#!/usr/bin/env python3

import asyncio
import unittest

import axros
import rostest
from geometry_msgs.msg import Point, PointStamped
from mil_poi.srv import (
AddPOI,
AddPOIRequest,
DeletePOI,
DeletePOIRequest,
MovePOI,
MovePOIRequest,
)
from std_msgs.msg import Header


class POITestAsync(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
self.nh = axros.NodeHandle.from_argv("basic", always_default_name=True)
await self.nh.setup()
self.add_service = self.nh.get_service_client("/poi_server/add", AddPOI)
self.move_service = self.nh.get_service_client("/poi_server/move", MovePOI)
self.delete_service = self.nh.get_service_client(
"/poi_server/delete",
DeletePOI,
)
await asyncio.gather(
self.add_service.wait_for_service(),
self.move_service.wait_for_service(),
self.delete_service.wait_for_service(),
)

async def test_points(self):
# Adding
test = await self.add_service(
AddPOIRequest(
name="test",
position=PointStamped(header=Header(), point=Point(0.0, 1.0, 2.0)),
),
)
self.assertTrue(test.success)
test = await self.add_service(
AddPOIRequest(
name="test2",
position=PointStamped(header=Header(), point=Point(0.0, 1.0, 2.0)),
),
)
self.assertTrue(test.success)

# Moving
test = await self.move_service(
MovePOIRequest(
name="test",
position=PointStamped(header=Header(), point=Point(0.0, -1.0, -2.0)),
),
)
self.assertTrue(test.success)
test = await self.move_service(
MovePOIRequest(
name="test2",
position=PointStamped(header=Header(), point=Point(0.0, 0.0, 0.0)),
),
)
self.assertTrue(test.success)

# Moving a non-existent POI should return False
test = await self.move_service(
MovePOIRequest(
name="test3",
position=PointStamped(header=Header(), point=Point(0.0, 0.0, 0.0)),
),
)
self.assertFalse(test.success)

# Deleting
test = await self.delete_service(DeletePOIRequest(name="test"))
self.assertTrue(test.success)
test = await self.delete_service(DeletePOIRequest(name="test2"))
self.assertTrue(test.success)

# Deleting a non-existent POI should return False
test = await self.delete_service(DeletePOIRequest(name="test"))
self.assertFalse(test.success)

# Deleting a non-existent POI should return False
test = await self.delete_service(DeletePOIRequest(name="test3"))
self.assertFalse(test.success)

async def asyncTearDown(self):
await self.nh.shutdown()


if __name__ == "__main__":
rostest.rosrun("mil_poi", "test_poi", POITestAsync)
unittest.main()
124 changes: 124 additions & 0 deletions mil_common/utils/mil_poi/test/test_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
#!/usr/bin/env python3
import random
import string
import unittest

import genpy
import rospy
import rostest
from geometry_msgs.msg import Point, PointStamped
from mil_poi.srv import (
AddPOI,
DeletePOI,
MovePOI,
MovePOIRequest,
)
from std_msgs.msg import Header


class POITest(unittest.TestCase):
def setUp(self):
# make one poi here
rospy.init_node("poi_test_node")
self.poi_name = "test_poi"
self.poi_position = PointStamped(header=Header(), point=Point(0.0, 1.0, 2.0))
self.add_poi()

def add_poi(self):
rospy.wait_for_service("/poi_server/add")
service = rospy.ServiceProxy("/poi_server/add", AddPOI)
response = service(self.poi_name, self.poi_position)
self.assertTrue(response.success)

def test_add(self):
# Call the add_poi function to add a POI
self.add_poi()

def test_move(self):
# Call the add_poi function to add a POI
self.add_poi()

# Wait for the move_poi service to become available
rospy.wait_for_service("/poi_server/move")
move_service = rospy.ServiceProxy("/poi_server/move", MovePOI)

# Move the POI to a new position
new_position = [1.0, 2.0, 3.0] # New position coordinates
move_response = move_service(
MovePOIRequest(
name=self.poi_name,
position=PointStamped(point=Point(*new_position)),
),
)
# Check if the additions were unsuccessful
self.assertTrue(move_response.success, f"Failed to move POI '{self.poi_name}'")

def test_delete(self):
# Call the add_poi function to add a POI
self.add_poi()

# Wait for the remove_poi service to become available
rospy.wait_for_service("/poi_server/delete")
remove_service = rospy.ServiceProxy("/poi_server/delete", DeletePOI)

# Remove the added POI
remove_response = remove_service(self.poi_name)

# Check if the removal was successful
self.assertTrue(
remove_response.success,
f"Failed to delete POI '{self.poi_name}'",
)

def test_long_string(self):
rospy.wait_for_service("/poi_server/add")
service = rospy.ServiceProxy("/poi_server/add", AddPOI)

# Create a long string for the POI name
long_string = "".join(random.choices(string.ascii_letters, k=20000))

# Call the service to add a new POI with the long string name
response = service(
long_string,
PointStamped(header=Header(), point=Point(0.0, 1.0, 2.0)),
)
self.assertTrue(response.success, response.message)

def test_wrong_types(self):
# Wait for the add_poi service to become available
rospy.wait_for_service("/poi_server/add")
service = rospy.ServiceProxy("/poi_server/add", AddPOI)

# Try adding a POI with wrong types of arguments
with self.assertRaises(genpy.message.SerializationError):
service(12321, [0.0, 2.3, 21.3]) # Incorrect name type
service("tester", ["hi", "wrong", "bad"]) # Incorrect position type

def test_delete_twice(self):
# Call the add_poi function to add a POI
self.add_poi()

# Wait for the remove_poi service to become available
rospy.wait_for_service("/poi_server/delete")
remove_service = rospy.ServiceProxy("/poi_server/delete", DeletePOI)

# Remove the added POI
remove_response1 = remove_service(self.poi_name)

# Try removing the same POI again
remove_response2 = remove_service(self.poi_name)

# Check if the first removal was successful and the second removal was unsuccessful
self.assertTrue(
remove_response1.success,
f"Failed to remove POI '{self.poi_name}'",
)
self.assertFalse(
remove_response2.success,
f"Removed POI '{self.poi_name}' twice",
)


if __name__ == "__main__":
rostest.rosrun("mil_poi", "test_poi", POITest)
unittest.main()
Loading