diff --git a/tests/conftest.py b/tests/conftest.py index 0478dfb..519483f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -16,13 +16,14 @@ from typing import Callable, TypeVar -import celery import pytest + +import celery from celery import Task from celery.worker import WorkController -from pkg_resources import get_distribution, parse_version from celery_pubsub import subscribe, subscribe_to +from pkg_resources import get_distribution, parse_version P = ParamSpec("P") R = TypeVar("R") @@ -56,12 +57,12 @@ def celery_config(): @pytest.fixture(scope="session") def job_a() -> Task[P, str]: - @subscribe_to(topic="index.high") - def job_a(*args: P.args, **kwargs: P.kwargs) -> str: + @task(name="job_a") + def job(*args: P.args, **kwargs: P.kwargs) -> str: print("job_a: {} {}".format(args, kwargs)) return "a" - return job_a + return job @pytest.fixture(scope="session") @@ -76,12 +77,12 @@ def job(*args: P.args, **kwargs: P.kwargs) -> str: @pytest.fixture(scope="session") def job_c() -> Task[P, str]: - @subscribe_to(topic="index") - def job_c(*args: P.args, **kwargs: P.kwargs) -> str: + @task(name="job_c") + def job(*args: P.args, **kwargs: P.kwargs) -> str: print("job_c: {} {}".format(args, kwargs)) return "c" - return job_c + return job @pytest.fixture(scope="session") @@ -124,6 +125,48 @@ def job(*args: P.args, **kwargs: P.kwargs) -> str: return job +@pytest.fixture(scope="session") +def job_h() -> Task[P, str]: + @subscribe_to(topic="foo.#") + @task(bind=True, name="job_h") + def job(*args: P.args, **kwargs: P.kwargs) -> str: + print(f"job_h: {args} {kwargs}") + return "h" + + return job + + +@pytest.fixture(scope="session") +def job_i() -> Task[P, str]: + @subscribe_to(topic="foo") + def job(*args: P.args, **kwargs: P.kwargs) -> str: + print(f"job_i: {args} {kwargs}") + return "i" + + return job + + +@pytest.fixture(scope="session") +def job_l() -> Task[P, str]: + @subscribe_to(topic="foo.bar.baz") + @task(name="job_l") + def job(*args: P.args, **kwargs: P.kwargs) -> str: + print(f"job_l: {args} {kwargs}") + return "l" + + return job + + +@pytest.fixture(scope="session") +def job_m() -> Task[P, str]: + @subscribe_to(topic="foo.bar") + def job(*args: P.args, **kwargs: P.kwargs) -> str: + print(f"job_m: {args} {kwargs}") + return "m" + + return job + + @pytest.fixture(scope="session", autouse=True) def subscriber( job_a: Task[P, str], @@ -133,8 +176,14 @@ def subscriber( job_e: Task[P, str], job_f: Task[P, str], job_g: Task[P, str], + job_h: Task[P, str], + job_i: Task[P, str], + job_l: Task[P, str], + job_m: Task[P, str], ) -> None: + subscribe("index.high", job_a) subscribe("index.low", job_b) + subscribe("index", job_c) subscribe("index.#", job_d) subscribe("#", job_e) subscribe("index.*.test", job_f) diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index 39ceba1..ea81620 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -80,6 +80,34 @@ def test_7(celery_worker: WorkController) -> None: assert sorted(res) == sorted(["d", "e", "f", "g"]) +def test_8(celery_worker: WorkController) -> None: + from celery_pubsub import publish + + res = publish("foo", 4, 8, a15=16, a23=42).get() + assert sorted(res) == sorted(["e", "i"]) + + +def test_9(celery_worker: WorkController) -> None: + from celery_pubsub import publish + + res = publish("foo.bar.blur", 4, 8, a15=16, a23=42).get() + assert sorted(res) == sorted(["e", "h"]) + + +def test_10(celery_worker: WorkController) -> None: + from celery_pubsub import publish + + res = publish("foo.bar.baz", 4, 8, a15=16, a23=42).get() + assert sorted(res) == sorted(["e", "h", "l"]) + + +def test_11(celery_worker: WorkController) -> None: + from celery_pubsub import publish + + res = publish("foo.bar", 4, 8, a15=16, a23=42).get() + assert sorted(res) == sorted(["e", "h", "m"]) + + def test_subscription_redundant( job_a: Task[P, str], celery_worker: WorkController ) -> None: