Skip to content

Commit

Permalink
feat: Implement subscribe_to Decorator (#252)
Browse files Browse the repository at this point in the history
Introduces the `subscribe_to` method, providing a decorator-based
mechanism for subscribing to topics in the celery-pubsub system,
enhancing usability and code organization.

---------

Co-authored-by: Samuel GIFFARD <[email protected]>
  • Loading branch information
WaYdotNET and Mulugruntz authored Dec 5, 2023
1 parent f4a40a1 commit f8674b3
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 5 deletions.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ def my_task_2(*args, **kwargs):
celery_pubsub.subscribe('some.topic', my_task_1)
celery_pubsub.subscribe('some.topic', my_task_2)

# Or subscribe with decoration
@celery_pubsub.subscribe_to(topic="some.topic")
@celery.task
def my_task_1(*args, **kwargs):
return "task 1 done"

# Or use only decoration
@celery_pubsub.subscribe_to(topic="some.topic")
def my_task_1(*args, **kwargs):
return "task 1 done"

# Now, let's publish something
res = celery_pubsub.publish('some.topic', data='something', value=42)

Expand Down Expand Up @@ -81,6 +92,13 @@ celery_pubsub.subscribe('some.beep', my_task_5)
# it's okay to have more than one task on the same topic
celery_pubsub.subscribe('some.beep', my_task_6)

# or subscribe directly with decorator
@celery_pubsub.subscribe_to(topic="some.*")
def my_task_1(*args, **kwargs): ...

@celery_pubsub.subscribe_to(topic="some.*.test")
def my_task_2(*args, **kwargs): ...

# Let's publish
celery_pubsub.publish('nowhere', 4) # task 4 only
celery_pubsub.publish('some', 8) # task 4 only
Expand All @@ -102,6 +120,7 @@ celery_pubsub.publish('some.very.good.test', 42) # task 3 only
## Changelog

* 2.0.0
* Add new decorator: subscribe_to
* Drop support for CPython 2.7, 3.4, 3.5, 3.6
* Drop support for Pypy 2.7 and 3.6.
* Drop support for Celery 3.
Expand Down
3 changes: 2 additions & 1 deletion celery_pubsub/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from .pubsub import publish_now, publish, subscribe, unsubscribe
from .pubsub import publish, publish_now, subscribe, subscribe_to, unsubscribe

__all__ = [
"publish_now",
"publish",
"subscribe",
"subscribe_to",
"unsubscribe",
]
23 changes: 20 additions & 3 deletions celery_pubsub/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import re
import typing

if typing.TYPE_CHECKING: # pragma: no cover
Expand All @@ -16,24 +17,26 @@
TypeAlias = None

import celery
import re

__all__ = [
"publish",
"publish_now",
"subscribe",
"subscribe_to",
"unsubscribe",
]

from celery import Task, group
from celery.result import AsyncResult, EagerResult


PA: TypeAlias = typing.Any # ParamSpec args
PK: TypeAlias = typing.Any # ParamSpec kwargs
P: TypeAlias = typing.Any # ParamSpec
R: TypeAlias = typing.Any # Return type

task: typing.Callable[
..., typing.Callable[[typing.Callable[[P], R]], Task[P, R]]
] = celery.shared_task


class PubSubManager:
def __init__(self) -> None:
Expand Down Expand Up @@ -84,6 +87,20 @@ def _topic_to_re(topic: str) -> re.Pattern[str]:
_pubsub_manager: PubSubManager = PubSubManager()


def subscribe_to(topic: str) -> typing.Callable[[typing.Callable[[P], R]], Task[P, R]]:
def decorator(func: typing.Callable[[P], R]) -> Task[P, R]:
if isinstance(func, Task):
task_instance: Task[P, R] = func
else:
app_name, module_name = func.__module__.split(".", 1)
task_name = f"{app_name}.{module_name}.{func.__qualname__}"
task_instance = task(name=task_name)(func)
_pubsub_manager.subscribe(topic, task_instance)
return task_instance

return decorator


def publish(topic: str, *args: PA, **kwargs: PK) -> AsyncResult[R]:
return _pubsub_manager.publish(topic, *args, **kwargs)

Expand Down
48 changes: 47 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import celery
from celery import Task

from celery_pubsub import subscribe
from celery_pubsub import subscribe, subscribe_to
from packaging.version import parse, Version


Expand Down Expand Up @@ -134,6 +134,48 @@ def job(*args: P.args, **kwargs: P.kwargs) -> str:
return job


@pytest.fixture(scope="session")
def job_h() -> Task[P, str]:
@subscribe_to(topic="foo.#")
@task(bind=True, name="job_h")
def job(*args: P.args, **kwargs: P.kwargs) -> str:
print(f"job_h: {args} {kwargs}")
return "h"

return job


@pytest.fixture(scope="session")
def job_i() -> Task[P, str]:
@subscribe_to(topic="foo")
def job(*args: P.args, **kwargs: P.kwargs) -> str:
print(f"job_i: {args} {kwargs}")
return "i"

return job


@pytest.fixture(scope="session")
def job_j() -> Task[P, str]:
@subscribe_to(topic="foo.bar.baz")
@task(name="job_j")
def job(*args: P.args, **kwargs: P.kwargs) -> str:
print(f"job_j: {args} {kwargs}")
return "j"

return job


@pytest.fixture(scope="session")
def job_k() -> Task[P, str]:
@subscribe_to(topic="foo.bar")
def job(*args: P.args, **kwargs: P.kwargs) -> str:
print(f"job_k: {args} {kwargs}")
return "k"

return job


@pytest.fixture(scope="session", autouse=True)
def subscriber(
job_a: Task[P, str],
Expand All @@ -143,6 +185,10 @@ def subscriber(
job_e: Task[P, str],
job_f: Task[P, str],
job_g: Task[P, str],
job_h: Task[P, str],
job_i: Task[P, str],
job_j: Task[P, str],
job_k: Task[P, str],
) -> None:
subscribe("index.high", job_a)
subscribe("index.low", job_b)
Expand Down
28 changes: 28 additions & 0 deletions tests/test_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,34 @@ def test_7(celery_worker: WorkController) -> None:
assert sorted(res) == sorted(["d", "e", "f", "g"])


def test_8(celery_worker: WorkController) -> None:
from celery_pubsub import publish

res = publish("foo", 4, 8, a15=16, a23=42).get()
assert sorted(res) == sorted(["e", "i"])


def test_9(celery_worker: WorkController) -> None:
from celery_pubsub import publish

res = publish("foo.bar.blur", 4, 8, a15=16, a23=42).get()
assert sorted(res) == sorted(["e", "h"])


def test_10(celery_worker: WorkController) -> None:
from celery_pubsub import publish

res = publish("foo.bar.baz", 4, 8, a15=16, a23=42).get()
assert sorted(res) == sorted(["e", "h", "j"])


def test_11(celery_worker: WorkController) -> None:
from celery_pubsub import publish

res = publish("foo.bar", 4, 8, a15=16, a23=42).get()
assert sorted(res) == sorted(["e", "h", "k"])


def test_subscription_redundant(
job_a: Task[P, str], celery_worker: WorkController
) -> None:
Expand Down

0 comments on commit f8674b3

Please sign in to comment.