Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor/task builder #131

Merged
merged 48 commits into from
Mar 27, 2021
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
ff459bd
Add task builder
JonatanMartens Feb 20, 2021
bd11a09
Add single_value function to dict conversion to task_builder
JonatanMartens Feb 20, 2021
7095d9e
Move task_builder from pyzeebe/worker to pyzeebe/task
JonatanMartens Feb 20, 2021
0f659fd
Add type annotations to build_task
JonatanMartens Feb 20, 2021
dc57eff
Add type annotations to build_task
JonatanMartens Feb 20, 2021
0ab0445
Add log when failing to execute job
JonatanMartens Feb 20, 2021
89b1667
Refactor Task to be a dataclass containing the job_handler and the co…
JonatanMartens Feb 20, 2021
67b0a0a
Add pyzeebe/task/types
JonatanMartens Feb 20, 2021
2c3f931
Move TaskDecorator from pyzeebe/task/task_decorator to pyzeebe/task/t…
JonatanMartens Feb 20, 2021
d3f2aba
Refactor task_handler to accept TaskConfig
JonatanMartens Feb 21, 2021
b8c3a94
Rename TaskHandler to ZeebeTaskRouter
JonatanMartens Feb 21, 2021
59131d0
Refactor ZeebeTaskRouter tests
JonatanMartens Feb 21, 2021
3bb171d
Refactor ZeebeWorker to use task_builder
JonatanMartens Feb 21, 2021
f52ac70
Fix worker tests
JonatanMartens Feb 21, 2021
cb03ed4
Rename functions named _ to dummy_function
JonatanMartens Feb 21, 2021
1057465
Add type annotations to ZeebeWorker tests
JonatanMartens Feb 21, 2021
d920ed9
Export TaskConfig, default_exception_handler
JonatanMartens Feb 21, 2021
d2c4d04
Remove redundant import from zeebe_worker.py
JonatanMartens Feb 21, 2021
48cb320
Remove ZeebeDecoratorBase
JonatanMartens Feb 21, 2021
d02ae2b
Refactor integration tests to use pytest fixtures and use new ZeebeWo…
JonatanMartens Feb 21, 2021
579055d
Use inspect to get function parameters
JonatanMartens Feb 21, 2021
67710f1
Import Callable from typing instead of collections
JonatanMartens Feb 23, 2021
409d114
Remove usage of dataclasses
JonatanMartens Mar 6, 2021
37f1431
Rename NoVariableNameGiven to NoVariableNameGivenError
JonatanMartens Mar 6, 2021
6a35d71
Get index with 1 instead of -1
JonatanMartens Mar 6, 2021
1797841
Change test_remove_task_from_many to have at least one task
JonatanMartens Mar 6, 2021
f7e7987
Rename TaskNotFound to TaskNotFoundError
JonatanMartens Mar 6, 2021
a154a85
Rename all pyzeebe exceptions to end with Error
JonatanMartens Mar 6, 2021
5010b12
Rename all exceptions files to errors
JonatanMartens Mar 6, 2021
d861280
Refactor build_job_handler tests to fit arrange, act, assert structure
JonatanMartens Mar 6, 2021
1da1d0e
Use pytest parameterize to simplify TestGetFunctionParameters
JonatanMartens Mar 6, 2021
2a2eaf7
Improve default_exception_handler test name
JonatanMartens Mar 6, 2021
f746baa
Add happy path to task_config tests
JonatanMartens Mar 6, 2021
8d7547b
Add docstrings to before and after methods
JonatanMartens Mar 14, 2021
f63f1d0
Rename timeout field in TaskConfig to timeout_ms
JonatanMartens Mar 14, 2021
62c6002
Clean get_parameters_from_function function inner names
JonatanMartens Mar 14, 2021
2b8ddb2
Add repr to Task and TaskConfig objects
JonatanMartens Mar 14, 2021
bd568e2
Rename wrapper to task_wrapper
JonatanMartens Mar 14, 2021
3483cf1
Rename fixture from task_handler_mock to job_handler_spy
JonatanMartens Mar 14, 2021
83c4ce6
Call route._is_task_duplicate in test_no_duplicate_task_type_error_is…
JonatanMartens Mar 15, 2021
3c57237
Receive regular params instead of task config
JonatanMartens Mar 20, 2021
876d819
Add test case for no variables added to result
JonatanMartens Mar 20, 2021
6d9e85e
Run tests on every push
JonatanMartens Mar 20, 2021
c29addb
Merge branch 'pre-release/3.0.0' into refactor/task-builder
JonatanMartens Mar 20, 2021
34e7f26
Fix integration tests to work without task config
JonatanMartens Mar 20, 2021
4a8ba92
Update ZeebeWorker documentation
JonatanMartens Mar 20, 2021
22042f4
Remove docstring whitespace in ZeebeTaskRouter
JonatanMartens Mar 27, 2021
1621369
Move variable name check back to TaskConfig
JonatanMartens Mar 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyzeebe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pyzeebe.job.job import Job
from pyzeebe.job.job_status import JobStatus
from pyzeebe.task.exception_handler import ExceptionHandler
from pyzeebe.task.task_decorator import TaskDecorator
from pyzeebe.task.task_config import TaskConfig, default_exception_handler
from pyzeebe.task.types import TaskDecorator
from pyzeebe.worker.task_router import ZeebeTaskRouter
from pyzeebe.worker.worker import ZeebeWorker
Empty file removed pyzeebe/decorators/__init__.py
Empty file.
15 changes: 0 additions & 15 deletions pyzeebe/decorators/zeebe_decorator_base.py

This file was deleted.

32 changes: 12 additions & 20 deletions pyzeebe/task/task.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,16 @@
from typing import Callable, List, Dict
from dataclasses import dataclass
from typing import Callable

from pyzeebe.decorators.zeebe_decorator_base import ZeebeDecoratorBase
from pyzeebe.job.job import Job
from pyzeebe.task.exception_handler import ExceptionHandler
from pyzeebe.task.task_config import TaskConfig
from pyzeebe.task.types import JobHandler


class Task(ZeebeDecoratorBase):
def __init__(self, task_type: str, task_handler: Callable[..., Dict], exception_handler: ExceptionHandler,
timeout: int = 10000, max_jobs_to_activate: int = 32, variables_to_fetch: List[str] = None,
before: List = None, after: List = None):
super().__init__(before=before, after=after)
@dataclass
JonatanMartens marked this conversation as resolved.
Show resolved Hide resolved
class Task:
JonatanMartens marked this conversation as resolved.
Show resolved Hide resolved
original_function: Callable
job_handler: JobHandler
config: TaskConfig

self.type = task_type
self.inner_function = task_handler
self.exception_handler = exception_handler
self.timeout = timeout
self.max_jobs_to_activate = max_jobs_to_activate
self.variables_to_fetch = variables_to_fetch or []
self.handler: Callable[[Job], Job] = None

def __repr__(self) -> str:
return str({"type": self.type, "timeout": self.timeout, "max_jobs_to_activate": self.max_jobs_to_activate,
"variables_to_fetch": self.variables_to_fetch})
@property
def type(self):
return self.config.type
76 changes: 76 additions & 0 deletions pyzeebe/task/task_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import inspect
import logging
from typing import List, Callable, Dict, Tuple

from pyzeebe import Job, TaskDecorator
from pyzeebe.task.task import Task
from pyzeebe.task.task_config import TaskConfig
from pyzeebe.task.types import DecoratorRunner, JobHandler

logger = logging.getLogger(__name__)


def build_task(task_function: Callable, task_config: TaskConfig) -> Task:
JonatanMartens marked this conversation as resolved.
Show resolved Hide resolved
if not task_config.variables_to_fetch:
task_config.variables_to_fetch = get_parameters_from_function(task_function)

if task_config.single_value:
task_function = convert_to_dict_function(task_function, task_config.variable_name)
kbakk marked this conversation as resolved.
Show resolved Hide resolved

return Task(task_function, build_job_handler(task_function, task_config), task_config)


def build_job_handler(task_function: Callable, task_config: TaskConfig) -> JobHandler:
before_decorator_runner = create_decorator_runner(task_config.before)
after_decorator_runner = create_decorator_runner(task_config.after)

def job_handler(job: Job) -> Job:
job = before_decorator_runner(job)
job.variables, succeeded = run_original_task_function(task_function, task_config, job)
job = after_decorator_runner(job)
if succeeded:
job.set_success_status()
return job

return job_handler


def run_original_task_function(task_function: Callable, task_config: TaskConfig, job: Job) -> Tuple[Dict, bool]:
try:
return task_function(**job.variables), True
except Exception as e:
logger.debug(f"Failed job: {job}. Error: {e}.")
task_config.exception_handler(e, job)
return job.variables, False


def create_decorator_runner(decorators: List[TaskDecorator]) -> DecoratorRunner:
def decorator_runner(job: Job):
for decorator in decorators:
job = run_decorator(decorator, job)
return job

return decorator_runner


def run_decorator(decorator: TaskDecorator, job: Job) -> Job:
try:
return decorator(job)
except Exception as e:
logger.warning(f"Failed to run decorator {decorator}. Exception: {e}")
return job
kbakk marked this conversation as resolved.
Show resolved Hide resolved


def convert_to_dict_function(single_value_function: Callable, variable_name: str) -> Callable[..., Dict]:
def inner_fn(*args, **kwargs):
return {variable_name: single_value_function(*args, **kwargs)}

return inner_fn


def get_parameters_from_function(fn: Callable) -> List[str]:
JonatanMartens marked this conversation as resolved.
Show resolved Hide resolved
function_signature = inspect.signature(fn)
for parameter_name, parameter in function_signature.parameters.items():
JonatanMartens marked this conversation as resolved.
Show resolved Hide resolved
if parameter.kind in (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD):
return []
return list(function_signature.parameters)
32 changes: 32 additions & 0 deletions pyzeebe/task/task_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import logging
from dataclasses import dataclass, field
from typing import List, Optional

from pyzeebe.exceptions import NoVariableNameGiven
from pyzeebe.job.job import Job
from pyzeebe.task.exception_handler import ExceptionHandler
from pyzeebe.task.types import TaskDecorator

logger = logging.getLogger(__name__)


def default_exception_handler(e: Exception, job: Job) -> None:
logger.warning(f"Task type: {job.type} - failed job {job}. Error: {e}.")
job.set_failure_status(f"Failed job. Error: {e}")


@dataclass
class TaskConfig:
type: str
exception_handler: ExceptionHandler = default_exception_handler
timeout: int = 10000
max_jobs_to_activate: int = 32
variables_to_fetch: Optional[List[str]] = None
single_value: bool = False
variable_name: Optional[str] = None
before: List[TaskDecorator] = field(default_factory=list)
after: List[TaskDecorator] = field(default_factory=list)

def __post_init__(self):
if self.single_value and not self.variable_name:
raise NoVariableNameGiven(self.type)
JonatanMartens marked this conversation as resolved.
Show resolved Hide resolved
5 changes: 0 additions & 5 deletions pyzeebe/task/task_decorator.py

This file was deleted.

7 changes: 7 additions & 0 deletions pyzeebe/task/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from typing import Callable

from pyzeebe import Job

DecoratorRunner = Callable[[Job], Job]
JobHandler = Callable[[Job], Job]
TaskDecorator = Callable[[Job], Job]
145 changes: 0 additions & 145 deletions pyzeebe/worker/task_handler.py

This file was deleted.

Loading