Skip to content

Commit

Permalink
test: add additional tests and refactor job fixture methods in pubsub…
Browse files Browse the repository at this point in the history
… module

Expanded unit test coverage in test_pubsub.py with methods test_8 to test_11 to validate the functionality and expected results for the publish method with different parameters. Refactored job fixture methods in conftest.py to increase code readability. Now, the Callable typing for each 'job' is directly derived from the task input, enhancing code execution efficiency. New job fixtures (job_h to job_m) have also been added for additional testing purposes.
  • Loading branch information
WaYdotNET committed Dec 3, 2023
1 parent bf9ac7e commit fa9141e
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 8 deletions.
65 changes: 57 additions & 8 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

from typing import Callable, TypeVar

import celery
import pytest

import celery
from celery import Task
from celery.worker import WorkController
from pkg_resources import get_distribution, parse_version

from celery_pubsub import subscribe, subscribe_to
from pkg_resources import get_distribution, parse_version

P = ParamSpec("P")
R = TypeVar("R")
Expand Down Expand Up @@ -56,12 +57,12 @@ def celery_config():

@pytest.fixture(scope="session")
def job_a() -> Task[P, str]:
@subscribe_to(topic="index.high")
def job_a(*args: P.args, **kwargs: P.kwargs) -> str:
@task(name="job_a")
def job(*args: P.args, **kwargs: P.kwargs) -> str:
print("job_a: {} {}".format(args, kwargs))
return "a"

return job_a
return job


@pytest.fixture(scope="session")
Expand All @@ -76,12 +77,12 @@ def job(*args: P.args, **kwargs: P.kwargs) -> str:

@pytest.fixture(scope="session")
def job_c() -> Task[P, str]:
@subscribe_to(topic="index")
def job_c(*args: P.args, **kwargs: P.kwargs) -> str:
@task(name="job_c")
def job(*args: P.args, **kwargs: P.kwargs) -> str:
print("job_c: {} {}".format(args, kwargs))
return "c"

return job_c
return job


@pytest.fixture(scope="session")
Expand Down Expand Up @@ -124,6 +125,48 @@ def job(*args: P.args, **kwargs: P.kwargs) -> str:
return job


@pytest.fixture(scope="session")
def job_h() -> Task[P, str]:
@subscribe_to(topic="foo.#")
@task(bind=True, name="job_h")
def job(*args: P.args, **kwargs: P.kwargs) -> str:
print(f"job_h: {args} {kwargs}")
return "h"

return job


@pytest.fixture(scope="session")
def job_i() -> Task[P, str]:
@subscribe_to(topic="foo")
def job(*args: P.args, **kwargs: P.kwargs) -> str:
print(f"job_i: {args} {kwargs}")
return "i"

return job


@pytest.fixture(scope="session")
def job_l() -> Task[P, str]:
@subscribe_to(topic="foo.bar.baz")
@task(name="job_l")
def job(*args: P.args, **kwargs: P.kwargs) -> str:
print(f"job_l: {args} {kwargs}")
return "l"

return job


@pytest.fixture(scope="session")
def job_m() -> Task[P, str]:
@subscribe_to(topic="foo.bar")
def job(*args: P.args, **kwargs: P.kwargs) -> str:
print(f"job_m: {args} {kwargs}")
return "m"

return job


@pytest.fixture(scope="session", autouse=True)
def subscriber(
job_a: Task[P, str],
Expand All @@ -133,8 +176,14 @@ def subscriber(
job_e: Task[P, str],
job_f: Task[P, str],
job_g: Task[P, str],
job_h: Task[P, str],
job_i: Task[P, str],
job_l: Task[P, str],
job_m: Task[P, str],
) -> None:
subscribe("index.high", job_a)
subscribe("index.low", job_b)
subscribe("index", job_c)
subscribe("index.#", job_d)
subscribe("#", job_e)
subscribe("index.*.test", job_f)
Expand Down
28 changes: 28 additions & 0 deletions tests/test_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,34 @@ def test_7(celery_worker: WorkController) -> None:
assert sorted(res) == sorted(["d", "e", "f", "g"])


def test_8(celery_worker: WorkController) -> None:
from celery_pubsub import publish

res = publish("foo", 4, 8, a15=16, a23=42).get()
assert sorted(res) == sorted(["e", "i"])


def test_9(celery_worker: WorkController) -> None:
from celery_pubsub import publish

res = publish("foo.bar.blur", 4, 8, a15=16, a23=42).get()
assert sorted(res) == sorted(["e", "h"])


def test_10(celery_worker: WorkController) -> None:
from celery_pubsub import publish

res = publish("foo.bar.baz", 4, 8, a15=16, a23=42).get()
assert sorted(res) == sorted(["e", "h", "l"])


def test_11(celery_worker: WorkController) -> None:
from celery_pubsub import publish

res = publish("foo.bar", 4, 8, a15=16, a23=42).get()
assert sorted(res) == sorted(["e", "h", "m"])


def test_subscription_redundant(
job_a: Task[P, str], celery_worker: WorkController
) -> None:
Expand Down

0 comments on commit fa9141e

Please sign in to comment.