diff --git a/faststream/nats/testing.py b/faststream/nats/testing.py index 2fa2bddd1a..f106c93f9d 100644 --- a/faststream/nats/testing.py +++ b/faststream/nats/testing.py @@ -4,6 +4,7 @@ from typing_extensions import override from faststream.broker.message import encode_message, gen_cor_id +from faststream.exceptions import WRONG_PUBLISH_ARGS from faststream.nats.broker import NatsBroker from faststream.nats.publisher.producer import NatsFastProducer from faststream.nats.schemas.js_stream import is_subject_match_wildcard @@ -71,6 +72,9 @@ async def publish( # type: ignore[override] rpc_timeout: Optional[float] = None, raise_timeout: bool = False, ) -> Any: + if rpc and reply_to: + raise WRONG_PUBLISH_ARGS + incoming = build_message( message=message, subject=subject, diff --git a/faststream/rabbit/testing.py b/faststream/rabbit/testing.py index e425ed02d6..e15cbe2cb3 100644 --- a/faststream/rabbit/testing.py +++ b/faststream/rabbit/testing.py @@ -8,6 +8,7 @@ from typing_extensions import override from faststream.broker.message import gen_cor_id +from faststream.exceptions import WRONG_PUBLISH_ARGS from faststream.rabbit.broker.broker import RabbitBroker from faststream.rabbit.parser import AioPikaParser from faststream.rabbit.publisher.asyncapi import AsyncAPIPublisher @@ -197,6 +198,9 @@ async def publish( # type: ignore[override] """Publish a message to a RabbitMQ queue or exchange.""" exch = RabbitExchange.validate(exchange) + if rpc and reply_to: + raise WRONG_PUBLISH_ARGS + incoming = build_message( message=message, exchange=exch, diff --git a/faststream/redis/testing.py b/faststream/redis/testing.py index 54cf908923..74541322f1 100644 --- a/faststream/redis/testing.py +++ b/faststream/redis/testing.py @@ -4,7 +4,7 @@ from typing_extensions import override from faststream.broker.message import gen_cor_id -from faststream.exceptions import SetupError +from faststream.exceptions import WRONG_PUBLISH_ARGS, SetupError from faststream.redis.broker.broker import RedisBroker from faststream.redis.message import ( BatchListMessage, @@ -87,6 +87,9 @@ async def publish( # type: ignore[override] rpc_timeout: Optional[float] = 30.0, raise_timeout: bool = False, ) -> Optional[Any]: + if rpc and reply_to: + raise WRONG_PUBLISH_ARGS + correlation_id = correlation_id or gen_cor_id() body = build_message( diff --git a/tests/brokers/nats/test_test_client.py b/tests/brokers/nats/test_test_client.py index 8190e27509..c4bdaa7b41 100644 --- a/tests/brokers/nats/test_test_client.py +++ b/tests/brokers/nats/test_test_client.py @@ -3,12 +3,23 @@ import pytest from faststream import BaseMiddleware +from faststream.exceptions import SetupError from faststream.nats import JStream, NatsBroker, PullSub, TestNatsBroker from tests.brokers.base.testclient import BrokerTestclientTestcase @pytest.mark.asyncio() class TestTestclient(BrokerTestclientTestcase): + async def test_rpc_conflicts_reply(self, queue): + async with TestNatsBroker(NatsBroker()) as br: + with pytest.raises(SetupError): + await br.publish( + "", + queue, + rpc=True, + reply_to="response", + ) + @pytest.mark.nats() async def test_with_real_testclient( self, diff --git a/tests/brokers/rabbit/test_test_client.py b/tests/brokers/rabbit/test_test_client.py index b5f32f0de6..0bb72286e5 100644 --- a/tests/brokers/rabbit/test_test_client.py +++ b/tests/brokers/rabbit/test_test_client.py @@ -4,6 +4,7 @@ import pytest from faststream import BaseMiddleware +from faststream.exceptions import SetupError from faststream.rabbit import ( ExchangeType, RabbitBroker, @@ -18,6 +19,16 @@ @pytest.mark.asyncio() class TestTestclient(BrokerTestclientTestcase): + async def test_rpc_conflicts_reply(self, queue): + async with TestRabbitBroker(RabbitBroker()) as br: + with pytest.raises(SetupError): + await br.publish( + "", + queue, + rpc=True, + reply_to="response", + ) + @pytest.mark.rabbit() async def test_with_real_testclient( self, diff --git a/tests/brokers/redis/test_test_client.py b/tests/brokers/redis/test_test_client.py index ba87d4e685..951d071fbe 100644 --- a/tests/brokers/redis/test_test_client.py +++ b/tests/brokers/redis/test_test_client.py @@ -3,12 +3,23 @@ import pytest from faststream import BaseMiddleware +from faststream.exceptions import SetupError from faststream.redis import ListSub, RedisBroker, StreamSub, TestRedisBroker from tests.brokers.base.testclient import BrokerTestclientTestcase @pytest.mark.asyncio() class TestTestclient(BrokerTestclientTestcase): + async def test_rpc_conflicts_reply(self, queue): + async with TestRedisBroker(RedisBroker()) as br: + with pytest.raises(SetupError): + await br.publish( + "", + queue, + rpc=True, + reply_to="response", + ) + @pytest.mark.redis() async def test_with_real_testclient( self,