diff --git a/docs/docs/SUMMARY.md b/docs/docs/SUMMARY.md index e09163b08a..150de9cc30 100644 --- a/docs/docs/SUMMARY.md +++ b/docs/docs/SUMMARY.md @@ -116,6 +116,7 @@ search: - [FastStream](public_api/faststream/FastStream.md) - [Header](public_api/faststream/Header.md) - [Path](public_api/faststream/Path.md) + - [Response](public_api/faststream/Response.md) - [TestApp](public_api/faststream/TestApp.md) - [apply_types](public_api/faststream/apply_types.md) - asyncapi @@ -124,6 +125,7 @@ search: - confluent - [KafkaBroker](public_api/faststream/confluent/KafkaBroker.md) - [KafkaPublisher](public_api/faststream/confluent/KafkaPublisher.md) + - [KafkaResponse](public_api/faststream/confluent/KafkaResponse.md) - [KafkaRoute](public_api/faststream/confluent/KafkaRoute.md) - [KafkaRouter](public_api/faststream/confluent/KafkaRouter.md) - [TestApp](public_api/faststream/confluent/TestApp.md) @@ -131,6 +133,7 @@ search: - kafka - [KafkaBroker](public_api/faststream/kafka/KafkaBroker.md) - [KafkaPublisher](public_api/faststream/kafka/KafkaPublisher.md) + - [KafkaResponse](public_api/faststream/kafka/KafkaResponse.md) - [KafkaRoute](public_api/faststream/kafka/KafkaRoute.md) - [KafkaRouter](public_api/faststream/kafka/KafkaRouter.md) - [TestApp](public_api/faststream/kafka/TestApp.md) @@ -146,6 +149,7 @@ search: - [KvWatch](public_api/faststream/nats/KvWatch.md) - [NatsBroker](public_api/faststream/nats/NatsBroker.md) - [NatsPublisher](public_api/faststream/nats/NatsPublisher.md) + - [NatsResponse](public_api/faststream/nats/NatsResponse.md) - [NatsRoute](public_api/faststream/nats/NatsRoute.md) - [NatsRouter](public_api/faststream/nats/NatsRouter.md) - [ObjWatch](public_api/faststream/nats/ObjWatch.md) @@ -165,6 +169,7 @@ search: - [RabbitExchange](public_api/faststream/rabbit/RabbitExchange.md) - [RabbitPublisher](public_api/faststream/rabbit/RabbitPublisher.md) - [RabbitQueue](public_api/faststream/rabbit/RabbitQueue.md) + - [RabbitResponse](public_api/faststream/rabbit/RabbitResponse.md) - [RabbitRoute](public_api/faststream/rabbit/RabbitRoute.md) - [RabbitRouter](public_api/faststream/rabbit/RabbitRouter.md) - [ReplyConfig](public_api/faststream/rabbit/ReplyConfig.md) @@ -175,6 +180,7 @@ search: - [PubSub](public_api/faststream/redis/PubSub.md) - [RedisBroker](public_api/faststream/redis/RedisBroker.md) - [RedisPublisher](public_api/faststream/redis/RedisPublisher.md) + - [RedisResponse](public_api/faststream/redis/RedisResponse.md) - [RedisRoute](public_api/faststream/redis/RedisRoute.md) - [RedisRouter](public_api/faststream/redis/RedisRouter.md) - [StreamSub](public_api/faststream/redis/StreamSub.md) @@ -188,6 +194,7 @@ search: - [FastStream](api/faststream/FastStream.md) - [Header](api/faststream/Header.md) - [Path](api/faststream/Path.md) + - [Response](api/faststream/Response.md) - [TestApp](api/faststream/TestApp.md) - [apply_types](api/faststream/apply_types.md) - app @@ -349,6 +356,8 @@ search: - [PublisherProto](api/faststream/broker/publisher/proto/PublisherProto.md) - usecase - [PublisherUsecase](api/faststream/broker/publisher/usecase/PublisherUsecase.md) + - response + - [Response](api/faststream/broker/response/Response.md) - router - [ArgsContainer](api/faststream/broker/router/ArgsContainer.md) - [BrokerRouter](api/faststream/broker/router/BrokerRouter.md) @@ -413,6 +422,7 @@ search: - confluent - [KafkaBroker](api/faststream/confluent/KafkaBroker.md) - [KafkaPublisher](api/faststream/confluent/KafkaPublisher.md) + - [KafkaResponse](api/faststream/confluent/KafkaResponse.md) - [KafkaRoute](api/faststream/confluent/KafkaRoute.md) - [KafkaRouter](api/faststream/confluent/KafkaRouter.md) - [TestApp](api/faststream/confluent/TestApp.md) @@ -464,6 +474,8 @@ search: - [BatchPublisher](api/faststream/confluent/publisher/usecase/BatchPublisher.md) - [DefaultPublisher](api/faststream/confluent/publisher/usecase/DefaultPublisher.md) - [LogicPublisher](api/faststream/confluent/publisher/usecase/LogicPublisher.md) + - response + - [KafkaResponse](api/faststream/confluent/response/KafkaResponse.md) - router - [KafkaPublisher](api/faststream/confluent/router/KafkaPublisher.md) - [KafkaRoute](api/faststream/confluent/router/KafkaRoute.md) @@ -506,6 +518,7 @@ search: - kafka - [KafkaBroker](api/faststream/kafka/KafkaBroker.md) - [KafkaPublisher](api/faststream/kafka/KafkaPublisher.md) + - [KafkaResponse](api/faststream/kafka/KafkaResponse.md) - [KafkaRoute](api/faststream/kafka/KafkaRoute.md) - [KafkaRouter](api/faststream/kafka/KafkaRouter.md) - [TestApp](api/faststream/kafka/TestApp.md) @@ -550,6 +563,8 @@ search: - [BatchPublisher](api/faststream/kafka/publisher/usecase/BatchPublisher.md) - [DefaultPublisher](api/faststream/kafka/publisher/usecase/DefaultPublisher.md) - [LogicPublisher](api/faststream/kafka/publisher/usecase/LogicPublisher.md) + - response + - [KafkaResponse](api/faststream/kafka/response/KafkaResponse.md) - router - [KafkaPublisher](api/faststream/kafka/router/KafkaPublisher.md) - [KafkaRoute](api/faststream/kafka/router/KafkaRoute.md) @@ -592,6 +607,7 @@ search: - [KvWatch](api/faststream/nats/KvWatch.md) - [NatsBroker](api/faststream/nats/NatsBroker.md) - [NatsPublisher](api/faststream/nats/NatsPublisher.md) + - [NatsResponse](api/faststream/nats/NatsResponse.md) - [NatsRoute](api/faststream/nats/NatsRoute.md) - [NatsRouter](api/faststream/nats/NatsRouter.md) - [ObjWatch](api/faststream/nats/ObjWatch.md) @@ -657,6 +673,8 @@ search: - [NatsJSFastProducer](api/faststream/nats/publisher/producer/NatsJSFastProducer.md) - usecase - [LogicPublisher](api/faststream/nats/publisher/usecase/LogicPublisher.md) + - response + - [NatsResponse](api/faststream/nats/response/NatsResponse.md) - router - [NatsPublisher](api/faststream/nats/router/NatsPublisher.md) - [NatsRoute](api/faststream/nats/router/NatsRoute.md) @@ -728,6 +746,7 @@ search: - [RabbitExchange](api/faststream/rabbit/RabbitExchange.md) - [RabbitPublisher](api/faststream/rabbit/RabbitPublisher.md) - [RabbitQueue](api/faststream/rabbit/RabbitQueue.md) + - [RabbitResponse](api/faststream/rabbit/RabbitResponse.md) - [RabbitRoute](api/faststream/rabbit/RabbitRoute.md) - [RabbitRouter](api/faststream/rabbit/RabbitRouter.md) - [ReplyConfig](api/faststream/rabbit/ReplyConfig.md) @@ -764,6 +783,8 @@ search: - usecase - [LogicPublisher](api/faststream/rabbit/publisher/usecase/LogicPublisher.md) - [PublishKwargs](api/faststream/rabbit/publisher/usecase/PublishKwargs.md) + - response + - [RabbitResponse](api/faststream/rabbit/response/RabbitResponse.md) - router - [RabbitPublisher](api/faststream/rabbit/router/RabbitPublisher.md) - [RabbitRoute](api/faststream/rabbit/router/RabbitRoute.md) @@ -808,6 +829,7 @@ search: - [PubSub](api/faststream/redis/PubSub.md) - [RedisBroker](api/faststream/redis/RedisBroker.md) - [RedisPublisher](api/faststream/redis/RedisPublisher.md) + - [RedisResponse](api/faststream/redis/RedisResponse.md) - [RedisRoute](api/faststream/redis/RedisRoute.md) - [RedisRouter](api/faststream/redis/RedisRouter.md) - [StreamSub](api/faststream/redis/StreamSub.md) @@ -869,6 +891,8 @@ search: - [ListPublisher](api/faststream/redis/publisher/usecase/ListPublisher.md) - [LogicPublisher](api/faststream/redis/publisher/usecase/LogicPublisher.md) - [StreamPublisher](api/faststream/redis/publisher/usecase/StreamPublisher.md) + - response + - [RedisResponse](api/faststream/redis/response/RedisResponse.md) - router - [RedisPublisher](api/faststream/redis/router/RedisPublisher.md) - [RedisRoute](api/faststream/redis/router/RedisRoute.md) diff --git a/docs/docs/en/api/faststream/Response.md b/docs/docs/en/api/faststream/Response.md new file mode 100644 index 0000000000..3475e3f584 --- /dev/null +++ b/docs/docs/en/api/faststream/Response.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.Response diff --git a/docs/docs/en/api/faststream/broker/response/Response.md b/docs/docs/en/api/faststream/broker/response/Response.md new file mode 100644 index 0000000000..1163381d7b --- /dev/null +++ b/docs/docs/en/api/faststream/broker/response/Response.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.broker.response.Response diff --git a/docs/docs/en/api/faststream/confluent/KafkaResponse.md b/docs/docs/en/api/faststream/confluent/KafkaResponse.md new file mode 100644 index 0000000000..eb0eab479c --- /dev/null +++ b/docs/docs/en/api/faststream/confluent/KafkaResponse.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.confluent.KafkaResponse diff --git a/docs/docs/en/api/faststream/confluent/response/KafkaResponse.md b/docs/docs/en/api/faststream/confluent/response/KafkaResponse.md new file mode 100644 index 0000000000..7fa5542613 --- /dev/null +++ b/docs/docs/en/api/faststream/confluent/response/KafkaResponse.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.confluent.response.KafkaResponse diff --git a/docs/docs/en/api/faststream/kafka/KafkaResponse.md b/docs/docs/en/api/faststream/kafka/KafkaResponse.md new file mode 100644 index 0000000000..4aab0b965d --- /dev/null +++ b/docs/docs/en/api/faststream/kafka/KafkaResponse.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.kafka.KafkaResponse diff --git a/docs/docs/en/api/faststream/kafka/response/KafkaResponse.md b/docs/docs/en/api/faststream/kafka/response/KafkaResponse.md new file mode 100644 index 0000000000..05ecd69c2d --- /dev/null +++ b/docs/docs/en/api/faststream/kafka/response/KafkaResponse.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.kafka.response.KafkaResponse diff --git a/docs/docs/en/api/faststream/nats/NatsResponse.md b/docs/docs/en/api/faststream/nats/NatsResponse.md new file mode 100644 index 0000000000..6b967b527a --- /dev/null +++ b/docs/docs/en/api/faststream/nats/NatsResponse.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.nats.NatsResponse diff --git a/docs/docs/en/api/faststream/nats/response/NatsResponse.md b/docs/docs/en/api/faststream/nats/response/NatsResponse.md new file mode 100644 index 0000000000..8a7da66982 --- /dev/null +++ b/docs/docs/en/api/faststream/nats/response/NatsResponse.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.nats.response.NatsResponse diff --git a/docs/docs/en/api/faststream/rabbit/RabbitResponse.md b/docs/docs/en/api/faststream/rabbit/RabbitResponse.md new file mode 100644 index 0000000000..4d20d82b0e --- /dev/null +++ b/docs/docs/en/api/faststream/rabbit/RabbitResponse.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.rabbit.RabbitResponse diff --git a/docs/docs/en/api/faststream/rabbit/response/RabbitResponse.md b/docs/docs/en/api/faststream/rabbit/response/RabbitResponse.md new file mode 100644 index 0000000000..477cfb9861 --- /dev/null +++ b/docs/docs/en/api/faststream/rabbit/response/RabbitResponse.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.rabbit.response.RabbitResponse diff --git a/docs/docs/en/api/faststream/redis/RedisResponse.md b/docs/docs/en/api/faststream/redis/RedisResponse.md new file mode 100644 index 0000000000..eedecf1ea3 --- /dev/null +++ b/docs/docs/en/api/faststream/redis/RedisResponse.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.redis.RedisResponse diff --git a/docs/docs/en/api/faststream/redis/response/RedisResponse.md b/docs/docs/en/api/faststream/redis/response/RedisResponse.md new file mode 100644 index 0000000000..dd7fbe72eb --- /dev/null +++ b/docs/docs/en/api/faststream/redis/response/RedisResponse.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.redis.response.RedisResponse diff --git a/faststream/__about__.py b/faststream/__about__.py index d0972600b0..33eac3b1c1 100644 --- a/faststream/__about__.py +++ b/faststream/__about__.py @@ -1,6 +1,6 @@ """Simple and fast framework to create message brokers based microservices.""" -__version__ = "0.5.9" +__version__ = "0.5.10" SERVICE_NAME = f"faststream-{__version__}" diff --git a/faststream/__init__.py b/faststream/__init__.py index 7ef2bad9e4..b0f6456967 100644 --- a/faststream/__init__.py +++ b/faststream/__init__.py @@ -3,6 +3,7 @@ from faststream.annotations import ContextRepo, Logger, NoCast from faststream.app import FastStream from faststream.broker.middlewares import BaseMiddleware +from faststream.broker.response import Response from faststream.testing.app import TestApp from faststream.utils import Context, Depends, Header, Path, apply_types, context @@ -23,4 +24,6 @@ "NoCast", # middlewares "BaseMiddleware", + # basic + "Response", ) diff --git a/faststream/broker/response.py b/faststream/broker/response.py new file mode 100644 index 0000000000..5a811a77e0 --- /dev/null +++ b/faststream/broker/response.py @@ -0,0 +1,52 @@ +from typing import TYPE_CHECKING, Any, Optional, Union + +if TYPE_CHECKING: + from faststream.types import AnyDict, SendableMessage + + +class Response: + def __new__( + cls, + body: Union[ + "SendableMessage", + "Response", + ], + **kwargs: Any, + ) -> "Response": + """Create a new instance of the class.""" + if isinstance(body, cls): + return body + + else: + return super().__new__(cls) + + def __init__( + self, + body: "SendableMessage", + *, + headers: Optional["AnyDict"] = None, + correlation_id: Optional[str] = None, + ) -> None: + """Initialize a handler.""" + if not isinstance(body, Response): + self.body = body + self.headers = headers or {} + self.correlation_id = correlation_id + + def add_headers( + self, + extra_headers: "AnyDict", + *, + override: bool = True, + ) -> None: + if override: + self.headers = {**self.headers, **extra_headers} + else: + self.headers = {**extra_headers, **self.headers} + + def as_publish_kwargs(self) -> "AnyDict": + publish_options = { + "headers": self.headers, + "correlation_id": self.correlation_id, + } + return publish_options diff --git a/faststream/broker/subscriber/usecase.py b/faststream/broker/subscriber/usecase.py index e5ee6fadea..2e5ca52151 100644 --- a/faststream/broker/subscriber/usecase.py +++ b/faststream/broker/subscriber/usecase.py @@ -21,6 +21,7 @@ from faststream.asyncapi.abc import AsyncAPIOperation from faststream.asyncapi.message import parse_handler_params from faststream.asyncapi.utils import to_camelcase +from faststream.broker.response import Response from faststream.broker.subscriber.call_item import HandlerItem from faststream.broker.subscriber.proto import SubscriberProto from faststream.broker.types import ( @@ -333,24 +334,29 @@ async def consume(self, msg: MsgType) -> Any: for m in middlewares: stack.push_async_exit(m.__aexit__) - result_msg = await h.call( - message=message, - # consumer middlewares - _extra_middlewares=(m.consume_scope for m in middlewares), + result_msg = Response( + await h.call( + message=message, + # consumer middlewares + _extra_middlewares=(m.consume_scope for m in middlewares), + ) ) + if not result_msg.correlation_id: + result_msg.correlation_id = message.correlation_id + for p in chain( self.__get_reponse_publisher(message), h.handler._publishers, ): await p.publish( - result_msg, - correlation_id=message.correlation_id, + result_msg.body, + **result_msg.as_publish_kwargs(), # publisher middlewares _extra_middlewares=(m.publish_scope for m in middlewares), ) - return result_msg + return result_msg.body # Suitable handler is not founded for m in middlewares: diff --git a/faststream/confluent/__init__.py b/faststream/confluent/__init__.py index 88f8705e0b..9566997b78 100644 --- a/faststream/confluent/__init__.py +++ b/faststream/confluent/__init__.py @@ -1,5 +1,6 @@ from faststream.confluent.annotations import KafkaMessage from faststream.confluent.broker import KafkaBroker +from faststream.confluent.response import KafkaResponse from faststream.confluent.router import KafkaPublisher, KafkaRoute, KafkaRouter from faststream.confluent.testing import TestKafkaBroker from faststream.testing.app import TestApp @@ -10,6 +11,7 @@ "KafkaRouter", "KafkaRoute", "KafkaPublisher", + "KafkaResponse", "TestKafkaBroker", "TestApp", ) diff --git a/faststream/confluent/response.py b/faststream/confluent/response.py new file mode 100644 index 0000000000..dc36bb6932 --- /dev/null +++ b/faststream/confluent/response.py @@ -0,0 +1,5 @@ +from faststream.broker.response import Response + + +class KafkaResponse(Response): + pass diff --git a/faststream/kafka/__init__.py b/faststream/kafka/__init__.py index c81b617033..dc5f5e3cf6 100644 --- a/faststream/kafka/__init__.py +++ b/faststream/kafka/__init__.py @@ -2,6 +2,7 @@ from faststream.kafka.annotations import KafkaMessage from faststream.kafka.broker import KafkaBroker +from faststream.kafka.response import KafkaResponse from faststream.kafka.router import KafkaPublisher, KafkaRoute, KafkaRouter from faststream.kafka.testing import TestKafkaBroker from faststream.testing.app import TestApp @@ -11,6 +12,7 @@ "KafkaMessage", "KafkaRouter", "KafkaRoute", + "KafkaResponse", "KafkaPublisher", "TestKafkaBroker", "TestApp", diff --git a/faststream/kafka/response.py b/faststream/kafka/response.py new file mode 100644 index 0000000000..dc36bb6932 --- /dev/null +++ b/faststream/kafka/response.py @@ -0,0 +1,5 @@ +from faststream.broker.response import Response + + +class KafkaResponse(Response): + pass diff --git a/faststream/nats/__init__.py b/faststream/nats/__init__.py index 72ba1a2876..bae483a17e 100644 --- a/faststream/nats/__init__.py +++ b/faststream/nats/__init__.py @@ -15,6 +15,7 @@ from faststream.nats.annotations import NatsMessage from faststream.nats.broker.broker import NatsBroker +from faststream.nats.response import NatsResponse from faststream.nats.router import NatsPublisher, NatsRoute, NatsRouter from faststream.nats.schemas import JStream, KvWatch, ObjWatch, PullSub from faststream.nats.testing import TestNatsBroker @@ -32,6 +33,7 @@ "NatsPublisher", "TestNatsBroker", "NatsMessage", + "NatsResponse", # Nats imports "ConsumerConfig", "DeliverPolicy", diff --git a/faststream/nats/response.py b/faststream/nats/response.py new file mode 100644 index 0000000000..6b77c7da25 --- /dev/null +++ b/faststream/nats/response.py @@ -0,0 +1,5 @@ +from faststream.broker.response import Response + + +class NatsResponse(Response): + pass diff --git a/faststream/rabbit/__init__.py b/faststream/rabbit/__init__.py index 7c05cb70c8..cfc152d4e0 100644 --- a/faststream/rabbit/__init__.py +++ b/faststream/rabbit/__init__.py @@ -1,5 +1,6 @@ from faststream.rabbit.annotations import RabbitMessage from faststream.rabbit.broker import RabbitBroker +from faststream.rabbit.response import RabbitResponse from faststream.rabbit.router import RabbitPublisher, RabbitRoute, RabbitRouter from faststream.rabbit.schemas import ( ExchangeType, @@ -17,6 +18,7 @@ "RabbitRouter", "RabbitRoute", "RabbitPublisher", + "RabbitResponse", "ExchangeType", "ReplyConfig", "RabbitExchange", diff --git a/faststream/rabbit/publisher/asyncapi.py b/faststream/rabbit/publisher/asyncapi.py index b18a968202..d5f785d7c1 100644 --- a/faststream/rabbit/publisher/asyncapi.py +++ b/faststream/rabbit/publisher/asyncapi.py @@ -32,7 +32,6 @@ class AsyncAPIPublisher(LogicPublisher): # or publisher: AsyncAPIPublisher = router.publisher(...) ``` - """ def get_name(self) -> str: diff --git a/faststream/rabbit/response.py b/faststream/rabbit/response.py new file mode 100644 index 0000000000..aea4457337 --- /dev/null +++ b/faststream/rabbit/response.py @@ -0,0 +1,5 @@ +from faststream.broker.response import Response + + +class RabbitResponse(Response): + pass diff --git a/faststream/rabbit/testing.py b/faststream/rabbit/testing.py index e15cbe2cb3..36f79ef60a 100644 --- a/faststream/rabbit/testing.py +++ b/faststream/rabbit/testing.py @@ -49,7 +49,7 @@ def create_publisher_fake_subscriber( publisher: AsyncAPIPublisher, ) -> "HandlerCallWrapper[Any, Any, Any]": sub = broker.subscriber( - queue=publisher.queue, + queue=publisher.routing, exchange=publisher.exchange, ) @@ -70,7 +70,7 @@ def remove_publisher_fake_subscriber( ) -> None: broker._subscribers.pop( AsyncAPISubscriber.get_routing_hash( - queue=publisher.queue, + queue=RabbitQueue.validate(publisher.routing), exchange=publisher.exchange, ), None, @@ -132,7 +132,7 @@ def build_message( priority=priority, correlation_id=correlation_id, expiration=expiration, - message_id=message_id, + message_id=message_id or gen_cor_id(), timestamp=timestamp, message_type=message_type, user_id=user_id, @@ -148,14 +148,21 @@ def build_message( header=ContentHeader( properties=spec.Basic.Properties( content_type=msg.content_type, - message_id=gen_cor_id(), headers=msg.headers, - reply_to=reply_to, + reply_to=msg.reply_to, + content_encoding=msg.content_encoding, + priority=msg.priority, + correlation_id=msg.correlation_id, + message_id=msg.message_id, + timestamp=msg.timestamp, + message_type=message_type, + user_id=msg.user_id, + app_id=msg.app_id, ) ), body=msg.body, channel=AsyncMock(), - ) + ), ) diff --git a/faststream/redis/__init__.py b/faststream/redis/__init__.py index a67f2843ac..7624747bf8 100644 --- a/faststream/redis/__init__.py +++ b/faststream/redis/__init__.py @@ -1,5 +1,6 @@ from faststream.redis.annotations import Redis, RedisMessage from faststream.redis.broker.broker import RedisBroker +from faststream.redis.response import RedisResponse from faststream.redis.router import RedisPublisher, RedisRoute, RedisRouter from faststream.redis.schemas import ListSub, PubSub, StreamSub from faststream.redis.testing import TestRedisBroker @@ -12,6 +13,7 @@ "RedisRoute", "RedisRouter", "RedisPublisher", + "RedisResponse", "TestRedisBroker", "TestApp", "PubSub", diff --git a/faststream/redis/publisher/usecase.py b/faststream/redis/publisher/usecase.py index 24309fcdb4..a887140d84 100644 --- a/faststream/redis/publisher/usecase.py +++ b/faststream/redis/publisher/usecase.py @@ -282,7 +282,6 @@ async def publish( # type: ignore[override] list_sub = ListSub.validate(list or self.list) reply_to = reply_to or self.reply_to - headers = headers or self.headers correlation_id = correlation_id or gen_cor_id() call: "AsyncFunc" = self._producer.publish @@ -301,7 +300,7 @@ async def publish( # type: ignore[override] list=list_sub.name, # basic args reply_to=reply_to, - headers=headers, + headers=headers or self.headers, correlation_id=correlation_id, # RPC args rpc=rpc, @@ -327,6 +326,10 @@ async def publish( # type: ignore[override] Optional[str], Doc("Has no real effect. Option to be compatible with original protocol."), ] = None, + headers: Annotated[ + Optional["AnyDict"], + Doc("Message headers to store metainformation."), + ] = None, # publisher specific _extra_middlewares: Annotated[ Iterable["PublisherMiddleware"], @@ -353,6 +356,7 @@ async def publish( # type: ignore[override] *message, list=list_sub.name, correlation_id=correlation_id, + headers=headers or self.headers, ) diff --git a/faststream/redis/response.py b/faststream/redis/response.py new file mode 100644 index 0000000000..d414b0ebe4 --- /dev/null +++ b/faststream/redis/response.py @@ -0,0 +1,5 @@ +from faststream.broker.response import Response + + +class RedisResponse(Response): + pass diff --git a/tests/brokers/base/publish.py b/tests/brokers/base/publish.py index 974b12d8cf..abf5f0022c 100644 --- a/tests/brokers/base/publish.py +++ b/tests/brokers/base/publish.py @@ -9,7 +9,7 @@ import pytest from pydantic import BaseModel -from faststream import BaseMiddleware +from faststream import BaseMiddleware, Context, Response from faststream._compat import dump_json, model_to_json from faststream.broker.core.usecase import BrokerUsecase @@ -175,6 +175,46 @@ async def handler(m: message_type): assert event.is_set() mock.assert_called_with(expected_message) + @pytest.mark.asyncio() + async def test_response( + self, + queue: str, + event: asyncio.Event, + mock: Mock, + ): + pub_broker = self.get_broker(apply_types=True) + + @pub_broker.subscriber(queue, **self.subscriber_kwargs) + @pub_broker.publisher(queue + "1") + async def m(): + return Response(1, headers={"custom": "1"}, correlation_id="1") + + @pub_broker.subscriber(queue + "1", **self.subscriber_kwargs) + async def m_next(msg=Context("message")): + event.set() + mock( + body=msg.body, + headers=msg.headers["custom"], + correlation_id=msg.correlation_id, + ) + + async with self.patch_broker(pub_broker) as br: + await br.start() + await asyncio.wait( + ( + asyncio.create_task(br.publish(None, queue)), + asyncio.create_task(event.wait()), + ), + timeout=self.timeout, + ) + + assert event.is_set() + mock.assert_called_with( + body=b"1", + correlation_id="1", + headers="1", + ) + @pytest.mark.asyncio() async def test_unwrap_dict( self, diff --git a/tests/brokers/rabbit/test_test_client.py b/tests/brokers/rabbit/test_test_client.py index e07cbd88c0..dde1383ff4 100644 --- a/tests/brokers/rabbit/test_test_client.py +++ b/tests/brokers/rabbit/test_test_client.py @@ -61,6 +61,18 @@ def subscriber(m): assert event.is_set() + async def test_respect_routing_key(self): + broker = self.get_broker() + + publisher = broker.publisher( + exchange=RabbitExchange("test", type=ExchangeType.TOPIC), routing_key="up" + ) + + async with TestRabbitBroker(broker): + await publisher.publish("Hi!") + + publisher.mock.assert_called_once_with("Hi!") + async def test_direct( self, queue: str, diff --git a/tests/brokers/test_response.py b/tests/brokers/test_response.py new file mode 100644 index 0000000000..3766b28f2b --- /dev/null +++ b/tests/brokers/test_response.py @@ -0,0 +1,25 @@ +from faststream.broker.response import Response + + +def test_raw_data(): + resp = Response(1) + assert resp.body == 1 + assert resp.headers == {} + + +def test_response_with_response_instance(): + resp = Response(Response(1, headers={"some": 1})) + assert resp.body == 1 + assert resp.headers == {"some": 1} + + +def test_headers_override(): + resp = Response(1, headers={"some": 1}) + resp.add_headers({"some": 2}) + assert resp.headers == {"some": 2} + + +def test_headers_with_default(): + resp = Response(1, headers={"some": 1}) + resp.add_headers({"some": 2}, override=False) + assert resp.headers == {"some": 1}