From f7e15c8363b4376ca125df63c4e8f4894b58af5b Mon Sep 17 00:00:00 2001 From: Jessica Halvorsen Date: Fri, 2 Dec 2022 18:23:33 -0500 Subject: [PATCH 1/4] added starting test files for mil_poi --- mil_common/utils/mil_poi/CMakeLists.txt | 6 ++ mil_common/utils/mil_poi/test/poi.test | 6 ++ mil_common/utils/mil_poi/test/test_async.py | 31 ++++++++++ mil_common/utils/mil_poi/test/test_sync.py | 63 +++++++++++++++++++++ 4 files changed, 106 insertions(+) create mode 100644 mil_common/utils/mil_poi/test/poi.test create mode 100755 mil_common/utils/mil_poi/test/test_async.py create mode 100755 mil_common/utils/mil_poi/test/test_sync.py diff --git a/mil_common/utils/mil_poi/CMakeLists.txt b/mil_common/utils/mil_poi/CMakeLists.txt index 671bfde1b..1a11e096d 100644 --- a/mil_common/utils/mil_poi/CMakeLists.txt +++ b/mil_common/utils/mil_poi/CMakeLists.txt @@ -29,3 +29,9 @@ generate_messages( ) catkin_package() + +if(CATKIN_ENABLE_TESTING) + find_package(rostest REQUIRED) + add_rostest(test/poi.test) +endif() + diff --git a/mil_common/utils/mil_poi/test/poi.test b/mil_common/utils/mil_poi/test/poi.test new file mode 100644 index 000000000..3188d090a --- /dev/null +++ b/mil_common/utils/mil_poi/test/poi.test @@ -0,0 +1,6 @@ + + + + + + diff --git a/mil_common/utils/mil_poi/test/test_async.py b/mil_common/utils/mil_poi/test/test_async.py new file mode 100755 index 000000000..06ef4c639 --- /dev/null +++ b/mil_common/utils/mil_poi/test/test_async.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python3 +import unittest +import rostest +import rospy +import axros +from std_msgs.msg import Header +from geometry_msgs.msg import PointStamped, Point +from mil_poi.srv import AddPOIRequest, AddPOI + +class POITestAsync(unittest.IsolatedAsyncioTestCase): + async def asyncSetUp(self): + self.nh = axros.NodeHandle.from_argv("basic", always_default_name = True) + await self.nh.setup() + self.service = self.nh.get_service_client("poi_server/add", AddPOI) + + async def test_add(self): + await self.service.wait_for_service() + test = await self.service("test", PointStamped(header = Header(), point = Point(0.0, 1.0, 2.0))) + self.assertTrue(test.success) + + async def asyncTearDown(self): + await self.nh.shutdown() + +if __name__ == "__main__": + rostest.rosrun("mil_poi", "test_poi", POITestAsync) + unittest.main() + + + + + diff --git a/mil_common/utils/mil_poi/test/test_sync.py b/mil_common/utils/mil_poi/test/test_sync.py new file mode 100755 index 000000000..6914228eb --- /dev/null +++ b/mil_common/utils/mil_poi/test/test_sync.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 +import unittest +import rostest +import rospy +from std_msgs.msg import Header +from geometry_msgs.msg import PointStamped, Point +from mil_poi.srv import AddPOIRequest, AddPOI + +class POITest(unittest.TestCase): + def setUp(self): + #make one poi here + pass + + def test_add(self): + rospy.wait_for_service('/poi_server/add') + service = rospy.ServiceProxy("/poi_server/add", AddPOI) + test = service("test", PointStamped(header = Header(), point = Point(0.0, 1.0, 2.0))) + self.assertTrue(test.success) + +# def test_move(self): +# p = POIServer() +# #how do i know which POIs exist already +# +# def test_delete(self): +# p = POIServer() +# +# def test_long_string(self): +# p = POIServer() +# long_string = ''.join(random.choices(string.ascii_letters, k=20000)) +# poi = p.add_poi(long_string, [1.0, 2.0, 3.0]) +# self.assertTrue(poi) +# +# def test_wrong_types(self): +# p = POIServer() +# poi1 = p.add_poi(12321, [0.0, 2.3, 21.3]) +# poi2 = p.add_poi("tester", ["hi", "wrong" "bad"]) +# poi3 = p.remove_poi(10000) +# poi4 = p.remove_poi([0, 3]) +# poi5 = p.update_position() #not sure what second parameter even is +# +# self.assertFalse(poi1) +# self.assertFalse(poi2) +# self.assertFalse(poi3) +# self.assertFalse(poi4) +# self.assertFalse(poi5) +# +# def test_delete_twice(self): +# p = POIServer() +# poi1 = p.remove_poi("test") +# poi2 = p.remove_poi("test") +# self.asserFalse(poi2) +# + # def tearDown(self): + # self.p.cleanup() #not sure if this is right + +if __name__ == "__main__": + rostest.rosrun("mil_poi", "test_poi", POITest) + unittest.main() + + + + + From cd750b71622e732ad3fbb854f67a4d1c76ee9754 Mon Sep 17 00:00:00 2001 From: vboxuser Date: Sun, 13 Aug 2023 13:07:55 -0400 Subject: [PATCH 2/4] Add tests for POI functionality --- mil_common/utils/mil_poi/test/test_sync.py | 139 ++++++++++++++------- 1 file changed, 95 insertions(+), 44 deletions(-) diff --git a/mil_common/utils/mil_poi/test/test_sync.py b/mil_common/utils/mil_poi/test/test_sync.py index 6914228eb..bf7948e9b 100755 --- a/mil_common/utils/mil_poi/test/test_sync.py +++ b/mil_common/utils/mil_poi/test/test_sync.py @@ -7,51 +7,102 @@ from mil_poi.srv import AddPOIRequest, AddPOI class POITest(unittest.TestCase): - def setUp(self): + def setUp(self): #make one poi here - pass - - def test_add(self): - rospy.wait_for_service('/poi_server/add') - service = rospy.ServiceProxy("/poi_server/add", AddPOI) - test = service("test", PointStamped(header = Header(), point = Point(0.0, 1.0, 2.0))) - self.assertTrue(test.success) - -# def test_move(self): -# p = POIServer() -# #how do i know which POIs exist already -# -# def test_delete(self): -# p = POIServer() -# -# def test_long_string(self): -# p = POIServer() -# long_string = ''.join(random.choices(string.ascii_letters, k=20000)) -# poi = p.add_poi(long_string, [1.0, 2.0, 3.0]) -# self.assertTrue(poi) -# -# def test_wrong_types(self): -# p = POIServer() -# poi1 = p.add_poi(12321, [0.0, 2.3, 21.3]) -# poi2 = p.add_poi("tester", ["hi", "wrong" "bad"]) -# poi3 = p.remove_poi(10000) -# poi4 = p.remove_poi([0, 3]) -# poi5 = p.update_position() #not sure what second parameter even is -# -# self.assertFalse(poi1) -# self.assertFalse(poi2) -# self.assertFalse(poi3) -# self.assertFalse(poi4) -# self.assertFalse(poi5) -# -# def test_delete_twice(self): -# p = POIServer() -# poi1 = p.remove_poi("test") -# poi2 = p.remove_poi("test") -# self.asserFalse(poi2) -# - # def tearDown(self): - # self.p.cleanup() #not sure if this is right + rospy.init_node("poi_test_node") + self.poi_name = "test_poi" + self.poi_position = PointStamped(header=Header(), point=Point(0.0, 1.0, 2.0)) + self.add_poi() + + def add_poi(self): + rospy.wait_for_service('/poi_server/add') + service = rospy.ServiceProxy("/poi_server/add", AddPOI) + response = service(self.poi_name, self.poi_position) + self.assertTrue(response.success) + + def test_add(self): + # Call the add_poi function to add a POI + self.add_poi() + + # Check if the POI was added successfully + rospy.wait_for_service('/poi_server/get') + get_service = rospy.ServiceProxy("/poi_server/get", GetPOI) + response = get_service(self.poi_name) + self.assertTrue(response.success, f"Failed to add POI '{self.poi_name}'") + + def test_move(self): + # Call the add_poi function to add a POI + self.add_poi() + + # Wait for the move_poi service to become available + rospy.wait_for_service('/poi_server/move') + move_service = rospy.ServiceProxy("/poi_server/move", MovePOI) + + # Move the POI to a new position + new_position = [1.0, 2.0, 3.0] # New position coordinates + move_response = move_service(self.poi_name, new_position) + self.assertTrue(move_response.success, f"Failed to move POI '{self.poi_name}'") + + def test_delete(self): + # Call the add_poi function to add a POI + self.add_poi() + + # Wait for the remove_poi service to become available + rospy.wait_for_service('/poi_server/remove') + remove_service = rospy.ServiceProxy("/poi_server/remove", RemovePOI) + + # Remove the added POI + remove_response = remove_service(self.poi_name) + + # Check if the removal was successful + self.assertTrue(remove_response.success, f"Failed to remove POI '{self.poi_name}'") + + def test_long_string(self): + rospy.wait_for_service('/poi_server/add') + service = rospy.ServiceProxy("/poi_server/add", AddPOI) + + # Create a long string for the POI name + long_string = ''.join(random.choices(string.ascii_letters, k=20000)) + + # Call the service to add a new POI with the long string name + response = service(long_string, PointStamped(header=Header(), point=Point(0.0, 1.0, 2.0))) + self.assertFalse(response.success, "Added POI with long string name") + + def test_wrong_types(self): + # Wait for the add_poi service to become available + rospy.wait_for_service('/poi_server/add') + service = rospy.ServiceProxy("/poi_server/add", AddPOI) + + # Try adding a POI with wrong types of arguments + response1 = service(12321, [0.0, 2.3, 21.3]) # Incorrect name type + response2 = service("tester", ["hi", "wrong", "bad"]) # Incorrect position type + + # Check if the additions were unsuccessful + self.assertFalse(response1.success, "Added POI with wrong argument types") + self.assertFalse(response2.success, "Added POI with wrong argument types") + + def test_delete_twice(self): + # Call the add_poi function to add a POI + self.add_poi() + + # Wait for the remove_poi service to become available + rospy.wait_for_service('/poi_server/remove') + remove_service = rospy.ServiceProxy("/poi_server/remove", RemovePOI) + + # Remove the added POI + remove_response1 = remove_service(self.poi_name) + + # Try removing the same POI again + remove_response2 = remove_service(self.poi_name) + + # Check if the first removal was successful and the second removal was unsuccessful + self.assertTrue(remove_response1.success, f"Failed to remove POI '{self.poi_name}'") + self.assertFalse(remove_response2.success, f"Removed POI '{self.poi_name}' twice") + + def tearDown(self): + # Clean up any resources or state modified during testing + if hasattr(self, 'p') and callable(getattr(self.p, 'cleanup', None)): + self.p.cleanup() if __name__ == "__main__": rostest.rosrun("mil_poi", "test_poi", POITest) From d70c59d38de0799535828c0dd17c77ec01c3a139 Mon Sep 17 00:00:00 2001 From: Cameron Brown Date: Wed, 6 Mar 2024 01:26:09 -0500 Subject: [PATCH 3/4] Finish out tests --- mil_common/utils/mil_poi/test/poi.test | 4 +- mil_common/utils/mil_poi/test/test_async.py | 95 +++++++-- mil_common/utils/mil_poi/test/test_sync.py | 206 ++++++++++---------- 3 files changed, 191 insertions(+), 114 deletions(-) diff --git a/mil_common/utils/mil_poi/test/poi.test b/mil_common/utils/mil_poi/test/poi.test index 3188d090a..fa6be65ba 100644 --- a/mil_common/utils/mil_poi/test/poi.test +++ b/mil_common/utils/mil_poi/test/poi.test @@ -1,6 +1,6 @@ - - + + diff --git a/mil_common/utils/mil_poi/test/test_async.py b/mil_common/utils/mil_poi/test/test_async.py index 06ef4c639..4a4798d3e 100755 --- a/mil_common/utils/mil_poi/test/test_async.py +++ b/mil_common/utils/mil_poi/test/test_async.py @@ -1,31 +1,98 @@ #!/usr/bin/env python3 + +import asyncio import unittest -import rostest -import rospy + import axros +import rostest +from geometry_msgs.msg import Point, PointStamped +from mil_poi.srv import ( + AddPOI, + AddPOIRequest, + DeletePOI, + DeletePOIRequest, + MovePOI, + MovePOIRequest, +) from std_msgs.msg import Header -from geometry_msgs.msg import PointStamped, Point -from mil_poi.srv import AddPOIRequest, AddPOI + class POITestAsync(unittest.IsolatedAsyncioTestCase): async def asyncSetUp(self): - self.nh = axros.NodeHandle.from_argv("basic", always_default_name = True) + self.nh = axros.NodeHandle.from_argv("basic", always_default_name=True) await self.nh.setup() - self.service = self.nh.get_service_client("poi_server/add", AddPOI) + self.add_service = self.nh.get_service_client("/poi_server/add", AddPOI) + self.move_service = self.nh.get_service_client("/poi_server/move", MovePOI) + self.delete_service = self.nh.get_service_client( + "/poi_server/delete", + DeletePOI, + ) + await asyncio.gather( + self.add_service.wait_for_service(), + self.move_service.wait_for_service(), + self.delete_service.wait_for_service(), + ) - async def test_add(self): - await self.service.wait_for_service() - test = await self.service("test", PointStamped(header = Header(), point = Point(0.0, 1.0, 2.0))) + async def test_points(self): + # Adding + test = await self.add_service( + AddPOIRequest( + name="test", + position=PointStamped(header=Header(), point=Point(0.0, 1.0, 2.0)), + ), + ) + self.assertTrue(test.success) + test = await self.add_service( + AddPOIRequest( + name="test2", + position=PointStamped(header=Header(), point=Point(0.0, 1.0, 2.0)), + ), + ) self.assertTrue(test.success) - async def asyncTearDown(self): - await self.nh.shutdown() + # Moving + test = await self.move_service( + MovePOIRequest( + name="test", + position=PointStamped(header=Header(), point=Point(0.0, -1.0, -2.0)), + ), + ) + self.assertTrue(test.success) + test = await self.move_service( + MovePOIRequest( + name="test2", + position=PointStamped(header=Header(), point=Point(0.0, 0.0, 0.0)), + ), + ) + self.assertTrue(test.success) -if __name__ == "__main__": - rostest.rosrun("mil_poi", "test_poi", POITestAsync) - unittest.main() + # Moving a non-existent POI should return False + test = await self.move_service( + MovePOIRequest( + name="test3", + position=PointStamped(header=Header(), point=Point(0.0, 0.0, 0.0)), + ), + ) + self.assertFalse(test.success) + # Deleting + test = await self.delete_service(DeletePOIRequest(name="test")) + self.assertTrue(test.success) + test = await self.delete_service(DeletePOIRequest(name="test2")) + self.assertTrue(test.success) + # Deleting a non-existent POI should return False + test = await self.delete_service(DeletePOIRequest(name="test")) + self.assertFalse(test.success) + # Deleting a non-existent POI should return False + test = await self.delete_service(DeletePOIRequest(name="test3")) + self.assertFalse(test.success) + + async def asyncTearDown(self): + await self.nh.shutdown() +if __name__ == "__main__": + rostest.rosrun("mil_poi", "test_poi", POITestAsync) + unittest.main() diff --git a/mil_common/utils/mil_poi/test/test_sync.py b/mil_common/utils/mil_poi/test/test_sync.py index bf7948e9b..661153e4d 100755 --- a/mil_common/utils/mil_poi/test/test_sync.py +++ b/mil_common/utils/mil_poi/test/test_sync.py @@ -1,114 +1,124 @@ #!/usr/bin/env python3 +import random +import string import unittest -import rostest + +import genpy import rospy +import rostest +from geometry_msgs.msg import Point, PointStamped +from mil_poi.srv import ( + AddPOI, + DeletePOI, + MovePOI, + MovePOIRequest, +) from std_msgs.msg import Header -from geometry_msgs.msg import PointStamped, Point -from mil_poi.srv import AddPOIRequest, AddPOI + class POITest(unittest.TestCase): - def setUp(self): - #make one poi here - rospy.init_node("poi_test_node") - self.poi_name = "test_poi" - self.poi_position = PointStamped(header=Header(), point=Point(0.0, 1.0, 2.0)) - self.add_poi() - - def add_poi(self): - rospy.wait_for_service('/poi_server/add') - service = rospy.ServiceProxy("/poi_server/add", AddPOI) - response = service(self.poi_name, self.poi_position) - self.assertTrue(response.success) - - def test_add(self): - # Call the add_poi function to add a POI - self.add_poi() - - # Check if the POI was added successfully - rospy.wait_for_service('/poi_server/get') - get_service = rospy.ServiceProxy("/poi_server/get", GetPOI) - response = get_service(self.poi_name) - self.assertTrue(response.success, f"Failed to add POI '{self.poi_name}'") - - def test_move(self): + def setUp(self): + # make one poi here + rospy.init_node("poi_test_node") + self.poi_name = "test_poi" + self.poi_position = PointStamped(header=Header(), point=Point(0.0, 1.0, 2.0)) + self.add_poi() + + def add_poi(self): + rospy.wait_for_service("/poi_server/add") + service = rospy.ServiceProxy("/poi_server/add", AddPOI) + response = service(self.poi_name, self.poi_position) + self.assertTrue(response.success) + + def test_add(self): # Call the add_poi function to add a POI - self.add_poi() - - # Wait for the move_poi service to become available - rospy.wait_for_service('/poi_server/move') - move_service = rospy.ServiceProxy("/poi_server/move", MovePOI) - - # Move the POI to a new position - new_position = [1.0, 2.0, 3.0] # New position coordinates - move_response = move_service(self.poi_name, new_position) - self.assertTrue(move_response.success, f"Failed to move POI '{self.poi_name}'") - - def test_delete(self): - # Call the add_poi function to add a POI - self.add_poi() - - # Wait for the remove_poi service to become available - rospy.wait_for_service('/poi_server/remove') - remove_service = rospy.ServiceProxy("/poi_server/remove", RemovePOI) - - # Remove the added POI - remove_response = remove_service(self.poi_name) - - # Check if the removal was successful - self.assertTrue(remove_response.success, f"Failed to remove POI '{self.poi_name}'") - - def test_long_string(self): - rospy.wait_for_service('/poi_server/add') - service = rospy.ServiceProxy("/poi_server/add", AddPOI) - - # Create a long string for the POI name - long_string = ''.join(random.choices(string.ascii_letters, k=20000)) + self.add_poi() - # Call the service to add a new POI with the long string name - response = service(long_string, PointStamped(header=Header(), point=Point(0.0, 1.0, 2.0))) - self.assertFalse(response.success, "Added POI with long string name") - - def test_wrong_types(self): - # Wait for the add_poi service to become available - rospy.wait_for_service('/poi_server/add') - service = rospy.ServiceProxy("/poi_server/add", AddPOI) - - # Try adding a POI with wrong types of arguments - response1 = service(12321, [0.0, 2.3, 21.3]) # Incorrect name type - response2 = service("tester", ["hi", "wrong", "bad"]) # Incorrect position type - - # Check if the additions were unsuccessful - self.assertFalse(response1.success, "Added POI with wrong argument types") - self.assertFalse(response2.success, "Added POI with wrong argument types") - - def test_delete_twice(self): - # Call the add_poi function to add a POI - self.add_poi() + def test_move(self): + # Call the add_poi function to add a POI + self.add_poi() + + # Wait for the move_poi service to become available + rospy.wait_for_service("/poi_server/move") + move_service = rospy.ServiceProxy("/poi_server/move", MovePOI) + + # Move the POI to a new position + new_position = [1.0, 2.0, 3.0] # New position coordinates + move_response = move_service( + MovePOIRequest( + name=self.poi_name, + position=PointStamped(point=Point(*new_position)), + ), + ) + # Check if the additions were unsuccessful + self.assertTrue(move_response.success, f"Failed to move POI '{self.poi_name}'") + + def test_delete(self): + # Call the add_poi function to add a POI + self.add_poi() + + # Wait for the remove_poi service to become available + rospy.wait_for_service("/poi_server/delete") + remove_service = rospy.ServiceProxy("/poi_server/delete", DeletePOI) + + # Remove the added POI + remove_response = remove_service(self.poi_name) + + # Check if the removal was successful + self.assertTrue( + remove_response.success, + f"Failed to delete POI '{self.poi_name}'", + ) + + def test_long_string(self): + rospy.wait_for_service("/poi_server/add") + service = rospy.ServiceProxy("/poi_server/add", AddPOI) + + # Create a long string for the POI name + long_string = "".join(random.choices(string.ascii_letters, k=20000)) + + # Call the service to add a new POI with the long string name + response = service( + long_string, + PointStamped(header=Header(), point=Point(0.0, 1.0, 2.0)), + ) + self.assertTrue(response.success, response.message) + + def test_wrong_types(self): + # Wait for the add_poi service to become available + rospy.wait_for_service("/poi_server/add") + service = rospy.ServiceProxy("/poi_server/add", AddPOI) + + # Try adding a POI with wrong types of arguments + with self.assertRaises(genpy.message.SerializationError): + service(12321, [0.0, 2.3, 21.3]) # Incorrect name type + service("tester", ["hi", "wrong", "bad"]) # Incorrect position type + + def test_delete_twice(self): + # Call the add_poi function to add a POI + self.add_poi() - # Wait for the remove_poi service to become available - rospy.wait_for_service('/poi_server/remove') - remove_service = rospy.ServiceProxy("/poi_server/remove", RemovePOI) + # Wait for the remove_poi service to become available + rospy.wait_for_service("/poi_server/delete") + remove_service = rospy.ServiceProxy("/poi_server/delete", DeletePOI) - # Remove the added POI - remove_response1 = remove_service(self.poi_name) + # Remove the added POI + remove_response1 = remove_service(self.poi_name) - # Try removing the same POI again - remove_response2 = remove_service(self.poi_name) + # Try removing the same POI again + remove_response2 = remove_service(self.poi_name) - # Check if the first removal was successful and the second removal was unsuccessful - self.assertTrue(remove_response1.success, f"Failed to remove POI '{self.poi_name}'") - self.assertFalse(remove_response2.success, f"Removed POI '{self.poi_name}' twice") + # Check if the first removal was successful and the second removal was unsuccessful + self.assertTrue( + remove_response1.success, + f"Failed to remove POI '{self.poi_name}'", + ) + self.assertFalse( + remove_response2.success, + f"Removed POI '{self.poi_name}' twice", + ) - def tearDown(self): - # Clean up any resources or state modified during testing - if hasattr(self, 'p') and callable(getattr(self.p, 'cleanup', None)): - self.p.cleanup() -if __name__ == "__main__": +if __name__ == "__main__": rostest.rosrun("mil_poi", "test_poi", POITest) unittest.main() - - - - - From 786cc0e3b21a4f34548efe19eb4b8262298d0bc5 Mon Sep 17 00:00:00 2001 From: Cameron Brown Date: Wed, 6 Mar 2024 01:27:45 -0500 Subject: [PATCH 4/4] Fix line endings --- mil_common/utils/mil_poi/CMakeLists.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/mil_common/utils/mil_poi/CMakeLists.txt b/mil_common/utils/mil_poi/CMakeLists.txt index 8eb85a953..66989bd8d 100644 --- a/mil_common/utils/mil_poi/CMakeLists.txt +++ b/mil_common/utils/mil_poi/CMakeLists.txt @@ -34,4 +34,3 @@ if(CATKIN_ENABLE_TESTING) find_package(rostest REQUIRED) add_rostest(test/poi.test) endif() -