From f8674b3aed131dff3e08b35fc7a5a70f79c9a3ba Mon Sep 17 00:00:00 2001 From: Carlo Bertini Date: Tue, 5 Dec 2023 13:16:21 +0100 Subject: [PATCH] feat: Implement `subscribe_to` Decorator (#252) Introduces the `subscribe_to` method, providing a decorator-based mechanism for subscribing to topics in the celery-pubsub system, enhancing usability and code organization. --------- Co-authored-by: Samuel GIFFARD --- README.md | 19 ++++++++++++++++ celery_pubsub/__init__.py | 3 ++- celery_pubsub/pubsub.py | 23 ++++++++++++++++--- tests/conftest.py | 48 ++++++++++++++++++++++++++++++++++++++- tests/test_pubsub.py | 28 +++++++++++++++++++++++ 5 files changed, 116 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 83feca1..a4541fe 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,17 @@ def my_task_2(*args, **kwargs): celery_pubsub.subscribe('some.topic', my_task_1) celery_pubsub.subscribe('some.topic', my_task_2) +# Or subscribe with decoration +@celery_pubsub.subscribe_to(topic="some.topic") +@celery.task +def my_task_1(*args, **kwargs): + return "task 1 done" + +# Or use only decoration +@celery_pubsub.subscribe_to(topic="some.topic") +def my_task_1(*args, **kwargs): + return "task 1 done" + # Now, let's publish something res = celery_pubsub.publish('some.topic', data='something', value=42) @@ -81,6 +92,13 @@ celery_pubsub.subscribe('some.beep', my_task_5) # it's okay to have more than one task on the same topic celery_pubsub.subscribe('some.beep', my_task_6) +# or subscribe directly with decorator +@celery_pubsub.subscribe_to(topic="some.*") +def my_task_1(*args, **kwargs): ... + +@celery_pubsub.subscribe_to(topic="some.*.test") +def my_task_2(*args, **kwargs): ... + # Let's publish celery_pubsub.publish('nowhere', 4) # task 4 only celery_pubsub.publish('some', 8) # task 4 only @@ -102,6 +120,7 @@ celery_pubsub.publish('some.very.good.test', 42) # task 3 only ## Changelog * 2.0.0 + * Add new decorator: subscribe_to * Drop support for CPython 2.7, 3.4, 3.5, 3.6 * Drop support for Pypy 2.7 and 3.6. * Drop support for Celery 3. diff --git a/celery_pubsub/__init__.py b/celery_pubsub/__init__.py index 07a9842..f8ff7b8 100644 --- a/celery_pubsub/__init__.py +++ b/celery_pubsub/__init__.py @@ -1,8 +1,9 @@ -from .pubsub import publish_now, publish, subscribe, unsubscribe +from .pubsub import publish, publish_now, subscribe, subscribe_to, unsubscribe __all__ = [ "publish_now", "publish", "subscribe", + "subscribe_to", "unsubscribe", ] diff --git a/celery_pubsub/pubsub.py b/celery_pubsub/pubsub.py index eb77502..768a83e 100644 --- a/celery_pubsub/pubsub.py +++ b/celery_pubsub/pubsub.py @@ -2,6 +2,7 @@ from __future__ import annotations +import re import typing if typing.TYPE_CHECKING: # pragma: no cover @@ -16,24 +17,26 @@ TypeAlias = None import celery -import re __all__ = [ "publish", "publish_now", "subscribe", + "subscribe_to", "unsubscribe", ] - from celery import Task, group from celery.result import AsyncResult, EagerResult - PA: TypeAlias = typing.Any # ParamSpec args PK: TypeAlias = typing.Any # ParamSpec kwargs P: TypeAlias = typing.Any # ParamSpec R: TypeAlias = typing.Any # Return type +task: typing.Callable[ + ..., typing.Callable[[typing.Callable[[P], R]], Task[P, R]] +] = celery.shared_task + class PubSubManager: def __init__(self) -> None: @@ -84,6 +87,20 @@ def _topic_to_re(topic: str) -> re.Pattern[str]: _pubsub_manager: PubSubManager = PubSubManager() +def subscribe_to(topic: str) -> typing.Callable[[typing.Callable[[P], R]], Task[P, R]]: + def decorator(func: typing.Callable[[P], R]) -> Task[P, R]: + if isinstance(func, Task): + task_instance: Task[P, R] = func + else: + app_name, module_name = func.__module__.split(".", 1) + task_name = f"{app_name}.{module_name}.{func.__qualname__}" + task_instance = task(name=task_name)(func) + _pubsub_manager.subscribe(topic, task_instance) + return task_instance + + return decorator + + def publish(topic: str, *args: PA, **kwargs: PK) -> AsyncResult[R]: return _pubsub_manager.publish(topic, *args, **kwargs) diff --git a/tests/conftest.py b/tests/conftest.py index b69ae0f..1bdec6a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,7 +21,7 @@ import celery from celery import Task -from celery_pubsub import subscribe +from celery_pubsub import subscribe, subscribe_to from packaging.version import parse, Version @@ -134,6 +134,48 @@ def job(*args: P.args, **kwargs: P.kwargs) -> str: return job +@pytest.fixture(scope="session") +def job_h() -> Task[P, str]: + @subscribe_to(topic="foo.#") + @task(bind=True, name="job_h") + def job(*args: P.args, **kwargs: P.kwargs) -> str: + print(f"job_h: {args} {kwargs}") + return "h" + + return job + + +@pytest.fixture(scope="session") +def job_i() -> Task[P, str]: + @subscribe_to(topic="foo") + def job(*args: P.args, **kwargs: P.kwargs) -> str: + print(f"job_i: {args} {kwargs}") + return "i" + + return job + + +@pytest.fixture(scope="session") +def job_j() -> Task[P, str]: + @subscribe_to(topic="foo.bar.baz") + @task(name="job_j") + def job(*args: P.args, **kwargs: P.kwargs) -> str: + print(f"job_j: {args} {kwargs}") + return "j" + + return job + + +@pytest.fixture(scope="session") +def job_k() -> Task[P, str]: + @subscribe_to(topic="foo.bar") + def job(*args: P.args, **kwargs: P.kwargs) -> str: + print(f"job_k: {args} {kwargs}") + return "k" + + return job + + @pytest.fixture(scope="session", autouse=True) def subscriber( job_a: Task[P, str], @@ -143,6 +185,10 @@ def subscriber( job_e: Task[P, str], job_f: Task[P, str], job_g: Task[P, str], + job_h: Task[P, str], + job_i: Task[P, str], + job_j: Task[P, str], + job_k: Task[P, str], ) -> None: subscribe("index.high", job_a) subscribe("index.low", job_b) diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index 39ceba1..07a8707 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -80,6 +80,34 @@ def test_7(celery_worker: WorkController) -> None: assert sorted(res) == sorted(["d", "e", "f", "g"]) +def test_8(celery_worker: WorkController) -> None: + from celery_pubsub import publish + + res = publish("foo", 4, 8, a15=16, a23=42).get() + assert sorted(res) == sorted(["e", "i"]) + + +def test_9(celery_worker: WorkController) -> None: + from celery_pubsub import publish + + res = publish("foo.bar.blur", 4, 8, a15=16, a23=42).get() + assert sorted(res) == sorted(["e", "h"]) + + +def test_10(celery_worker: WorkController) -> None: + from celery_pubsub import publish + + res = publish("foo.bar.baz", 4, 8, a15=16, a23=42).get() + assert sorted(res) == sorted(["e", "h", "j"]) + + +def test_11(celery_worker: WorkController) -> None: + from celery_pubsub import publish + + res = publish("foo.bar", 4, 8, a15=16, a23=42).get() + assert sorted(res) == sorted(["e", "h", "k"]) + + def test_subscription_redundant( job_a: Task[P, str], celery_worker: WorkController ) -> None: