Skip to content

Commit

Permalink
tests: refactor Kafka tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed May 3, 2024
1 parent 58e11db commit d38b7a4
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 170 deletions.
73 changes: 38 additions & 35 deletions tests/brokers/base/consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ class BrokerConsumeTestcase:
def get_broker(self, broker: BrokerUsecase) -> BrokerUsecase[Any, Any]:
raise NotImplementedError

def patch_broker(self, broker: BrokerUsecase[Any, Any]) -> BrokerUsecase[Any, Any]:
return broker

async def test_consume(
self,
queue: str,
Expand All @@ -31,11 +34,11 @@ async def test_consume(
def subscriber(m):
event.set()

async with consume_broker:
await consume_broker.start()
async with self.patch_broker(consume_broker) as br:
await br.start()
await asyncio.wait(
(
asyncio.create_task(consume_broker.publish("hello", queue)),
asyncio.create_task(br.publish("hello", queue)),
asyncio.create_task(event.wait()),
),
timeout=self.timeout,
Expand All @@ -62,12 +65,12 @@ def subscriber(m):
else:
consume2.set()

async with consume_broker:
await consume_broker.start()
async with self.patch_broker(consume_broker) as br:
await br.start()
await asyncio.wait(
(
asyncio.create_task(consume_broker.publish("hello", queue)),
asyncio.create_task(consume_broker.publish("hello", queue + "1")),
asyncio.create_task(br.publish("hello", queue)),
asyncio.create_task(br.publish("hello", queue + "1")),
asyncio.create_task(consume.wait()),
asyncio.create_task(consume2.wait()),
),
Expand Down Expand Up @@ -96,12 +99,12 @@ async def handler(m):
else:
consume2.set()

async with consume_broker:
await consume_broker.start()
async with self.patch_broker(consume_broker) as br:
await br.start()
await asyncio.wait(
(
asyncio.create_task(consume_broker.publish("hello", queue)),
asyncio.create_task(consume_broker.publish("hello", queue)),
asyncio.create_task(br.publish("hello", queue)),
asyncio.create_task(br.publish("hello", queue)),
asyncio.create_task(consume.wait()),
asyncio.create_task(consume2.wait()),
),
Expand Down Expand Up @@ -134,12 +137,12 @@ def handler2(m):
mock.handler2()
consume2.set()

async with consume_broker:
await consume_broker.start()
async with self.patch_broker(consume_broker) as br:
await br.start()
await asyncio.wait(
(
asyncio.create_task(consume_broker.publish("hello", queue)),
asyncio.create_task(consume_broker.publish("hello", another_topic)),
asyncio.create_task(br.publish("hello", queue)),
asyncio.create_task(br.publish("hello", another_topic)),
asyncio.create_task(consume.wait()),
asyncio.create_task(consume2.wait()),
),
Expand Down Expand Up @@ -175,14 +178,12 @@ async def handler2(m):
mock.handler2(m)
consume2.set()

async with consume_broker:
await consume_broker.start()
async with self.patch_broker(consume_broker) as br:
await br.start()
await asyncio.wait(
(
asyncio.create_task(
consume_broker.publish({"msg": "hello"}, queue)
),
asyncio.create_task(consume_broker.publish("hello", queue)),
asyncio.create_task(br.publish({"msg": "hello"}, queue)),
asyncio.create_task(br.publish("hello", queue)),
asyncio.create_task(consume.wait()),
asyncio.create_task(consume2.wait()),
),
Expand Down Expand Up @@ -216,17 +217,19 @@ async def handler(m: Foo, dep: int = Depends(dependency), broker=Context()):
mock(m, dep, broker)
event.set()

await consume_broker.start()
await asyncio.wait(
(
asyncio.create_task(consume_broker.publish({"x": 1}, queue)),
asyncio.create_task(event.wait()),
),
timeout=self.timeout,
)
async with self.patch_broker(consume_broker) as br:
await br.start()

assert event.is_set()
mock.assert_called_once_with({"x": 1}, "100", consume_broker)
await asyncio.wait(
(
asyncio.create_task(br.publish({"x": 1}, queue)),
asyncio.create_task(event.wait()),
),
timeout=self.timeout,
)

assert event.is_set()
mock.assert_called_once_with({"x": 1}, "100", consume_broker)


@pytest.mark.asyncio()
Expand All @@ -246,17 +249,17 @@ def subscriber(m):
event.set()
raise StopConsume()

async with consume_broker:
await consume_broker.start()
async with self.patch_broker(consume_broker) as br:
await br.start()
await asyncio.wait(
(
asyncio.create_task(consume_broker.publish("hello", queue)),
asyncio.create_task(br.publish("hello", queue)),
asyncio.create_task(event.wait()),
),
timeout=self.timeout,
)
await asyncio.sleep(0.5)
await consume_broker.publish("hello", queue)
await br.publish("hello", queue)
await asyncio.sleep(0.5)

assert event.is_set()
Expand Down
65 changes: 34 additions & 31 deletions tests/brokers/base/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ class BrokerPublishTestcase:
def get_broker(self, apply_types: bool = False) -> BrokerUsecase[Any, Any]:
raise NotImplementedError

def patch_broker(self, broker: BrokerUsecase[Any, Any]) -> BrokerUsecase[Any, Any]:
return broker

@pytest.mark.asyncio()
@pytest.mark.parametrize(
("message", "message_type", "expected_message"),
Expand Down Expand Up @@ -122,12 +125,12 @@ async def handler(m: message_type, logger: Logger):
event.set()
mock(m)

async with pub_broker:
await pub_broker.start()
async with self.patch_broker(pub_broker) as br:
await br.start()

await asyncio.wait(
(
asyncio.create_task(pub_broker.publish(message, queue)),
asyncio.create_task(br.publish(message, queue)),
asyncio.create_task(event.wait()),
),
timeout=self.timeout,
Expand All @@ -150,11 +153,11 @@ async def m(a: int, b: int, logger: Logger):
event.set()
mock({"a": a, "b": b})

async with pub_broker:
await pub_broker.start()
async with self.patch_broker(pub_broker) as br:
await br.start()
await asyncio.wait(
(
asyncio.create_task(pub_broker.publish({"a": 1, "b": 1.0}, queue)),
asyncio.create_task(br.publish({"a": 1, "b": 1.0}, queue)),
asyncio.create_task(event.wait()),
),
timeout=self.timeout,
Expand Down Expand Up @@ -182,11 +185,11 @@ async def m(a: int, b: int, *args: Tuple[int, ...], logger: Logger):
event.set()
mock({"a": a, "b": b, "args": args})

async with pub_broker:
await pub_broker.start()
async with self.patch_broker(pub_broker) as br:
await br.start()
await asyncio.wait(
(
asyncio.create_task(pub_broker.publish([1, 1.0, 2.0, 3.0], queue)),
asyncio.create_task(br.publish([1, 1.0, 2.0, 3.0], queue)),
asyncio.create_task(event.wait()),
),
timeout=self.timeout,
Expand Down Expand Up @@ -214,11 +217,11 @@ async def resp(msg):
event.set()
mock(msg)

async with pub_broker:
await pub_broker.start()
async with self.patch_broker(pub_broker) as br:
await br.start()
await asyncio.wait(
(
asyncio.create_task(pub_broker.publish("", queue)),
asyncio.create_task(br.publish("", queue)),
asyncio.create_task(event.wait()),
),
timeout=self.timeout,
Expand Down Expand Up @@ -248,11 +251,11 @@ async def resp(msg):
event.set()
mock(msg)

async with pub_broker:
await pub_broker.start()
async with self.patch_broker(pub_broker) as br:
await br.start()
await asyncio.wait(
(
asyncio.create_task(pub_broker.publish("", queue)),
asyncio.create_task(br.publish("", queue)),
asyncio.create_task(event.wait()),
),
timeout=self.timeout,
Expand Down Expand Up @@ -281,11 +284,11 @@ async def resp(msg):
event.set()
mock(msg)

async with pub_broker:
await pub_broker.start()
async with self.patch_broker(pub_broker) as br:
await br.start()
await asyncio.wait(
(
asyncio.create_task(pub_broker.publish("", queue)),
asyncio.create_task(br.publish("", queue)),
asyncio.create_task(event.wait()),
),
timeout=self.timeout,
Expand Down Expand Up @@ -321,11 +324,11 @@ async def resp2(msg):
event2.set()
mock.resp2(msg)

async with pub_broker:
await pub_broker.start()
async with self.patch_broker(pub_broker) as br:
await br.start()
await asyncio.wait(
(
asyncio.create_task(pub_broker.publish("", queue)),
asyncio.create_task(br.publish("", queue)),
asyncio.create_task(event.wait()),
asyncio.create_task(event2.wait()),
),
Expand Down Expand Up @@ -368,12 +371,12 @@ async def resp():
consume2.set()
mock()

async with pub_broker:
await pub_broker.start()
async with self.patch_broker(pub_broker) as br:
await br.start()
await asyncio.wait(
(
asyncio.create_task(pub_broker.publish("", queue)),
asyncio.create_task(pub_broker.publish("", queue + "2")),
asyncio.create_task(br.publish("", queue)),
asyncio.create_task(br.publish("", queue + "2")),
asyncio.create_task(consume.wait()),
asyncio.create_task(consume2.wait()),
),
Expand Down Expand Up @@ -402,13 +405,13 @@ async def reply_handler(m):
async def handler(m):
return m

async with pub_broker:
await pub_broker.start()
async with self.patch_broker(pub_broker) as br:
await br.start()

await asyncio.wait(
(
asyncio.create_task(
pub_broker.publish("Hello!", queue, reply_to=queue + "reply")
br.publish("Hello!", queue, reply_to=queue + "reply")
),
asyncio.create_task(event.wait()),
),
Expand All @@ -432,10 +435,10 @@ async def handler(m):
event.set()
mock(m)

async with pub_broker:
await pub_broker.start()
async with self.patch_broker(pub_broker) as br:
await br.start()

pub = pub_broker.publisher(queue)
pub = br.publisher(queue)

await asyncio.wait(
(
Expand Down
27 changes: 15 additions & 12 deletions tests/brokers/base/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ class BrokerRPCTestcase:
def get_broker(self, apply_types: bool = False) -> BrokerUsecase[Any, Any]:
raise NotImplementedError

def patch_broker(self, broker: BrokerUsecase[Any, Any]) -> BrokerUsecase[Any, Any]:
return broker

@pytest.mark.asyncio()
async def test_rpc(self, queue: str):
rpc_broker = self.get_broker()
Expand All @@ -23,9 +26,9 @@ async def test_rpc(self, queue: str):
async def m(m): # pragma: no cover
return "1"

async with rpc_broker:
await rpc_broker.start()
r = await rpc_broker.publish("hello", queue, rpc_timeout=3, rpc=True)
async with self.patch_broker(rpc_broker) as br:
await br.start()
r = await br.publish("hello", queue, rpc_timeout=3, rpc=True)

assert r == "1"

Expand All @@ -37,11 +40,11 @@ async def test_rpc_timeout_raises(self, queue: str):
async def m(m): # pragma: no cover
await anyio.sleep(1)

async with rpc_broker:
await rpc_broker.start()
async with self.patch_broker(rpc_broker) as br:
await br.start()

with pytest.raises(TimeoutError): # pragma: no branch
await rpc_broker.publish(
await br.publish(
"hello",
queue,
rpc=True,
Expand All @@ -57,10 +60,10 @@ async def test_rpc_timeout_none(self, queue: str):
async def m(m): # pragma: no cover
await anyio.sleep(1)

async with rpc_broker:
await rpc_broker.start()
async with self.patch_broker(rpc_broker) as br:
await br.start()

r = await rpc_broker.publish(
r = await br.publish(
"hello",
queue,
rpc=True,
Expand Down Expand Up @@ -89,10 +92,10 @@ async def response_hanler(m: str):
async def m(m): # pragma: no cover
return "1"

async with rpc_broker:
await rpc_broker.start()
async with self.patch_broker(rpc_broker) as br:
await br.start()

await rpc_broker.publish("hello", queue, reply_to=reply_queue)
await br.publish("hello", queue, reply_to=reply_queue)

with timeout_scope(3, True):
await event.wait()
Expand Down
Loading

0 comments on commit d38b7a4

Please sign in to comment.