From 42676c11089f677afa1ad4b9743a39c335055748 Mon Sep 17 00:00:00 2001 From: Samuel GIFFARD Date: Sun, 2 Apr 2023 18:43:55 +0200 Subject: [PATCH] Add modern type hints. (#202) * Use `typing-extensions` to support older versions. This is needed for `ParamSpec` and `TypeAlias`. --- README.md | 45 ++++++++-------- celery_pubsub/__init__.pyi | 4 -- celery_pubsub/pubsub.py | 63 +++++++++++++++------- celery_pubsub/pubsub.pyi | 21 -------- mypy.ini | 2 +- requirements_test.txt | 3 ++ setup.py | 4 +- tests/conftest.py | 104 ++++++++++++++++++++++++------------- tests/test_pubsub.py | 61 ++++++++++++++++------ 9 files changed, 187 insertions(+), 120 deletions(-) delete mode 100644 celery_pubsub/__init__.pyi delete mode 100644 celery_pubsub/pubsub.pyi diff --git a/README.md b/README.md index ddbc327..303a1c1 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# celery-pubsub 2.0.0-beta2 +# celery-pubsub 2.0.0-beta3 [![Build and Test](https://github.com/Mulugruntz/celery-pubsub/actions/workflows/build.yml/badge.svg)](https://github.com/Mulugruntz/celery-pubsub/actions/workflows/build.yml) @@ -105,6 +105,7 @@ celery_pubsub.publish('some.very.good.test', 42) # task 3 only * Drop support for Pypy 2.7 and 3.6. * Add support for Pypy 3.8 and 3.9. * Add support for CPython 3.11. + * Type hints are now directly in the code. No more stubs files. * 1.0.2 * Add stubs file for type hinting. * 1.0.1 @@ -190,30 +191,30 @@ celery_pubsub.publish('some.very.good.test', 42) # task 3 only [badge-m_linux_pypy3.9_celery5]: https://byob.yarr.is/Mulugruntz/celery-pubsub/m_linux_pypy-3.9_celery5/shields [//]: # (Status in tagged version) -[badge-t_linux_3.7_celery3]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta2_linux_3.7_celery3/shields -[badge-t_linux_3.7_celery4]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta2_linux_3.7_celery4/shields -[badge-t_linux_3.7_celery5]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta2_linux_3.7_celery5/shields +[badge-t_linux_3.7_celery3]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta3_linux_3.7_celery3/shields +[badge-t_linux_3.7_celery4]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta3_linux_3.7_celery4/shields +[badge-t_linux_3.7_celery5]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta3_linux_3.7_celery5/shields -[badge-t_linux_3.8_celery3]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta2_linux_3.8_celery3/shields -[badge-t_linux_3.8_celery4]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta2_linux_3.8_celery4/shields -[badge-t_linux_3.8_celery5]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta2_linux_3.8_celery5/shields +[badge-t_linux_3.8_celery3]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta3_linux_3.8_celery3/shields +[badge-t_linux_3.8_celery4]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta3_linux_3.8_celery4/shields +[badge-t_linux_3.8_celery5]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta3_linux_3.8_celery5/shields -[badge-t_linux_3.9_celery3]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta2_linux_3.9_celery3/shields -[badge-t_linux_3.9_celery4]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta2_linux_3.9_celery4/shields -[badge-t_linux_3.9_celery5]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta2_linux_3.9_celery5/shields +[badge-t_linux_3.9_celery3]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta3_linux_3.9_celery3/shields +[badge-t_linux_3.9_celery4]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta3_linux_3.9_celery4/shields +[badge-t_linux_3.9_celery5]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta3_linux_3.9_celery5/shields -[badge-t_linux_3.10_celery3]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta2_linux_3.10_celery3/shields -[badge-t_linux_3.10_celery4]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta2_linux_3.10_celery4/shields -[badge-t_linux_3.10_celery5]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta2_linux_3.10_celery5/shields +[badge-t_linux_3.10_celery3]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta3_linux_3.10_celery3/shields +[badge-t_linux_3.10_celery4]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta3_linux_3.10_celery4/shields +[badge-t_linux_3.10_celery5]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta3_linux_3.10_celery5/shields -[badge-t_linux_3.11_celery3]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta2_linux_3.11_celery3/shields -[badge-t_linux_3.11_celery4]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta2_linux_3.11_celery4/shields -[badge-t_linux_3.11_celery5]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta2_linux_3.11_celery5/shields +[badge-t_linux_3.11_celery3]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta3_linux_3.11_celery3/shields +[badge-t_linux_3.11_celery4]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta3_linux_3.11_celery4/shields +[badge-t_linux_3.11_celery5]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta3_linux_3.11_celery5/shields -[badge-t_linux_pypy3.8_celery3]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta2_linux_pypy-3.8_celery3/shields -[badge-t_linux_pypy3.8_celery4]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta2_linux_pypy-3.8_celery4/shields -[badge-t_linux_pypy3.8_celery5]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta2_linux_pypy-3.8_celery5/shields +[badge-t_linux_pypy3.8_celery3]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta3_linux_pypy-3.8_celery3/shields +[badge-t_linux_pypy3.8_celery4]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta3_linux_pypy-3.8_celery4/shields +[badge-t_linux_pypy3.8_celery5]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta3_linux_pypy-3.8_celery5/shields -[badge-t_linux_pypy3.9_celery3]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta2_linux_pypy-3.9_celery3/shields -[badge-t_linux_pypy3.9_celery4]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta2_linux_pypy-3.9_celery4/shields -[badge-t_linux_pypy3.9_celery5]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta2_linux_pypy-3.9_celery5/shields +[badge-t_linux_pypy3.9_celery3]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta3_linux_pypy-3.9_celery3/shields +[badge-t_linux_pypy3.9_celery4]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta3_linux_pypy-3.9_celery4/shields +[badge-t_linux_pypy3.9_celery5]: https://byob.yarr.is/Mulugruntz/celery-pubsub/2.0.0-beta3_linux_pypy-3.9_celery5/shields diff --git a/celery_pubsub/__init__.pyi b/celery_pubsub/__init__.pyi deleted file mode 100644 index a2a56d0..0000000 --- a/celery_pubsub/__init__.pyi +++ /dev/null @@ -1,4 +0,0 @@ -from .pubsub import publish as publish -from .pubsub import publish_now as publish_now -from .pubsub import subscribe as subscribe -from .pubsub import unsubscribe as unsubscribe diff --git a/celery_pubsub/pubsub.py b/celery_pubsub/pubsub.py index 1073aaa..eb77502 100644 --- a/celery_pubsub/pubsub.py +++ b/celery_pubsub/pubsub.py @@ -1,3 +1,20 @@ +"""Contains the pubsub manager and the pubsub functions.""" + +from __future__ import annotations + +import typing + +if typing.TYPE_CHECKING: # pragma: no cover + from typing_extensions import TypeAlias +else: + try: + from typing import TypeAlias as TypeAlias + except ImportError: + try: + from typing_extensions import TypeAlias as TypeAlias + except ImportError: + TypeAlias = None + import celery import re @@ -8,39 +25,49 @@ "unsubscribe", ] +from celery import Task, group +from celery.result import AsyncResult, EagerResult + + +PA: TypeAlias = typing.Any # ParamSpec args +PK: TypeAlias = typing.Any # ParamSpec kwargs +P: TypeAlias = typing.Any # ParamSpec +R: TypeAlias = typing.Any # Return type + -class PubSubManager(object): - def __init__(self): +class PubSubManager: + def __init__(self) -> None: super(PubSubManager, self).__init__() - self.subscribed = set() - self.jobs = {} + self.subscribed: set[tuple[str, re.Pattern[str], Task[P, R]]] = set() + self.jobs: dict[str, group] = {} - def publish(self, topic, *args, **kwargs): + def publish(self, topic: str, *args: PA, **kwargs: PK) -> AsyncResult[R]: result = self.get_jobs(topic).delay(*args, **kwargs) return result - def publish_now(self, topic, *args, **kwargs): - result = self.get_jobs(topic).apply(args=args, kwargs=kwargs) + def publish_now(self, topic: str, *args: PA, **kwargs: PK) -> EagerResult[R]: + # Ignoring type because of this: https://github.com/sbdchd/celery-types/issues/111 + result = self.get_jobs(topic).apply(args=args, kwargs=kwargs) # type: ignore return result - def subscribe(self, topic, task): + def subscribe(self, topic: str, task: Task[P, R]) -> None: key = (topic, self._topic_to_re(topic), task) if key not in self.subscribed: self.subscribed.add(key) self.jobs = {} - def unsubscribe(self, topic, task): + def unsubscribe(self, topic: str, task: Task[P, R]) -> None: key = (topic, self._topic_to_re(topic), task) if key in self.subscribed: self.subscribed.discard(key) self.jobs = {} - def get_jobs(self, topic): + def get_jobs(self, topic: str) -> group: if topic not in self.jobs: self._gen_jobs(topic) return self.jobs[topic] - def _gen_jobs(self, topic): + def _gen_jobs(self, topic: str) -> None: jobs = [] for job in self.subscribed: if job[1].match(topic): @@ -48,28 +75,26 @@ def _gen_jobs(self, topic): self.jobs[topic] = celery.group(jobs) @staticmethod - def _topic_to_re(topic): + def _topic_to_re(topic: str) -> re.Pattern[str]: assert isinstance(topic, str) re_topic = topic.replace(".", r"\.").replace("*", r"[^.]+").replace("#", r".+") return re.compile(r"^{}$".format(re_topic)) -_pubsub_manager = None -if _pubsub_manager is None: # pragma: no cover - _pubsub_manager = PubSubManager() +_pubsub_manager: PubSubManager = PubSubManager() -def publish(topic, *args, **kwargs): +def publish(topic: str, *args: PA, **kwargs: PK) -> AsyncResult[R]: return _pubsub_manager.publish(topic, *args, **kwargs) -def publish_now(topic, *args, **kwargs): +def publish_now(topic: str, *args: PA, **kwargs: PK) -> EagerResult[R]: return _pubsub_manager.publish_now(topic, *args, **kwargs) -def subscribe(topic, task): +def subscribe(topic: str, task: Task[P, R]) -> None: return _pubsub_manager.subscribe(topic, task) -def unsubscribe(topic, task): +def unsubscribe(topic: str, task: Task[P, R]) -> None: return _pubsub_manager.unsubscribe(topic, task) diff --git a/celery_pubsub/pubsub.pyi b/celery_pubsub/pubsub.pyi deleted file mode 100644 index 7ee4a9a..0000000 --- a/celery_pubsub/pubsub.pyi +++ /dev/null @@ -1,21 +0,0 @@ -import re -from typing import Any - -from celery import Task, group -from celery.result import AsyncResult - -class PubSubManager: - subscribed: set[tuple[str, re.Pattern[str], Task[Any, Any]]] - jobs: dict[str, group] - def publish(self, topic: str, *args: Any, **kwargs: Any) -> AsyncResult[Any]: ... - def publish_now( - self, topic: str, *args: Any, **kwargs: Any - ) -> AsyncResult[Any]: ... - def subscribe(self, topic: str, task: Task[Any, Any]) -> None: ... - def unsubscribe(self, topic: str, task: Task[Any, Any]) -> None: ... - def get_jobs(self, topic: str) -> group: ... - -def publish(topic: str, *args: Any, **kwargs: Any) -> AsyncResult[Any]: ... -def publish_now(topic: str, *args: Any, **kwargs: Any) -> AsyncResult[Any]: ... -def subscribe(topic: str, task: Task[Any, Any]) -> None: ... -def unsubscribe(topic: str, task: Task[Any, Any]) -> None: ... diff --git a/mypy.ini b/mypy.ini index 5598a63..ce06de2 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,4 +1,4 @@ [mypy] strict = True # Only check files in the `celery_pubsub` package -files = celery_pubsub +files = celery_pubsub, tests diff --git a/requirements_test.txt b/requirements_test.txt index 15fc569..58e630b 100644 --- a/requirements_test.txt +++ b/requirements_test.txt @@ -1,6 +1,9 @@ pytest coverage==5.5 +black==23.3.0 +typing-extensions==4.5.0 # Only for CPython (exclude pypy). celery-types==0.14.0; implementation_name != 'pypy' mypy==1.0.0; implementation_name != 'pypy' +types-setuptools==67.6.0.6; implementation_name != 'pypy' diff --git a/setup.py b/setup.py index ddc0bcc..bec910c 100644 --- a/setup.py +++ b/setup.py @@ -34,7 +34,7 @@ def tests_require(): setuptools.setup( name="celery-pubsub", packages=["celery_pubsub"], - version="2.0.0-beta2", + version="2.0.0-beta3", description="A Publish and Subscribe library for Celery", long_description=long_description(), long_description_content_type="text/markdown", @@ -42,7 +42,7 @@ def tests_require(): author_email="mulugruntz@gmail.com", license="MIT", url="https://github.com/Mulugruntz/celery-pubsub", - download_url="https://github.com/Mulugruntz/celery-pubsub/tarball/2.0.0-beta2", + download_url="https://github.com/Mulugruntz/celery-pubsub/tarball/2.0.0-beta3", keywords=["celery", "publish", "subscribe", "pubsub"], classifiers=[ "Development Status :: 5 - Production/Stable", diff --git a/tests/conftest.py b/tests/conftest.py index ebe7327..2de679d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,38 +1,64 @@ +from __future__ import annotations + +import typing + +if typing.TYPE_CHECKING: # pragma: no cover + from typing_extensions import ParamSpec +else: + try: + from typing import ParamSpec as ParamSpec + except ImportError: + try: + from typing_extensions import ParamSpec as ParamSpec + except ImportError: + ParamSpec = None + + +from typing import Callable, TypeVar + import pytest import celery +from celery import Task +from celery.worker import WorkController + from celery_pubsub import subscribe, unsubscribe +from pkg_resources import get_distribution, parse_version +P = ParamSpec("P") +R = TypeVar("R") +task: Callable[..., Callable[[Callable[P, R]], Task[P, R]]] -if celery.__version__ < "4.0.0": # pragma: no cover - celery.current_app.conf.update( - CELERY_ALWAYS_EAGER=True, - ) - task = celery.task +if not typing.TYPE_CHECKING: + if get_distribution("celery").parsed_version < parse_version("4.0.0"): + celery.current_app.conf.update( + CELERY_ALWAYS_EAGER=True, + ) + task = celery.task - @pytest.fixture - def celery_worker(): - pass + @pytest.fixture + def celery_worker() -> WorkController: + pass -else: # pragma: no cover - task = celery.shared_task + else: # pragma: no cover + task = celery.shared_task - @pytest.fixture - def celery_config(): - return { - "broker_url": "memory://", - "result_backend": "rpc://", - "broker_transport_options": {"polling_interval": 0.05}, - } + @pytest.fixture(scope="session") + def celery_config(): + return { + "broker_url": "memory://", + "result_backend": "rpc://", + "broker_transport_options": {"polling_interval": 0.05}, + } - if celery.__version__ >= "5.0.0": - pytest_plugins = ["celery.contrib.pytest"] + if get_distribution("celery").parsed_version >= parse_version("5.0.0"): + pytest_plugins = ["celery.contrib.pytest"] @pytest.fixture(scope="session") -def job_a(): +def job_a() -> Task[P, str]: @task(name="job_a") - def job(*args, **kwargs): + def job(*args: P.args, **kwargs: P.kwargs) -> str: print("job_a: {} {}".format(args, kwargs)) return "a" @@ -40,9 +66,9 @@ def job(*args, **kwargs): @pytest.fixture(scope="session") -def job_b(): +def job_b() -> Task[P, str]: @task(name="job_b") - def job(*args, **kwargs): + def job(*args: P.args, **kwargs: P.kwargs) -> str: print("job_b: {} {}".format(args, kwargs)) return "b" @@ -50,9 +76,9 @@ def job(*args, **kwargs): @pytest.fixture(scope="session") -def job_c(): +def job_c() -> Task[P, str]: @task(name="job_c") - def job(*args, **kwargs): + def job(*args: P.args, **kwargs: P.kwargs) -> str: print("job_c: {} {}".format(args, kwargs)) return "c" @@ -60,9 +86,9 @@ def job(*args, **kwargs): @pytest.fixture(scope="session") -def job_d(): +def job_d() -> Task[P, str]: @task(name="job_d") - def job(*args, **kwargs): + def job(*args: P.args, **kwargs: P.kwargs) -> str: print("job_d: {} {}".format(args, kwargs)) return "d" @@ -70,9 +96,9 @@ def job(*args, **kwargs): @pytest.fixture(scope="session") -def job_e(): +def job_e() -> Task[P, str]: @task(name="job_e") - def job(*args, **kwargs): + def job(*args: P.args, **kwargs: P.kwargs) -> str: print("job_e: {} {}".format(args, kwargs)) return "e" @@ -80,9 +106,9 @@ def job(*args, **kwargs): @pytest.fixture(scope="session") -def job_f(): +def job_f() -> Task[P, str]: @task(name="job_f") - def job(*args, **kwargs): + def job(*args: P.args, **kwargs: P.kwargs) -> str: print("job_f: {} {}".format(args, kwargs)) return "f" @@ -90,17 +116,25 @@ def job(*args, **kwargs): @pytest.fixture(scope="session") -def job_g(): +def job_g() -> Task[P, str]: @task(name="job_g") - def job(*args, **kwargs): + def job(*args: P.args, **kwargs: P.kwargs) -> str: print("job_g: {} {}".format(args, kwargs)) return "g" return job -@pytest.fixture(scope="session") -def subscriber(job_a, job_b, job_c, job_d, job_e, job_f, job_g): +@pytest.fixture(scope="session", autouse=True) +def subscriber( + job_a: Task[P, str], + job_b: Task[P, str], + job_c: Task[P, str], + job_d: Task[P, str], + job_e: Task[P, str], + job_f: Task[P, str], + job_g: Task[P, str], +) -> None: subscribe("index.high", job_a) subscribe("index.low", job_b) subscribe("index", job_c) diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index 96e0235..39ceba1 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -1,7 +1,28 @@ +from __future__ import annotations + +import typing + +if typing.TYPE_CHECKING: # pragma: no cover + from typing_extensions import ParamSpec +else: + try: + from typing import ParamSpec as ParamSpec + except ImportError: + try: + from typing_extensions import ParamSpec as ParamSpec + except ImportError: + ParamSpec = None + +from celery import Task +from celery.worker import WorkController + from celery_pubsub import publish, unsubscribe, publish_now +P = ParamSpec("P") +R = typing.TypeVar("R") -def test_subscription(subscriber, job_c, celery_worker): + +def test_subscription(job_c: Task[P, str], celery_worker: WorkController) -> None: from celery_pubsub import publish, subscribe res = publish("dummy", 4, 8, a15=16, a23=42).get() @@ -18,67 +39,75 @@ def test_subscription(subscriber, job_c, celery_worker): assert sorted(res) == sorted(["e"]) -def test_1(subscriber, celery_worker): +def test_1(celery_worker: WorkController) -> None: res = publish("index.low.test", 4, 8, a15=16, a23=42).get() assert sorted(res) == sorted(["d", "e", "f", "g"]) -def test_2(subscriber, celery_worker): +def test_2(celery_worker: WorkController) -> None: res = publish("something.else", 4, 8, a15=16, a23=42).get() assert sorted(res) == sorted(["e"]) -def test_3(subscriber, celery_worker): +def test_3(celery_worker: WorkController) -> None: res = publish("index.high", 4, 8, a15=16, a23=42).get() assert sorted(res) == sorted(["a", "d", "e"]) -def test_4(subscriber, celery_worker): +def test_4(celery_worker: WorkController) -> None: res = publish_now("index", 4, 8, a15=16, a23=42).get() assert sorted(res) == sorted(["c", "e"]) -def test_5(subscriber, celery_worker): +def test_5(celery_worker: WorkController) -> None: from celery_pubsub import publish as pub res = pub("index.low", 4, 8, a15=16, a23=42).get() assert sorted(res) == sorted(["b", "d", "e"]) -def test_6(subscriber, celery_worker): +def test_6(celery_worker: WorkController) -> None: import celery_pubsub as pubsub res = pubsub.publish("index.high.some.test", 4, 8, a15=16, a23=42).get() assert sorted(res) == sorted(["d", "e", "g"]) -def test_7(subscriber, celery_worker): +def test_7(celery_worker: WorkController) -> None: from celery_pubsub import publish res = publish("index.high.test", 4, 8, a15=16, a23=42).get() assert sorted(res) == sorted(["d", "e", "f", "g"]) -def test_subscription_redundant(subscriber, job_a, celery_worker): +def test_subscription_redundant( + job_a: Task[P, str], celery_worker: WorkController +) -> None: import celery_pubsub - jobs_init = celery_pubsub.pubsub._pubsub_manager.get_jobs("redundant.test").tasks + # Ignoring attr-defined because of this: https://github.com/sbdchd/celery-types/issues/112 + + jobs_init = celery_pubsub.pubsub._pubsub_manager.get_jobs("redundant.test").tasks # type: ignore[attr-defined] celery_pubsub.subscribe("redundant.test", job_a) - jobs_before = celery_pubsub.pubsub._pubsub_manager.get_jobs("redundant.test").tasks + jobs_before = celery_pubsub.pubsub._pubsub_manager.get_jobs("redundant.test").tasks # type: ignore[attr-defined] celery_pubsub.subscribe("redundant.test", job_a) - jobs_after = celery_pubsub.pubsub._pubsub_manager.get_jobs("redundant.test").tasks + jobs_after = celery_pubsub.pubsub._pubsub_manager.get_jobs("redundant.test").tasks # type: ignore[attr-defined] celery_pubsub.unsubscribe("redundant.test", job_a) - jobs_end = celery_pubsub.pubsub._pubsub_manager.get_jobs("redundant.test").tasks + jobs_end = celery_pubsub.pubsub._pubsub_manager.get_jobs("redundant.test").tasks # type: ignore[attr-defined] assert jobs_before == jobs_after assert jobs_init == jobs_end -def test_unsubscribe_nonexistant(subscriber, job_a, celery_worker): +def test_unsubscribe_nonexistant( + job_a: Task[P, str], celery_worker: WorkController +) -> None: import celery_pubsub - jobs_before = celery_pubsub.pubsub._pubsub_manager.get_jobs("not.exists").tasks + # Ignoring attr-defined because of this: https://github.com/sbdchd/celery-types/issues/112 + + jobs_before = celery_pubsub.pubsub._pubsub_manager.get_jobs("not.exists").tasks # type: ignore[attr-defined] celery_pubsub.unsubscribe("not.exists", job_a) - jobs_after = celery_pubsub.pubsub._pubsub_manager.get_jobs("not.exists").tasks + jobs_after = celery_pubsub.pubsub._pubsub_manager.get_jobs("not.exists").tasks # type: ignore[attr-defined] assert jobs_before == jobs_after