Skip to content

Commit

Permalink
Cover confluent asyncapi tests (#1279)
Browse files Browse the repository at this point in the history
* Add tests for confluent AsyncAPI

* Update tests to include confluent docs_src

* Cover confluent multiple_lifespan

* Update confluent fastapi tests
  • Loading branch information
kumaranvpl authored Mar 4, 2024
1 parent fd644b4 commit e37721f
Show file tree
Hide file tree
Showing 13 changed files with 1,038 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
app = FastStream(broker)


@broker.subscriber("test-topic")
@broker.subscriber("test-topic-confluent", auto_offset_reset="earliest")
async def handle(
name: str = Field(
..., examples=["John"], description="Registered user name"
Expand All @@ -18,3 +18,16 @@ async def handle(
):
assert name == "John"
assert user_id == 1


@broker.subscriber("test-confluent-wrong-fields", auto_offset_reset="earliest")
async def wrong_handle(
name: str = Field(
..., examples=["John"], description="Registered user name"
),
user_id: NonNegativeInt = Field(
..., examples=[1], description="Registered user id"
),
):
assert name == "John"
assert user_id == 1
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@

from faststream.confluent import TestKafkaBroker

from .pydantic_fields import broker, handle
from .pydantic_fields import broker, handle, wrong_handle


@pytest.mark.asyncio
async def test_handle():
async with TestKafkaBroker(broker, with_real=True) as br:
await br.publish({"name": "John", "user_id": 1}, topic="test-topic")
await handle.wait_call(timeout=3)
await br.publish({"name": "John", "user_id": 1}, topic="test-topic-confluent")
await handle.wait_call(timeout=10)
handle.mock.assert_called_once_with({"name": "John", "user_id": 1})

assert handle.mock is None
Expand All @@ -19,7 +19,7 @@ async def test_handle():
async def test_validation_error():
async with TestKafkaBroker(broker, with_real=True) as br:
with pytest.raises(ValidationError):
await br.publish("wrong message", topic="test-topic")
await handle.wait_call(timeout=3)
await br.publish("wrong message", topic="test-confluent-wrong-fields")
await wrong_handle.wait_call(timeout=10)

handle.mock.assert_called_once_with("wrong message")
wrong_handle.mock.assert_called_once_with("wrong message")
Empty file.
21 changes: 21 additions & 0 deletions tests/asyncapi/confluent/test_arguments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from faststream.asyncapi.generate import get_app_schema
from faststream.confluent import KafkaBroker
from tests.asyncapi.base.arguments import ArgumentsTestcase


class TestArguments(ArgumentsTestcase): # noqa: D101
broker_class = KafkaBroker

def test_subscriber_bindings(self):
broker = self.broker_class()

@broker.subscriber("test")
async def handle(msg):
...

schema = get_app_schema(self.build_app(broker)).to_jsonable()
key = tuple(schema["channels"].keys())[0] # noqa: RUF015

assert schema["channels"][key]["bindings"] == {
"kafka": {"bindingVersion": "0.4.0", "topic": "test"}
}
92 changes: 92 additions & 0 deletions tests/asyncapi/confluent/test_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from faststream import FastStream
from faststream.asyncapi.generate import get_app_schema
from faststream.asyncapi.schema import Tag
from faststream.confluent import KafkaBroker


def test_base():
schema = get_app_schema(
FastStream(
KafkaBroker(
"kafka:9092",
protocol="plaintext",
protocol_version="0.9.0",
description="Test description",
tags=(Tag(name="some-tag", description="experimental"),),
)
)
).to_jsonable()

assert schema == {
"asyncapi": "2.6.0",
"channels": {},
"components": {"messages": {}, "schemas": {}},
"defaultContentType": "application/json",
"info": {"description": "", "title": "FastStream", "version": "0.1.0"},
"servers": {
"development": {
"description": "Test description",
"protocol": "plaintext",
"protocolVersion": "0.9.0",
"tags": [{"description": "experimental", "name": "some-tag"}],
"url": "kafka:9092",
}
},
}


def test_multi():
schema = get_app_schema(
FastStream(KafkaBroker(["kafka:9092", "kafka:9093"]))
).to_jsonable()

assert schema == {
"asyncapi": "2.6.0",
"channels": {},
"components": {"messages": {}, "schemas": {}},
"defaultContentType": "application/json",
"info": {"description": "", "title": "FastStream", "version": "0.1.0"},
"servers": {
"Server1": {
"protocol": "kafka",
"protocolVersion": "auto",
"url": "kafka:9092",
},
"Server2": {
"protocol": "kafka",
"protocolVersion": "auto",
"url": "kafka:9093",
},
},
}


def test_custom():
schema = get_app_schema(
FastStream(
KafkaBroker(
["kafka:9092", "kafka:9093"],
asyncapi_url=["kafka:9094", "kafka:9095"],
)
)
).to_jsonable()

assert schema == {
"asyncapi": "2.6.0",
"channels": {},
"components": {"messages": {}, "schemas": {}},
"defaultContentType": "application/json",
"info": {"description": "", "title": "FastStream", "version": "0.1.0"},
"servers": {
"Server1": {
"protocol": "kafka",
"protocolVersion": "auto",
"url": "kafka:9094",
},
"Server2": {
"protocol": "kafka",
"protocolVersion": "auto",
"url": "kafka:9095",
},
},
}
42 changes: 42 additions & 0 deletions tests/asyncapi/confluent/test_fastapi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from typing import Type

from faststream.asyncapi.generate import get_app_schema
from faststream.confluent.fastapi import KafkaRouter
from faststream.confluent.test import TestKafkaBroker
from faststream.security import SASLPlaintext
from tests.asyncapi.base.arguments import FastAPICompatible
from tests.asyncapi.base.fastapi import FastAPITestCase
from tests.asyncapi.base.publisher import PublisherTestcase


class TestRouterArguments(FastAPITestCase, FastAPICompatible): # noqa: D101
broker_class: Type[KafkaRouter] = KafkaRouter
broker_wrapper = staticmethod(TestKafkaBroker)

def build_app(self, router):
return router


class TestRouterPublisher(PublisherTestcase): # noqa: D101
broker_class = KafkaRouter

def build_app(self, router):
return router


def test_fastapi_security_schema():
security = SASLPlaintext(username="user", password="pass", use_ssl=False)

broker = KafkaRouter("localhost:9092", security=security)

schema = get_app_schema(broker).to_jsonable()

assert schema["servers"]["development"] == {
"protocol": "kafka",
"protocolVersion": "auto",
"security": [{"user-password": []}],
"url": "localhost:9092",
}
assert schema["components"]["securitySchemes"] == {
"user-password": {"type": "userPassword"}
}
51 changes: 51 additions & 0 deletions tests/asyncapi/confluent/test_naming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from faststream import FastStream
from faststream.asyncapi.generate import get_app_schema
from faststream.confluent import KafkaBroker
from tests.asyncapi.base.naming import NamingTestCase


class TestNaming(NamingTestCase): # noqa: D101
broker_class = KafkaBroker

def test_base(self):
broker = self.broker_class()

@broker.subscriber("test")
async def handle():
...

schema = get_app_schema(FastStream(broker)).to_jsonable()

assert schema == {
"asyncapi": "2.6.0",
"defaultContentType": "application/json",
"info": {"title": "FastStream", "version": "0.1.0", "description": ""},
"servers": {
"development": {
"url": "localhost",
"protocol": "kafka",
"protocolVersion": "auto",
}
},
"channels": {
"test:Handle": {
"servers": ["development"],
"bindings": {"kafka": {"topic": "test", "bindingVersion": "0.4.0"}},
"subscribe": {
"message": {"$ref": "#/components/messages/test:Handle:Message"}
},
}
},
"components": {
"messages": {
"test:Handle:Message": {
"title": "test:Handle:Message",
"correlationId": {
"location": "$message.header#/correlation_id"
},
"payload": {"$ref": "#/components/schemas/EmptyPayload"},
}
},
"schemas": {"EmptyPayload": {"title": "EmptyPayload", "type": "null"}},
},
}
21 changes: 21 additions & 0 deletions tests/asyncapi/confluent/test_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from faststream.asyncapi.generate import get_app_schema
from faststream.confluent import KafkaBroker
from tests.asyncapi.base.publisher import PublisherTestcase


class TestArguments(PublisherTestcase): # noqa: D101
broker_class = KafkaBroker

def test_publisher_bindings(self):
broker = self.broker_class()

@broker.publisher("test")
async def handle(msg):
...

schema = get_app_schema(self.build_app(broker)).to_jsonable()
key = tuple(schema["channels"].keys())[0] # noqa: RUF015

assert schema["channels"][key]["bindings"] == {
"kafka": {"bindingVersion": "0.4.0", "topic": "test"}
}
85 changes: 85 additions & 0 deletions tests/asyncapi/confluent/test_router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from faststream import FastStream
from faststream.asyncapi.generate import get_app_schema
from faststream.confluent import KafkaBroker, KafkaRoute, KafkaRouter
from tests.asyncapi.base.arguments import ArgumentsTestcase
from tests.asyncapi.base.publisher import PublisherTestcase
from tests.asyncapi.base.router import RouterTestcase


class TestRouter(RouterTestcase): # noqa: D101
broker_class = KafkaBroker
router_class = KafkaRouter
route_class = KafkaRoute

def test_prefix(self):
broker = self.broker_class()

router = self.router_class(prefix="test_")

@router.subscriber("test")
async def handle(msg):
...

broker.include_router(router)

schema = get_app_schema(FastStream(broker)).to_jsonable()

assert schema == {
"asyncapi": "2.6.0",
"defaultContentType": "application/json",
"info": {"title": "FastStream", "version": "0.1.0", "description": ""},
"servers": {
"development": {
"url": "localhost",
"protocol": "kafka",
"protocolVersion": "auto",
}
},
"channels": {
"test_test:Handle": {
"servers": ["development"],
"bindings": {
"kafka": {"topic": "test_test", "bindingVersion": "0.4.0"}
},
"subscribe": {
"message": {
"$ref": "#/components/messages/test_test:Handle:Message"
}
},
}
},
"components": {
"messages": {
"test_test:Handle:Message": {
"title": "test_test:Handle:Message",
"correlationId": {
"location": "$message.header#/correlation_id"
},
"payload": {
"$ref": "#/components/schemas/Handle:Message:Payload"
},
}
},
"schemas": {
"Handle:Message:Payload": {"title": "Handle:Message:Payload"}
},
},
}


class TestRouterArguments(ArgumentsTestcase): # noqa: D101
broker_class = KafkaRouter

def build_app(self, router):
broker = KafkaBroker()
broker.include_router(router)
return FastStream(broker)


class TestRouterPublisher(PublisherTestcase): # noqa: D101
broker_class = KafkaRouter

def build_app(self, router):
broker = KafkaBroker()
broker.include_router(router)
return FastStream(broker)
Loading

0 comments on commit e37721f

Please sign in to comment.