Skip to content

Commit

Permalink
fix #840: broker.publish respects middlewares
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Jan 11, 2024
1 parent 0b9d4f4 commit 0b8c7c9
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 10 deletions.
3 changes: 2 additions & 1 deletion faststream/broker/core/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class BrokerUsecase(

handlers: Mapping[Any, "BaseHandler[MsgType]"]
_publishers: Mapping[Any, "BasePublisher[MsgType]"]
middlewares: Sequence[Callable[[Any], "BaseMiddleware"]]

def __init__(
self,
Expand Down Expand Up @@ -120,7 +121,7 @@ def __init__(
Doc("Dependencies to apply to all broker subscribers"),
] = (),
middlewares: Annotated[
Sequence[Callable[[MsgType], "BaseMiddleware"]],
Sequence[Callable[[Any], "BaseMiddleware"]],
Doc("Middlewares to apply to all broker publishers/subscribers"),
] = (),
graceful_timeout: Annotated[
Expand Down
2 changes: 1 addition & 1 deletion faststream/broker/core/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ async def close_middlewares(
# TODO: need to test copy
result_to_send = result

for m_pub in chain(middlewares, h.middlewares):
for m_pub in middlewares:
result_to_send = await pub_stack.enter_async_context(
m_pub.publish_scope(result_to_send)
)
Expand Down
22 changes: 14 additions & 8 deletions faststream/nats/broker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import warnings
from contextlib import AsyncExitStack
from functools import partial
from types import TracebackType
from typing import (
Expand Down Expand Up @@ -48,7 +49,7 @@
from faststream.nats.security import parse_security
from faststream.nats.shared.logging import NatsLoggingMixin
from faststream.security import BaseSecurity
from faststream.types import AnyDict, DecodedMessage
from faststream.types import AnyDict, DecodedMessage, SendableMessage
from faststream.utils.context.repository import context

Subject: TypeAlias = str
Expand Down Expand Up @@ -437,21 +438,26 @@ def publisher( # type: ignore[override]
@override
async def publish( # type: ignore[override]
self,
message: SendableMessage,
*args: Any,
stream: Optional[str] = None,
**kwargs: Any,
) -> Optional[DecodedMessage]:
if stream is None:
assert self._producer, NOT_CONNECTED_YET # nosec B101
return await self._producer.publish(*args, **kwargs)

publisher = self._producer
else:
assert self._js_producer, NOT_CONNECTED_YET # nosec B101
return await self._js_producer.publish(
*args,
stream=stream,
**kwargs, # type: ignore[misc]
)
publisher = self._js_producer
kwargs["stream"] = stream

async with AsyncExitStack() as stack:
for m in self.middlewares:
message = await stack.enter_async_context(
m(None).publish_scope(message)
)

return await publisher.publish(message, *args, **kwargs)

def __set_publisher_producer(self, publisher: Publisher) -> None:
if publisher.stream is not None:
Expand Down

0 comments on commit 0b8c7c9

Please sign in to comment.