diff --git a/faststream/rabbit/publisher/producer.py b/faststream/rabbit/publisher/producer.py index f7a4013bab..71e12ab427 100644 --- a/faststream/rabbit/publisher/producer.py +++ b/faststream/rabbit/publisher/producer.py @@ -9,7 +9,6 @@ ) import anyio -from aio_pika.abc import AbstractIncomingMessage from typing_extensions import override from faststream.broker.publisher.proto import ProducerProto @@ -24,8 +23,8 @@ import aiormq from aio_pika import IncomingMessage, RobustQueue - from aio_pika.abc import DateType, HeadersType, TimeoutType - from anyio.streams.memory import MemoryObjectReceiveStream + from aio_pika.abc import AbstractIncomingMessage, DateType, HeadersType, TimeoutType + from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from faststream.broker.types import ( AsyncCallable, @@ -198,12 +197,13 @@ def __init__(self, lock: "anyio.Lock", callback_queue: "RobustQueue") -> None: self.queue = callback_queue async def __aenter__(self) -> "MemoryObjectReceiveStream[IncomingMessage]": + send_response_stream: MemoryObjectSendStream[AbstractIncomingMessage] + receive_response_stream: MemoryObjectReceiveStream[AbstractIncomingMessage] + ( send_response_stream, receive_response_stream, - ) = anyio.create_memory_object_stream[AbstractIncomingMessage]( - max_buffer_size=1 - ) + ) = anyio.create_memory_object_stream(max_buffer_size=1) await self.lock.acquire() self.consumer_tag = await self.queue.consume(