diff --git a/faststream/nats/testing.py b/faststream/nats/testing.py index f106c93f9d..6681ba5b14 100644 --- a/faststream/nats/testing.py +++ b/faststream/nats/testing.py @@ -1,4 +1,5 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union +from unittest.mock import AsyncMock from nats.aio.msg import Msg from typing_extensions import override @@ -40,8 +41,14 @@ def f(msg: Any) -> None: return sub.calls[0].handler @staticmethod - async def _fake_connect(broker: NatsBroker, *args: Any, **kwargs: Any) -> None: + async def _fake_connect( # type: ignore[override] + broker: NatsBroker, + *args: Any, + **kwargs: Any, + ) -> AsyncMock: + broker.stream = AsyncMock() # type: ignore[assignment] broker._js_producer = broker._producer = FakeProducer(broker) # type: ignore[assignment] + return AsyncMock() @staticmethod def remove_publisher_fake_subscriber( diff --git a/tests/brokers/base/consume.py b/tests/brokers/base/consume.py index 654d3b19f8..68f4edc3e3 100644 --- a/tests/brokers/base/consume.py +++ b/tests/brokers/base/consume.py @@ -221,6 +221,33 @@ async def handler(m: Foo, dep: int = Depends(dependency), broker=Context()): assert event.is_set() mock.assert_called_once_with({"x": 1}, "100", consume_broker) + async def test_dynamic_sub( + self, + queue: str, + consume_broker: BrokerUsecase, + event: asyncio.Event, + ): + def subscriber(m): + event.set() + + async with consume_broker: + await consume_broker.start() + + sub = consume_broker.subscriber(queue, **self.subscriber_kwargs) + sub(subscriber) + consume_broker.setup_subscriber(sub) + await sub.start() + + await asyncio.wait( + ( + asyncio.create_task(consume_broker.publish("hello", queue)), + asyncio.create_task(event.wait()), + ), + timeout=self.timeout, + ) + + assert event.is_set() + @pytest.mark.asyncio() class BrokerRealConsumeTestcase(BrokerConsumeTestcase):