From 1237edb2120700513380021b77f8e8ecef564b9b Mon Sep 17 00:00:00 2001 From: Kumaran Rajendhiran Date: Sat, 25 May 2024 13:43:45 +0530 Subject: [PATCH] Pass logger to confluent producer and consumer (#1464) * Pass logger to confluent producer and consumer * Add test to check consumer_logger * Update func signature * Update function signature * User LoggerProto and remove Annotated * Use only logger in signature * Use LoggerProto as type --- .codespell-whitelist.txt | 2 +- .secrets.baseline | 6 +-- docs/docs/en/nats/jetstream/key-value.md | 2 +- docs/docs/en/nats/jetstream/object.md | 2 +- docs/docs/en/release.md | 24 +++++------ faststream/confluent/broker/broker.py | 2 + faststream/confluent/client.py | 19 +++++++- tests/brokers/confluent/test_logger.py | 55 ++++++++++++++++++++++++ 8 files changed, 92 insertions(+), 20 deletions(-) create mode 100644 tests/brokers/confluent/test_logger.py diff --git a/.codespell-whitelist.txt b/.codespell-whitelist.txt index 6b1a432b87..dcfed576bf 100644 --- a/.codespell-whitelist.txt +++ b/.codespell-whitelist.txt @@ -1 +1 @@ -dependant \ No newline at end of file +dependant diff --git a/.secrets.baseline b/.secrets.baseline index 4c3829ee62..5ceae71388 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -128,7 +128,7 @@ "filename": "docs/docs/en/release.md", "hashed_secret": "35675e68f4b5af7b995d9205ad0fc43842f16450", "is_verified": false, - "line_number": 1079, + "line_number": 1269, "is_secret": false } ], @@ -138,7 +138,7 @@ "filename": "examples/e10_middlewares.py", "hashed_secret": "35675e68f4b5af7b995d9205ad0fc43842f16450", "is_verified": false, - "line_number": 33, + "line_number": 35, "is_secret": false } ], @@ -163,5 +163,5 @@ } ] }, - "generated_at": "2024-04-23T11:41:19Z" + "generated_at": "2024-05-24T07:31:48Z" } diff --git a/docs/docs/en/nats/jetstream/key-value.md b/docs/docs/en/nats/jetstream/key-value.md index 2ca8d70add..0e579bce5d 100644 --- a/docs/docs/en/nats/jetstream/key-value.md +++ b/docs/docs/en/nats/jetstream/key-value.md @@ -54,4 +54,4 @@ from faststream.nats import NatsBroker, KvWatch ) async def handler(msg: str): ... -``` \ No newline at end of file +``` diff --git a/docs/docs/en/nats/jetstream/object.md b/docs/docs/en/nats/jetstream/object.md index 33aa1055df..9d21914cfc 100644 --- a/docs/docs/en/nats/jetstream/object.md +++ b/docs/docs/en/nats/jetstream/object.md @@ -65,4 +65,4 @@ from faststream.nats import NatsBroker, ObjWatch ) async def handler(filename: str): ... -``` \ No newline at end of file +``` diff --git a/docs/docs/en/release.md b/docs/docs/en/release.md index 41192308ac..c6546bcd3d 100644 --- a/docs/docs/en/release.md +++ b/docs/docs/en/release.md @@ -32,37 +32,37 @@ hide: This is the time for a new **NATS** features! **FastStream** supports **NATS Key-Value** and **Object Storage** subscribption features in a native way now (big thx for @sheldygg)! 1. KeyValue creation and watching API added (you can read updated [documentation section](https://faststream.airt.ai/latest/nats/jetstream/key-value/) for changes): - + ```python from faststream import FastStream, Logger from faststream.nats import NatsBroker - + broker = NatsBroker() app = FastStream(broker) - + @broker.subscriber("some-key", kv_watch="bucket") async def handler(msg: int, logger: Logger): logger.info(msg) - + @app.after_startup async def test(): kv = await broker.key_value("bucket") await kv.put("some-key", b"1") ``` - + 2. ObjectStore API added as well (you can read updated [documentation section](https://faststream.airt.ai/latest/nats/jetstream/object/) for changes): ```python from faststream import FastStream, Logger from faststream.nats import NatsBroker - + broker = NatsBroker() app = FastStream(broker) - + @broker.subscriber("file-bucket", obj_watch=True) async def handler(filename: str, logger: Logger): logger.info(filename) - + @app.after_startup async def test(): object_store = await broker.object_storage("file-bucket") @@ -74,22 +74,22 @@ This is the time for a new **NATS** features! **FastStream** supports **NATS Key ```python from faststream import FastStream, Logger from faststream.nats import NatsBroker - + broker = NatsBroker() app = FastStream(broker) - + @broker.subscriber("test", stream="stream", pull_sub=True) async def handler(msg, logger: Logger): logger.info(msg) ``` - + Finally, we have a new feature, related to all brokers: special flag to suppress automatic RPC and reply_to responses: ```python @broker.subscriber("tests", no_reply=True) async def handler(): .... - + # will fail with timeout, because there is no automatic response msg = await broker.publish("msg", "test", rpc=True) ``` diff --git a/faststream/confluent/broker/broker.py b/faststream/confluent/broker/broker.py index 30c97ae298..960b2606ad 100644 --- a/faststream/confluent/broker/broker.py +++ b/faststream/confluent/broker/broker.py @@ -448,6 +448,7 @@ async def _connect( # type: ignore[override] producer = AsyncConfluentProducer( **kwargs, client_id=client_id, + logger=self.logger, ) self._producer = AsyncConfluentFastProducer( @@ -457,6 +458,7 @@ async def _connect( # type: ignore[override] return partial( AsyncConfluentConsumer, **filter_by_dict(ConsumerConnectionParams, kwargs), + logger=self.logger, ) async def start(self) -> None: diff --git a/faststream/confluent/client.py b/faststream/confluent/client.py index 4744f9b8d8..f1703c3694 100644 --- a/faststream/confluent/client.py +++ b/faststream/confluent/client.py @@ -2,6 +2,7 @@ from ssl import SSLContext from time import time from typing import ( + TYPE_CHECKING, Any, Callable, Dict, @@ -16,10 +17,14 @@ from confluent_kafka import Consumer, KafkaError, KafkaException, Message, Producer from confluent_kafka.admin import AdminClient, NewTopic from pydantic import BaseModel +from typing_extensions import Annotated, Doc from faststream.log import logger from faststream.utils.functions import call_or_await +if TYPE_CHECKING: + from faststream.types import LoggerProto + _missing = object() @@ -105,7 +110,12 @@ def __init__( sasl_kerberos_service_name: str = "kafka", sasl_kerberos_domain_name: Optional[str] = None, sasl_oauth_token_provider: Optional[str] = None, + logger: Annotated[ + Union["LoggerProto", None, object], + Doc("User specified logger to pass into Context and log service messages."), + ] = logger, ) -> None: + self.logger = logger if isinstance(bootstrap_servers, Iterable) and not isinstance( bootstrap_servers, str ): @@ -145,7 +155,7 @@ def __init__( } ) - self.producer = Producer(self.config) + self.producer = Producer(self.config, logger=self.logger) # self.producer.init_transactions() self.producer.list_topics() self.loop = loop or asyncio.get_event_loop() @@ -295,7 +305,12 @@ def __init__( sasl_kerberos_service_name: str = "kafka", sasl_kerberos_domain_name: Optional[str] = None, sasl_oauth_token_provider: Optional[str] = None, + logger: Annotated[ + Union["LoggerProto", None, object], + Doc("User specified logger to pass into Context and log service messages."), + ] = logger, ) -> None: + self.logger = logger if group_id is None: group_id = "confluent-kafka-consumer-group" @@ -352,7 +367,7 @@ def __init__( self.loop = loop or asyncio.get_event_loop() create_topics(topics=self.topics, config=self.config) - self.consumer = Consumer(self.config) + self.consumer = Consumer(self.config, logger=self.logger) async def start(self) -> None: """Starts the Kafka consumer and subscribes to the specified topics.""" diff --git a/tests/brokers/confluent/test_logger.py b/tests/brokers/confluent/test_logger.py new file mode 100644 index 0000000000..ab72676fc5 --- /dev/null +++ b/tests/brokers/confluent/test_logger.py @@ -0,0 +1,55 @@ +import asyncio +import logging +from typing import Any, ClassVar, Dict + +import pytest + +from faststream.broker.core.usecase import BrokerUsecase +from faststream.confluent import KafkaBroker + + +@pytest.mark.confluent() +class TestLogger: + """A class to represent a test Kafka broker.""" + + timeout: int = 10 + subscriber_kwargs: ClassVar[Dict[str, Any]] = {"auto_offset_reset": "earliest"} + + def get_broker(self, apply_types: bool = False): + return KafkaBroker(apply_types=apply_types) + + def patch_broker(self, broker: BrokerUsecase[Any, Any]) -> BrokerUsecase[Any, Any]: + return broker + + @pytest.mark.asyncio() + async def test_custom_logger( + self, + queue: str, + event: asyncio.Event, + ): + test_logger = logging.getLogger("test_logger") + consume_broker = KafkaBroker(logger=test_logger) + + @consume_broker.subscriber(queue, **self.subscriber_kwargs) + def subscriber(m): + event.set() + + async with self.patch_broker(consume_broker) as br: + await br.start() + + for sub in br._subscribers.values(): + consumer_logger = sub.consumer.logger + assert consumer_logger == test_logger + + producer_logger = br._producer._producer.logger + assert producer_logger == test_logger + + await asyncio.wait( + ( + asyncio.create_task(br.publish("hello", queue)), + asyncio.create_task(event.wait()), + ), + timeout=self.timeout, + ) + + assert event.is_set()