From 5469bae67b28d437e7e436655c4cd1cb36e6f46f Mon Sep 17 00:00:00 2001 From: Alex Date: Sat, 18 May 2024 16:43:35 +0100 Subject: [PATCH] Merged changes for #1413 --- .github/PULL_REQUEST_TEMPLATE.md | 2 +- faststream/confluent/testing.py | 26 +++++---- faststream/kafka/testing.py | 3 +- scripts/set_variables.sh | 2 +- tests/brokers/confluent/test_test_client.py | 62 ++++++++++++++++----- tests/brokers/kafka/test_test_client.py | 37 ++++++++++++ 6 files changed, 104 insertions(+), 28 deletions(-) diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index 93f6f4cabc..e5333e3e48 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -22,5 +22,5 @@ Please delete options that are not relevant. - [ ] My changes do not generate any new warnings - [ ] I have added tests to validate the effectiveness of my fix or the functionality of my new feature - [ ] Both new and existing unit tests pass successfully on my local environment by running `scripts/test-cov.sh` -- [ ] I have ensured that static analysis tests are passing by running `scripts/static-anaylysis.sh` +- [ ] I have ensured that static analysis tests are passing by running `scripts/static-analysis.sh` - [ ] I have included code examples to illustrate the modifications diff --git a/faststream/confluent/testing.py b/faststream/confluent/testing.py index 9420ff3aa5..8c01c89465 100644 --- a/faststream/confluent/testing.py +++ b/faststream/confluent/testing.py @@ -98,17 +98,21 @@ async def publish( # type: ignore[override] reply_to=reply_to, ) - for handler in self.broker._subscribers.values(): # pragma: no branch - if topic in handler.topics: - return await call_handler( - handler=handler, - message=[incoming] - if isinstance(handler, AsyncAPIBatchSubscriber) - else incoming, - rpc=rpc, - rpc_timeout=rpc_timeout, - raise_timeout=raise_timeout, - ) + for consumer_group in set( + {sub.group_id for sub in self.broker._subscribers.values()} + ): + for handler in self.broker._subscribers.values(): # pragma: no branch + if topic in handler.topics and consumer_group == handler.group_id: + await call_handler( + handler=handler, + message=[incoming] + if isinstance(handler, AsyncAPIBatchSubscriber) + else incoming, + rpc=rpc, + rpc_timeout=rpc_timeout, + raise_timeout=raise_timeout, + ) + break return None diff --git a/faststream/kafka/testing.py b/faststream/kafka/testing.py index fd8b520332..d835634179 100755 --- a/faststream/kafka/testing.py +++ b/faststream/kafka/testing.py @@ -1,6 +1,5 @@ from datetime import datetime -from typing import TYPE_CHECKING, Any, Callable, Dict, Optional -from unittest.mock import AsyncMock, MagicMock +from typing import TYPE_CHECKING, Any, Dict, Optional from aiokafka import ConsumerRecord from typing_extensions import override diff --git a/scripts/set_variables.sh b/scripts/set_variables.sh index e2c0c9531c..ef662336bb 100755 --- a/scripts/set_variables.sh +++ b/scripts/set_variables.sh @@ -9,7 +9,7 @@ echo AIRT_PROJECT variable set to $AIRT_PROJECT export UID=$(id -u) export GID=$(id -g) -export DOCKER_COMPOSE_PROJECT="${USER}-faststream" +export DOCKER_COMPOSE_PROJECT="${USER//./_}-faststream" echo DOCKER_COMPOSE_PROJECT variable set to $DOCKER_COMPOSE_PROJECT export KAFKA_HOSTNAME="${DOCKER_COMPOSE_PROJECT}-kafka-1" echo KAFKA_HOSTNAME variable set to $KAFKA_HOSTNAME diff --git a/tests/brokers/confluent/test_test_client.py b/tests/brokers/confluent/test_test_client.py index d70d2fda6d..82daa8adac 100644 --- a/tests/brokers/confluent/test_test_client.py +++ b/tests/brokers/confluent/test_test_client.py @@ -13,10 +13,10 @@ class TestTestclient(BrokerTestclientTestcase): @pytest.mark.confluent() async def test_with_real_testclient( - self, - broker: KafkaBroker, - queue: str, - event: asyncio.Event, + self, + broker: KafkaBroker, + queue: str, + event: asyncio.Event, ): @broker.subscriber(queue, auto_offset_reset="earliest") def subscriber(m): @@ -34,9 +34,9 @@ def subscriber(m): assert event.is_set() async def test_batch_pub_by_default_pub( - self, - test_broker: KafkaBroker, - queue: str, + self, + test_broker: KafkaBroker, + queue: str, ): @test_broker.subscriber(queue, batch=True, auto_offset_reset="earliest") async def m(): @@ -47,9 +47,9 @@ async def m(): m.mock.assert_called_once_with(["hello"]) async def test_batch_pub_by_pub_batch( - self, - test_broker: KafkaBroker, - queue: str, + self, + test_broker: KafkaBroker, + queue: str, ): @test_broker.subscriber(queue, batch=True, auto_offset_reset="earliest") async def m(): @@ -60,9 +60,9 @@ async def m(): m.mock.assert_called_once_with(["hello"]) async def test_batch_publisher_mock( - self, - test_broker: KafkaBroker, - queue: str, + self, + test_broker: KafkaBroker, + queue: str, ): publisher = test_broker.publisher(queue + "1", batch=True) @@ -122,3 +122,39 @@ async def h2(): ... await h2.wait_call(10) assert len(routes) == 2 + + @pytest.mark.confluent() + async def test_multiple_subscribers_different_groups( + self, + queue: str, + test_broker: KafkaBroker, + ): + @test_broker.subscriber(queue, group_id="group1") + async def subscriber1(): ... + + @test_broker.subscriber(queue, group_id="group2") + async def subscriber2(): ... + + await test_broker.start() + await test_broker.publish("", queue) + + assert subscriber1.mock.call_count == 1 + assert subscriber2.mock.call_count == 1 + + @pytest.mark.confluent() + async def test_multiple_subscribers_same_group( + self, + queue: str, + test_broker: KafkaBroker, + ): + @test_broker.subscriber(queue, group_id="group1") + async def subscriber1(): ... + + @test_broker.subscriber(queue, group_id="group1") + async def subscriber2(): ... + + await test_broker.start() + await test_broker.publish("", queue) + + assert subscriber1.mock.call_count == 1 + assert subscriber2.mock.call_count == 0 diff --git a/tests/brokers/kafka/test_test_client.py b/tests/brokers/kafka/test_test_client.py index cc128921ed..bf830ff7eb 100644 --- a/tests/brokers/kafka/test_test_client.py +++ b/tests/brokers/kafka/test_test_client.py @@ -164,3 +164,40 @@ async def h2(): ... await h2.wait_call(3) assert len(routes) == 2 + + @pytest.mark.kafka() + async def test_multiple_subscribers_different_groups( + self, + queue: str, + test_broker: KafkaBroker, + ): + @test_broker.subscriber(queue, group_id="group1") + async def subscriber1(): ... + + @test_broker.subscriber(queue, group_id="group2") + async def subscriber2(): ... + + await test_broker.start() + await test_broker.publish("", queue) + + assert subscriber1.mock.call_count == 1 + assert subscriber2.mock.call_count == 1 + + @pytest.mark.kafka() + async def test_multiple_subscribers_same_group( + self, + queue: str, + test_broker: KafkaBroker, + ): + @test_broker.subscriber(queue, group_id="group1") + async def subscriber1(): ... + + @test_broker.subscriber(queue, group_id="group1") + async def subscriber2(): ... + + await test_broker.start() + await test_broker.publish("", queue) + + assert subscriber1.mock.call_count == 1 + assert subscriber2.mock.call_count == 0 +