Skip to content

Commit

Permalink
chore: solve main conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed May 18, 2024
1 parent 915ad47 commit 0070f5c
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 41 deletions.
4 changes: 3 additions & 1 deletion faststream/testing/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,9 @@ async def call_handler(

if rpc:
message_body, content_type = encode_message(result)
msg_to_publish = StreamMessage(raw_message=None, body=message_body, content_type=content_type)
msg_to_publish = StreamMessage(
raw_message=None, body=message_body, content_type=content_type
)
consumed_data = decode_message(msg_to_publish)
return consumed_data

Expand Down
17 changes: 10 additions & 7 deletions tests/brokers/confluent/test_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,14 @@ async def handler(msg):

@pytest.mark.asyncio()
async def test_consume_batch_headers(
self, mock, event: asyncio.Event, queue: str, full_broker: KafkaBroker
self,
mock,
event: asyncio.Event,
queue: str,
):
@full_broker.subscriber(queue, batch=True, **self.subscriber_kwargs)
consume_broker = self.get_broker(apply_types=True)

@consume_broker.subscriber(queue, batch=True, **self.subscriber_kwargs)
def subscriber(m, msg: KafkaMessage):
check = all(
(
Expand All @@ -60,14 +65,12 @@ def subscriber(m, msg: KafkaMessage):
mock(check)
event.set()

async with full_broker:
await full_broker.start()
async with self.patch_broker(consume_broker) as br:
await br.start()

await asyncio.wait(
(
asyncio.create_task(
full_broker.publish("", queue, headers={"custom": "1"})
),
asyncio.create_task(br.publish("", queue, headers={"custom": "1"})),
asyncio.create_task(event.wait()),
),
timeout=self.timeout,
Expand Down
17 changes: 10 additions & 7 deletions tests/brokers/kafka/test_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,14 @@ async def handler(msg):

@pytest.mark.asyncio()
async def test_consume_batch_headers(
self, mock, event: asyncio.Event, queue: str, full_broker: KafkaBroker
self,
mock,
event: asyncio.Event,
queue: str,
):
@full_broker.subscriber(queue, batch=True)
consume_broker = self.get_broker(apply_types=True)

@consume_broker.subscriber(queue, batch=True)
def subscriber(m, msg: KafkaMessage):
check = all(
(
Expand All @@ -54,14 +59,12 @@ def subscriber(m, msg: KafkaMessage):
mock(check)
event.set()

async with full_broker:
await full_broker.start()
async with self.patch_broker(consume_broker) as br:
await br.start()

await asyncio.wait(
(
asyncio.create_task(
full_broker.publish("", queue, headers={"custom": "1"})
),
asyncio.create_task(br.publish("", queue, headers={"custom": "1"})),
asyncio.create_task(event.wait()),
),
timeout=3,
Expand Down
2 changes: 1 addition & 1 deletion tests/brokers/nats/test_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ def subscriber(m, msg: NatsMessage):
)
mock(check)
event.set()

async with self.patch_broker(consume_broker) as br:
await br.start()
await asyncio.wait(
Expand Down
67 changes: 42 additions & 25 deletions tests/brokers/redis/test_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ async def test_consume_list_batch_with_one(
):
consume_broker = self.get_broker()

@consume_broker.subscriber(list=ListSub(queue, batch=True, polling_interval=0.01))
@consume_broker.subscriber(
list=ListSub(queue, batch=True, polling_interval=0.01)
)
async def handler(msg):
mock(msg)
event.set()
Expand All @@ -172,7 +174,7 @@ async def handler(msg):
await br.start()
await asyncio.wait(
(
asyncio.create_task(broker.publish("hi", list=queue)),
asyncio.create_task(br.publish("hi", list=queue)),
asyncio.create_task(event.wait()),
),
timeout=3,
Expand All @@ -188,9 +190,11 @@ async def test_consume_list_batch_headers(
event: asyncio.Event,
mock,
):
consume_broker = self.get_broker()

@full_broker.subscriber(list=ListSub(queue, batch=True, polling_interval=0.01))
consume_broker = self.get_broker(apply_types=True)

@consume_broker.subscriber(
list=ListSub(queue, batch=True, polling_interval=0.01)
)
def subscriber(m, msg: RedisMessage):
check = all(
(
Expand All @@ -203,19 +207,20 @@ def subscriber(m, msg: RedisMessage):
mock(check)
event.set()

await full_broker.start()
await asyncio.wait(
(
asyncio.create_task(
full_broker.publish("", list=queue, headers={"custom": "1"})
async with self.patch_broker(consume_broker) as br:
await br.start()
await asyncio.wait(
(
asyncio.create_task(
br.publish("", list=queue, headers={"custom": "1"})
),
asyncio.create_task(event.wait()),
),
asyncio.create_task(event.wait()),
),
timeout=3,
)
timeout=3,
)

assert event.is_set()
mock.assert_called_once_with(True)
assert event.is_set()
mock.assert_called_once_with(True)

@pytest.mark.slow()
async def test_consume_list_batch(
Expand All @@ -226,7 +231,9 @@ async def test_consume_list_batch(

msgs_queue = asyncio.Queue(maxsize=1)

@consume_broker.subscriber(list=ListSub(queue, batch=True, polling_interval=0.01))
@consume_broker.subscriber(
list=ListSub(queue, batch=True, polling_interval=0.01)
)
async def handler(msg):
await msgs_queue.put(msg)

Expand All @@ -240,7 +247,7 @@ async def handler(msg):
timeout=3,
)

assert [{1, "hi"}] == [set(r.result()) for r in result]
assert [{1, "hi"}] == [set(r.result()) for r in result]

@pytest.mark.slow()
async def test_consume_list_batch_complex(
Expand All @@ -259,7 +266,9 @@ def __hash__(self):

msgs_queue = asyncio.Queue(maxsize=1)

@consume_broker.subscriber(list=ListSub(queue, batch=True, polling_interval=0.01))
@consume_broker.subscriber(
list=ListSub(queue, batch=True, polling_interval=0.01)
)
async def handler(msg: List[Data]):
await msgs_queue.put(msg)

Expand All @@ -284,7 +293,9 @@ async def test_consume_list_batch_native(

msgs_queue = asyncio.Queue(maxsize=1)

@consume_broker.subscriber(list=ListSub(queue, batch=True, polling_interval=0.01))
@consume_broker.subscriber(
list=ListSub(queue, batch=True, polling_interval=0.01)
)
async def handler(msg):
await msgs_queue.put(msg)

Expand Down Expand Up @@ -375,7 +386,9 @@ async def test_consume_stream_batch(
):
consume_broker = self.get_broker()

@consume_broker.subscriber(stream=StreamSub(queue, polling_interval=10, batch=True))
@consume_broker.subscriber(
stream=StreamSub(queue, polling_interval=10, batch=True)
)
async def handler(msg):
mock(msg)
event.set()
Expand All @@ -401,7 +414,7 @@ async def test_consume_stream_batch_headers(
mock,
):
consume_broker = self.get_broker(apply_types=True)

@consume_broker.subscriber(
stream=StreamSub(queue, polling_interval=10, batch=True)
)
Expand All @@ -416,7 +429,7 @@ def subscriber(m, msg: RedisMessage):
)
mock(check)
event.set()

async with self.patch_broker(consume_broker) as br:
await br.start()
await asyncio.wait(
Expand Down Expand Up @@ -446,7 +459,9 @@ class Data(BaseModel):

msgs_queue = asyncio.Queue(maxsize=1)

@consume_broker.subscriber(stream=StreamSub(queue, polling_interval=10, batch=True))
@consume_broker.subscriber(
stream=StreamSub(queue, polling_interval=10, batch=True)
)
async def handler(msg: List[Data]):
await msgs_queue.put(msg)

Expand All @@ -471,7 +486,9 @@ async def test_consume_stream_batch_native(
):
consume_broker = self.get_broker()

@consume_broker.subscriber(stream=StreamSub(queue, polling_interval=10, batch=True))
@consume_broker.subscriber(
stream=StreamSub(queue, polling_interval=10, batch=True)
)
async def handler(msg):
mock(msg)
event.set()
Expand Down

0 comments on commit 0070f5c

Please sign in to comment.