Skip to content

Commit

Permalink
feat (#1431): add Response class
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed May 28, 2024
1 parent d83c1cc commit 0047638
Show file tree
Hide file tree
Showing 20 changed files with 201 additions and 18 deletions.
2 changes: 1 addition & 1 deletion faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Simple and fast framework to create message brokers based microservices."""

__version__ = "0.5.9"
__version__ = "0.5.10"

SERVICE_NAME = f"faststream-{__version__}"

Expand Down
3 changes: 3 additions & 0 deletions faststream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from faststream.annotations import ContextRepo, Logger, NoCast
from faststream.app import FastStream
from faststream.broker.middlewares import BaseMiddleware
from faststream.broker.response import Response
from faststream.testing.app import TestApp
from faststream.utils import Context, Depends, Header, Path, apply_types, context

Expand All @@ -23,4 +24,6 @@
"NoCast",
# middlewares
"BaseMiddleware",
# basic
"Response",
)
52 changes: 52 additions & 0 deletions faststream/broker/response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from typing import TYPE_CHECKING, Any, Optional, Union

if TYPE_CHECKING:
from faststream.types import AnyDict, SendableMessage


class Response:
def __new__(
cls,
body: Union[
"SendableMessage",
"Response",
],
**kwargs: Any,
) -> "Response":
"""Create a new instance of the class."""
if isinstance(body, cls):
return body

else:
return super().__new__(cls)

def __init__(
self,
body: "SendableMessage",
*,
headers: Optional["AnyDict"] = None,
correlation_id: Optional[str] = None,
) -> None:
"""Initialize a handler."""
if not isinstance(body, Response):
self.body = body
self.headers = headers or {}
self.correlation_id = correlation_id

def add_headers(
self,
extra_headers: "AnyDict",
*,
override: bool = True,
) -> None:
if override:
self.headers = {**self.headers, **extra_headers}
else:
self.headers = {**extra_headers, **self.headers}

def as_publish_kwargs(self) -> "AnyDict":
publish_options = {
"headers": self.headers,
"correlation_id": self.correlation_id,
}
return publish_options
20 changes: 13 additions & 7 deletions faststream/broker/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from faststream.asyncapi.abc import AsyncAPIOperation
from faststream.asyncapi.message import parse_handler_params
from faststream.asyncapi.utils import to_camelcase
from faststream.broker.response import Response
from faststream.broker.subscriber.call_item import HandlerItem
from faststream.broker.subscriber.proto import SubscriberProto
from faststream.broker.types import (
Expand Down Expand Up @@ -333,24 +334,29 @@ async def consume(self, msg: MsgType) -> Any:
for m in middlewares:
stack.push_async_exit(m.__aexit__)

result_msg = await h.call(
message=message,
# consumer middlewares
_extra_middlewares=(m.consume_scope for m in middlewares),
result_msg = Response(
await h.call(
message=message,
# consumer middlewares
_extra_middlewares=(m.consume_scope for m in middlewares),
)
)

if not result_msg.correlation_id:
result_msg.correlation_id = message.correlation_id

for p in chain(
self.__get_reponse_publisher(message),
h.handler._publishers,
):
await p.publish(
result_msg,
correlation_id=message.correlation_id,
result_msg.body,
**result_msg.as_publish_kwargs(),
# publisher middlewares
_extra_middlewares=(m.publish_scope for m in middlewares),
)

return result_msg
return result_msg.body

# Suitable handler is not founded
for m in middlewares:
Expand Down
2 changes: 2 additions & 0 deletions faststream/confluent/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from faststream.confluent.annotations import KafkaMessage
from faststream.confluent.broker import KafkaBroker
from faststream.confluent.response import KafkaResponse
from faststream.confluent.router import KafkaPublisher, KafkaRoute, KafkaRouter
from faststream.confluent.testing import TestKafkaBroker
from faststream.testing.app import TestApp
Expand All @@ -10,6 +11,7 @@
"KafkaRouter",
"KafkaRoute",
"KafkaPublisher",
"KafkaResponse",
"TestKafkaBroker",
"TestApp",
)
5 changes: 5 additions & 0 deletions faststream/confluent/response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from faststream.broker.response import Response


class KafkaResponse(Response):
pass
2 changes: 2 additions & 0 deletions faststream/kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from faststream.kafka.annotations import KafkaMessage
from faststream.kafka.broker import KafkaBroker
from faststream.kafka.response import KafkaResponse
from faststream.kafka.router import KafkaPublisher, KafkaRoute, KafkaRouter
from faststream.kafka.testing import TestKafkaBroker
from faststream.testing.app import TestApp
Expand All @@ -11,6 +12,7 @@
"KafkaMessage",
"KafkaRouter",
"KafkaRoute",
"KafkaResponse",
"KafkaPublisher",
"TestKafkaBroker",
"TestApp",
Expand Down
5 changes: 5 additions & 0 deletions faststream/kafka/response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from faststream.broker.response import Response


class KafkaResponse(Response):
pass
2 changes: 2 additions & 0 deletions faststream/nats/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from faststream.nats.annotations import NatsMessage
from faststream.nats.broker.broker import NatsBroker
from faststream.nats.response import NatsResponse
from faststream.nats.router import NatsPublisher, NatsRoute, NatsRouter
from faststream.nats.schemas import JStream, KvWatch, ObjWatch, PullSub
from faststream.nats.testing import TestNatsBroker
Expand All @@ -32,6 +33,7 @@
"NatsPublisher",
"TestNatsBroker",
"NatsMessage",
"NatsResponse",
# Nats imports
"ConsumerConfig",
"DeliverPolicy",
Expand Down
5 changes: 5 additions & 0 deletions faststream/nats/response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from faststream.broker.response import Response


class NatsResponse(Response):
pass
2 changes: 2 additions & 0 deletions faststream/rabbit/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from faststream.rabbit.annotations import RabbitMessage
from faststream.rabbit.broker import RabbitBroker
from faststream.rabbit.response import RabbitResponse
from faststream.rabbit.router import RabbitPublisher, RabbitRoute, RabbitRouter
from faststream.rabbit.schemas import (
ExchangeType,
Expand All @@ -17,6 +18,7 @@
"RabbitRouter",
"RabbitRoute",
"RabbitPublisher",
"RabbitResponse",
"ExchangeType",
"ReplyConfig",
"RabbitExchange",
Expand Down
1 change: 0 additions & 1 deletion faststream/rabbit/publisher/asyncapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ class AsyncAPIPublisher(LogicPublisher):
# or
publisher: AsyncAPIPublisher = router.publisher(...)
```
"""

def get_name(self) -> str:
Expand Down
5 changes: 5 additions & 0 deletions faststream/rabbit/response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from faststream.broker.response import Response


class RabbitResponse(Response):
pass
19 changes: 13 additions & 6 deletions faststream/rabbit/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def create_publisher_fake_subscriber(
publisher: AsyncAPIPublisher,
) -> "HandlerCallWrapper[Any, Any, Any]":
sub = broker.subscriber(
queue=publisher.queue,
queue=publisher.routing,
exchange=publisher.exchange,
)

Expand All @@ -70,7 +70,7 @@ def remove_publisher_fake_subscriber(
) -> None:
broker._subscribers.pop(
AsyncAPISubscriber.get_routing_hash(
queue=publisher.queue,
queue=RabbitQueue.validate(publisher.routing),
exchange=publisher.exchange,
),
None,
Expand Down Expand Up @@ -132,7 +132,7 @@ def build_message(
priority=priority,
correlation_id=correlation_id,
expiration=expiration,
message_id=message_id,
message_id=message_id or gen_cor_id(),
timestamp=timestamp,
message_type=message_type,
user_id=user_id,
Expand All @@ -148,14 +148,21 @@ def build_message(
header=ContentHeader(
properties=spec.Basic.Properties(
content_type=msg.content_type,
message_id=gen_cor_id(),
headers=msg.headers,
reply_to=reply_to,
reply_to=msg.reply_to,
content_encoding=msg.content_encoding,
priority=msg.priority,
correlation_id=msg.correlation_id,
message_id=msg.message_id,
timestamp=msg.timestamp,
message_type=message_type,
user_id=msg.user_id,
app_id=msg.app_id,
)
),
body=msg.body,
channel=AsyncMock(),
)
),
)


Expand Down
2 changes: 2 additions & 0 deletions faststream/redis/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from faststream.redis.annotations import Redis, RedisMessage
from faststream.redis.broker.broker import RedisBroker
from faststream.redis.response import RedisResponse
from faststream.redis.router import RedisPublisher, RedisRoute, RedisRouter
from faststream.redis.schemas import ListSub, PubSub, StreamSub
from faststream.redis.testing import TestRedisBroker
Expand All @@ -12,6 +13,7 @@
"RedisRoute",
"RedisRouter",
"RedisPublisher",
"RedisResponse",
"TestRedisBroker",
"TestApp",
"PubSub",
Expand Down
8 changes: 6 additions & 2 deletions faststream/redis/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ async def publish( # type: ignore[override]

list_sub = ListSub.validate(list or self.list)
reply_to = reply_to or self.reply_to
headers = headers or self.headers
correlation_id = correlation_id or gen_cor_id()

call: "AsyncFunc" = self._producer.publish
Expand All @@ -301,7 +300,7 @@ async def publish( # type: ignore[override]
list=list_sub.name,
# basic args
reply_to=reply_to,
headers=headers,
headers=headers or self.headers,
correlation_id=correlation_id,
# RPC args
rpc=rpc,
Expand All @@ -327,6 +326,10 @@ async def publish( # type: ignore[override]
Optional[str],
Doc("Has no real effect. Option to be compatible with original protocol."),
] = None,
headers: Annotated[
Optional["AnyDict"],
Doc("Message headers to store metainformation."),
] = None,
# publisher specific
_extra_middlewares: Annotated[
Iterable["PublisherMiddleware"],
Expand All @@ -353,6 +356,7 @@ async def publish( # type: ignore[override]
*message,
list=list_sub.name,
correlation_id=correlation_id,
headers=headers or self.headers,
)


Expand Down
5 changes: 5 additions & 0 deletions faststream/redis/response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from faststream.broker.response import Response


class RedisResponse(Response):
pass
42 changes: 41 additions & 1 deletion tests/brokers/base/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import pytest
from pydantic import BaseModel

from faststream import BaseMiddleware
from faststream import BaseMiddleware, Context, Response
from faststream._compat import dump_json, model_to_json
from faststream.broker.core.usecase import BrokerUsecase

Expand Down Expand Up @@ -175,6 +175,46 @@ async def handler(m: message_type):
assert event.is_set()
mock.assert_called_with(expected_message)

@pytest.mark.asyncio()
async def test_response(
self,
queue: str,
event: asyncio.Event,
mock: Mock,
):
pub_broker = self.get_broker(apply_types=True)

@pub_broker.subscriber(queue, **self.subscriber_kwargs)
@pub_broker.publisher(queue + "1")
async def m():
return Response(1, headers={"custom": "1"}, correlation_id="1")

@pub_broker.subscriber(queue + "1", **self.subscriber_kwargs)
async def m_next(msg=Context("message")):
event.set()
mock(
body=msg.body,
headers=msg.headers["custom"],
correlation_id=msg.correlation_id,
)

async with self.patch_broker(pub_broker) as br:
await br.start()
await asyncio.wait(
(
asyncio.create_task(br.publish(None, queue)),
asyncio.create_task(event.wait()),
),
timeout=self.timeout,
)

assert event.is_set()
mock.assert_called_with(
body=b"1",
correlation_id="1",
headers="1",
)

@pytest.mark.asyncio()
async def test_unwrap_dict(
self,
Expand Down
12 changes: 12 additions & 0 deletions tests/brokers/rabbit/test_test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,18 @@ def subscriber(m):

assert event.is_set()

async def test_respect_routing_key(self):
broker = self.get_broker()

publisher = broker.publisher(
exchange=RabbitExchange("test", type=ExchangeType.TOPIC), routing_key="up"
)

async with TestRabbitBroker(broker):
await publisher.publish("Hi!")

publisher.mock.assert_called_once_with("Hi!")

async def test_direct(
self,
queue: str,
Expand Down
Loading

0 comments on commit 0047638

Please sign in to comment.