Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
fix condition for raising ValueError (#89)
Browse files Browse the repository at this point in the history
* fix condition for raising ValueError

* add ut for task_status

* put check status outside wait method

* add docstring to get_all_tasks

* bump patch version

Co-authored-by: danielui <[email protected]>
  • Loading branch information
cicharka and danielui authored Jan 17, 2023
1 parent 18f5b78 commit 1886999
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 12 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "vmngclient"
version = "0.5.0"
version = "0.5.1"
description = "Universal vManage API"
authors = ["kagorski <[email protected]>"]
readme = "README.md"
Expand Down
25 changes: 18 additions & 7 deletions vmngclient/api/task_status_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,21 @@ class TaskStatus:
activity: List[str]


def get_all_tasks(session: vManageSession) -> List[str]:
"""
Get list of active tasks id's in vmanage
Args:
session (vManageSession): session
Returns:
List[str]: active tasks id's
"""
url = "dataservice/device/action/status/tasks"
tasks = session.get_json(url)
return [process["processId"] for process in tasks["runningTasks"]]


def wait_for_completed(
session: vManageSession,
action_id: str,
Expand Down Expand Up @@ -91,11 +106,6 @@ def check_status(task: TaskStatus) -> bool:
return False
return True

def get_all_tasks():
url = "dataservice/device/action/status/tasks"
tasks = session.get_json(url)
return [process["processId"] for process in tasks["runningTasks"]]

def log_exception(self) -> None:
logger.error("Operation status not achieved in given time")

Expand All @@ -117,7 +127,7 @@ def wait_for_action_finish() -> TaskStatus:
try:
action_data = session.get_data(url)[0]
except IndexError:
tasks_ids = get_all_tasks()
tasks_ids = get_all_tasks(session)
if action_id in tasks_ids:
sleep(delay_seconds)
try:
Expand All @@ -127,7 +137,8 @@ def wait_for_action_finish() -> TaskStatus:
f"Task id {action_id} registered by vManage in all tasks list, "
f"but response about it's status didn't contain any information."
)
raise ValueError(f"Task id {action_id} is not registered by vManage.")
else:
raise ValueError(f"Task id {action_id} is not registered by vManage.")

task = create_dataclass(TaskStatus, action_data)
logger.debug(
Expand Down
50 changes: 46 additions & 4 deletions vmngclient/tests/test_task_status_api.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,71 @@
import unittest
from unittest.mock import patch

from vmngclient.api.task_status_api import TaskStatus, wait_for_completed
from vmngclient.api.task_status_api import TaskStatus, get_all_tasks, wait_for_completed


class TestTaskStatusApi(unittest.TestCase):
def setUp(self):
self.task = TaskStatus("Success", "success", [])
self.action_data = [{"status": "Success", "statusId": "success", "activity": []}]

@patch("vmngclient.session.vManageSession")
def test_wait_for_completed_success(self, mock_session):

# Prepare mock data
mock_session.get_data.return_value = [{"status": "Success", "statusId": "success", "activity": []}]
mock_session.get_data.return_value = self.action_data

# Assert
answer = wait_for_completed(mock_session, "mock_action_id", 3000, 5)
self.assertEqual(answer, self.task, "job status incorrect")

@patch("vmngclient.session.vManageSession")
def test_wait_for_completed_status_out_of_range(self, mock_session):

# Prepare mock data
mock_session.get_data.return_value = [{"status": "Other_status", "statusId": "other_status", "activity": []}]

# Assert
answer = wait_for_completed(mock_session, "mock_action_id", 1, 1)
self.assertEqual(answer, None, "job status incorrect")

@patch("vmngclient.api.task_status_api.sleep")
@patch("vmngclient.api.task_status_api.get_all_tasks")
@patch("vmngclient.session.vManageSession")
def test_raise_index_error_actionid_in_tasks_ids_data_exists(self, mock_session, mock_get_tasks, mock_sleep):
# Arrange
mock_session.get_data.side_effect = [IndexError(), self.action_data]
mock_get_tasks.return_value = ["action_id"]
# Act
answer = wait_for_completed(mock_session, "action_id", 1, 1)
# Assert
self.assertEqual(answer, self.task)

@patch("vmngclient.api.task_status_api.sleep")
@patch("vmngclient.api.task_status_api.get_all_tasks")
@patch("vmngclient.session.vManageSession")
def test_raise_index_error_actionid_in_tasks_ids_data_dosnt_exists(self, mock_session, mock_get_tasks, mock_sleep):
# Arrange
mock_session.get_data.side_effect = [IndexError(), []]
mock_get_tasks.return_value = ["action_id"]
# Act&Assert
self.assertRaises(IndexError, wait_for_completed, mock_session, "action_id", 1, 1)

@patch("vmngclient.api.task_status_api.sleep")
@patch("vmngclient.api.task_status_api.get_all_tasks")
@patch("vmngclient.session.vManageSession")
def test_raise_index_error_actionid_not_in_tasks(self, mock_session, mock_get_tasks, mock_sleep):
# Arrange
mock_session.get_data.side_effect = IndexError()
mock_get_tasks.return_value = ["no_id"]
# Act&Assert
self.assertRaises(ValueError, wait_for_completed, mock_session, "action_id", 1, 1)

@patch("vmngclient.session.vManageSession")
def test_get_all_tasks(self, mock_session):
# Arrange
mock_session.get_json.return_value = {
"runningTasks": [{"processId": "processId_1"}, {"processId": "processId_2"}]
}
# Act
answer = get_all_tasks(mock_session)
# Assert
self.assertEqual(answer, ["processId_1", "processId_2"])

0 comments on commit 1886999

Please sign in to comment.