From b9c36a084e8eebc663a411b5348cdb8856527540 Mon Sep 17 00:00:00 2001 From: Kumaran Rajendhiran Date: Fri, 24 May 2024 13:02:53 +0530 Subject: [PATCH 1/7] Pass logger to confluent producer and consumer --- .codespell-whitelist.txt | 2 +- .secrets.baseline | 6 +-- docs/docs/en/nats/jetstream/key-value.md | 2 +- docs/docs/en/nats/jetstream/object.md | 2 +- docs/docs/en/release.md | 24 +++++------ faststream/confluent/broker/broker.py | 2 + faststream/confluent/client.py | 19 +++++++- tests/brokers/confluent/test_logger.py | 55 ++++++++++++++++++++++++ 8 files changed, 92 insertions(+), 20 deletions(-) create mode 100644 tests/brokers/confluent/test_logger.py diff --git a/.codespell-whitelist.txt b/.codespell-whitelist.txt index 6b1a432b87..dcfed576bf 100644 --- a/.codespell-whitelist.txt +++ b/.codespell-whitelist.txt @@ -1 +1 @@ -dependant \ No newline at end of file +dependant diff --git a/.secrets.baseline b/.secrets.baseline index 4c3829ee62..5ceae71388 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -128,7 +128,7 @@ "filename": "docs/docs/en/release.md", "hashed_secret": "35675e68f4b5af7b995d9205ad0fc43842f16450", "is_verified": false, - "line_number": 1079, + "line_number": 1269, "is_secret": false } ], @@ -138,7 +138,7 @@ "filename": "examples/e10_middlewares.py", "hashed_secret": "35675e68f4b5af7b995d9205ad0fc43842f16450", "is_verified": false, - "line_number": 33, + "line_number": 35, "is_secret": false } ], @@ -163,5 +163,5 @@ } ] }, - "generated_at": "2024-04-23T11:41:19Z" + "generated_at": "2024-05-24T07:31:48Z" } diff --git a/docs/docs/en/nats/jetstream/key-value.md b/docs/docs/en/nats/jetstream/key-value.md index 2ca8d70add..0e579bce5d 100644 --- a/docs/docs/en/nats/jetstream/key-value.md +++ b/docs/docs/en/nats/jetstream/key-value.md @@ -54,4 +54,4 @@ from faststream.nats import NatsBroker, KvWatch ) async def handler(msg: str): ... -``` \ No newline at end of file +``` diff --git a/docs/docs/en/nats/jetstream/object.md b/docs/docs/en/nats/jetstream/object.md index 33aa1055df..9d21914cfc 100644 --- a/docs/docs/en/nats/jetstream/object.md +++ b/docs/docs/en/nats/jetstream/object.md @@ -65,4 +65,4 @@ from faststream.nats import NatsBroker, ObjWatch ) async def handler(filename: str): ... -``` \ No newline at end of file +``` diff --git a/docs/docs/en/release.md b/docs/docs/en/release.md index 1a40be40f1..5fc680cce0 100644 --- a/docs/docs/en/release.md +++ b/docs/docs/en/release.md @@ -19,37 +19,37 @@ hide: This is the time for a new **NATS** features! **FastStream** supports **NATS Key-Value** and **Object Storage** subscribption features in a native way now (big thx for @sheldygg)! 1. KeyValue creation and watching API added (you can read updated [documentation section](https://faststream.airt.ai/latest/nats/jetstream/key-value/) for changes): - + ```python from faststream import FastStream, Logger from faststream.nats import NatsBroker - + broker = NatsBroker() app = FastStream(broker) - + @broker.subscriber("some-key", kv_watch="bucket") async def handler(msg: int, logger: Logger): logger.info(msg) - + @app.after_startup async def test(): kv = await broker.key_value("bucket") await kv.put("some-key", b"1") ``` - + 2. ObjectStore API added as well (you can read updated [documentation section](https://faststream.airt.ai/latest/nats/jetstream/object/) for changes): ```python from faststream import FastStream, Logger from faststream.nats import NatsBroker - + broker = NatsBroker() app = FastStream(broker) - + @broker.subscriber("file-bucket", obj_watch=True) async def handler(filename: str, logger: Logger): logger.info(filename) - + @app.after_startup async def test(): object_store = await broker.object_storage("file-bucket") @@ -61,22 +61,22 @@ This is the time for a new **NATS** features! **FastStream** supports **NATS Key ```python from faststream import FastStream, Logger from faststream.nats import NatsBroker - + broker = NatsBroker() app = FastStream(broker) - + @broker.subscriber("test", stream="stream", pull_sub=True) async def handler(msg, logger: Logger): logger.info(msg) ``` - + Finally, we have a new feature, related to all brokers: special flag to suppress automatic RPC and reply_to responses: ```python @broker.subscriber("tests", no_reply=True) async def handler(): .... - + # will fail with timeout, because there is no automatic response msg = await broker.publish("msg", "test", rpc=True) ``` diff --git a/faststream/confluent/broker/broker.py b/faststream/confluent/broker/broker.py index 30c97ae298..960b2606ad 100644 --- a/faststream/confluent/broker/broker.py +++ b/faststream/confluent/broker/broker.py @@ -448,6 +448,7 @@ async def _connect( # type: ignore[override] producer = AsyncConfluentProducer( **kwargs, client_id=client_id, + logger=self.logger, ) self._producer = AsyncConfluentFastProducer( @@ -457,6 +458,7 @@ async def _connect( # type: ignore[override] return partial( AsyncConfluentConsumer, **filter_by_dict(ConsumerConnectionParams, kwargs), + logger=self.logger, ) async def start(self) -> None: diff --git a/faststream/confluent/client.py b/faststream/confluent/client.py index 4744f9b8d8..f1703c3694 100644 --- a/faststream/confluent/client.py +++ b/faststream/confluent/client.py @@ -2,6 +2,7 @@ from ssl import SSLContext from time import time from typing import ( + TYPE_CHECKING, Any, Callable, Dict, @@ -16,10 +17,14 @@ from confluent_kafka import Consumer, KafkaError, KafkaException, Message, Producer from confluent_kafka.admin import AdminClient, NewTopic from pydantic import BaseModel +from typing_extensions import Annotated, Doc from faststream.log import logger from faststream.utils.functions import call_or_await +if TYPE_CHECKING: + from faststream.types import LoggerProto + _missing = object() @@ -105,7 +110,12 @@ def __init__( sasl_kerberos_service_name: str = "kafka", sasl_kerberos_domain_name: Optional[str] = None, sasl_oauth_token_provider: Optional[str] = None, + logger: Annotated[ + Union["LoggerProto", None, object], + Doc("User specified logger to pass into Context and log service messages."), + ] = logger, ) -> None: + self.logger = logger if isinstance(bootstrap_servers, Iterable) and not isinstance( bootstrap_servers, str ): @@ -145,7 +155,7 @@ def __init__( } ) - self.producer = Producer(self.config) + self.producer = Producer(self.config, logger=self.logger) # self.producer.init_transactions() self.producer.list_topics() self.loop = loop or asyncio.get_event_loop() @@ -295,7 +305,12 @@ def __init__( sasl_kerberos_service_name: str = "kafka", sasl_kerberos_domain_name: Optional[str] = None, sasl_oauth_token_provider: Optional[str] = None, + logger: Annotated[ + Union["LoggerProto", None, object], + Doc("User specified logger to pass into Context and log service messages."), + ] = logger, ) -> None: + self.logger = logger if group_id is None: group_id = "confluent-kafka-consumer-group" @@ -352,7 +367,7 @@ def __init__( self.loop = loop or asyncio.get_event_loop() create_topics(topics=self.topics, config=self.config) - self.consumer = Consumer(self.config) + self.consumer = Consumer(self.config, logger=self.logger) async def start(self) -> None: """Starts the Kafka consumer and subscribes to the specified topics.""" diff --git a/tests/brokers/confluent/test_logger.py b/tests/brokers/confluent/test_logger.py new file mode 100644 index 0000000000..add2b17e54 --- /dev/null +++ b/tests/brokers/confluent/test_logger.py @@ -0,0 +1,55 @@ +import asyncio +import logging +from typing import Any, ClassVar, Dict + +import pytest + +from faststream.broker.core.usecase import BrokerUsecase +from faststream.confluent import KafkaBroker + + +@pytest.mark.confluent() +class TestLogger: + """A class to represent a test Kafka broker.""" + + timeout: int = 10 + subscriber_kwargs: ClassVar[Dict[str, Any]] = {"auto_offset_reset": "earliest"} + + def get_broker(self, apply_types: bool = False): + return KafkaBroker(apply_types=apply_types) + + def patch_broker(self, broker: BrokerUsecase[Any, Any]) -> BrokerUsecase[Any, Any]: + return broker + + @pytest.mark.asyncio() + async def test_custom_logger( + self, + queue: str, + event: asyncio.Event, + ): + test_logger = logging.getLogger("test_logger") + consume_broker = KafkaBroker(logger=test_logger) + + @consume_broker.subscriber(queue, **self.subscriber_kwargs) + def subscriber(m): + event.set() + + async with self.patch_broker(consume_broker) as br: + await br.start() + + await br.publish_batch(1, "hi", topic=queue) + + # ToDo: Fetch consumer logger and assert it + + producer_logger = br._producer._producer.logger + assert producer_logger == test_logger + + await asyncio.wait( + ( + asyncio.create_task(br.publish("hello", queue)), + asyncio.create_task(event.wait()), + ), + timeout=self.timeout, + ) + + assert event.is_set() From 2bb6092cab12204ba4425a58f29815c66c9c01b1 Mon Sep 17 00:00:00 2001 From: Kumaran Rajendhiran Date: Fri, 24 May 2024 13:18:14 +0530 Subject: [PATCH 2/7] Add test to check consumer_logger --- tests/brokers/confluent/test_logger.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/brokers/confluent/test_logger.py b/tests/brokers/confluent/test_logger.py index add2b17e54..ab72676fc5 100644 --- a/tests/brokers/confluent/test_logger.py +++ b/tests/brokers/confluent/test_logger.py @@ -37,9 +37,9 @@ def subscriber(m): async with self.patch_broker(consume_broker) as br: await br.start() - await br.publish_batch(1, "hi", topic=queue) - - # ToDo: Fetch consumer logger and assert it + for sub in br._subscribers.values(): + consumer_logger = sub.consumer.logger + assert consumer_logger == test_logger producer_logger = br._producer._producer.logger assert producer_logger == test_logger From d76b1b2823b8586a49c779b6818c223dfe3e5212 Mon Sep 17 00:00:00 2001 From: Kumaran Rajendhiran Date: Fri, 24 May 2024 13:38:01 +0530 Subject: [PATCH 3/7] Update func signature --- faststream/confluent/client.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/faststream/confluent/client.py b/faststream/confluent/client.py index f1703c3694..7aa3ef6f23 100644 --- a/faststream/confluent/client.py +++ b/faststream/confluent/client.py @@ -1,4 +1,5 @@ import asyncio +import logging from ssl import SSLContext from time import time from typing import ( @@ -111,7 +112,7 @@ def __init__( sasl_kerberos_domain_name: Optional[str] = None, sasl_oauth_token_provider: Optional[str] = None, logger: Annotated[ - Union["LoggerProto", None, object], + Union["LoggerProto", logging.Logger, None, object], Doc("User specified logger to pass into Context and log service messages."), ] = logger, ) -> None: @@ -306,7 +307,7 @@ def __init__( sasl_kerberos_domain_name: Optional[str] = None, sasl_oauth_token_provider: Optional[str] = None, logger: Annotated[ - Union["LoggerProto", None, object], + Union["LoggerProto", logging.Logger, None, object], Doc("User specified logger to pass into Context and log service messages."), ] = logger, ) -> None: From 64b0704b51483f5c00652b5c8053517f9aa0a959 Mon Sep 17 00:00:00 2001 From: Kumaran Rajendhiran Date: Fri, 24 May 2024 13:42:22 +0530 Subject: [PATCH 4/7] Update function signature --- faststream/confluent/client.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/faststream/confluent/client.py b/faststream/confluent/client.py index 7aa3ef6f23..9c22cc2e89 100644 --- a/faststream/confluent/client.py +++ b/faststream/confluent/client.py @@ -3,7 +3,6 @@ from ssl import SSLContext from time import time from typing import ( - TYPE_CHECKING, Any, Callable, Dict, @@ -23,9 +22,6 @@ from faststream.log import logger from faststream.utils.functions import call_or_await -if TYPE_CHECKING: - from faststream.types import LoggerProto - _missing = object() @@ -112,7 +108,7 @@ def __init__( sasl_kerberos_domain_name: Optional[str] = None, sasl_oauth_token_provider: Optional[str] = None, logger: Annotated[ - Union["LoggerProto", logging.Logger, None, object], + Union[logging.Logger, None, object], Doc("User specified logger to pass into Context and log service messages."), ] = logger, ) -> None: @@ -307,7 +303,7 @@ def __init__( sasl_kerberos_domain_name: Optional[str] = None, sasl_oauth_token_provider: Optional[str] = None, logger: Annotated[ - Union["LoggerProto", logging.Logger, None, object], + Union[logging.Logger, None, object], Doc("User specified logger to pass into Context and log service messages."), ] = logger, ) -> None: From 36c1469e1dead1763f1c869f3ba114ec75acbd72 Mon Sep 17 00:00:00 2001 From: Kumaran Rajendhiran Date: Fri, 24 May 2024 13:49:38 +0530 Subject: [PATCH 5/7] User LoggerProto and remove Annotated --- faststream/confluent/client.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/faststream/confluent/client.py b/faststream/confluent/client.py index 9c22cc2e89..1e38085b4b 100644 --- a/faststream/confluent/client.py +++ b/faststream/confluent/client.py @@ -3,6 +3,7 @@ from ssl import SSLContext from time import time from typing import ( + TYPE_CHECKING, Any, Callable, Dict, @@ -17,11 +18,13 @@ from confluent_kafka import Consumer, KafkaError, KafkaException, Message, Producer from confluent_kafka.admin import AdminClient, NewTopic from pydantic import BaseModel -from typing_extensions import Annotated, Doc from faststream.log import logger from faststream.utils.functions import call_or_await +if TYPE_CHECKING: + from faststream.types import LoggerProto + _missing = object() @@ -107,10 +110,7 @@ def __init__( sasl_kerberos_service_name: str = "kafka", sasl_kerberos_domain_name: Optional[str] = None, sasl_oauth_token_provider: Optional[str] = None, - logger: Annotated[ - Union[logging.Logger, None, object], - Doc("User specified logger to pass into Context and log service messages."), - ] = logger, + logger: Optional[Union["LoggerProto", logging.Logger]] = logger, ) -> None: self.logger = logger if isinstance(bootstrap_servers, Iterable) and not isinstance( @@ -302,10 +302,7 @@ def __init__( sasl_kerberos_service_name: str = "kafka", sasl_kerberos_domain_name: Optional[str] = None, sasl_oauth_token_provider: Optional[str] = None, - logger: Annotated[ - Union[logging.Logger, None, object], - Doc("User specified logger to pass into Context and log service messages."), - ] = logger, + logger: Optional[Union["LoggerProto", logging.Logger]] = logger, ) -> None: self.logger = logger if group_id is None: From 17d2303b8cf197ac97999dd7c51679eb59fe5430 Mon Sep 17 00:00:00 2001 From: Kumaran Rajendhiran Date: Fri, 24 May 2024 13:52:02 +0530 Subject: [PATCH 6/7] Use only logger in signature --- faststream/confluent/client.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/faststream/confluent/client.py b/faststream/confluent/client.py index 1e38085b4b..871a0a9fe7 100644 --- a/faststream/confluent/client.py +++ b/faststream/confluent/client.py @@ -3,7 +3,6 @@ from ssl import SSLContext from time import time from typing import ( - TYPE_CHECKING, Any, Callable, Dict, @@ -22,9 +21,6 @@ from faststream.log import logger from faststream.utils.functions import call_or_await -if TYPE_CHECKING: - from faststream.types import LoggerProto - _missing = object() @@ -110,7 +106,7 @@ def __init__( sasl_kerberos_service_name: str = "kafka", sasl_kerberos_domain_name: Optional[str] = None, sasl_oauth_token_provider: Optional[str] = None, - logger: Optional[Union["LoggerProto", logging.Logger]] = logger, + logger: Optional[logging.Logger] = logger, ) -> None: self.logger = logger if isinstance(bootstrap_servers, Iterable) and not isinstance( @@ -302,7 +298,7 @@ def __init__( sasl_kerberos_service_name: str = "kafka", sasl_kerberos_domain_name: Optional[str] = None, sasl_oauth_token_provider: Optional[str] = None, - logger: Optional[Union["LoggerProto", logging.Logger]] = logger, + logger: Optional[logging.Logger] = logger, ) -> None: self.logger = logger if group_id is None: From 93dacfbcae346ebe098b7cb1ba32ea5786f7f4f2 Mon Sep 17 00:00:00 2001 From: Kumaran Rajendhiran Date: Fri, 24 May 2024 15:11:36 +0530 Subject: [PATCH 7/7] Use LoggerProto as type --- faststream/confluent/client.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/faststream/confluent/client.py b/faststream/confluent/client.py index 871a0a9fe7..f1703c3694 100644 --- a/faststream/confluent/client.py +++ b/faststream/confluent/client.py @@ -1,8 +1,8 @@ import asyncio -import logging from ssl import SSLContext from time import time from typing import ( + TYPE_CHECKING, Any, Callable, Dict, @@ -17,10 +17,14 @@ from confluent_kafka import Consumer, KafkaError, KafkaException, Message, Producer from confluent_kafka.admin import AdminClient, NewTopic from pydantic import BaseModel +from typing_extensions import Annotated, Doc from faststream.log import logger from faststream.utils.functions import call_or_await +if TYPE_CHECKING: + from faststream.types import LoggerProto + _missing = object() @@ -106,7 +110,10 @@ def __init__( sasl_kerberos_service_name: str = "kafka", sasl_kerberos_domain_name: Optional[str] = None, sasl_oauth_token_provider: Optional[str] = None, - logger: Optional[logging.Logger] = logger, + logger: Annotated[ + Union["LoggerProto", None, object], + Doc("User specified logger to pass into Context and log service messages."), + ] = logger, ) -> None: self.logger = logger if isinstance(bootstrap_servers, Iterable) and not isinstance( @@ -298,7 +305,10 @@ def __init__( sasl_kerberos_service_name: str = "kafka", sasl_kerberos_domain_name: Optional[str] = None, sasl_oauth_token_provider: Optional[str] = None, - logger: Optional[logging.Logger] = logger, + logger: Annotated[ + Union["LoggerProto", None, object], + Doc("User specified logger to pass into Context and log service messages."), + ] = logger, ) -> None: self.logger = logger if group_id is None: