Skip to content

Commit

Permalink
Merge pull request #21 from taskbadger/sk/v1.0
Browse files Browse the repository at this point in the history
v1.0
  • Loading branch information
snopoke authored Oct 25, 2023
2 parents 564935e + f8987bd commit f1e7e8d
Show file tree
Hide file tree
Showing 15 changed files with 795 additions and 1 deletion.
29 changes: 29 additions & 0 deletions .github/workflows/integration_tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: Integration Tests
on:
push:
branches:
- main

jobs:
integration-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.11"
- name: Install Poetry
uses: Gr1N/setup-poetry@v8
- uses: actions/cache@v3
with:
path: ~/.cache/pypoetry/virtualenvs
key: ${{ runner.os }}-poetry-${{ hashFiles('poetry.lock') }}
- name: Checks
run: |
poetry --version
poetry check --no-interaction
- name: Install project
run: poetry install --no-interaction
- name: Run integration tests
run: poetry run pytest integration_tests -v
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dist/
.env
.venv
.envrc
.env.integration

# mypy
.mypy_cache/
Expand Down
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,24 @@ taskbadger.init(
)
```

#### Usage with Celery

```python
import taskbadger
from celery import Celery

app = Celery("tasks")

@app.task(bind=True, base=taskbadger.Task)
def my_task(self):
task = self.taskbadger_task
for i in range(1000):
do_something(i)
if i % 100 == 0:
task.update(value=i, value_max=1000)
task.success(value=1000)
```

#### API Example

```python
Expand Down
14 changes: 14 additions & 0 deletions examples/function_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from datetime import datetime

from taskbadger import track


@track(max_runtime=1)
def my_task(arg1, arg2, kwarg1=None, kwarg2="demo"):
print("Hello from my_task")
print(f"arg1={arg1}, arg2={arg2}, kwarg1={kwarg1}, kwarg2={kwarg2}")
return ["Hello from my_task", datetime.utcnow()]


if __name__ == "__main__":
my_task(1, 2, kwarg1="foo", kwarg2="bar")
33 changes: 33 additions & 0 deletions integration_tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import os
from pathlib import Path

import pytest

import taskbadger as badger


def _load_config():
path = Path(__file__).parent.parent / ".env.integration"
if not path.exists():
return
with path.open() as f:
for line in f:
if line.startswith("#") or not line.strip():
continue
key, value = line.strip().split("=", 1)
os.environ[key] = value


_load_config()
ORG = os.environ.get("TASKBADGER_ORG", "")
PROJECT = os.environ.get("TASKBADGER_PROJECT", "")
API_KEY = os.environ.get("TASKBADGER_API_KEY", "")

if not ORG or not PROJECT or not API_KEY:
pytest.fail("Integration test config missing", pytrace=False)
else:
badger.init(
os.environ.get("TASKBADGER_ORG", ""),
os.environ.get("TASKBADGER_PROJECT", ""),
os.environ.get("TASKBADGER_API_KEY", ""),
)
10 changes: 10 additions & 0 deletions integration_tests/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from celery import shared_task

import taskbadger.celery


@shared_task(bind=True, base=taskbadger.celery.Task)
def add(self, x, y):
assert self.taskbadger_task is not None, "missing task on self"
self.taskbadger_task.update(value=100, data={"result": x + y})
return x + y
16 changes: 16 additions & 0 deletions integration_tests/test_basics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from datetime import datetime

import taskbadger as badger
from taskbadger import StatusEnum


def test_basics():
data = {"now": datetime.utcnow().isoformat()}
task = badger.create_task("test basics", data=data)
task.success(100)
assert task.status == StatusEnum.SUCCESS

fresh = badger.get_task(task.id)
assert fresh.status == StatusEnum.SUCCESS
assert fresh.value == 100
assert fresh.data == data
26 changes: 26 additions & 0 deletions integration_tests/test_celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import random

import pytest

from taskbadger import StatusEnum

from .tasks import add


@pytest.fixture(scope="session", autouse=True)
def celery_includes():
return [
"integration_tests.tasks",
]


def test_celery(celery_worker):
a, b = random.randint(1, 1000), random.randint(1, 1000)
result = add.delay(a, b)
assert result.get(timeout=10, propagate=True) == a + b

tb_task = result.get_taskbadger_task()
assert tb_task is not None
assert tb_task.status == StatusEnum.SUCCESS
assert tb_task.value == 100
assert tb_task.data == {"result": a + b}
6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "taskbadger"
version = "0.8.0"
version = "1.0.0a"
description = "The official Python SDK for Task Badger"
license = "Apache-2.0"

Expand Down Expand Up @@ -77,3 +77,7 @@ exclude = '''
[tool.isort]
line_length = 120
profile = "black"

[tool.pytest.ini_options]
# don't run integration tests unless specifically requested
norecursedirs = ".* integration_tests"
1 change: 1 addition & 0 deletions taskbadger/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .decorators import track
from .integrations import Action, EmailIntegration, WebhookIntegration
from .internal.models import StatusEnum
from .mug import Session
Expand Down
183 changes: 183 additions & 0 deletions taskbadger/celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import functools
import logging

import celery
from celery.signals import before_task_publish, task_failure, task_prerun, task_retry, task_success

from .internal.models import StatusEnum
from .mug import Badger
from .safe_sdk import create_task_safe, update_task_safe
from .sdk import DefaultMergeStrategy, get_task

KWARG_PREFIX = "taskbadger_"
TB_KWARGS_ARG = f"{KWARG_PREFIX}kwargs"
IGNORE_ARGS = {TB_KWARGS_ARG, f"{KWARG_PREFIX}task", f"{KWARG_PREFIX}task_id"}

TERMINAL_STATES = {StatusEnum.SUCCESS, StatusEnum.ERROR, StatusEnum.CANCELLED, StatusEnum.STALE}

log = logging.getLogger("taskbadger")


class Task(celery.Task):
"""A Celery Task that tracks itself with TaskBadger.
The TaskBadger task will go through the following states:
- PENDING: The task has been created by calling `.delay()` or `.apply_async()`.
- PROCESSING: Set when the task starts executing.
- SUCCESS: The task completed successfully.
- ERROR: The task failed.
No tracking is done for tasks that ar executed synchronously either via `.appy()` or
if Celery is configured to run tasks eagerly.
Access to the task is provided via the `taskbadger_task` property of the Celery task.
The task ID may also be accessed via the `taskbadger_task_id` property. These may
be `None` if the task is not being tracked (e.g. Task Badger is not configured or
there was an error creating the task).
Examples:
.. code-block:: python
@app.task(base=taskbadger.Task)
def refresh_feed(url):
store_feed(feedparser.parse(url))
with access to the task in the function body:
.. code-block:: python
@app.task(bind=True, base=taskbadger.Task)
def scrape_urls(self, urls):
task = self.taskbadger_task
total_urls = len(urls)
for i, url in enumerate(urls):
scrape_url(url)
if i % 10 == 0:
task.update(value=i, value_max=total_urls)
task.success(value=total_urls)
"""

def apply_async(self, *args, **kwargs):
headers = kwargs.setdefault("headers", {})
headers["taskbadger_track"] = True
tb_kwargs = kwargs.pop(TB_KWARGS_ARG, {})
for name in list(kwargs):
if name.startswith(KWARG_PREFIX):
val = kwargs.pop(name)
tb_kwargs[name.removeprefix(KWARG_PREFIX)] = val
headers[TB_KWARGS_ARG] = tb_kwargs
result = super().apply_async(*args, **kwargs)

tb_task_id = result.info.get("taskbadger_task_id") if result.info else None
setattr(result, "taskbadger_task_id", tb_task_id)

_get_task = functools.partial(get_task, tb_task_id) if tb_task_id else lambda: None
setattr(result, "get_taskbadger_task", _get_task)

return result

@property
def taskbadger_task_id(self):
return self.request and self.request.headers and self.request.headers.get("taskbadger_task_id")

@property
def taskbadger_task(self):
if not self.taskbadger_task_id:
return None

task = self.request.get("taskbadger_task")
if not task:
log.debug("Fetching task '%s'", self.taskbadger_task_id)
try:
task = get_task(self.taskbadger_task_id)
self.request.update({"taskbadger_task": task})
except Exception:
log.exception("Error fetching task '%s'", self.taskbadger_task_id)
task = None
return task


@before_task_publish.connect
def task_publish_handler(sender=None, headers=None, **kwargs):
if not headers.get("taskbadger_track") or not Badger.is_configured():
return

ctask = celery.current_app.tasks.get(sender)

# get kwargs from the task class (set via decorator)
kwargs = getattr(ctask, TB_KWARGS_ARG, {})
for attr in dir(ctask):
if attr.startswith(KWARG_PREFIX) and attr not in IGNORE_ARGS:
kwargs[attr.removeprefix(KWARG_PREFIX)] = getattr(ctask, attr)

# get kwargs from the task headers (set via apply_async)
kwargs.update(headers[TB_KWARGS_ARG])
kwargs["status"] = StatusEnum.PENDING
name = kwargs.pop("name", headers["task"])

task = create_task_safe(name, **kwargs)
if task:
meta = {"taskbadger_task_id": task.id}
headers.update(meta)
ctask.update_state(task_id=headers["id"], state="PENDING", meta=meta)


@task_prerun.connect
def task_prerun_handler(sender=None, **kwargs):
_update_task(sender, StatusEnum.PROCESSING)


@task_success.connect
def task_success_handler(sender=None, **kwargs):
_update_task(sender, StatusEnum.SUCCESS)
exit_session(sender)


@task_failure.connect
def task_failure_handler(sender=None, einfo=None, **kwargs):
_update_task(sender, StatusEnum.ERROR, einfo)
exit_session(sender)


@task_retry.connect
def task_retry_handler(sender=None, einfo=None, **kwargs):
_update_task(sender, StatusEnum.ERROR, einfo)
exit_session(sender)


def _update_task(signal_sender, status, einfo=None):
log.debug("celery_task_update %s %s", signal_sender, status)
if not hasattr(signal_sender, "taskbadger_task"):
return

task = signal_sender.taskbadger_task
if task is None:
return

if task.status in TERMINAL_STATES:
# ignore tasks that have already been set to a terminal state (probably in the task body)
return

enter_session()

data = None
if einfo:
data = DefaultMergeStrategy().merge(task.data, {"exception": str(einfo)})
update_task_safe(task.id, status=status, data=data)


def enter_session():
if not Badger.is_configured():
return
session = Badger.current.session()
if not session.client:
session.__enter__()


def exit_session(signal_sender):
if not hasattr(signal_sender, "taskbadger_task") or not Badger.is_configured():
return
session = Badger.current.session()
if session.client:
session.__exit__()
Loading

0 comments on commit f1e7e8d

Please sign in to comment.