From 0070f5c3384d0c07528b7a0134a7c0f30dd88a81 Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Sat, 18 May 2024 18:23:31 +0300 Subject: [PATCH] chore: solve main conflicts --- faststream/testing/broker.py | 4 +- tests/brokers/confluent/test_consume.py | 17 ++++--- tests/brokers/kafka/test_consume.py | 17 ++++--- tests/brokers/nats/test_consume.py | 2 +- tests/brokers/redis/test_consume.py | 67 ++++++++++++++++--------- 5 files changed, 66 insertions(+), 41 deletions(-) diff --git a/faststream/testing/broker.py b/faststream/testing/broker.py index c7d186d34c..f8925210a4 100644 --- a/faststream/testing/broker.py +++ b/faststream/testing/broker.py @@ -214,7 +214,9 @@ async def call_handler( if rpc: message_body, content_type = encode_message(result) - msg_to_publish = StreamMessage(raw_message=None, body=message_body, content_type=content_type) + msg_to_publish = StreamMessage( + raw_message=None, body=message_body, content_type=content_type + ) consumed_data = decode_message(msg_to_publish) return consumed_data diff --git a/tests/brokers/confluent/test_consume.py b/tests/brokers/confluent/test_consume.py index 89a6441f62..805b3a97f2 100644 --- a/tests/brokers/confluent/test_consume.py +++ b/tests/brokers/confluent/test_consume.py @@ -46,9 +46,14 @@ async def handler(msg): @pytest.mark.asyncio() async def test_consume_batch_headers( - self, mock, event: asyncio.Event, queue: str, full_broker: KafkaBroker + self, + mock, + event: asyncio.Event, + queue: str, ): - @full_broker.subscriber(queue, batch=True, **self.subscriber_kwargs) + consume_broker = self.get_broker(apply_types=True) + + @consume_broker.subscriber(queue, batch=True, **self.subscriber_kwargs) def subscriber(m, msg: KafkaMessage): check = all( ( @@ -60,14 +65,12 @@ def subscriber(m, msg: KafkaMessage): mock(check) event.set() - async with full_broker: - await full_broker.start() + async with self.patch_broker(consume_broker) as br: + await br.start() await asyncio.wait( ( - asyncio.create_task( - full_broker.publish("", queue, headers={"custom": "1"}) - ), + asyncio.create_task(br.publish("", queue, headers={"custom": "1"})), asyncio.create_task(event.wait()), ), timeout=self.timeout, diff --git a/tests/brokers/kafka/test_consume.py b/tests/brokers/kafka/test_consume.py index 097b810a75..2a7f57b888 100644 --- a/tests/brokers/kafka/test_consume.py +++ b/tests/brokers/kafka/test_consume.py @@ -40,9 +40,14 @@ async def handler(msg): @pytest.mark.asyncio() async def test_consume_batch_headers( - self, mock, event: asyncio.Event, queue: str, full_broker: KafkaBroker + self, + mock, + event: asyncio.Event, + queue: str, ): - @full_broker.subscriber(queue, batch=True) + consume_broker = self.get_broker(apply_types=True) + + @consume_broker.subscriber(queue, batch=True) def subscriber(m, msg: KafkaMessage): check = all( ( @@ -54,14 +59,12 @@ def subscriber(m, msg: KafkaMessage): mock(check) event.set() - async with full_broker: - await full_broker.start() + async with self.patch_broker(consume_broker) as br: + await br.start() await asyncio.wait( ( - asyncio.create_task( - full_broker.publish("", queue, headers={"custom": "1"}) - ), + asyncio.create_task(br.publish("", queue, headers={"custom": "1"})), asyncio.create_task(event.wait()), ), timeout=3, diff --git a/tests/brokers/nats/test_consume.py b/tests/brokers/nats/test_consume.py index ce484dec07..5318fb2a69 100644 --- a/tests/brokers/nats/test_consume.py +++ b/tests/brokers/nats/test_consume.py @@ -265,7 +265,7 @@ def subscriber(m, msg: NatsMessage): ) mock(check) event.set() - + async with self.patch_broker(consume_broker) as br: await br.start() await asyncio.wait( diff --git a/tests/brokers/redis/test_consume.py b/tests/brokers/redis/test_consume.py index 6a0b443514..071467c449 100644 --- a/tests/brokers/redis/test_consume.py +++ b/tests/brokers/redis/test_consume.py @@ -163,7 +163,9 @@ async def test_consume_list_batch_with_one( ): consume_broker = self.get_broker() - @consume_broker.subscriber(list=ListSub(queue, batch=True, polling_interval=0.01)) + @consume_broker.subscriber( + list=ListSub(queue, batch=True, polling_interval=0.01) + ) async def handler(msg): mock(msg) event.set() @@ -172,7 +174,7 @@ async def handler(msg): await br.start() await asyncio.wait( ( - asyncio.create_task(broker.publish("hi", list=queue)), + asyncio.create_task(br.publish("hi", list=queue)), asyncio.create_task(event.wait()), ), timeout=3, @@ -188,9 +190,11 @@ async def test_consume_list_batch_headers( event: asyncio.Event, mock, ): - consume_broker = self.get_broker() - - @full_broker.subscriber(list=ListSub(queue, batch=True, polling_interval=0.01)) + consume_broker = self.get_broker(apply_types=True) + + @consume_broker.subscriber( + list=ListSub(queue, batch=True, polling_interval=0.01) + ) def subscriber(m, msg: RedisMessage): check = all( ( @@ -203,19 +207,20 @@ def subscriber(m, msg: RedisMessage): mock(check) event.set() - await full_broker.start() - await asyncio.wait( - ( - asyncio.create_task( - full_broker.publish("", list=queue, headers={"custom": "1"}) + async with self.patch_broker(consume_broker) as br: + await br.start() + await asyncio.wait( + ( + asyncio.create_task( + br.publish("", list=queue, headers={"custom": "1"}) + ), + asyncio.create_task(event.wait()), ), - asyncio.create_task(event.wait()), - ), - timeout=3, - ) + timeout=3, + ) - assert event.is_set() - mock.assert_called_once_with(True) + assert event.is_set() + mock.assert_called_once_with(True) @pytest.mark.slow() async def test_consume_list_batch( @@ -226,7 +231,9 @@ async def test_consume_list_batch( msgs_queue = asyncio.Queue(maxsize=1) - @consume_broker.subscriber(list=ListSub(queue, batch=True, polling_interval=0.01)) + @consume_broker.subscriber( + list=ListSub(queue, batch=True, polling_interval=0.01) + ) async def handler(msg): await msgs_queue.put(msg) @@ -240,7 +247,7 @@ async def handler(msg): timeout=3, ) - assert [{1, "hi"}] == [set(r.result()) for r in result] + assert [{1, "hi"}] == [set(r.result()) for r in result] @pytest.mark.slow() async def test_consume_list_batch_complex( @@ -259,7 +266,9 @@ def __hash__(self): msgs_queue = asyncio.Queue(maxsize=1) - @consume_broker.subscriber(list=ListSub(queue, batch=True, polling_interval=0.01)) + @consume_broker.subscriber( + list=ListSub(queue, batch=True, polling_interval=0.01) + ) async def handler(msg: List[Data]): await msgs_queue.put(msg) @@ -284,7 +293,9 @@ async def test_consume_list_batch_native( msgs_queue = asyncio.Queue(maxsize=1) - @consume_broker.subscriber(list=ListSub(queue, batch=True, polling_interval=0.01)) + @consume_broker.subscriber( + list=ListSub(queue, batch=True, polling_interval=0.01) + ) async def handler(msg): await msgs_queue.put(msg) @@ -375,7 +386,9 @@ async def test_consume_stream_batch( ): consume_broker = self.get_broker() - @consume_broker.subscriber(stream=StreamSub(queue, polling_interval=10, batch=True)) + @consume_broker.subscriber( + stream=StreamSub(queue, polling_interval=10, batch=True) + ) async def handler(msg): mock(msg) event.set() @@ -401,7 +414,7 @@ async def test_consume_stream_batch_headers( mock, ): consume_broker = self.get_broker(apply_types=True) - + @consume_broker.subscriber( stream=StreamSub(queue, polling_interval=10, batch=True) ) @@ -416,7 +429,7 @@ def subscriber(m, msg: RedisMessage): ) mock(check) event.set() - + async with self.patch_broker(consume_broker) as br: await br.start() await asyncio.wait( @@ -446,7 +459,9 @@ class Data(BaseModel): msgs_queue = asyncio.Queue(maxsize=1) - @consume_broker.subscriber(stream=StreamSub(queue, polling_interval=10, batch=True)) + @consume_broker.subscriber( + stream=StreamSub(queue, polling_interval=10, batch=True) + ) async def handler(msg: List[Data]): await msgs_queue.put(msg) @@ -471,7 +486,9 @@ async def test_consume_stream_batch_native( ): consume_broker = self.get_broker() - @consume_broker.subscriber(stream=StreamSub(queue, polling_interval=10, batch=True)) + @consume_broker.subscriber( + stream=StreamSub(queue, polling_interval=10, batch=True) + ) async def handler(msg): mock(msg) event.set()