Skip to content

Commit

Permalink
Merge branch 'main' into fix/kafka-secure-merge
Browse files Browse the repository at this point in the history
  • Loading branch information
kumaranvpl authored May 3, 2024
2 parents f9a31a8 + 7bc3428 commit 7a40853
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 1 deletion.
4 changes: 4 additions & 0 deletions faststream/nats/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing_extensions import override

from faststream.broker.message import encode_message, gen_cor_id
from faststream.exceptions import WRONG_PUBLISH_ARGS
from faststream.nats.broker import NatsBroker
from faststream.nats.publisher.producer import NatsFastProducer
from faststream.nats.schemas.js_stream import is_subject_match_wildcard
Expand Down Expand Up @@ -71,6 +72,9 @@ async def publish( # type: ignore[override]
rpc_timeout: Optional[float] = None,
raise_timeout: bool = False,
) -> Any:
if rpc and reply_to:
raise WRONG_PUBLISH_ARGS

incoming = build_message(
message=message,
subject=subject,
Expand Down
4 changes: 4 additions & 0 deletions faststream/rabbit/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing_extensions import override

from faststream.broker.message import gen_cor_id
from faststream.exceptions import WRONG_PUBLISH_ARGS
from faststream.rabbit.broker.broker import RabbitBroker
from faststream.rabbit.parser import AioPikaParser
from faststream.rabbit.publisher.asyncapi import AsyncAPIPublisher
Expand Down Expand Up @@ -197,6 +198,9 @@ async def publish( # type: ignore[override]
"""Publish a message to a RabbitMQ queue or exchange."""
exch = RabbitExchange.validate(exchange)

if rpc and reply_to:
raise WRONG_PUBLISH_ARGS

incoming = build_message(
message=message,
exchange=exch,
Expand Down
5 changes: 4 additions & 1 deletion faststream/redis/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing_extensions import override

from faststream.broker.message import gen_cor_id
from faststream.exceptions import SetupError
from faststream.exceptions import WRONG_PUBLISH_ARGS, SetupError
from faststream.redis.broker.broker import RedisBroker
from faststream.redis.message import (
BatchListMessage,
Expand Down Expand Up @@ -87,6 +87,9 @@ async def publish( # type: ignore[override]
rpc_timeout: Optional[float] = 30.0,
raise_timeout: bool = False,
) -> Optional[Any]:
if rpc and reply_to:
raise WRONG_PUBLISH_ARGS

correlation_id = correlation_id or gen_cor_id()

body = build_message(
Expand Down
11 changes: 11 additions & 0 deletions tests/brokers/nats/test_test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,23 @@
import pytest

from faststream import BaseMiddleware
from faststream.exceptions import SetupError
from faststream.nats import JStream, NatsBroker, PullSub, TestNatsBroker
from tests.brokers.base.testclient import BrokerTestclientTestcase


@pytest.mark.asyncio()
class TestTestclient(BrokerTestclientTestcase):
async def test_rpc_conflicts_reply(self, queue):
async with TestNatsBroker(NatsBroker()) as br:
with pytest.raises(SetupError):
await br.publish(
"",
queue,
rpc=True,
reply_to="response",
)

@pytest.mark.nats()
async def test_with_real_testclient(
self,
Expand Down
11 changes: 11 additions & 0 deletions tests/brokers/rabbit/test_test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest

from faststream import BaseMiddleware
from faststream.exceptions import SetupError
from faststream.rabbit import (
ExchangeType,
RabbitBroker,
Expand All @@ -18,6 +19,16 @@

@pytest.mark.asyncio()
class TestTestclient(BrokerTestclientTestcase):
async def test_rpc_conflicts_reply(self, queue):
async with TestRabbitBroker(RabbitBroker()) as br:
with pytest.raises(SetupError):
await br.publish(
"",
queue,
rpc=True,
reply_to="response",
)

@pytest.mark.rabbit()
async def test_with_real_testclient(
self,
Expand Down
11 changes: 11 additions & 0 deletions tests/brokers/redis/test_test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,23 @@
import pytest

from faststream import BaseMiddleware
from faststream.exceptions import SetupError
from faststream.redis import ListSub, RedisBroker, StreamSub, TestRedisBroker
from tests.brokers.base.testclient import BrokerTestclientTestcase


@pytest.mark.asyncio()
class TestTestclient(BrokerTestclientTestcase):
async def test_rpc_conflicts_reply(self, queue):
async with TestRedisBroker(RedisBroker()) as br:
with pytest.raises(SetupError):
await br.publish(
"",
queue,
rpc=True,
reply_to="response",
)

@pytest.mark.redis()
async def test_with_real_testclient(
self,
Expand Down

0 comments on commit 7a40853

Please sign in to comment.