Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #1412 with TestKafkaBroker behaviour where Consumer Groups weren't being respected #1413

Merged
merged 15 commits into from
May 25, 2024

Conversation

sifex
Copy link
Contributor

@sifex sifex commented May 1, 2024

Description

This fixes bug #1412 where distribution of messages on a topic does not get distributed to each consumer group when running the TestKafkaBroker.

Fixes #1412

Example:
This example should have 2 messages received to 2 subscribers.

@test_broker.subscriber(queue, group_id="group1")
async def subscriber1(): ...

@test_broker.subscriber(queue, group_id="group2")
async def subscriber2(): ...

assert subscriber1.mock.call_count == 1
assert subscriber2.mock.call_count == 1

Where as if we have the same group_id, we only expect to receive it once.

@test_broker.subscriber(queue, group_id="same_group")
async def subscriber1(): ...

@test_broker.subscriber(queue, group_id="same_group")
async def subscriber2(): ...

assert subscriber1.mock.call_count == 1
assert subscriber2.mock.call_count == 0

Other fixes

  • It also fixes a small bug in DOCKER_COMPOSE_PROJECT project name generation where the users' username field contains a . character (which docker believes to be invalid naming).
  • Typo fix in .github/PULL_REQUEST_TEMPLATE.md

Type of change

  • Bug fix (a non-breaking change that resolves an issue)

Checklist

  • My code adheres to the style guidelines of this project (scripts/lint.sh shows no errors)
  • I have conducted a self-review of my own code
  • I have made the necessary changes to the documentation
  • My changes do not generate any new warnings
  • I have added tests to validate the effectiveness of my fix or the functionality of my new feature
  • Both new and existing unit tests pass successfully on my local environment by running scripts/test-cov.sh (No. Currently Segfaults on Local M1 MBP, will rely on GHA)
  • I have ensured that static analysis tests are passing by running scripts/static-anaylysis.sh
  • I have included code examples to illustrate the modifications

@sifex
Copy link
Contributor Author

sifex commented May 1, 2024

Currently facing an issue where the RPC test assumes you can get a response back from a publisher? Not sure why this assumption was made.

@pytest.mark.asyncio()
    async def test_rpc(self, queue: str, rpc_broker: BrokerUsecase):
        @rpc_broker.subscriber(queue)
        async def m(m):  # pragma: no cover
            return "1"
    
        async with rpc_broker:
            await rpc_broker.start()
            r = await rpc_broker.publish("hello", queue, rpc_timeout=3, rpc=True)
    
>       assert r == "1"
E       AssertionError

imo I think publishers by definition can't return anything other than a ReceiveAck / similar response from the message queue – also not sure if I misunderstand the use case for RPC in this context.

@Lancetnik
Copy link
Member

Currently facing an issue where the RPC test assumes you can get a response back from a publisher? Not sure why this assumption was made.

@pytest.mark.asyncio()
    async def test_rpc(self, queue: str, rpc_broker: BrokerUsecase):
        @rpc_broker.subscriber(queue)
        async def m(m):  # pragma: no cover
            return "1"
    
        async with rpc_broker:
            await rpc_broker.start()
            r = await rpc_broker.publish("hello", queue, rpc_timeout=3, rpc=True)
    
>       assert r == "1"
E       AssertionError

imo I think publishers by definition can't return anything other than a ReceiveAck / similar response from the message queue – also not sure if I misunderstand the use case for RPC in this context.

RPC in TestClient allows you to validate your handler result without special publisher creation. I think, we should save this behavior. In you case, if user has the only last result - it should be OK for most cases: (the following pseudocode represents assuming bahavior)

result = None
for sub in subscribers:
    result = call_subscriber(...)
return result

raise_timeout=raise_timeout,
)
for consumer_group in set(
{sub.group_id for sub in self.broker._subscribers.values()}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like O(n**2) in this case. Can we find any decision without iteration throw all subscribers twice?

Copy link
Member

@Lancetnik Lancetnik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please fix the comments and upload main changes to make it mergeable?

@sifex sifex force-pushed the kafka-test-broker-fix branch 2 times, most recently from 5469bae to a03f4ea Compare May 18, 2024 15:44
@sifex sifex force-pushed the kafka-test-broker-fix branch from 9cb00e7 to 0d760ce Compare May 18, 2024 17:26
@sifex
Copy link
Contributor Author

sifex commented May 18, 2024

Should be good to go, applied ruff formatting + linting.

@sifex sifex requested a review from Lancetnik May 18, 2024 22:05
@Lancetnik
Copy link
Member

I'll check it and solve conflicts with main soon. Thank you for the work!

Lancetnik
Lancetnik previously approved these changes May 20, 2024
Copy link
Member

@Lancetnik Lancetnik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sifex looks really great, but now I can't understand, why it respects consumer group😆

Anyway, can you, please add test for this behavior for publish_batch method? Then we can merge it

@Lancetnik Lancetnik added this pull request to the merge queue May 25, 2024
Merged via the queue into airtai:main with commit 3d5f241 May 25, 2024
31 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Kafka Test Subscriber doesn't match Consumer Group behaviour
2 participants