Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor/task builder #131

Merged
merged 48 commits into from
Mar 27, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
ff459bd
Add task builder
JonatanMartens Feb 20, 2021
bd11a09
Add single_value function to dict conversion to task_builder
JonatanMartens Feb 20, 2021
7095d9e
Move task_builder from pyzeebe/worker to pyzeebe/task
JonatanMartens Feb 20, 2021
0f659fd
Add type annotations to build_task
JonatanMartens Feb 20, 2021
dc57eff
Add type annotations to build_task
JonatanMartens Feb 20, 2021
0ab0445
Add log when failing to execute job
JonatanMartens Feb 20, 2021
89b1667
Refactor Task to be a dataclass containing the job_handler and the co…
JonatanMartens Feb 20, 2021
67b0a0a
Add pyzeebe/task/types
JonatanMartens Feb 20, 2021
2c3f931
Move TaskDecorator from pyzeebe/task/task_decorator to pyzeebe/task/t…
JonatanMartens Feb 20, 2021
d3f2aba
Refactor task_handler to accept TaskConfig
JonatanMartens Feb 21, 2021
b8c3a94
Rename TaskHandler to ZeebeTaskRouter
JonatanMartens Feb 21, 2021
59131d0
Refactor ZeebeTaskRouter tests
JonatanMartens Feb 21, 2021
3bb171d
Refactor ZeebeWorker to use task_builder
JonatanMartens Feb 21, 2021
f52ac70
Fix worker tests
JonatanMartens Feb 21, 2021
cb03ed4
Rename functions named _ to dummy_function
JonatanMartens Feb 21, 2021
1057465
Add type annotations to ZeebeWorker tests
JonatanMartens Feb 21, 2021
d920ed9
Export TaskConfig, default_exception_handler
JonatanMartens Feb 21, 2021
d2c4d04
Remove redundant import from zeebe_worker.py
JonatanMartens Feb 21, 2021
48cb320
Remove ZeebeDecoratorBase
JonatanMartens Feb 21, 2021
d02ae2b
Refactor integration tests to use pytest fixtures and use new ZeebeWo…
JonatanMartens Feb 21, 2021
579055d
Use inspect to get function parameters
JonatanMartens Feb 21, 2021
67710f1
Import Callable from typing instead of collections
JonatanMartens Feb 23, 2021
409d114
Remove usage of dataclasses
JonatanMartens Mar 6, 2021
37f1431
Rename NoVariableNameGiven to NoVariableNameGivenError
JonatanMartens Mar 6, 2021
6a35d71
Get index with 1 instead of -1
JonatanMartens Mar 6, 2021
1797841
Change test_remove_task_from_many to have at least one task
JonatanMartens Mar 6, 2021
f7e7987
Rename TaskNotFound to TaskNotFoundError
JonatanMartens Mar 6, 2021
a154a85
Rename all pyzeebe exceptions to end with Error
JonatanMartens Mar 6, 2021
5010b12
Rename all exceptions files to errors
JonatanMartens Mar 6, 2021
d861280
Refactor build_job_handler tests to fit arrange, act, assert structure
JonatanMartens Mar 6, 2021
1da1d0e
Use pytest parameterize to simplify TestGetFunctionParameters
JonatanMartens Mar 6, 2021
2a2eaf7
Improve default_exception_handler test name
JonatanMartens Mar 6, 2021
f746baa
Add happy path to task_config tests
JonatanMartens Mar 6, 2021
8d7547b
Add docstrings to before and after methods
JonatanMartens Mar 14, 2021
f63f1d0
Rename timeout field in TaskConfig to timeout_ms
JonatanMartens Mar 14, 2021
62c6002
Clean get_parameters_from_function function inner names
JonatanMartens Mar 14, 2021
2b8ddb2
Add repr to Task and TaskConfig objects
JonatanMartens Mar 14, 2021
bd568e2
Rename wrapper to task_wrapper
JonatanMartens Mar 14, 2021
3483cf1
Rename fixture from task_handler_mock to job_handler_spy
JonatanMartens Mar 14, 2021
83c4ce6
Call route._is_task_duplicate in test_no_duplicate_task_type_error_is…
JonatanMartens Mar 15, 2021
3c57237
Receive regular params instead of task config
JonatanMartens Mar 20, 2021
876d819
Add test case for no variables added to result
JonatanMartens Mar 20, 2021
6d9e85e
Run tests on every push
JonatanMartens Mar 20, 2021
c29addb
Merge branch 'pre-release/3.0.0' into refactor/task-builder
JonatanMartens Mar 20, 2021
34e7f26
Fix integration tests to work without task config
JonatanMartens Mar 20, 2021
4a8ba92
Update ZeebeWorker documentation
JonatanMartens Mar 20, 2021
22042f4
Remove docstring whitespace in ZeebeTaskRouter
JonatanMartens Mar 27, 2021
1621369
Move variable name check back to TaskConfig
JonatanMartens Mar 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pyzeebe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from pyzeebe.job.job import Job
from pyzeebe.job.job_status import JobStatus
from pyzeebe.task.exception_handler import ExceptionHandler
from pyzeebe.task.task_config import TaskConfig, default_exception_handler
from pyzeebe.task.task_config import TaskConfig
from pyzeebe.task.types import TaskDecorator
from pyzeebe.worker.task_router import ZeebeTaskRouter
from pyzeebe.worker.task_router import ZeebeTaskRouter, default_exception_handler
from pyzeebe.worker.worker import ZeebeWorker
29 changes: 8 additions & 21 deletions pyzeebe/task/task_config.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,24 @@
import logging
from typing import List, Optional
from typing import List

from pyzeebe.errors import NoVariableNameGivenError
from pyzeebe.job.job import Job
from pyzeebe.task.exception_handler import ExceptionHandler
from pyzeebe.task.types import TaskDecorator

logger = logging.getLogger(__name__)


def default_exception_handler(e: Exception, job: Job) -> None:
logger.warning(f"Task type: {job.type} - failed job {job}. Error: {e}.")
job.set_failure_status(f"Failed job. Error: {e}")


class TaskConfig:
def __init__(self, type: str, exception_handler: ExceptionHandler = default_exception_handler,
timeout_ms: int = 10000, max_jobs_to_activate: int = 32,
variables_to_fetch: Optional[List[str]] = None,
single_value: bool = False, variable_name: Optional[str] = None, before: List[TaskDecorator] = None,
after: List[TaskDecorator] = None):
if single_value and not variable_name:
raise NoVariableNameGivenError(type)

def __init__(self, type: str, exception_handler: ExceptionHandler,
timeout_ms: int, max_jobs_to_activate: int,
variables_to_fetch: List[str],
single_value: bool, variable_name: str, before: List[TaskDecorator],
after: List[TaskDecorator]):
self.type = type
self.exception_handler = exception_handler
self.timeout_ms = timeout_ms
self.max_jobs_to_activate = max_jobs_to_activate
self.variables_to_fetch = variables_to_fetch
kbakk marked this conversation as resolved.
Show resolved Hide resolved
self.single_value = single_value
self.variable_name = variable_name
self.before = before or []
self.after = after or []
self.before = before
self.after = after

def __repr__(self):
return f"TaskConfig(type={self.type}, exception_handler={self.exception_handler}, " \
Expand Down
69 changes: 56 additions & 13 deletions pyzeebe/worker/task_router.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
import logging
from typing import Tuple, List, Callable, Union
from typing import Callable, List, Tuple, Optional

from pyzeebe import TaskDecorator
from pyzeebe.errors import TaskNotFoundError, DuplicateTaskTypeError
from pyzeebe.errors import (DuplicateTaskTypeError, NoVariableNameGivenError,
TaskNotFoundError)
from pyzeebe.job.job import Job
from pyzeebe.task import task_builder
from pyzeebe.task.exception_handler import ExceptionHandler
from pyzeebe.task.task import Task
from pyzeebe.task.task_config import TaskConfig

logger = logging.getLogger(__name__)


def default_exception_handler(e: Exception, job: Job) -> None:
logger.warning(f"Task type: {job.type} - failed job {job}. Error: {e}.")
job.set_failure_status(f"Failed job. Error: {e}")


class ZeebeTaskRouter:
def __init__(self, before: List[TaskDecorator] = None, after: List[TaskDecorator] = None):
"""
Expand All @@ -21,25 +29,60 @@ def __init__(self, before: List[TaskDecorator] = None, after: List[TaskDecorator
self._after: List[TaskDecorator] = after or []
self.tasks: List[Task] = []

def task(self, task_config: Union[TaskConfig, str]):
def task(self, task_type: str, exception_handler: ExceptionHandler = default_exception_handler,
variables_to_fetch: Optional[List[str]] = None, timeout_ms: int = 10000, max_jobs_to_activate: int = 32,
before: List[TaskDecorator] = None, after: List[TaskDecorator] = None, single_value: bool = False,
variable_name: str = None):
"""
Decorator to create a task

Args:
task_config (Union[str, TaskConfig]): Either the task type or a task configuration object
task_type (str): The task type

JonatanMartens marked this conversation as resolved.
Show resolved Hide resolved
exception_handler (ExceptionHandler): Handler that will be called when a job fails.

variables_to_fetch (Optional[List[str]]): The variables to request from Zeebe when activating jobs.

timeout_ms (int): Maximum duration of the task in milliseconds. If the timeout is surpassed Zeebe will give up
on the worker and retry it. Default: 10000 (10 seconds).

max_jobs_to_activate (int): Maximum jobs the worker will execute in parallel (of this task). Default: 32

before (List[TaskDecorator]): All decorators which should be performed before the task.

after (List[TaskDecorator]): All decorators which should be performed after the task.

single_value (bool): If the function returns a single value (int, string, list) and not a dictionary set
this to True. Default: False

variable_name (str): If single_value then this will be the variable name given to zeebe:
{ <variable_name>: <function_return_value> }
Raises:
DuplicateTaskTypeError: If a task from the router already exists in the worker

NoVariableNameGivenError: When single_value is set, but no variable_name is given
"""
if isinstance(task_config, str):
task_config = TaskConfig(task_config)
config_with_decorators = self._add_decorators_to_config(task_config)

def task_wrapper(fn: Callable):
task = task_builder.build_task(fn, config_with_decorators)
if single_value and not variable_name:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move back this check to TaskConfig.__init__, and build the TaskConfig outside of the wrapper? It seems to belong there.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task config is built here because we need the task_function to get variables_to_fetch

raise NoVariableNameGivenError(task_type)

def task_wrapper(task_function: Callable):
config = TaskConfig(
task_type,
exception_handler,
timeout_ms,
max_jobs_to_activate,
variables_to_fetch or task_builder.get_parameters_from_function(
task_function),
single_value,
variable_name or "",
before or [],
after or []
)
config_with_decorators = self._add_decorators_to_config(config)

task = task_builder.build_task(
task_function, config_with_decorators
)
self._add_task(task)
return fn
return task_function

return task_wrapper

Expand Down
12 changes: 11 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,17 @@ def first_active_job(task, job_from_task, grpc_servicer) -> str:

@pytest.fixture
def task_config(task_type):
return TaskConfig(task_type, MagicMock())
return TaskConfig(
type=task_type,
exception_handler=MagicMock(),
timeout_ms=10000,
max_jobs_to_activate=32,
variables_to_fetch=[],
single_value=False,
variable_name="",
before=[],
after=[]
)


@pytest.fixture
Expand Down
39 changes: 0 additions & 39 deletions tests/unit/task/task_config_test.py

This file was deleted.

2 changes: 1 addition & 1 deletion tests/unit/utils/random_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
RANDOM_RANGE = 1000000000


def random_job(task: Task = task_builder.build_task(lambda x: {"x": x}, TaskConfig("test")),
def random_job(task: Task = task_builder.build_task(lambda x: {"x": x}, TaskConfig("test", lambda: None, 10000, 32, [], False, "", [], [])),
zeebe_adapter: ZeebeAdapter = None) -> Job:
return Job(_type=task.type, key=randint(0, RANDOM_RANGE), worker=str(uuid4()),
retries=randint(0, 10), workflow_instance_key=randint(0, RANDOM_RANGE),
Expand Down
16 changes: 13 additions & 3 deletions tests/unit/worker/task_router_test.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from unittest.mock import patch
from uuid import uuid4

import pytest

from pyzeebe import TaskDecorator
from pyzeebe.errors import TaskNotFoundError, DuplicateTaskTypeError
from pyzeebe.errors import DuplicateTaskTypeError, TaskNotFoundError
from pyzeebe.job.job import Job
from pyzeebe.task.task import Task
from pyzeebe.worker.task_router import ZeebeTaskRouter
from pyzeebe.worker.task_router import (ZeebeTaskRouter,
default_exception_handler)
from tests.unit.utils.random_utils import randint


Expand Down Expand Up @@ -97,3 +99,11 @@ def test_add_after_decorator_through_constructor(decorator: TaskDecorator):
router = ZeebeTaskRouter(after=[decorator])

assert len(router._after) == 1


def test_default_exception_handler_logs_a_warning(mocked_job_with_adapter: Job):
with patch("pyzeebe.worker.task_router.logger.warning") as logging_mock:
default_exception_handler(Exception(), mocked_job_with_adapter)

mocked_job_with_adapter.set_failure_status.assert_called()
logging_mock.assert_called()
18 changes: 2 additions & 16 deletions tests/unit/worker/worker_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ def test_variables_to_fetch_match_function_parameters(self, zeebe_worker: ZeebeW
def dummy_function(x):
pass

assert zeebe_worker.get_task(task_type).config.variables_to_fetch == expected_variables_to_fetch
assert zeebe_worker.get_task(
task_type).config.variables_to_fetch == expected_variables_to_fetch


class TestDecorator:
Expand All @@ -67,21 +68,6 @@ def test_add_constructor_after_decorator(self, decorator: TaskDecorator):
assert len(zeebe_worker._after) == 1
assert decorator in zeebe_worker._after

def test_decorator_failed(self, zeebe_worker: ZeebeWorker, task: Task, decorator: TaskDecorator,
job_from_task: Job):
decorator.side_effect = Exception()
zeebe_worker.before(decorator)
zeebe_worker.after(decorator)

@zeebe_worker.task(task.config)
def dummy_function():
pass

task = zeebe_worker.get_task(task.type)
task.job_handler(job_from_task)

assert decorator.call_count == 2


class TestHandleJobs:
@pytest.fixture(autouse=True)
Expand Down