diff --git a/pyproject.toml b/pyproject.toml index d77a08862..9e085190c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "vmngclient" -version = "0.5.0" +version = "0.5.1" description = "Universal vManage API" authors = ["kagorski "] readme = "README.md" diff --git a/vmngclient/api/task_status_api.py b/vmngclient/api/task_status_api.py index 49299c17f..7b5daec57 100644 --- a/vmngclient/api/task_status_api.py +++ b/vmngclient/api/task_status_api.py @@ -19,6 +19,21 @@ class TaskStatus: activity: List[str] +def get_all_tasks(session: vManageSession) -> List[str]: + """ + Get list of active tasks id's in vmanage + + Args: + session (vManageSession): session + + Returns: + List[str]: active tasks id's + """ + url = "dataservice/device/action/status/tasks" + tasks = session.get_json(url) + return [process["processId"] for process in tasks["runningTasks"]] + + def wait_for_completed( session: vManageSession, action_id: str, @@ -91,11 +106,6 @@ def check_status(task: TaskStatus) -> bool: return False return True - def get_all_tasks(): - url = "dataservice/device/action/status/tasks" - tasks = session.get_json(url) - return [process["processId"] for process in tasks["runningTasks"]] - def log_exception(self) -> None: logger.error("Operation status not achieved in given time") @@ -117,7 +127,7 @@ def wait_for_action_finish() -> TaskStatus: try: action_data = session.get_data(url)[0] except IndexError: - tasks_ids = get_all_tasks() + tasks_ids = get_all_tasks(session) if action_id in tasks_ids: sleep(delay_seconds) try: @@ -127,7 +137,8 @@ def wait_for_action_finish() -> TaskStatus: f"Task id {action_id} registered by vManage in all tasks list, " f"but response about it's status didn't contain any information." ) - raise ValueError(f"Task id {action_id} is not registered by vManage.") + else: + raise ValueError(f"Task id {action_id} is not registered by vManage.") task = create_dataclass(TaskStatus, action_data) logger.debug( diff --git a/vmngclient/tests/test_task_status_api.py b/vmngclient/tests/test_task_status_api.py index bb9e26bcf..6b938f469 100644 --- a/vmngclient/tests/test_task_status_api.py +++ b/vmngclient/tests/test_task_status_api.py @@ -1,18 +1,18 @@ import unittest from unittest.mock import patch -from vmngclient.api.task_status_api import TaskStatus, wait_for_completed +from vmngclient.api.task_status_api import TaskStatus, get_all_tasks, wait_for_completed class TestTaskStatusApi(unittest.TestCase): def setUp(self): self.task = TaskStatus("Success", "success", []) + self.action_data = [{"status": "Success", "statusId": "success", "activity": []}] @patch("vmngclient.session.vManageSession") def test_wait_for_completed_success(self, mock_session): - # Prepare mock data - mock_session.get_data.return_value = [{"status": "Success", "statusId": "success", "activity": []}] + mock_session.get_data.return_value = self.action_data # Assert answer = wait_for_completed(mock_session, "mock_action_id", 3000, 5) @@ -20,10 +20,52 @@ def test_wait_for_completed_success(self, mock_session): @patch("vmngclient.session.vManageSession") def test_wait_for_completed_status_out_of_range(self, mock_session): - # Prepare mock data mock_session.get_data.return_value = [{"status": "Other_status", "statusId": "other_status", "activity": []}] # Assert answer = wait_for_completed(mock_session, "mock_action_id", 1, 1) self.assertEqual(answer, None, "job status incorrect") + + @patch("vmngclient.api.task_status_api.sleep") + @patch("vmngclient.api.task_status_api.get_all_tasks") + @patch("vmngclient.session.vManageSession") + def test_raise_index_error_actionid_in_tasks_ids_data_exists(self, mock_session, mock_get_tasks, mock_sleep): + # Arrange + mock_session.get_data.side_effect = [IndexError(), self.action_data] + mock_get_tasks.return_value = ["action_id"] + # Act + answer = wait_for_completed(mock_session, "action_id", 1, 1) + # Assert + self.assertEqual(answer, self.task) + + @patch("vmngclient.api.task_status_api.sleep") + @patch("vmngclient.api.task_status_api.get_all_tasks") + @patch("vmngclient.session.vManageSession") + def test_raise_index_error_actionid_in_tasks_ids_data_dosnt_exists(self, mock_session, mock_get_tasks, mock_sleep): + # Arrange + mock_session.get_data.side_effect = [IndexError(), []] + mock_get_tasks.return_value = ["action_id"] + # Act&Assert + self.assertRaises(IndexError, wait_for_completed, mock_session, "action_id", 1, 1) + + @patch("vmngclient.api.task_status_api.sleep") + @patch("vmngclient.api.task_status_api.get_all_tasks") + @patch("vmngclient.session.vManageSession") + def test_raise_index_error_actionid_not_in_tasks(self, mock_session, mock_get_tasks, mock_sleep): + # Arrange + mock_session.get_data.side_effect = IndexError() + mock_get_tasks.return_value = ["no_id"] + # Act&Assert + self.assertRaises(ValueError, wait_for_completed, mock_session, "action_id", 1, 1) + + @patch("vmngclient.session.vManageSession") + def test_get_all_tasks(self, mock_session): + # Arrange + mock_session.get_json.return_value = { + "runningTasks": [{"processId": "processId_1"}, {"processId": "processId_2"}] + } + # Act + answer = get_all_tasks(mock_session) + # Assert + self.assertEqual(answer, ["processId_1", "processId_2"])