Skip to content

Commit

Permalink
Merge pull request #28 from taskbadger/sk/celery-upgrade
Browse files Browse the repository at this point in the history
fix celery serialization bug
  • Loading branch information
snopoke authored Jun 3, 2024
2 parents 27a3e88 + 3bc2659 commit da99b88
Show file tree
Hide file tree
Showing 8 changed files with 776 additions and 385 deletions.
19 changes: 18 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,27 @@ on:
jobs:
python-tests:
runs-on: ubuntu-latest
services:
redis:
image: redis
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 6379:6379
strategy:
max-parallel: 4
matrix:
python-version:
- "3.9"
- "3.10"
- "3.11"
- "3.12"
celery-version:
- ">=5.3,<5.4"
- ">=5.4"
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
Expand All @@ -31,6 +46,8 @@ jobs:
poetry --version
poetry check --no-interaction
- name: Install project
run: poetry install --no-interaction
run: |
poetry install --no-interaction
pip install celery"${{ matrix.celery-version }}"
- name: Run tests
run: poetry run pytest -v
1,058 changes: 686 additions & 372 deletions poetry.lock

Large diffs are not rendered by default.

14 changes: 9 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ packages = [
include = ["CHANGELOG.md", "taskbadger/internal/py.typed"]

homepage = "https://taskbadger.net/"
repository = "https://github.com/taskbadger/taskbadger-docs"
repository = "https://github.com/taskbadger/taskbadger-python"
documentation = "https://docs.taskbadger.net/"
classifiers = [
"Development Status :: 4 - Beta",
Expand All @@ -30,13 +30,16 @@ classifiers = [
"Topic :: Software Development :: Libraries :: Python Modules",
]

[tool.poetry.urls]
"Changelog" = "https://github.com/taskbadger/taskbadger-python/releases"

[tool.poetry.dependencies]
python = "^3.8"
httpx = ">=0.20.0,<0.25.0"
httpx = ">=0.20.0,<0.28.0"
attrs = ">=21.3.0"
python-dateutil = "^2.8.0"
typer = {extras = ["all"], version = "^0.9.0"}
tomlkit = "^0.11.6"
typer = {extras = ["all"], version = "<0.10.0"}
tomlkit = "^0.12.5"
importlib-metadata = {version = "^1.0", python = "<3.8"}
typing-extensions = {version = "^4.7.1", python = "3.9"}
celery = {version = ">=4.0.0,<6.0.0", optional = true}
Expand All @@ -52,7 +55,8 @@ black = "^23.1.0"
pre-commit = "^3.0.2"
pytest-httpx = "^0.21.3"
invoke = "^2.0.0"
pytest-celery = "^0.0.0"
pytest-celery = ">0.0.0"
redis = "^5.0.4"

[tool.poetry.scripts]
taskbadger = "taskbadger.cli_main:app"
Expand Down
23 changes: 17 additions & 6 deletions taskbadger/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,13 @@ def scrape_urls(self, urls):
def apply_async(self, *args, **kwargs):
headers = kwargs.setdefault("headers", {})
headers["taskbadger_track"] = True
tb_kwargs = kwargs.pop(TB_KWARGS_ARG, {})
for name in list(kwargs):
if name.startswith(KWARG_PREFIX):
val = kwargs.pop(name)
tb_kwargs[name.removeprefix(KWARG_PREFIX)] = val
tb_kwargs = self._get_tb_kwargs(kwargs)
if kwargs.get("kwargs"):
# extract taskbadger options from task kwargs when supplied as keyword argument
tb_kwargs.update(self._get_tb_kwargs(kwargs["kwargs"]))
elif len(args) > 1 and isinstance(args[1], dict):
# extract taskbadger options from task kwargs when supplied as positional argument
tb_kwargs.update(self._get_tb_kwargs(args[1]))
headers[TB_KWARGS_ARG] = tb_kwargs
result = super().apply_async(*args, **kwargs)

Expand All @@ -119,6 +121,14 @@ def apply_async(self, *args, **kwargs):

return result

def _get_tb_kwargs(self, kwargs):
tb_kwargs = kwargs.pop(TB_KWARGS_ARG, {})
for name in list(kwargs):
if name.startswith(KWARG_PREFIX):
val = kwargs.pop(name)
tb_kwargs[name.removeprefix(KWARG_PREFIX)] = val
return tb_kwargs

@property
def taskbadger_task_id(self):
return _get_taskbadger_task_id(self.request)
Expand Down Expand Up @@ -146,6 +156,7 @@ def task_publish_handler(sender=None, headers=None, body=None, **kwargs):
celery_system = Badger.current.settings.get_system_by_id("celery")
auto_track = celery_system and celery_system.track_task(sender)
manual_track = headers.get("taskbadger_track")
header_kwargs = headers.pop(TB_KWARGS_ARG, {})
if not manual_track and not auto_track:
return

Expand All @@ -158,7 +169,7 @@ def task_publish_handler(sender=None, headers=None, body=None, **kwargs):
kwargs[attr.removeprefix(KWARG_PREFIX)] = getattr(ctask, attr)

# get kwargs from the task headers (set via apply_async)
kwargs.update(headers.get(TB_KWARGS_ARG, {}))
kwargs.update(header_kwargs)
kwargs["status"] = StatusEnum.PENDING
name = kwargs.pop("name", headers["task"])

Expand Down
9 changes: 9 additions & 0 deletions taskbadger/sdk.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import os
from typing import Any, List

Expand All @@ -15,6 +16,8 @@
from taskbadger.mug import Badger, Session, Settings
from taskbadger.systems import System

log = logging.getLogger("taskbadger")

_TB_HOST = "https://taskbadger.net"


Expand Down Expand Up @@ -338,6 +341,12 @@ def data(self):
def __getattr__(self, item):
return getattr(self._task, item)

def safe_update(self, **kwargs):
try:
self.update(**kwargs)
except Exception as e:
log.exception("Error updating task '%s'", self._task.id)


def _none_to_unset(value):
return UNSET if value is None else value
Expand Down
9 changes: 9 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,12 @@ def bind_settings():
Badger.current.bind(Settings("https://taskbadger.net", "token", "org", "proj"))
yield
Badger.current.bind(None)


@pytest.fixture(scope="session", autouse=True)
def celery_config():
"""Test against Redis to ensure serialization works"""
return {
"broker_url": "redis://localhost:6379",
"result_backend": "redis://localhost:6379",
}
28 changes: 27 additions & 1 deletion tests/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import celery
import pytest

from taskbadger import StatusEnum
from taskbadger import Action, EmailIntegration, StatusEnum
from taskbadger.celery import Task
from taskbadger.mug import Badger
from tests.utils import task_for_test
Expand Down Expand Up @@ -75,6 +75,32 @@ def add_with_task_args(self, a, b):
create.assert_called_once_with("new_name", value_max=10, data={"foo": "bar"}, status=StatusEnum.PENDING)


def test_celery_task_with_kwargs(celery_session_app, celery_session_worker, bind_settings):
@celery_session_app.task(bind=True, base=Task)
def add_with_task_args(self, a, b):
assert self.taskbadger_task is not None
return a + b

celery_session_worker.reload()

with mock.patch("taskbadger.celery.create_task_safe") as create, mock.patch(
"taskbadger.celery.update_task_safe"
) as update, mock.patch("taskbadger.celery.get_task") as get_task:
create.return_value = task_for_test()

actions = [Action("stale", integration=EmailIntegration(to="[email protected]"))]
result = add_with_task_args.delay(
2,
2,
taskbadger_name="new_name",
taskbadger_value_max=10,
taskbadger_kwargs={"actions": actions},
)
assert result.get(timeout=10, propagate=True) == 4

create.assert_called_once_with("new_name", value_max=10, actions=actions, status=StatusEnum.PENDING)


def test_celery_task_with_args_in_decorator(celery_session_app, celery_session_worker, bind_settings):
@celery_session_app.task(bind=True, base=Task, taskbadger_value_max=10, taskbadger_kwargs={"monitor_id": "123"})
def add_with_task_args_in_decorator(self, a, b):
Expand Down
1 change: 1 addition & 0 deletions tests/test_celery_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def add_error(self, a, b):
"taskbadger.celery.update_task_safe"
) as update, mock.patch("taskbadger.celery.get_task") as get_task:
task = task_for_test()
create.return_value = task
get_task.return_value = task
update.return_value = task
result = add_error.delay(2, 2)
Expand Down

0 comments on commit da99b88

Please sign in to comment.