Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new test: in_memory_routing #2010

Merged
merged 14 commits into from
Dec 28, 2024
7 changes: 5 additions & 2 deletions faststream/rabbit/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,12 @@ async def _execute_handler(
def _is_handler_matches(
handler: "LogicSubscriber",
routing_key: str,
headers: "Mapping[Any, Any]",
exchange: "RabbitExchange",
headers: Optional["Mapping[Any, Any]"] = None,
exchange: Optional["RabbitExchange"] = None,
) -> bool:
headers = headers or {}
exchange = RabbitExchange.validate(exchange)

if handler.exchange != exchange:
return False

Expand Down
9 changes: 7 additions & 2 deletions tests/asyncapi/base/v2_6_0/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,8 +718,13 @@ class User:
@broker.subscriber("test2")
async def second_handle(user: User) -> None: ...

with pytest.warns(RuntimeWarning, match="Overwriting the message schema, data types have the same name"):
schema = AsyncAPI(self.build_app(broker), schema_version="2.6.0").to_jsonable()
with pytest.warns(
RuntimeWarning,
match="Overwriting the message schema, data types have the same name",
):
schema = AsyncAPI(
self.build_app(broker), schema_version="2.6.0"
).to_jsonable()

payload = schema["components"]["schemas"]

Expand Down
9 changes: 7 additions & 2 deletions tests/asyncapi/base/v3_0_0/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,8 +702,13 @@ class User:
@broker.subscriber("test2")
async def second_handle(user: User) -> None: ...

with pytest.warns(RuntimeWarning, match="Overwriting the message schema, data types have the same name"):
schema = AsyncAPI(self.build_app(broker), schema_version="3.0.0").to_jsonable()
with pytest.warns(
RuntimeWarning,
match="Overwriting the message schema, data types have the same name",
):
schema = AsyncAPI(
self.build_app(broker), schema_version="3.0.0"
).to_jsonable()

payload = schema["components"]["schemas"]

Expand Down
291 changes: 148 additions & 143 deletions tests/brokers/rabbit/test_test_client.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import asyncio
from typing import Any

import pytest

from faststream import BaseMiddleware
from faststream.exceptions import SubscriberNotFound
from faststream.rabbit import (
ExchangeType,
RabbitBroker,
RabbitExchange,
RabbitQueue,
)
from faststream.rabbit.annotations import RabbitMessage
from faststream.rabbit.testing import FakeProducer, apply_pattern
from faststream.rabbit.testing import FakeProducer, _is_handler_matches, apply_pattern
from tests.brokers.base.testclient import BrokerTestclientTestcase

from .basic import RabbitMemoryTestcaseConfig
Expand Down Expand Up @@ -42,151 +44,15 @@ def subscriber(m) -> None:

assert event.is_set()

async def test_respect_routing_key(self) -> None:
broker = self.get_broker()

publisher = broker.publisher(
exchange=RabbitExchange("test", type=ExchangeType.TOPIC),
routing_key="up",
)

async with self.patch_broker(broker):
await publisher.publish("Hi!")

publisher.mock.assert_called_once_with("Hi!")

async def test_direct(
async def test_direct_not_found(
self,
queue: str,
) -> None:
broker = self.get_broker()

@broker.subscriber(queue)
async def handler(m) -> int:
return 1

@broker.subscriber(queue + "1", exchange="test")
async def handler2(m) -> int:
return 2

async with self.patch_broker(broker) as br:
await br.start()

assert await (await br.request("", queue)).decode() == 1
assert (
await (await br.request("", queue + "1", exchange="test")).decode() == 2
)

with pytest.raises(SubscriberNotFound):
await br.request("", exchange="test2")

async def test_fanout(
self,
queue: str,
mock,
) -> None:
broker = self.get_broker()

exch = RabbitExchange("test", type=ExchangeType.FANOUT)

@broker.subscriber(queue, exchange=exch)
async def handler(m) -> None:
mock()

async with self.patch_broker(broker) as br:
await br.request("", exchange=exch)

with pytest.raises(SubscriberNotFound):
await br.request("", exchange="test2")

assert mock.call_count == 1

async def test_any_topic_routing(self) -> None:
broker = self.get_broker()

exch = RabbitExchange("test", type=ExchangeType.TOPIC)

@broker.subscriber(
RabbitQueue("test", routing_key="test.*.subj.*"),
exchange=exch,
)
def subscriber(msg) -> None: ...

async with self.patch_broker(broker) as br:
await br.publish("hello", "test.a.subj.b", exchange=exch)
subscriber.mock.assert_called_once_with("hello")

async def test_ending_topic_routing(self) -> None:
broker = self.get_broker()

exch = RabbitExchange("test", type=ExchangeType.TOPIC)

@broker.subscriber(
RabbitQueue("test", routing_key="test.#"),
exchange=exch,
)
def subscriber(msg) -> None: ...

async with self.patch_broker(broker) as br:
await br.publish("hello", "test.a.subj.b", exchange=exch)
subscriber.mock.assert_called_once_with("hello")

async def test_mixed_topic_routing(self) -> None:
broker = self.get_broker()

exch = RabbitExchange("test", type=ExchangeType.TOPIC)

@broker.subscriber(
RabbitQueue("test", routing_key="*.*.subj.#"),
exchange=exch,
)
def subscriber(msg) -> None: ...

async with self.patch_broker(broker) as br:
await br.publish("hello", "test.a.subj.b.c", exchange=exch)
subscriber.mock.assert_called_once_with("hello")

async def test_header(self) -> None:
broker = self.get_broker()

q1 = RabbitQueue(
"test-queue-2",
bind_arguments={"key": 2, "key2": 2, "x-match": "any"},
)
q2 = RabbitQueue(
"test-queue-3",
bind_arguments={"key": 2, "key2": 2, "x-match": "all"},
)
q3 = RabbitQueue(
"test-queue-4",
bind_arguments={},
)
exch = RabbitExchange("exchange", type=ExchangeType.HEADERS)

@broker.subscriber(q2, exch)
async def handler2(msg) -> int:
return 2

@broker.subscriber(q1, exch)
async def handler(msg) -> int:
return 1

@broker.subscriber(q3, exch)
async def handler3(msg) -> int:
return 3

async with self.patch_broker(broker) as br:
assert (
await (
await br.request(exchange=exch, headers={"key": 2, "key2": 2})
).decode()
== 2
)
assert (
await (await br.request(exchange=exch, headers={"key": 2})).decode()
== 1
)
assert await (await br.request(exchange=exch, headers={})).decode() == 3
await br.request("", "")

async def test_consume_manual_ack(
self,
Expand All @@ -205,13 +71,13 @@ async def handler(msg: RabbitMessage) -> None:
consume.set()

@broker.subscriber(queue=queue + "1", exchange=exchange)
async def handler2(msg: RabbitMessage):
async def handler2(msg: RabbitMessage) -> None:
await msg.raw_message.nack()
consume2.set()
raise ValueError

@broker.subscriber(queue=queue + "2", exchange=exchange)
async def handler3(msg: RabbitMessage):
async def handler3(msg: RabbitMessage) -> None:
await msg.raw_message.reject()
consume3.set()
raise ValueError
Expand Down Expand Up @@ -239,7 +105,7 @@ async def handler3(msg: RabbitMessage):
assert consume2.is_set()
assert consume3.is_set()

async def test_respect_middleware(self, queue) -> None:
async def test_respect_middleware(self, queue: str) -> None:
routes = []

class Middleware(BaseMiddleware):
Expand All @@ -262,7 +128,7 @@ async def h2(msg) -> None: ...
assert len(routes) == 2

@pytest.mark.rabbit()
async def test_real_respect_middleware(self, queue) -> None:
async def test_real_respect_middleware(self, queue: str) -> None:
routes = []

class Middleware(BaseMiddleware):
Expand Down Expand Up @@ -329,3 +195,142 @@ async def test_broker_with_real_patches_publishers_and_subscribers(
)
def test(pattern: str, current: str, result: bool) -> None:
assert apply_pattern(pattern, current) == result


exch_direct = RabbitExchange("exchange", auto_delete=True, type=ExchangeType.DIRECT)
exch_fanout = RabbitExchange("exchange", auto_delete=True, type=ExchangeType.FANOUT)
exch_topic = RabbitExchange("exchange", auto_delete=True, type=ExchangeType.TOPIC)
exch_headers = RabbitExchange("exchange", auto_delete=True, type=ExchangeType.HEADERS)
reqular_queue = RabbitQueue("test-reqular-queue", auto_delete=True)

routing_key_queue = RabbitQueue(
"test-routing-key-queue", auto_delete=True, routing_key="*.info"
)
one_key_queue = RabbitQueue(
"test-one-key-queue", auto_delete=True, bind_arguments={"key": 1}
)
any_keys_queue = RabbitQueue(
"test-any-keys-queue",
auto_delete=True,
bind_arguments={"key": 2, "key2": 2, "x-match": "any"},
)
all_keys_queue = RabbitQueue(
"test-all-keys-queue",
auto_delete=True,
bind_arguments={"key": 2, "key2": 2, "x-match": "all"},
)

broker = RabbitBroker()


@pytest.mark.parametrize(
(
"queue",
"exchange",
"routing_key",
"headers",
"expected_result",
),
(
pytest.param(
reqular_queue,
exch_direct,
reqular_queue.routing,
{},
True,
id="direct match",
),
pytest.param(
reqular_queue,
exch_direct,
"wrong key",
{},
False,
id="direct mismatch",
),
pytest.param(
reqular_queue,
exch_fanout,
"",
{},
True,
id="fanout match",
),
pytest.param(
routing_key_queue,
exch_topic,
"log.info",
{},
True,
id="topic match",
),
pytest.param(
routing_key_queue,
exch_topic,
"log.wrong",
{},
False,
id="topic mismatch",
),
pytest.param(
one_key_queue,
exch_headers,
"",
{"key": 1},
True,
id="one header match",
),
pytest.param(
one_key_queue,
exch_headers,
"",
{"key": "wrong"},
False,
id="one header mismatch",
),
pytest.param(
any_keys_queue,
exch_headers,
"",
{"key2": 2},
True,
id="any headers match",
),
pytest.param(
any_keys_queue,
exch_headers,
"",
{"key2": "wrong"},
False,
id="any headers mismatch",
),
pytest.param(
all_keys_queue,
exch_headers,
"",
{"key": 2, "key2": 2},
True,
id="all headers match",
),
pytest.param(
all_keys_queue,
exch_headers,
"",
{"key": "wrong", "key2": 2},
False,
id="all headers mismatch",
),
),
)
def test_in_memory_routing(
queue: str,
exchange: RabbitExchange,
routing_key: str,
headers: dict[str, Any],
expected_result: bool,
) -> None:
subscriber = broker.subscriber(queue, exchange)
assert (
_is_handler_matches(subscriber, routing_key, headers, exchange)
is expected_result
)
Loading