-
Notifications
You must be signed in to change notification settings - Fork 29
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
191 additions
and
114 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
<launch> | ||
<node pkg="mil_poi" type="poi_server" name="poi_server"> | ||
<!-- <rosparam command="load" file="$(find navigator_launch)/config/poi.yaml"> --> | ||
</node> | ||
<test test-name="poi_test" pkg="mil_poi" type="test_async.py" time-limit="19.0"/> | ||
<test test-name="async_test" pkg="mil_poi" type="test_async.py" time-limit="19.0"/> | ||
<test test-name="sync_test" pkg="mil_poi" type="test_sync.py" time-limit="19.0"/> | ||
</launch> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,31 +1,98 @@ | ||
#!/usr/bin/env python3 | ||
|
||
import asyncio | ||
import unittest | ||
import rostest | ||
import rospy | ||
|
||
import axros | ||
import rostest | ||
from geometry_msgs.msg import Point, PointStamped | ||
from mil_poi.srv import ( | ||
AddPOI, | ||
AddPOIRequest, | ||
DeletePOI, | ||
DeletePOIRequest, | ||
MovePOI, | ||
MovePOIRequest, | ||
) | ||
from std_msgs.msg import Header | ||
from geometry_msgs.msg import PointStamped, Point | ||
from mil_poi.srv import AddPOIRequest, AddPOI | ||
|
||
|
||
class POITestAsync(unittest.IsolatedAsyncioTestCase): | ||
async def asyncSetUp(self): | ||
self.nh = axros.NodeHandle.from_argv("basic", always_default_name = True) | ||
self.nh = axros.NodeHandle.from_argv("basic", always_default_name=True) | ||
await self.nh.setup() | ||
self.service = self.nh.get_service_client("poi_server/add", AddPOI) | ||
self.add_service = self.nh.get_service_client("/poi_server/add", AddPOI) | ||
self.move_service = self.nh.get_service_client("/poi_server/move", MovePOI) | ||
self.delete_service = self.nh.get_service_client( | ||
"/poi_server/delete", | ||
DeletePOI, | ||
) | ||
await asyncio.gather( | ||
self.add_service.wait_for_service(), | ||
self.move_service.wait_for_service(), | ||
self.delete_service.wait_for_service(), | ||
) | ||
|
||
async def test_add(self): | ||
await self.service.wait_for_service() | ||
test = await self.service("test", PointStamped(header = Header(), point = Point(0.0, 1.0, 2.0))) | ||
async def test_points(self): | ||
# Adding | ||
test = await self.add_service( | ||
AddPOIRequest( | ||
name="test", | ||
position=PointStamped(header=Header(), point=Point(0.0, 1.0, 2.0)), | ||
), | ||
) | ||
self.assertTrue(test.success) | ||
test = await self.add_service( | ||
AddPOIRequest( | ||
name="test2", | ||
position=PointStamped(header=Header(), point=Point(0.0, 1.0, 2.0)), | ||
), | ||
) | ||
self.assertTrue(test.success) | ||
|
||
async def asyncTearDown(self): | ||
await self.nh.shutdown() | ||
# Moving | ||
test = await self.move_service( | ||
MovePOIRequest( | ||
name="test", | ||
position=PointStamped(header=Header(), point=Point(0.0, -1.0, -2.0)), | ||
), | ||
) | ||
self.assertTrue(test.success) | ||
test = await self.move_service( | ||
MovePOIRequest( | ||
name="test2", | ||
position=PointStamped(header=Header(), point=Point(0.0, 0.0, 0.0)), | ||
), | ||
) | ||
self.assertTrue(test.success) | ||
|
||
if __name__ == "__main__": | ||
rostest.rosrun("mil_poi", "test_poi", POITestAsync) | ||
unittest.main() | ||
# Moving a non-existent POI should return False | ||
test = await self.move_service( | ||
MovePOIRequest( | ||
name="test3", | ||
position=PointStamped(header=Header(), point=Point(0.0, 0.0, 0.0)), | ||
), | ||
) | ||
self.assertFalse(test.success) | ||
|
||
# Deleting | ||
test = await self.delete_service(DeletePOIRequest(name="test")) | ||
self.assertTrue(test.success) | ||
test = await self.delete_service(DeletePOIRequest(name="test2")) | ||
self.assertTrue(test.success) | ||
|
||
# Deleting a non-existent POI should return False | ||
test = await self.delete_service(DeletePOIRequest(name="test")) | ||
self.assertFalse(test.success) | ||
|
||
# Deleting a non-existent POI should return False | ||
test = await self.delete_service(DeletePOIRequest(name="test3")) | ||
self.assertFalse(test.success) | ||
|
||
async def asyncTearDown(self): | ||
await self.nh.shutdown() | ||
|
||
|
||
if __name__ == "__main__": | ||
rostest.rosrun("mil_poi", "test_poi", POITestAsync) | ||
unittest.main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,114 +1,124 @@ | ||
#!/usr/bin/env python3 | ||
import random | ||
import string | ||
import unittest | ||
import rostest | ||
|
||
import genpy | ||
import rospy | ||
import rostest | ||
from geometry_msgs.msg import Point, PointStamped | ||
from mil_poi.srv import ( | ||
AddPOI, | ||
DeletePOI, | ||
MovePOI, | ||
MovePOIRequest, | ||
) | ||
from std_msgs.msg import Header | ||
from geometry_msgs.msg import PointStamped, Point | ||
from mil_poi.srv import AddPOIRequest, AddPOI | ||
|
||
|
||
class POITest(unittest.TestCase): | ||
def setUp(self): | ||
#make one poi here | ||
rospy.init_node("poi_test_node") | ||
self.poi_name = "test_poi" | ||
self.poi_position = PointStamped(header=Header(), point=Point(0.0, 1.0, 2.0)) | ||
self.add_poi() | ||
|
||
def add_poi(self): | ||
rospy.wait_for_service('/poi_server/add') | ||
service = rospy.ServiceProxy("/poi_server/add", AddPOI) | ||
response = service(self.poi_name, self.poi_position) | ||
self.assertTrue(response.success) | ||
|
||
def test_add(self): | ||
# Call the add_poi function to add a POI | ||
self.add_poi() | ||
|
||
# Check if the POI was added successfully | ||
rospy.wait_for_service('/poi_server/get') | ||
get_service = rospy.ServiceProxy("/poi_server/get", GetPOI) | ||
response = get_service(self.poi_name) | ||
self.assertTrue(response.success, f"Failed to add POI '{self.poi_name}'") | ||
|
||
def test_move(self): | ||
def setUp(self): | ||
# make one poi here | ||
rospy.init_node("poi_test_node") | ||
self.poi_name = "test_poi" | ||
self.poi_position = PointStamped(header=Header(), point=Point(0.0, 1.0, 2.0)) | ||
self.add_poi() | ||
|
||
def add_poi(self): | ||
rospy.wait_for_service("/poi_server/add") | ||
service = rospy.ServiceProxy("/poi_server/add", AddPOI) | ||
response = service(self.poi_name, self.poi_position) | ||
self.assertTrue(response.success) | ||
|
||
def test_add(self): | ||
# Call the add_poi function to add a POI | ||
self.add_poi() | ||
|
||
# Wait for the move_poi service to become available | ||
rospy.wait_for_service('/poi_server/move') | ||
move_service = rospy.ServiceProxy("/poi_server/move", MovePOI) | ||
|
||
# Move the POI to a new position | ||
new_position = [1.0, 2.0, 3.0] # New position coordinates | ||
move_response = move_service(self.poi_name, new_position) | ||
self.assertTrue(move_response.success, f"Failed to move POI '{self.poi_name}'") | ||
|
||
def test_delete(self): | ||
# Call the add_poi function to add a POI | ||
self.add_poi() | ||
|
||
# Wait for the remove_poi service to become available | ||
rospy.wait_for_service('/poi_server/remove') | ||
remove_service = rospy.ServiceProxy("/poi_server/remove", RemovePOI) | ||
|
||
# Remove the added POI | ||
remove_response = remove_service(self.poi_name) | ||
|
||
# Check if the removal was successful | ||
self.assertTrue(remove_response.success, f"Failed to remove POI '{self.poi_name}'") | ||
|
||
def test_long_string(self): | ||
rospy.wait_for_service('/poi_server/add') | ||
service = rospy.ServiceProxy("/poi_server/add", AddPOI) | ||
|
||
# Create a long string for the POI name | ||
long_string = ''.join(random.choices(string.ascii_letters, k=20000)) | ||
self.add_poi() | ||
|
||
# Call the service to add a new POI with the long string name | ||
response = service(long_string, PointStamped(header=Header(), point=Point(0.0, 1.0, 2.0))) | ||
self.assertFalse(response.success, "Added POI with long string name") | ||
|
||
def test_wrong_types(self): | ||
# Wait for the add_poi service to become available | ||
rospy.wait_for_service('/poi_server/add') | ||
service = rospy.ServiceProxy("/poi_server/add", AddPOI) | ||
|
||
# Try adding a POI with wrong types of arguments | ||
response1 = service(12321, [0.0, 2.3, 21.3]) # Incorrect name type | ||
response2 = service("tester", ["hi", "wrong", "bad"]) # Incorrect position type | ||
|
||
# Check if the additions were unsuccessful | ||
self.assertFalse(response1.success, "Added POI with wrong argument types") | ||
self.assertFalse(response2.success, "Added POI with wrong argument types") | ||
|
||
def test_delete_twice(self): | ||
# Call the add_poi function to add a POI | ||
self.add_poi() | ||
def test_move(self): | ||
# Call the add_poi function to add a POI | ||
self.add_poi() | ||
|
||
# Wait for the move_poi service to become available | ||
rospy.wait_for_service("/poi_server/move") | ||
move_service = rospy.ServiceProxy("/poi_server/move", MovePOI) | ||
|
||
# Move the POI to a new position | ||
new_position = [1.0, 2.0, 3.0] # New position coordinates | ||
move_response = move_service( | ||
MovePOIRequest( | ||
name=self.poi_name, | ||
position=PointStamped(point=Point(*new_position)), | ||
), | ||
) | ||
# Check if the additions were unsuccessful | ||
self.assertTrue(move_response.success, f"Failed to move POI '{self.poi_name}'") | ||
|
||
def test_delete(self): | ||
# Call the add_poi function to add a POI | ||
self.add_poi() | ||
|
||
# Wait for the remove_poi service to become available | ||
rospy.wait_for_service("/poi_server/delete") | ||
remove_service = rospy.ServiceProxy("/poi_server/delete", DeletePOI) | ||
|
||
# Remove the added POI | ||
remove_response = remove_service(self.poi_name) | ||
|
||
# Check if the removal was successful | ||
self.assertTrue( | ||
remove_response.success, | ||
f"Failed to delete POI '{self.poi_name}'", | ||
) | ||
|
||
def test_long_string(self): | ||
rospy.wait_for_service("/poi_server/add") | ||
service = rospy.ServiceProxy("/poi_server/add", AddPOI) | ||
|
||
# Create a long string for the POI name | ||
long_string = "".join(random.choices(string.ascii_letters, k=20000)) | ||
|
||
# Call the service to add a new POI with the long string name | ||
response = service( | ||
long_string, | ||
PointStamped(header=Header(), point=Point(0.0, 1.0, 2.0)), | ||
) | ||
self.assertTrue(response.success, response.message) | ||
|
||
def test_wrong_types(self): | ||
# Wait for the add_poi service to become available | ||
rospy.wait_for_service("/poi_server/add") | ||
service = rospy.ServiceProxy("/poi_server/add", AddPOI) | ||
|
||
# Try adding a POI with wrong types of arguments | ||
with self.assertRaises(genpy.message.SerializationError): | ||
service(12321, [0.0, 2.3, 21.3]) # Incorrect name type | ||
service("tester", ["hi", "wrong", "bad"]) # Incorrect position type | ||
|
||
def test_delete_twice(self): | ||
# Call the add_poi function to add a POI | ||
self.add_poi() | ||
|
||
# Wait for the remove_poi service to become available | ||
rospy.wait_for_service('/poi_server/remove') | ||
remove_service = rospy.ServiceProxy("/poi_server/remove", RemovePOI) | ||
# Wait for the remove_poi service to become available | ||
rospy.wait_for_service("/poi_server/delete") | ||
remove_service = rospy.ServiceProxy("/poi_server/delete", DeletePOI) | ||
|
||
# Remove the added POI | ||
remove_response1 = remove_service(self.poi_name) | ||
# Remove the added POI | ||
remove_response1 = remove_service(self.poi_name) | ||
|
||
# Try removing the same POI again | ||
remove_response2 = remove_service(self.poi_name) | ||
# Try removing the same POI again | ||
remove_response2 = remove_service(self.poi_name) | ||
|
||
# Check if the first removal was successful and the second removal was unsuccessful | ||
self.assertTrue(remove_response1.success, f"Failed to remove POI '{self.poi_name}'") | ||
self.assertFalse(remove_response2.success, f"Removed POI '{self.poi_name}' twice") | ||
# Check if the first removal was successful and the second removal was unsuccessful | ||
self.assertTrue( | ||
remove_response1.success, | ||
f"Failed to remove POI '{self.poi_name}'", | ||
) | ||
self.assertFalse( | ||
remove_response2.success, | ||
f"Removed POI '{self.poi_name}' twice", | ||
) | ||
|
||
def tearDown(self): | ||
# Clean up any resources or state modified during testing | ||
if hasattr(self, 'p') and callable(getattr(self.p, 'cleanup', None)): | ||
self.p.cleanup() | ||
|
||
if __name__ == "__main__": | ||
if __name__ == "__main__": | ||
rostest.rosrun("mil_poi", "test_poi", POITest) | ||
unittest.main() | ||
|
||
|
||
|
||
|
||
|