Skip to content

Commit

Permalink
Merge pull request #172 from camunda-community-hub/feature/async-worker
Browse files Browse the repository at this point in the history
Feature/async
  • Loading branch information
JonatanMartens authored Jun 14, 2021
2 parents 4c32bc7 + 516e476 commit f746940
Show file tree
Hide file tree
Showing 69 changed files with 1,994 additions and 1,466 deletions.
16 changes: 15 additions & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: Lint pyzeebe
on: [push, pull_request]

jobs:
build:
type-checking:

runs-on: ubuntu-latest

Expand All @@ -17,3 +17,17 @@ jobs:
- name: Lint with mypy
run: |
pipenv run mypy pyzeebe
import-checking:
runs-on: ubuntu-latest

container: python:3.8
steps:
- uses: actions/checkout@v2
- name: Install dependencies
run: |
pip install pipenv
pipenv install --dev
- name: Check imports
run: |
pipenv run isort . --check --diff
2 changes: 1 addition & 1 deletion .github/workflows/publish-python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Publish pyzeebe

on:
release:
types: [created]
types: [created, prereleased]

jobs:
publish:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test-zeebe-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
zeebe-version: [ "1.0.0-rc3" ]
zeebe-version: [ "1.0.0" ]

container: python:3.6

Expand Down
5 changes: 4 additions & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ pyzeebe = {editable = true, path = "."}
sphinx = "~=3.5.2"
sphinx-rtd-theme = "*"
pytest-mock = "*"
pytest-asyncio = "~=0.15.1"
asyncmock = "~=0.4.2"

[packages]
oauthlib = "~=3.1.0"
requests-oauthlib = "~=1.3.0"
zeebe-grpc = "~=1.0.0rc2"
zeebe-grpc = "~=1.0.0"
aiofiles = "~=0.7.0"

[pipenv]
allow_prereleases = true
508 changes: 264 additions & 244 deletions Pipfile.lock

Large diffs are not rendered by default.

28 changes: 17 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Zeebe version support:

| Pyzeebe version | Tested Zeebe versions |
| :-------------: | ---------------------- |
| 3.x.x | 1.0.0 |
| 2.x.x | 0.23, 0.24, 0.25, 0.26 |
| 1.x.x | 0.23, 0.24 |

Expand All @@ -34,33 +35,38 @@ For full documentation please visit: https://pyzeebe.readthedocs.io/en/stable/
The `ZeebeWorker` class uses threading to get and run jobs.

```python
import asyncio

from pyzeebe import ZeebeWorker, Job


def on_error(exception: Exception, job: Job):
async def on_error(exception: Exception, job: Job):
"""
on_error will be called when the task fails
"""
print(exception)
job.set_error_status(f"Failed to handle job {job}. Error: {str(exception)}")
await job.set_error_status(f"Failed to handle job {job}. Error: {str(exception)}")



worker = ZeebeWorker(hostname="<zeebe_host>", port=26500) # Create a zeebe worker

@worker.task(task_type="example", exception_handler=on_error)
def example_task(input: str):
def example_task(input: str) -> dict:
return {"output": f"Hello world, {input}!"}


worker.work() # Now every time that a task with type example is called example_task will be called
@worker.task(task_type="example2", exception_handler=on_error)
async def another_example_task(name: str) -> dict: # Tasks can also be async
return {"output": f"Hello world, {name} from async task!"}

asyncio.run(worker.work()) # Now every time that a task with type `example` or `example2` is called, the corresponding function will be called
```

Stop a worker:

```python
zeebe_worker.work() # Worker will begin working
zeebe_worker.stop() # Stops worker after all running jobs have been completed
await zeebe_worker.stop() # Stops worker after all running jobs have been completed
```

### Client
Expand All @@ -72,22 +78,22 @@ from pyzeebe import ZeebeClient
zeebe_client = ZeebeClient(hostname="localhost", port=26500)

# Run a Zeebe process instance
process_instance_key = zeebe_client.run_process(bpmn_process_id="My zeebe process", variables={})
process_instance_key = await zeebe_client.run_process(bpmn_process_id="My zeebe process", variables={})

# Run a process and receive the result
process_instance_key, process_result = zeebe_client.run_process_with_result(
process_instance_key, process_result = await zeebe_client.run_process_with_result(
bpmn_process_id="My zeebe process",
timeout=10000
)

# Deploy a BPMN process definition
zeebe_client.deploy_process("process.bpmn")
await zeebe_client.deploy_process("process.bpmn")

# Cancel a running process
zeebe_client.cancel_process_instance(process_instance_key=12345)
await zeebe_client.cancel_process_instance(process_instance_key=12345)

# Publish message
zeebe_client.publish_message(name="message_name", correlation_key="some_id")
await zeebe_client.publish_message(name="message_name", correlation_key="some_id")

```

Expand Down
10 changes: 5 additions & 5 deletions docs/client_quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ To change connection retries:

.. code-block:: python
worker = ZeebeClient(max_connection_retries=1) # Will only accept one failure and disconnect upon the second
client = ZeebeClient(max_connection_retries=1) # Will only accept one failure and disconnect upon the second
This means the client will disconnect upon two consecutive failures. Each time the client connects successfully the counter is reset.
Expand Down Expand Up @@ -68,7 +68,7 @@ Run a Zeebe process instance

.. code-block:: python
process_instance_key = client.run_process("bpmn_process_id")
process_instance_key = await client.run_process("bpmn_process_id")
Run a process with result
Expand All @@ -78,7 +78,7 @@ To run a process and receive the result directly:

.. code-block:: python
process_instance_key, result = client.run_process_with_result("bpmn_process_id")
process_instance_key, result = await client.run_process_with_result("bpmn_process_id")
# result will be a dict
Expand All @@ -88,12 +88,12 @@ Deploy a process

.. code-block:: python
client.deploy_process("process_file.bpmn")
await client.deploy_process("process_file.bpmn")
Publish a message
-----------------

.. code-block:: python
client.publish_message(name="message_name", correlation_key="correlation_key")
await client.publish_message(name="message_name", correlation_key="correlation_key")
4 changes: 4 additions & 0 deletions docs/client_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@ Client Reference
.. autoclass:: pyzeebe.ZeebeClient
:members:
:undoc-members:

.. autoclass:: pyzeebe.SyncZeebeClient
:members:
:undoc-members:
4 changes: 2 additions & 2 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
author = 'Jonatan Martens'

# The full version, including alpha/beta/rc tags
release = '2.3.1'
release = '3.0.0rc1'

# -- General configuration ---------------------------------------------------

Expand Down Expand Up @@ -59,6 +59,6 @@
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']

version = "2.3.1"
version = "3.0.0rc1"

master_doc = 'index'
13 changes: 11 additions & 2 deletions docs/decorators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
Decorators
==========

A ``pyzeebe`` decorator is a function that receives a :py:class:`Job` instance and returns a :py:class:`Job`.
A ``pyzeebe`` decorator is an async/sync function that receives a :py:class:`Job` instance and returns a :py:class:`Job`.

.. code-block:: python
Callable[[Job], Job]
Union[
Callable[[Job], Job],
Callable[[Job], Awaitable[Job]]
]
An example decorator:

Expand All @@ -16,6 +19,12 @@ An example decorator:
logging.info(job)
return job
# Or:
async def logging_decorator(job: Job) -> Job:
await async_logger.info(job)
return job
If a decorator raises an :class:`Exception` ``pyzeebe`` will just ignore it and continue the task/other decorators.

Task Decorators
Expand Down
8 changes: 4 additions & 4 deletions docs/exceptions.rst → docs/errors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ All ``pyzeebe`` exceptions inherit from :py:class:`PyZeebeError`

.. autoexception:: pyzeebe.errors.MessageAlreadyExistsError

.. autoexception:: pyzeebe.exceptions.ProcessDefinitionNotFoundError
.. autoexception:: pyzeebe.errors.ProcessDefinitionNotFoundError

.. autoexception:: pyzeebe.exceptions.ProcessInstanceNotFoundError
.. autoexception:: pyzeebe.errors.ProcessInstanceNotFoundError

.. autoexception:: pyzeebe.exceptions.ProcessDefinitionHasNoStartEventError
.. autoexception:: pyzeebe.errors.ProcessDefinitionHasNoStartEventError

.. autoexception:: pyzeebe.exceptions.ProcessInvalidError
.. autoexception:: pyzeebe.errors.ProcessInvalidError

.. autoexception:: pyzeebe.errors.InvalidJSONError

Expand Down
10 changes: 5 additions & 5 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ Creating a worker
worker = ZeebeWorker()
@worker.task(task_type="my_task")
def my_task(x: int):
async def my_task(x: int):
return {"y": x + 1}
worker.work()
await worker.work()
Creating a client

Expand All @@ -37,10 +37,10 @@ Creating a client
client = ZeebeClient()
client.run_process("my_process")
await client.run_process("my_process")
# Run process with variables:
client.run_process("my_process", variables={"x": 0})
await client.run_process("my_process", variables={"x": 0})
Dependencies
Expand All @@ -62,4 +62,4 @@ Table Of Contents
Client <client>
Worker <worker>
Decorators <decorators>
Exceptions <exceptions>
Exceptions <errors>
13 changes: 10 additions & 3 deletions docs/worker_quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,19 @@ Create and start a worker

.. code-block:: python
import asyncio
from pyzeebe import ZeebeWorker
worker = ZeebeWorker()
@worker.task(task_type="my_task")
def my_task(x: int):
async def my_task(x: int):
return {"y": x + 1}
worker.work()
asyncio.run(worker.work())
Worker connection options
Expand Down Expand Up @@ -86,6 +88,11 @@ To add a task to the worker:
.. code-block:: python
@worker.task(task_type="my_task")
def my_task(x: int):
async def my_task(x: int):
return {"y": x + 1}
# Or using a non-async function:
@worker.task(task_type="my_task")
def second_task(x: int):
return {"y": x + 1}
2 changes: 1 addition & 1 deletion docs/worker_taskrouter.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Creating a task with a router is the exact same process as wiht a :py:class:`Zee
.. code-block:: python
@router.task(task_type="my_task")
def my_task(x: int):
async def my_task(x: int):
return {"y": x + 1}
Expand Down
37 changes: 32 additions & 5 deletions docs/worker_tasks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,43 @@ To create a task you must first create a :py:class:`ZeebeWorker` or :py:class:`Z
.. code-block:: python
@worker.task(task_type="my_task")
def my_task():
async def my_task():
return {}
This is a task that does nothing. It receives no parameters and also doesn't return any.


.. note::

While this task indeed returns a python dictionary, it doesn't return anything to Zeebe. Do do that we have to fill the dictionary.
While this task indeed returns a python dictionary, it doesn't return anything to Zeebe. To do that we have to fill the dictionary with values.


Async/Sync Tasks
----------------

Tasks can be regular or async functions. If given a regular function, pyzeebe will convert it into an async one by running `asyncio.run_in_executor`

.. note::

Make sure not to call any blocking function in an async task. This would slow the entire worker down.

Do:

.. code-block:: python
@worker.task(task_type="my_task")
def my_task():
time.sleep(10) # Blocking call
return {}
Don't:

.. code-block:: python
@worker.task(task_type="my_task")
async def my_task():
time.sleep(10) # Blocking call
return {}
Task Exception Handler
----------------------
Expand All @@ -30,7 +57,7 @@ An exception handler's signature:

.. code-block:: python
Callable[[Exception, Job], None]
Callable[[Exception, Job], Awaitable[None]]
In other words: an exception handler is a function that receives an :class:`Exception` and :py:class:`Job` instance (a pyzeebe class).

Expand All @@ -43,9 +70,9 @@ To add an exception handler to a task:
from pyzeebe import Job
def my_exception_handler(exception: Exception, job: Job) -> None:
async def my_exception_handler(exception: Exception, job: Job) -> None:
print(exception)
job.set_failure_status(message=str(exception))
await job.set_failure_status(message=str(exception))
@worker.task(task_type="my_task", exception_handler=my_exception_handler)
Expand Down
Loading

0 comments on commit f746940

Please sign in to comment.