diff --git a/docs/docs_src/getting_started/subscription/confluent/pydantic_fields.py b/docs/docs_src/getting_started/subscription/confluent/pydantic_fields.py index e5f76706ce..9dda99655c 100644 --- a/docs/docs_src/getting_started/subscription/confluent/pydantic_fields.py +++ b/docs/docs_src/getting_started/subscription/confluent/pydantic_fields.py @@ -7,7 +7,7 @@ app = FastStream(broker) -@broker.subscriber("test-topic") +@broker.subscriber("test-topic-confluent", auto_offset_reset="earliest") async def handle( name: str = Field( ..., examples=["John"], description="Registered user name" @@ -18,3 +18,16 @@ async def handle( ): assert name == "John" assert user_id == 1 + + +@broker.subscriber("test-confluent-wrong-fields", auto_offset_reset="earliest") +async def wrong_handle( + name: str = Field( + ..., examples=["John"], description="Registered user name" + ), + user_id: NonNegativeInt = Field( + ..., examples=[1], description="Registered user id" + ), +): + assert name == "John" + assert user_id == 1 diff --git a/docs/docs_src/getting_started/subscription/confluent/real_testing.py b/docs/docs_src/getting_started/subscription/confluent/real_testing.py index a749325d38..19d061c095 100644 --- a/docs/docs_src/getting_started/subscription/confluent/real_testing.py +++ b/docs/docs_src/getting_started/subscription/confluent/real_testing.py @@ -3,14 +3,14 @@ from faststream.confluent import TestKafkaBroker -from .pydantic_fields import broker, handle +from .pydantic_fields import broker, handle, wrong_handle @pytest.mark.asyncio async def test_handle(): async with TestKafkaBroker(broker, with_real=True) as br: - await br.publish({"name": "John", "user_id": 1}, topic="test-topic") - await handle.wait_call(timeout=3) + await br.publish({"name": "John", "user_id": 1}, topic="test-topic-confluent") + await handle.wait_call(timeout=10) handle.mock.assert_called_once_with({"name": "John", "user_id": 1}) assert handle.mock is None @@ -19,7 +19,7 @@ async def test_handle(): async def test_validation_error(): async with TestKafkaBroker(broker, with_real=True) as br: with pytest.raises(ValidationError): - await br.publish("wrong message", topic="test-topic") - await handle.wait_call(timeout=3) + await br.publish("wrong message", topic="test-confluent-wrong-fields") + await wrong_handle.wait_call(timeout=10) - handle.mock.assert_called_once_with("wrong message") + wrong_handle.mock.assert_called_once_with("wrong message") diff --git a/tests/asyncapi/confluent/__init__.py b/tests/asyncapi/confluent/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/asyncapi/confluent/test_arguments.py b/tests/asyncapi/confluent/test_arguments.py new file mode 100644 index 0000000000..1fb90a75d1 --- /dev/null +++ b/tests/asyncapi/confluent/test_arguments.py @@ -0,0 +1,21 @@ +from faststream.asyncapi.generate import get_app_schema +from faststream.confluent import KafkaBroker +from tests.asyncapi.base.arguments import ArgumentsTestcase + + +class TestArguments(ArgumentsTestcase): # noqa: D101 + broker_class = KafkaBroker + + def test_subscriber_bindings(self): + broker = self.broker_class() + + @broker.subscriber("test") + async def handle(msg): + ... + + schema = get_app_schema(self.build_app(broker)).to_jsonable() + key = tuple(schema["channels"].keys())[0] # noqa: RUF015 + + assert schema["channels"][key]["bindings"] == { + "kafka": {"bindingVersion": "0.4.0", "topic": "test"} + } diff --git a/tests/asyncapi/confluent/test_connection.py b/tests/asyncapi/confluent/test_connection.py new file mode 100644 index 0000000000..d37cefbfa7 --- /dev/null +++ b/tests/asyncapi/confluent/test_connection.py @@ -0,0 +1,92 @@ +from faststream import FastStream +from faststream.asyncapi.generate import get_app_schema +from faststream.asyncapi.schema import Tag +from faststream.confluent import KafkaBroker + + +def test_base(): + schema = get_app_schema( + FastStream( + KafkaBroker( + "kafka:9092", + protocol="plaintext", + protocol_version="0.9.0", + description="Test description", + tags=(Tag(name="some-tag", description="experimental"),), + ) + ) + ).to_jsonable() + + assert schema == { + "asyncapi": "2.6.0", + "channels": {}, + "components": {"messages": {}, "schemas": {}}, + "defaultContentType": "application/json", + "info": {"description": "", "title": "FastStream", "version": "0.1.0"}, + "servers": { + "development": { + "description": "Test description", + "protocol": "plaintext", + "protocolVersion": "0.9.0", + "tags": [{"description": "experimental", "name": "some-tag"}], + "url": "kafka:9092", + } + }, + } + + +def test_multi(): + schema = get_app_schema( + FastStream(KafkaBroker(["kafka:9092", "kafka:9093"])) + ).to_jsonable() + + assert schema == { + "asyncapi": "2.6.0", + "channels": {}, + "components": {"messages": {}, "schemas": {}}, + "defaultContentType": "application/json", + "info": {"description": "", "title": "FastStream", "version": "0.1.0"}, + "servers": { + "Server1": { + "protocol": "kafka", + "protocolVersion": "auto", + "url": "kafka:9092", + }, + "Server2": { + "protocol": "kafka", + "protocolVersion": "auto", + "url": "kafka:9093", + }, + }, + } + + +def test_custom(): + schema = get_app_schema( + FastStream( + KafkaBroker( + ["kafka:9092", "kafka:9093"], + asyncapi_url=["kafka:9094", "kafka:9095"], + ) + ) + ).to_jsonable() + + assert schema == { + "asyncapi": "2.6.0", + "channels": {}, + "components": {"messages": {}, "schemas": {}}, + "defaultContentType": "application/json", + "info": {"description": "", "title": "FastStream", "version": "0.1.0"}, + "servers": { + "Server1": { + "protocol": "kafka", + "protocolVersion": "auto", + "url": "kafka:9094", + }, + "Server2": { + "protocol": "kafka", + "protocolVersion": "auto", + "url": "kafka:9095", + }, + }, + } diff --git a/tests/asyncapi/confluent/test_fastapi.py b/tests/asyncapi/confluent/test_fastapi.py new file mode 100644 index 0000000000..1cfcaa3157 --- /dev/null +++ b/tests/asyncapi/confluent/test_fastapi.py @@ -0,0 +1,42 @@ +from typing import Type + +from faststream.asyncapi.generate import get_app_schema +from faststream.confluent.fastapi import KafkaRouter +from faststream.confluent.test import TestKafkaBroker +from faststream.security import SASLPlaintext +from tests.asyncapi.base.arguments import FastAPICompatible +from tests.asyncapi.base.fastapi import FastAPITestCase +from tests.asyncapi.base.publisher import PublisherTestcase + + +class TestRouterArguments(FastAPITestCase, FastAPICompatible): # noqa: D101 + broker_class: Type[KafkaRouter] = KafkaRouter + broker_wrapper = staticmethod(TestKafkaBroker) + + def build_app(self, router): + return router + + +class TestRouterPublisher(PublisherTestcase): # noqa: D101 + broker_class = KafkaRouter + + def build_app(self, router): + return router + + +def test_fastapi_security_schema(): + security = SASLPlaintext(username="user", password="pass", use_ssl=False) + + broker = KafkaRouter("localhost:9092", security=security) + + schema = get_app_schema(broker).to_jsonable() + + assert schema["servers"]["development"] == { + "protocol": "kafka", + "protocolVersion": "auto", + "security": [{"user-password": []}], + "url": "localhost:9092", + } + assert schema["components"]["securitySchemes"] == { + "user-password": {"type": "userPassword"} + } diff --git a/tests/asyncapi/confluent/test_naming.py b/tests/asyncapi/confluent/test_naming.py new file mode 100644 index 0000000000..423a0121fc --- /dev/null +++ b/tests/asyncapi/confluent/test_naming.py @@ -0,0 +1,51 @@ +from faststream import FastStream +from faststream.asyncapi.generate import get_app_schema +from faststream.confluent import KafkaBroker +from tests.asyncapi.base.naming import NamingTestCase + + +class TestNaming(NamingTestCase): # noqa: D101 + broker_class = KafkaBroker + + def test_base(self): + broker = self.broker_class() + + @broker.subscriber("test") + async def handle(): + ... + + schema = get_app_schema(FastStream(broker)).to_jsonable() + + assert schema == { + "asyncapi": "2.6.0", + "defaultContentType": "application/json", + "info": {"title": "FastStream", "version": "0.1.0", "description": ""}, + "servers": { + "development": { + "url": "localhost", + "protocol": "kafka", + "protocolVersion": "auto", + } + }, + "channels": { + "test:Handle": { + "servers": ["development"], + "bindings": {"kafka": {"topic": "test", "bindingVersion": "0.4.0"}}, + "subscribe": { + "message": {"$ref": "#/components/messages/test:Handle:Message"} + }, + } + }, + "components": { + "messages": { + "test:Handle:Message": { + "title": "test:Handle:Message", + "correlationId": { + "location": "$message.header#/correlation_id" + }, + "payload": {"$ref": "#/components/schemas/EmptyPayload"}, + } + }, + "schemas": {"EmptyPayload": {"title": "EmptyPayload", "type": "null"}}, + }, + } diff --git a/tests/asyncapi/confluent/test_publisher.py b/tests/asyncapi/confluent/test_publisher.py new file mode 100644 index 0000000000..1b8dac5d90 --- /dev/null +++ b/tests/asyncapi/confluent/test_publisher.py @@ -0,0 +1,21 @@ +from faststream.asyncapi.generate import get_app_schema +from faststream.confluent import KafkaBroker +from tests.asyncapi.base.publisher import PublisherTestcase + + +class TestArguments(PublisherTestcase): # noqa: D101 + broker_class = KafkaBroker + + def test_publisher_bindings(self): + broker = self.broker_class() + + @broker.publisher("test") + async def handle(msg): + ... + + schema = get_app_schema(self.build_app(broker)).to_jsonable() + key = tuple(schema["channels"].keys())[0] # noqa: RUF015 + + assert schema["channels"][key]["bindings"] == { + "kafka": {"bindingVersion": "0.4.0", "topic": "test"} + } diff --git a/tests/asyncapi/confluent/test_router.py b/tests/asyncapi/confluent/test_router.py new file mode 100644 index 0000000000..5655005e1f --- /dev/null +++ b/tests/asyncapi/confluent/test_router.py @@ -0,0 +1,85 @@ +from faststream import FastStream +from faststream.asyncapi.generate import get_app_schema +from faststream.confluent import KafkaBroker, KafkaRoute, KafkaRouter +from tests.asyncapi.base.arguments import ArgumentsTestcase +from tests.asyncapi.base.publisher import PublisherTestcase +from tests.asyncapi.base.router import RouterTestcase + + +class TestRouter(RouterTestcase): # noqa: D101 + broker_class = KafkaBroker + router_class = KafkaRouter + route_class = KafkaRoute + + def test_prefix(self): + broker = self.broker_class() + + router = self.router_class(prefix="test_") + + @router.subscriber("test") + async def handle(msg): + ... + + broker.include_router(router) + + schema = get_app_schema(FastStream(broker)).to_jsonable() + + assert schema == { + "asyncapi": "2.6.0", + "defaultContentType": "application/json", + "info": {"title": "FastStream", "version": "0.1.0", "description": ""}, + "servers": { + "development": { + "url": "localhost", + "protocol": "kafka", + "protocolVersion": "auto", + } + }, + "channels": { + "test_test:Handle": { + "servers": ["development"], + "bindings": { + "kafka": {"topic": "test_test", "bindingVersion": "0.4.0"} + }, + "subscribe": { + "message": { + "$ref": "#/components/messages/test_test:Handle:Message" + } + }, + } + }, + "components": { + "messages": { + "test_test:Handle:Message": { + "title": "test_test:Handle:Message", + "correlationId": { + "location": "$message.header#/correlation_id" + }, + "payload": { + "$ref": "#/components/schemas/Handle:Message:Payload" + }, + } + }, + "schemas": { + "Handle:Message:Payload": {"title": "Handle:Message:Payload"} + }, + }, + } + + +class TestRouterArguments(ArgumentsTestcase): # noqa: D101 + broker_class = KafkaRouter + + def build_app(self, router): + broker = KafkaBroker() + broker.include_router(router) + return FastStream(broker) + + +class TestRouterPublisher(PublisherTestcase): # noqa: D101 + broker_class = KafkaRouter + + def build_app(self, router): + broker = KafkaBroker() + broker.include_router(router) + return FastStream(broker) diff --git a/tests/asyncapi/confluent/test_security.py b/tests/asyncapi/confluent/test_security.py new file mode 100644 index 0000000000..80610ab053 --- /dev/null +++ b/tests/asyncapi/confluent/test_security.py @@ -0,0 +1,169 @@ +import ssl +from copy import deepcopy + +from faststream.app import FastStream +from faststream.asyncapi.generate import get_app_schema +from faststream.confluent import KafkaBroker +from faststream.security import ( + BaseSecurity, + SASLPlaintext, + SASLScram256, + SASLScram512, +) + +basic_schema = { + "asyncapi": "2.6.0", + "channels": { + "test_1:TestTopic": { + "bindings": {"kafka": {"bindingVersion": "0.4.0", "topic": "test_1"}}, + "servers": ["development"], + "subscribe": { + "message": {"$ref": "#/components/messages/test_1:TestTopic:Message"} + }, + }, + "test_2:Publisher": { + "bindings": {"kafka": {"bindingVersion": "0.4.0", "topic": "test_2"}}, + "publish": { + "message": {"$ref": "#/components/messages/test_2:Publisher:Message"} + }, + "servers": ["development"], + }, + }, + "components": { + "messages": { + "test_1:TestTopic:Message": { + "correlationId": {"location": "$message.header#/correlation_id"}, + "payload": {"$ref": "#/components/schemas/TestTopic:Message:Payload"}, + "title": "test_1:TestTopic:Message", + }, + "test_2:Publisher:Message": { + "correlationId": {"location": "$message.header#/correlation_id"}, + "payload": { + "$ref": "#/components/schemas/test_2:Publisher:Message:Payload" + }, + "title": "test_2:Publisher:Message", + }, + }, + "schemas": { + "TestTopic:Message:Payload": { + "title": "TestTopic:Message:Payload", + "type": "string", + }, + "test_2:Publisher:Message:Payload": { + "title": "test_2:Publisher:Message:Payload", + "type": "string", + }, + }, + "securitySchemes": {}, + }, + "defaultContentType": "application/json", + "info": {"description": "", "title": "FastStream", "version": "0.1.0"}, + "servers": { + "development": { + "protocol": "kafka-secure", + "protocolVersion": "auto", + "security": [], + "url": "localhost:9092", + } + }, +} + + +def test_base_security_schema(): + ssl_context = ssl.create_default_context() + security = BaseSecurity(ssl_context=ssl_context) + + broker = KafkaBroker("localhost:9092", security=security) + app = FastStream(broker) + + @broker.publisher("test_2") + @broker.subscriber("test_1") + async def test_topic(msg: str) -> str: + pass + + schema = get_app_schema(app).to_jsonable() + + assert schema == basic_schema + + +def test_plaintext_security_schema(): + ssl_context = ssl.create_default_context() + security = SASLPlaintext( + ssl_context=ssl_context, + username="admin", + password="password", # pragma: allowlist secret + ) + + broker = KafkaBroker("localhost:9092", security=security) + app = FastStream(broker) + + @broker.publisher("test_2") + @broker.subscriber("test_1") + async def test_topic(msg: str) -> str: + pass + + schema = get_app_schema(app).to_jsonable() + + plaintext_security_schema = deepcopy(basic_schema) + plaintext_security_schema["servers"]["development"]["security"] = [ + {"user-password": []} + ] + plaintext_security_schema["components"]["securitySchemes"] = { + "user-password": {"type": "userPassword"} + } + + assert schema == plaintext_security_schema + + +def test_scram256_security_schema(): + ssl_context = ssl.create_default_context() + security = SASLScram256( + ssl_context=ssl_context, + username="admin", + password="password", # pragma: allowlist secret + ) + + broker = KafkaBroker("localhost:9092", security=security) + app = FastStream(broker) + + @broker.publisher("test_2") + @broker.subscriber("test_1") + async def test_topic(msg: str) -> str: + pass + + schema = get_app_schema(app).to_jsonable() + + sasl256_security_schema = deepcopy(basic_schema) + sasl256_security_schema["servers"]["development"]["security"] = [{"scram256": []}] + sasl256_security_schema["components"]["securitySchemes"] = { + "scram256": {"type": "scramSha256"} + } + + assert schema == sasl256_security_schema + + +def test_scram512_security_schema(): + ssl_context = ssl.create_default_context() + security = SASLScram512( + ssl_context=ssl_context, + username="admin", + password="password", # pragma: allowlist secret + ) + + broker = KafkaBroker("localhost:9092", security=security) + app = FastStream(broker) + + @broker.publisher("test_2") + @broker.subscriber("test_1") + async def test_topic(msg: str) -> str: + pass + + schema = get_app_schema(app).to_jsonable() + + sasl512_security_schema = deepcopy(basic_schema) + sasl512_security_schema["servers"]["development"]["security"] = [{"scram512": []}] + sasl512_security_schema["components"]["securitySchemes"] = { + "scram512": {"type": "scramSha512"} + } + + assert schema == sasl512_security_schema diff --git a/tests/brokers/confluent/test_fastapi.py b/tests/brokers/confluent/test_fastapi.py index 41744dcd02..bdd9a34001 100644 --- a/tests/brokers/confluent/test_fastapi.py +++ b/tests/brokers/confluent/test_fastapi.py @@ -1,20 +1,527 @@ +import asyncio +from contextlib import asynccontextmanager +from typing import Callable, List, Type, TypeVar +from unittest.mock import Mock + import pytest +from fastapi import Depends, FastAPI, Header +from fastapi.exceptions import RequestValidationError +from fastapi.testclient import TestClient -from faststream.kafka.fastapi import KafkaRouter -from faststream.kafka.test import TestKafkaBroker, build_message -from tests.brokers.base.fastapi import FastAPILocalTestcase, FastAPITestcase +from faststream import context +from faststream.broker.core.asynchronous import BrokerAsyncUsecase +from faststream.broker.fastapi.context import Context +from faststream.broker.fastapi.router import StreamRouter +from faststream.confluent.fastapi import KafkaRouter +from faststream.confluent.test import TestKafkaBroker, build_message +from faststream.types import AnyCallable +Broker = TypeVar("Broker", bound=BrokerAsyncUsecase) -@pytest.mark.confluent() -class TestRabbitRouter(FastAPITestcase): - """A class to represent a test Kafka broker.""" +@pytest.mark.asyncio() +class FastAPITestcase: # noqa: D101 + router_class: Type[StreamRouter[BrokerAsyncUsecase]] + + async def test_base_real(self, mock: Mock, queue: str, event: asyncio.Event): + router = self.router_class() + + @router.subscriber(queue, auto_offset_reset="earliest") + async def hello(msg): + event.set() + return mock(msg) + + async with router.broker: + await router.broker.start() + await asyncio.wait( + ( + asyncio.create_task(router.broker.publish("hi", queue)), + asyncio.create_task(event.wait()), + ), + timeout=10, + ) + + assert event.is_set() + mock.assert_called_with("hi") + + async def test_context(self, mock: Mock, queue: str, event: asyncio.Event): + router = self.router_class() + + context_key = "message.headers" + + @router.subscriber(queue, auto_offset_reset="earliest") + async def hello(msg=Context(context_key)): + event.set() + return mock(msg == context.resolve(context_key)) + + async with router.broker: + await router.broker.start() + await asyncio.wait( + ( + asyncio.create_task(router.broker.publish("", queue)), + asyncio.create_task(event.wait()), + ), + timeout=10, + ) + + assert event.is_set() + mock.assert_called_with(True) + + async def test_initial_context(self, queue: str, event: asyncio.Event): + router = self.router_class() + + @router.subscriber(queue, auto_offset_reset="earliest") + async def hello(msg: int, data=Context(queue, initial=set)): + data.add(msg) + if len(data) == 2: + event.set() + + async with router.broker: + await router.broker.start() + await asyncio.wait( + ( + asyncio.create_task(router.broker.publish(1, queue)), + asyncio.create_task(router.broker.publish(2, queue)), + asyncio.create_task(event.wait()), + ), + timeout=10, + ) + + assert event.is_set() + assert context.get(queue) == {1, 2} + context.reset_global(queue) + + async def test_double_real(self, mock: Mock, queue: str, event: asyncio.Event): + event2 = asyncio.Event() + router = self.router_class() + + @router.subscriber(queue, auto_offset_reset="earliest") + @router.subscriber(queue + "2", auto_offset_reset="earliest") + async def hello(msg): + if event.is_set(): + event2.set() + else: + event.set() + mock() + + async with router.broker: + await router.broker.start() + await asyncio.wait( + ( + asyncio.create_task(router.broker.publish("hi", queue)), + asyncio.create_task(router.broker.publish("hi", queue + "2")), + asyncio.create_task(event.wait()), + asyncio.create_task(event2.wait()), + ), + timeout=10, + ) + + assert event.is_set() + assert event2.is_set() + assert mock.call_count == 2 + + async def test_base_publisher_real( + self, mock: Mock, queue: str, event: asyncio.Event + ): + router = self.router_class() + + @router.subscriber(queue, auto_offset_reset="earliest") + @router.publisher(queue + "resp") + async def m(): + return "hi" + + @router.subscriber(queue + "resp", auto_offset_reset="earliest") + async def resp(msg): + event.set() + mock(msg) + + async with router.broker: + await router.broker.start() + await asyncio.wait( + ( + asyncio.create_task(router.broker.publish("", queue)), + asyncio.create_task(event.wait()), + ), + timeout=10, + ) + + assert event.is_set() + mock.assert_called_once_with("hi") + + +@pytest.mark.asyncio() +class FastAPILocalTestcase: # noqa: D101 + router_class: Type[StreamRouter[BrokerAsyncUsecase]] + broker_test: Callable[[Broker], Broker] + build_message: AnyCallable + + async def test_base(self, queue: str): + router = self.router_class() + + app = FastAPI(lifespan=router.lifespan_context) + + @router.subscriber(queue, auto_offset_reset="earliest") + async def hello(): + return "hi" + + async with self.broker_test(router.broker): + with TestClient(app) as client: + assert client.app_state["broker"] is router.broker + + r = await router.broker.publish( + "hi", + queue, + rpc=True, + rpc_timeout=0.5, + ) + assert r == "hi" + + async def test_base_without_state(self, queue: str): + router = self.router_class(setup_state=False) + + app = FastAPI(lifespan=router.lifespan_context) + + @router.subscriber(queue, auto_offset_reset="earliest") + async def hello(): + return "hi" + + async with self.broker_test(router.broker): + with TestClient(app) as client: + assert not client.app_state.get("broker") + + r = await router.broker.publish( + "hi", + queue, + rpc=True, + rpc_timeout=0.5, + ) + assert r == "hi" + + async def test_invalid(self, queue: str): + router = self.router_class() + + app = FastAPI(lifespan=router.lifespan_context) + + @router.subscriber(queue, auto_offset_reset="earliest") + async def hello(msg: int): + ... + + app.include_router(router) + + async with self.broker_test(router.broker): + with TestClient(app): + with pytest.raises(RequestValidationError): + await router.broker.publish("hi", queue) + + async def test_headers(self, queue: str): + router = self.router_class() + + @router.subscriber(queue, auto_offset_reset="earliest") + async def hello(w=Header()): + return w + + async with self.broker_test(router.broker): + r = await router.broker.publish( + "", + queue, + headers={"w": "hi"}, + rpc=True, + rpc_timeout=0.5, + ) + assert r == "hi" + + async def test_depends(self, mock: Mock, queue: str): + router = self.router_class() + + def dep(a): + mock(a) + return a + + @router.subscriber(queue, auto_offset_reset="earliest") + async def hello(a, w=Depends(dep)): + return w + + async with self.broker_test(router.broker): + r = await router.broker.publish( + {"a": "hi"}, + queue, + rpc=True, + rpc_timeout=0.5, + ) + assert r == "hi" + + mock.assert_called_once_with("hi") + + async def test_yield_depends(self, mock: Mock, queue: str): + router = self.router_class() + + def dep(a): + mock.start() + yield a + mock.close() + + @router.subscriber(queue, auto_offset_reset="earliest") + async def hello(a, w=Depends(dep)): + mock.start.assert_called_once() + assert not mock.close.call_count + return w + + async with self.broker_test(router.broker): + r = await router.broker.publish( + {"a": "hi"}, + queue, + rpc=True, + rpc_timeout=0.5, + ) + assert r == "hi" + + mock.start.assert_called_once() + mock.close.assert_called_once() + + async def test_router_depends(self, mock: Mock, queue: str): + def mock_dep(): + mock() + + router = self.router_class(dependencies=(Depends(mock_dep, use_cache=False),)) + + @router.subscriber(queue, auto_offset_reset="earliest") + async def hello(a): + return a + + async with self.broker_test(router.broker): + r = await router.broker.publish("hi", queue, rpc=True, rpc_timeout=0.5) + assert r == "hi" + + mock.assert_called_once() + + async def test_subscriber_depends(self, mock: Mock, queue: str): + def mock_dep(): + mock() + + router = self.router_class() + + @router.subscriber(queue, auto_offset_reset="earliest", dependencies=(Depends(mock_dep, use_cache=False),)) + async def hello(a): + return a + + async with self.broker_test(router.broker): + r = await router.broker.publish( + "hi", + queue, + rpc=True, + rpc_timeout=0.5, + ) + assert r == "hi" + + mock.assert_called_once() + + async def test_hooks(self, mock: Mock): + router = self.router_class() + + app = FastAPI(lifespan=router.lifespan_context) + app.include_router(router) + + @router.after_startup + def test_sync(app): + mock.sync_called() + return {"sync_called": mock.async_called.called is False} + + @router.after_startup + async def test_async(app): + mock.async_called() + return {"async_called": mock.sync_called.called} + + @router.on_broker_shutdown + def test_shutdown_sync(app): + mock.sync_shutdown_called() + + @router.on_broker_shutdown + async def test_shutdown_async(app): + mock.async_shutdown_called() + + async with self.broker_test(router.broker), router.lifespan_context( + app + ) as context: + assert context["sync_called"] + assert context["async_called"] + + mock.sync_called.assert_called_once() + mock.async_called.assert_called_once() + mock.sync_shutdown_called.assert_called_once() + mock.async_shutdown_called.assert_called_once() + + async def test_existed_lifespan_startup(self, mock: Mock): + @asynccontextmanager + async def lifespan(app): + mock.start() + yield {"lifespan": True} + mock.close() + + router = self.router_class(lifespan=lifespan) + + app = FastAPI(lifespan=router.lifespan_context) + app.include_router(router) + + async with self.broker_test(router.broker), router.lifespan_context( + app + ) as context: + assert context["lifespan"] + + mock.start.assert_called_once() + mock.close.assert_called_once() + + async def test_subscriber_mock(self, queue: str): + router = self.router_class() + + @router.subscriber(queue, auto_offset_reset="earliest") + async def m(): + return "hi" + + async with self.broker_test(router.broker) as rb: + await rb.publish("hello", queue) + m.mock.assert_called_once_with("hello") + + async def test_publisher_mock(self, queue: str): + router = self.router_class() + + publisher = router.publisher(queue + "resp") + + @publisher + @router.subscriber(queue, auto_offset_reset="earliest") + async def m(): + return "response" + + async with self.broker_test(router.broker) as rb: + await rb.publish("hello", queue) + publisher.mock.assert_called_with("response") + + async def test_include(self, queue: str): + router = self.router_class() + router2 = self.router_class() + + app = FastAPI(lifespan=router.lifespan_context) + + @router.subscriber(queue, auto_offset_reset="earliest") + async def hello(): + return "hi" + + @router2.subscriber(queue + "1", auto_offset_reset="earliest") + async def hello_router2(): + return "hi" + + router.include_router(router2) + + async with self.broker_test(router.broker): + with TestClient(app) as client: + assert client.app_state["broker"] is router.broker + + r = await router.broker.publish( + "hi", + queue, + rpc=True, + rpc_timeout=0.5, + ) + assert r == "hi" + + r = await router.broker.publish( + "hi", + queue + "1", + rpc=True, + rpc_timeout=0.5, + ) + assert r == "hi" + + async def test_dependency_overrides(self, mock: Mock, queue: str): + router = self.router_class() + router2 = self.router_class() + + def dep1(): + mock.not_call() + pass + + app = FastAPI(lifespan=router.lifespan_context) + app.dependency_overrides[dep1] = lambda: mock() + + @router2.subscriber(queue, auto_offset_reset="earliest") + async def hello_router2(dep=Depends(dep1)): + return "hi" + + router.include_router(router2) + app.include_router(router) + + async with self.broker_test(router.broker): + with TestClient(app) as client: + assert client.app_state["broker"] is router.broker + + r = await router.broker.publish( + "hi", + queue, + rpc=True, + rpc_timeout=0.5, + ) + assert r == "hi" + + mock.assert_called_once() + assert not mock.not_call.called + + +@pytest.mark.confluent() +class TestRabbitRouter(FastAPITestcase): # noqa: D101 router_class = KafkaRouter + async def test_batch_real( + self, + mock: Mock, + queue: str, + event: asyncio.Event, + ): + router = KafkaRouter() + + @router.subscriber(queue, batch=True, auto_offset_reset="earliest") + async def hello(msg: List[str]): + event.set() + return mock(msg) -class TestRouterLocal(FastAPILocalTestcase): - """A class to represent a test Kafka broker.""" + async with router.broker: + await router.broker.start() + await asyncio.wait( + ( + asyncio.create_task(router.broker.publish("hi", queue)), + asyncio.create_task(event.wait()), + ), + timeout=10, + ) + assert event.is_set() + mock.assert_called_with(["hi"]) + + +class TestRouterLocal(FastAPILocalTestcase): # noqa: D101 router_class = KafkaRouter broker_test = staticmethod(TestKafkaBroker) build_message = staticmethod(build_message) + + async def test_batch_testclient( + self, + mock: Mock, + queue: str, + event: asyncio.Event, + ): + router = KafkaRouter() + + @router.subscriber(queue, batch=True, auto_offset_reset="earliest") + async def hello(msg: List[str]): + event.set() + return mock(msg) + + async with TestKafkaBroker(router.broker): + await asyncio.wait( + ( + asyncio.create_task(router.broker.publish("hi", queue)), + asyncio.create_task(event.wait()), + ), + timeout=10, + ) + + assert event.is_set() + mock.assert_called_with(["hi"]) diff --git a/tests/docs/getting_started/subscription/test_real.py b/tests/docs/getting_started/subscription/test_real.py index 4aba3ec33f..0592da5c09 100644 --- a/tests/docs/getting_started/subscription/test_real.py +++ b/tests/docs/getting_started/subscription/test_real.py @@ -1,5 +1,11 @@ import pytest +from docs.docs_src.getting_started.subscription.confluent.real_testing import ( + test_handle as test_handle_confluent, +) +from docs.docs_src.getting_started.subscription.confluent.real_testing import ( + test_validation_error as test_validation_error_confluent, +) from docs.docs_src.getting_started.subscription.kafka.real_testing import ( test_handle as test_handle_k, ) @@ -28,6 +34,9 @@ pytest.mark.kafka(test_handle_k) pytest.mark.kafka(test_validation_error_k) +pytest.mark.kafka(test_handle_confluent) +pytest.mark.kafka(test_validation_error_confluent) + pytest.mark.rabbit(test_handle_r) pytest.mark.rabbit(test_validation_error_r) diff --git a/tests/docs/integration/fastapi/test_multiple_lifespan.py b/tests/docs/integration/fastapi/test_multiple_lifespan.py index a68b1367c4..b34bad88f7 100644 --- a/tests/docs/integration/fastapi/test_multiple_lifespan.py +++ b/tests/docs/integration/fastapi/test_multiple_lifespan.py @@ -41,6 +41,19 @@ def data(self): return (app, core_router, nested_router) +@pytest.mark.confluent() +class TestConfluent(BaseCase): # noqa: D101 + @pytest.fixture(scope="class") + def data(self): + from docs.docs_src.integrations.fastapi.confluent.multiple_lifespan import ( + app, + core_router, + nested_router, + ) + + return (app, core_router, nested_router) + + @pytest.mark.nats() class TestNats(BaseCase): # noqa: D101 @pytest.fixture(scope="class")