Skip to content

Commit

Permalink
Merge branch 'main' into feat/nats-new-inbox
Browse files Browse the repository at this point in the history
  • Loading branch information
maxalbert authored Jun 21, 2024
2 parents 0c2447a + a5aa00e commit 1f77c01
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions faststream/rabbit/publisher/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
)

import anyio
from aio_pika.abc import AbstractIncomingMessage
from typing_extensions import override

from faststream.broker.publisher.proto import ProducerProto
Expand All @@ -24,8 +23,8 @@

import aiormq
from aio_pika import IncomingMessage, RobustQueue
from aio_pika.abc import DateType, HeadersType, TimeoutType
from anyio.streams.memory import MemoryObjectReceiveStream
from aio_pika.abc import AbstractIncomingMessage, DateType, HeadersType, TimeoutType
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream

from faststream.broker.types import (
AsyncCallable,
Expand Down Expand Up @@ -198,12 +197,13 @@ def __init__(self, lock: "anyio.Lock", callback_queue: "RobustQueue") -> None:
self.queue = callback_queue

async def __aenter__(self) -> "MemoryObjectReceiveStream[IncomingMessage]":
send_response_stream: MemoryObjectSendStream[AbstractIncomingMessage]
receive_response_stream: MemoryObjectReceiveStream[AbstractIncomingMessage]

(
send_response_stream,
receive_response_stream,
) = anyio.create_memory_object_stream[AbstractIncomingMessage](
max_buffer_size=1
)
) = anyio.create_memory_object_stream(max_buffer_size=1)
await self.lock.acquire()

self.consumer_tag = await self.queue.consume(
Expand Down

0 comments on commit 1f77c01

Please sign in to comment.