Skip to content

Commit

Permalink
35: Had to show an example of using mocks in a unittest. Thought to m…
Browse files Browse the repository at this point in the history
…ake a relevant example and committing it here. Adds unit tests for the new_job_submitted_handler in main.py:Orchestrator
  • Loading branch information
lfse-slafleur committed May 6, 2024
1 parent d25348b commit c5fb765
Showing 1 changed file with 229 additions and 3 deletions.
232 changes: 229 additions & 3 deletions unit_test/test_main.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
import threading
import time
import unittest
import uuid
from datetime import timedelta
from multiprocessing.pool import ThreadPool, MapResult
from unittest.mock import patch
from unittest.mock import patch, Mock
from uuid import UUID

from omotes_sdk.job import Job
from omotes_sdk.workflow_type import WorkflowType
from omotes_sdk_protocol.job_pb2 import JobSubmission
from google.protobuf import json_format

from omotes_orchestrator.config import OrchestratorConfig
from omotes_orchestrator.main import LifeCycleBarrierManager, BarrierTimeoutException, \
MissingBarrierException
from omotes_orchestrator.db_models.job import JobStatus
from omotes_orchestrator.main import (
LifeCycleBarrierManager,
BarrierTimeoutException,
MissingBarrierException,
Orchestrator,
)


class LifeCycleBarrierManagerTest(unittest.TestCase):
Expand Down Expand Up @@ -148,6 +160,220 @@ def test__cleanup_barrier__cleaning_barrier(self) -> None:
self.assertNotIn(job_id, barrier_manager._barriers)


class OrchestratorTest(unittest.TestCase):
def test__new_job_submitted_handler__fully_new_job(self) -> None:
# Arrange
omotes_orchestrator_if = Mock()
jobs_broker_if = Mock()
celery_if = Mock()
celery_if.start_workflow.return_value = "celery_id"
postgresql_if = Mock()
postgresql_if.job_exists.return_value = False
workflow_manager = Mock()

with patch(
"omotes_orchestrator.main.LifeCycleBarrierManager"
) as life_cycle_barrier_manager_class_mock:
orchestrator = Orchestrator(
omotes_orchestrator_if=omotes_orchestrator_if,
jobs_broker_if=jobs_broker_if,
celery_if=celery_if,
postgresql_if=postgresql_if,
workflow_manager=workflow_manager,
)

life_cycle_barrier_manager_obj_mock = life_cycle_barrier_manager_class_mock.return_value

job_id = uuid.uuid4()
timeout = 3000
workflow_type = "some-workflow"
esdl = "Some-esdl"
params_dict = {}

Check failure on line 191 in unit_test/test_main.py

View workflow job for this annotation

GitHub Actions / Typecheck (3.11)

error: Need type annotation for "params_dict" (hint: "params_dict: Dict[<type>, <type>] = ...") [var-annotated]
job_submission = JobSubmission(
uuid=str(job_id),
timeout_ms=timeout,
workflow_type=workflow_type,
esdl=esdl,
params_dict=params_dict,

Check failure on line 197 in unit_test/test_main.py

View workflow job for this annotation

GitHub Actions / Typecheck (3.11)

error: Argument "params_dict" to "JobSubmission" has incompatible type "dict[Any, Any]"; expected "Struct | None" [arg-type]
)
job = Job(id=job_id, workflow_type=WorkflowType(workflow_type, "some-descr"))

# Act
orchestrator.new_job_submitted_handler(job_submission, job)

# Assert
expected_params_dict = json_format.MessageToDict(job_submission.params_dict)
expected_celery_id = "celery_id"
expected_timeout = timedelta(milliseconds=timeout)

life_cycle_barrier_manager_obj_mock.ensure_barrier.assert_called_once_with(job_id)
life_cycle_barrier_manager_obj_mock.set_barrier.assert_called_once_with(job_id)
celery_if.start_workflow.assert_called_once_with(
job.workflow_type, job.id, job_submission.esdl, expected_params_dict
)
postgresql_if.job_exists.assert_called_once_with(job_id)
postgresql_if.get_job_status.assert_not_called()
postgresql_if.set_job_submitted.called_called_once_with(job_id, expected_celery_id)
postgresql_if.put_new_job.assert_called_once_with(
job_id=job.id,
workflow_type=job_submission.workflow_type,
timeout_after=expected_timeout,
)

def test__new_job_submitted_handler__already_registered_but_not_submitted_new_job(self) -> None:
# Arrange
omotes_orchestrator_if = Mock()
jobs_broker_if = Mock()
celery_if = Mock()
celery_if.start_workflow.return_value = "celery_id"
postgresql_if = Mock()
postgresql_if.get_job_status.return_value = JobStatus.REGISTERED
postgresql_if.job_exists.return_value = True
workflow_manager = Mock()

with patch(
"omotes_orchestrator.main.LifeCycleBarrierManager"
) as life_cycle_barrier_manager_class_mock:
orchestrator = Orchestrator(
omotes_orchestrator_if=omotes_orchestrator_if,
jobs_broker_if=jobs_broker_if,
celery_if=celery_if,
postgresql_if=postgresql_if,
workflow_manager=workflow_manager,
)

life_cycle_barrier_manager_obj_mock = life_cycle_barrier_manager_class_mock.return_value

job_id = uuid.uuid4()
timeout = 3000
workflow_type = "some-workflow"
esdl = "Some-esdl"
params_dict = {}

Check failure on line 251 in unit_test/test_main.py

View workflow job for this annotation

GitHub Actions / Typecheck (3.11)

error: Need type annotation for "params_dict" (hint: "params_dict: Dict[<type>, <type>] = ...") [var-annotated]
job_submission = JobSubmission(
uuid=str(job_id),
timeout_ms=timeout,
workflow_type=workflow_type,
esdl=esdl,
params_dict=params_dict,

Check failure on line 257 in unit_test/test_main.py

View workflow job for this annotation

GitHub Actions / Typecheck (3.11)

error: Argument "params_dict" to "JobSubmission" has incompatible type "dict[Any, Any]"; expected "Struct | None" [arg-type]
)
job = Job(id=job_id, workflow_type=WorkflowType(workflow_type, "some-descr"))

# Act
orchestrator.new_job_submitted_handler(job_submission, job)

# Assert
expected_params_dict = json_format.MessageToDict(job_submission.params_dict)
expected_celery_id = "celery_id"

life_cycle_barrier_manager_obj_mock.ensure_barrier.assert_called_once_with(job_id)
life_cycle_barrier_manager_obj_mock.set_barrier.assert_called_once_with(job_id)
celery_if.start_workflow.assert_called_once_with(
job.workflow_type, job.id, job_submission.esdl, expected_params_dict
)
postgresql_if.job_exists.assert_called_once_with(job_id)
postgresql_if.get_job_status.assert_called_once_with(job_id)
postgresql_if.set_job_submitted.called_called_once_with(job_id, expected_celery_id)
postgresql_if.put_new_job.assert_not_called()

def test__new_job_submitted_handler__already_registered_and_submitted_new_job(self) -> None:
# Arrange
omotes_orchestrator_if = Mock()
jobs_broker_if = Mock()
celery_if = Mock()
postgresql_if = Mock()
postgresql_if.get_job_status.return_value = JobStatus.SUBMITTED
postgresql_if.job_exists.return_value = True
workflow_manager = Mock()

with patch(
"omotes_orchestrator.main.LifeCycleBarrierManager"
) as life_cycle_barrier_manager_class_mock:
orchestrator = Orchestrator(
omotes_orchestrator_if=omotes_orchestrator_if,
jobs_broker_if=jobs_broker_if,
celery_if=celery_if,
postgresql_if=postgresql_if,
workflow_manager=workflow_manager,
)

life_cycle_barrier_manager_obj_mock = life_cycle_barrier_manager_class_mock.return_value

job_id = uuid.uuid4()
timeout = 3000
workflow_type = "some-workflow"
esdl = "Some-esdl"
params_dict = {}

Check failure on line 305 in unit_test/test_main.py

View workflow job for this annotation

GitHub Actions / Typecheck (3.11)

error: Need type annotation for "params_dict" (hint: "params_dict: Dict[<type>, <type>] = ...") [var-annotated]
job_submission = JobSubmission(
uuid=str(job_id),
timeout_ms=timeout,
workflow_type=workflow_type,
esdl=esdl,
params_dict=params_dict,

Check failure on line 311 in unit_test/test_main.py

View workflow job for this annotation

GitHub Actions / Typecheck (3.11)

error: Argument "params_dict" to "JobSubmission" has incompatible type "dict[Any, Any]"; expected "Struct | None" [arg-type]
)
job = Job(id=job_id, workflow_type=WorkflowType(workflow_type, "some-descr"))

# Act
orchestrator.new_job_submitted_handler(job_submission, job)

# Assert
life_cycle_barrier_manager_obj_mock.ensure_barrier.assert_not_called()
life_cycle_barrier_manager_obj_mock.set_barrier.assert_not_called()
celery_if.start_workflow.assert_not_called()
postgresql_if.job_exists.assert_called_once_with(job_id)
postgresql_if.get_job_status.assert_called_once_with(job_id)
postgresql_if.set_job_submitted.assert_not_called()
postgresql_if.put_new_job.assert_not_called()

def test__new_job_submitted_handler__already_running_new_job(self) -> None:
# Arrange
omotes_orchestrator_if = Mock()
jobs_broker_if = Mock()
celery_if = Mock()
postgresql_if = Mock()
postgresql_if.get_job_status.return_value = JobStatus.RUNNING
postgresql_if.job_exists.return_value = True
workflow_manager = Mock()

with patch(
"omotes_orchestrator.main.LifeCycleBarrierManager"
) as life_cycle_barrier_manager_class_mock:
orchestrator = Orchestrator(
omotes_orchestrator_if=omotes_orchestrator_if,
jobs_broker_if=jobs_broker_if,
celery_if=celery_if,
postgresql_if=postgresql_if,
workflow_manager=workflow_manager,
)

life_cycle_barrier_manager_obj_mock = life_cycle_barrier_manager_class_mock.return_value

job_id = uuid.uuid4()
timeout = 3000
workflow_type = "some-workflow"
esdl = "Some-esdl"
params_dict = {}

Check failure on line 354 in unit_test/test_main.py

View workflow job for this annotation

GitHub Actions / Typecheck (3.11)

error: Need type annotation for "params_dict" (hint: "params_dict: Dict[<type>, <type>] = ...") [var-annotated]
job_submission = JobSubmission(
uuid=str(job_id),
timeout_ms=timeout,
workflow_type=workflow_type,
esdl=esdl,
params_dict=params_dict,

Check failure on line 360 in unit_test/test_main.py

View workflow job for this annotation

GitHub Actions / Typecheck (3.11)

error: Argument "params_dict" to "JobSubmission" has incompatible type "dict[Any, Any]"; expected "Struct | None" [arg-type]
)
job = Job(id=job_id, workflow_type=WorkflowType(workflow_type, "some-descr"))

# Act
orchestrator.new_job_submitted_handler(job_submission, job)

# Assert
life_cycle_barrier_manager_obj_mock.ensure_barrier.assert_not_called()
life_cycle_barrier_manager_obj_mock.set_barrier.assert_not_called()
celery_if.start_workflow.assert_not_called()
postgresql_if.job_exists.assert_called_once_with(job_id)
postgresql_if.get_job_status.assert_called_once_with(job_id)
postgresql_if.set_job_submitted.assert_not_called()
postgresql_if.put_new_job.assert_not_called()


class MyTest(unittest.TestCase):
def test__construct_orchestrator_config__no_exception(self) -> None:
# Arrange
Expand Down

0 comments on commit c5fb765

Please sign in to comment.