Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
[Fix] Get data of Task (#243)
Browse files Browse the repository at this point in the history
* rm task validation

* rm useless test

* set subtasks data attrs as optional again

---------

Co-authored-by: Tomasz Warda <[email protected]>
  • Loading branch information
danielui and tomaszwarda authored May 10, 2023
1 parent d94562e commit e960e20
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 44 deletions.
23 changes: 2 additions & 21 deletions vmngclient/api/task_status_api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import logging
from time import sleep
from typing import TYPE_CHECKING, List, Optional, cast

from tenacity import retry, retry_if_result, stop_after_attempt, wait_fixed # type: ignore
Expand All @@ -11,7 +10,6 @@

from pydantic import BaseModel, Field

from vmngclient.exceptions import EmptyTaskResponseError, TaskNotRegisteredError
from vmngclient.utils.operation_status import OperationStatus, OperationStatusId

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -78,7 +76,7 @@ def get_all_tasks(self) -> TasksData:
json = self.session.get_json(url)
return TasksData.parse_obj(json)

def get_task_data(self, delay_seconds: int = 5) -> List[SubTaskData]:
def get_task_data(self) -> List[SubTaskData]:
"""
Get data about all sub-tasks in task
Expand All @@ -89,25 +87,9 @@ def get_task_data(self, delay_seconds: int = 5) -> List[SubTaskData]:
Returns:
List[SubTaskData]: List of all sub-tusks
"""
self.__check_if_data_is_available(delay_seconds)
task_data = self.session.get_data(self.url)
return [SubTaskData.parse_obj(subtask_data) for subtask_data in task_data]

def __check_if_data_is_available(self, delay_seconds):
task_data = self.session.get_data(self.url)
if not task_data:
all_tasks_ids = [task.process_id for task in self.get_all_tasks().running_tasks]
if self.task_id in all_tasks_ids:
sleep(delay_seconds)
task_data = self.session.get_data(self.url)
if not task_data:
raise EmptyTaskResponseError(
f"Task id {self.task_id} registered by vManage in all tasks list, "
f"but response about it's status didn't contain any information."
)
else:
raise TaskNotRegisteredError(f"Task id {self.task_id} is not registered by vManage.")


class Task:
"""
Expand All @@ -124,7 +106,6 @@ def wait_for_completed(
self,
timeout_seconds: int = 300,
interval_seconds: int = 5,
delay_seconds: int = 10,
success_statuses: List[OperationStatus] = [
OperationStatus.SUCCESS,
],
Expand Down Expand Up @@ -228,7 +209,7 @@ def wait_for_action_finish() -> List[SubTaskData]:
List[SubTaskData]
"""

self.task_data = TasksAPI(self.session, self.task_id).get_task_data(delay_seconds)
self.task_data = TasksAPI(self.session, self.task_id).get_task_data()
sub_task_statuses = [task.status for task in self.task_data]
sub_task_statuses_id = [task.status_id for task in self.task_data]
sub_task_activities = [task.activity for task in self.task_data]
Expand Down
23 changes: 0 additions & 23 deletions vmngclient/tests/test_task_status_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from unittest.mock import patch

from vmngclient.api.task_status_api import RunningTaskData, SubTaskData, Task, TaskResult, TasksAPI, TasksData
from vmngclient.exceptions import EmptyTaskResponseError, TaskNotRegisteredError


class TestTaskStatusApi(unittest.TestCase):
Expand Down Expand Up @@ -103,28 +102,6 @@ def test_wait_for_completed_status_out_of_range(self, mock_session):
answer = Task(mock_session, "mock_action_id").wait_for_completed(1, 1).result
self.assertEqual(answer, False)

@patch("vmngclient.api.task_status_api.sleep")
@patch.object(TasksAPI, "get_all_tasks")
@patch("vmngclient.session.vManageSession")
def test_raise_error_actionid_in_tasks_ids_data_dosnt_exists(self, mock_session, mock_get_tasks, mock_sleep):
# Arrange
mock_session.get_data.return_value = []
mock_get_tasks.return_value = TasksData.parse_obj(self.running_task_data_json)
# Act&Assert
self.assertRaises(EmptyTaskResponseError, Task(mock_session, "processId_1").wait_for_completed, 1, 1)

@patch("vmngclient.api.task_status_api.sleep")
@patch.object(TasksAPI, "get_all_tasks")
@patch("vmngclient.session.vManageSession")
def test_raise_TaskNotRegistered_error(self, mock_session, mock_get_tasks, mock_sleep):
# Arrange
mock_session.get_data.return_value = []
mock_get_tasks.return_value = TasksData.parse_obj(self.running_task_data_json)
# Act&Assert
self.assertRaises(
TaskNotRegisteredError, Task(mock_session, "missing_id").wait_for_completed, mock_session, 1, 1
)

@patch("vmngclient.session.vManageSession")
def test_get_all_tasks(self, mock_session):
# Arrange
Expand Down

0 comments on commit e960e20

Please sign in to comment.