From 5f4c9f462694ad5a4854946572e5f8cd44f99f70 Mon Sep 17 00:00:00 2001 From: Flosckow <66554425+Flosckow@users.noreply.github.com> Date: Wed, 11 Dec 2024 02:29:14 +0700 Subject: [PATCH] Fix: return logger (#1982) * Fix: return logger, typing * Fix: lint * chore: fix tests --------- Co-authored-by: Daniil Dumchenko Co-authored-by: Nikita Pastukhov --- faststream/_internal/broker/router.py | 2 +- faststream/confluent/broker/broker.py | 1 + faststream/confluent/client.py | 7 +++++-- faststream/rabbit/broker/broker.py | 2 +- .../asyncapi/v3_0_0/schema/bindings/amqp/channel.py | 4 +++- tests/brokers/kafka/test_consume.py | 8 ++++++-- tests/prometheus/confluent/basic.py | 4 ---- tests/prometheus/kafka/basic.py | 4 ---- tests/prometheus/nats/basic.py | 3 ++- tests/prometheus/nats/test_nats.py | 4 +++- tests/prometheus/nats/test_provider.py | 3 ++- tests/prometheus/rabbit/basic.py | 4 ---- tests/prometheus/rabbit/test_provider.py | 6 +----- tests/prometheus/redis/basic.py | 4 ---- 14 files changed, 25 insertions(+), 31 deletions(-) diff --git a/faststream/_internal/broker/router.py b/faststream/_internal/broker/router.py index 81f4a2998a..8c02459095 100644 --- a/faststream/_internal/broker/router.py +++ b/faststream/_internal/broker/router.py @@ -84,7 +84,7 @@ def __init__( decoder=decoder, include_in_schema=include_in_schema, state=EmptyBrokerState("You should include router to any broker."), - routers=routers + routers=routers, ) for h in handlers: diff --git a/faststream/confluent/broker/broker.py b/faststream/confluent/broker/broker.py index 1b96a46126..0262dbd9c3 100644 --- a/faststream/confluent/broker/broker.py +++ b/faststream/confluent/broker/broker.py @@ -454,6 +454,7 @@ async def _connect( # type: ignore[override] **kwargs, client_id=client_id, config=self.config, + logger=self._state.get().logger_state, ) self._producer.connect(native_producer) diff --git a/faststream/confluent/client.py b/faststream/confluent/client.py index 05a0210a6a..385a1b4389 100644 --- a/faststream/confluent/client.py +++ b/faststream/confluent/client.py @@ -43,6 +43,7 @@ class AsyncConfluentProducer: def __init__( self, *, + logger: "LoggerState", config: config_module.ConfluentFastConfig, bootstrap_servers: Union[str, list[str]] = "localhost", client_id: Optional[str] = None, @@ -64,6 +65,8 @@ def __init__( sasl_plain_password: Optional[str] = None, sasl_plain_username: Optional[str] = None, ) -> None: + self.logger_state = logger + if isinstance(bootstrap_servers, Iterable) and not isinstance( bootstrap_servers, str, @@ -107,7 +110,7 @@ def __init__( }, ) - self.producer = Producer(final_config, logger=self.logger) # type: ignore[call-arg] + self.producer = Producer(final_config, logger=self.logger_state.logger.logger) # type: ignore[call-arg] self.__running = True self._poll_task = asyncio.create_task(self._poll_loop()) @@ -309,7 +312,7 @@ def __init__( ) self.config = final_config - self.consumer = Consumer(final_config, logger=self.logger) # type: ignore[call-arg] + self.consumer = Consumer(final_config, logger=self.logger_state.logger.logger) # type: ignore[call-arg] # We shouldn't read messages and close consumer concurrently # https://github.com/airtai/faststream/issues/1904#issuecomment-2506990895 diff --git a/faststream/rabbit/broker/broker.py b/faststream/rabbit/broker/broker.py index a171755cf1..04d2c2cd4f 100644 --- a/faststream/rabbit/broker/broker.py +++ b/faststream/rabbit/broker/broker.py @@ -16,7 +16,7 @@ from typing_extensions import Doc, override from faststream.__about__ import SERVICE_NAME -from faststream._internal.broker.broker import BrokerUsecase +from faststream._internal.broker.broker import ABCBroker, BrokerUsecase from faststream._internal.constants import EMPTY from faststream._internal.publisher.proto import PublisherProto from faststream.message import gen_cor_id diff --git a/faststream/specification/asyncapi/v3_0_0/schema/bindings/amqp/channel.py b/faststream/specification/asyncapi/v3_0_0/schema/bindings/amqp/channel.py index 3d2bcc23ad..bfadb4c0f4 100644 --- a/faststream/specification/asyncapi/v3_0_0/schema/bindings/amqp/channel.py +++ b/faststream/specification/asyncapi/v3_0_0/schema/bindings/amqp/channel.py @@ -1,4 +1,6 @@ -from typing import Optional, Self +from typing import Optional + +from typing_extensions import Self from faststream.specification.asyncapi.v2_6_0.schema.bindings.amqp import ( ChannelBinding as V2Binding, diff --git a/tests/brokers/kafka/test_consume.py b/tests/brokers/kafka/test_consume.py index 3c9bd4ca90..1c8ee55f99 100644 --- a/tests/brokers/kafka/test_consume.py +++ b/tests/brokers/kafka/test_consume.py @@ -399,7 +399,9 @@ async def handler(msg): await asyncio.wait( ( - asyncio.create_task(br._producer._producer.producer.send(queue, key=b"")), + asyncio.create_task( + br._producer._producer.producer.send(queue, key=b"") + ), asyncio.create_task(event.wait()), ), timeout=3, @@ -426,7 +428,9 @@ async def handler(msg): await asyncio.wait( ( - asyncio.create_task(br._producer._producer.producer.send(queue, key=b"")), + asyncio.create_task( + br._producer._producer.producer.send(queue, key=b"") + ), asyncio.create_task(event.wait()), ), timeout=3, diff --git a/tests/prometheus/confluent/basic.py b/tests/prometheus/confluent/basic.py index 92df8e38f4..facea2efec 100644 --- a/tests/prometheus/confluent/basic.py +++ b/tests/prometheus/confluent/basic.py @@ -1,7 +1,6 @@ from typing import Any from faststream import AckPolicy -from faststream.confluent import KafkaBroker from faststream.confluent.prometheus import KafkaPrometheusMiddleware from tests.brokers.confluent.basic import ConfluentTestcaseConfig @@ -9,9 +8,6 @@ class KafkaPrometheusSettings(ConfluentTestcaseConfig): messaging_system = "kafka" - def get_broker(self, apply_types=False, **kwargs: Any) -> KafkaBroker: - return KafkaBroker(apply_types=apply_types, **kwargs) - def get_middleware(self, **kwargs: Any) -> KafkaPrometheusMiddleware: return KafkaPrometheusMiddleware(**kwargs) diff --git a/tests/prometheus/kafka/basic.py b/tests/prometheus/kafka/basic.py index 0225f7053a..9fdd8795dc 100644 --- a/tests/prometheus/kafka/basic.py +++ b/tests/prometheus/kafka/basic.py @@ -1,7 +1,6 @@ from typing import Any from faststream import AckPolicy -from faststream.kafka import KafkaBroker from faststream.kafka.prometheus import KafkaPrometheusMiddleware from tests.brokers.kafka.basic import KafkaTestcaseConfig @@ -9,9 +8,6 @@ class KafkaPrometheusSettings(KafkaTestcaseConfig): messaging_system = "kafka" - def get_broker(self, apply_types=False, **kwargs: Any) -> KafkaBroker: - return KafkaBroker(apply_types=apply_types, **kwargs) - def get_middleware(self, **kwargs: Any) -> KafkaPrometheusMiddleware: return KafkaPrometheusMiddleware(**kwargs) diff --git a/tests/prometheus/nats/basic.py b/tests/prometheus/nats/basic.py index 2bb3abbad9..5199ee84ef 100644 --- a/tests/prometheus/nats/basic.py +++ b/tests/prometheus/nats/basic.py @@ -2,9 +2,10 @@ from faststream.nats import NatsBroker from faststream.nats.prometheus import NatsPrometheusMiddleware +from tests.brokers.nats.basic import NatsTestcaseConfig -class NatsPrometheusSettings: +class NatsPrometheusSettings(NatsTestcaseConfig): messaging_system = "nats" def get_broker(self, apply_types=False, **kwargs: Any) -> NatsBroker: diff --git a/tests/prometheus/nats/test_nats.py b/tests/prometheus/nats/test_nats.py index a3bdbed2e0..edb07ad20b 100644 --- a/tests/prometheus/nats/test_nats.py +++ b/tests/prometheus/nats/test_nats.py @@ -22,7 +22,9 @@ def stream(queue): @pytest.mark.nats() class TestPrometheus( - NatsPrometheusSettings, LocalPrometheusTestcase, LocalRPCPrometheusTestcase + NatsPrometheusSettings, + LocalPrometheusTestcase, + LocalRPCPrometheusTestcase, ): async def test_metrics_batch( self, diff --git a/tests/prometheus/nats/test_provider.py b/tests/prometheus/nats/test_provider.py index c8ce53dc72..10410b95f8 100644 --- a/tests/prometheus/nats/test_provider.py +++ b/tests/prometheus/nats/test_provider.py @@ -16,7 +16,8 @@ class LocalBaseNatsMetricsSettingsProviderTestcase( - NatsPrometheusSettings, LocalMetricsSettingsProviderTestcase + NatsPrometheusSettings, + LocalMetricsSettingsProviderTestcase, ): def test_get_publish_destination_name_from_cmd(self, queue: str) -> None: expected_destination_name = queue diff --git a/tests/prometheus/rabbit/basic.py b/tests/prometheus/rabbit/basic.py index d1d192e459..a489fcb2e8 100644 --- a/tests/prometheus/rabbit/basic.py +++ b/tests/prometheus/rabbit/basic.py @@ -1,6 +1,5 @@ from typing import Any -from faststream.rabbit import RabbitBroker from faststream.rabbit.prometheus import RabbitPrometheusMiddleware from tests.brokers.rabbit.basic import RabbitTestcaseConfig @@ -8,8 +7,5 @@ class RabbitPrometheusSettings(RabbitTestcaseConfig): messaging_system = "rabbitmq" - def get_broker(self, apply_types=False, **kwargs: Any) -> RabbitBroker: - return RabbitBroker(apply_types=apply_types, **kwargs) - def get_middleware(self, **kwargs: Any) -> RabbitPrometheusMiddleware: return RabbitPrometheusMiddleware(**kwargs) diff --git a/tests/prometheus/rabbit/test_provider.py b/tests/prometheus/rabbit/test_provider.py index 9b6eb0b7bf..88f4c506bb 100644 --- a/tests/prometheus/rabbit/test_provider.py +++ b/tests/prometheus/rabbit/test_provider.py @@ -5,10 +5,7 @@ from faststream.prometheus import MetricsSettingsProvider from faststream.rabbit.prometheus.provider import RabbitMetricsSettingsProvider -from tests.prometheus.basic import ( - LocalMetricsSettingsProviderTestcase, - LocalRPCPrometheusTestcase, -) +from tests.prometheus.basic import LocalMetricsSettingsProviderTestcase from .basic import RabbitPrometheusSettings @@ -16,7 +13,6 @@ class TestRabbitMetricsSettingsProvider( RabbitPrometheusSettings, LocalMetricsSettingsProviderTestcase, - LocalRPCPrometheusTestcase, ): @staticmethod def get_provider() -> MetricsSettingsProvider: diff --git a/tests/prometheus/redis/basic.py b/tests/prometheus/redis/basic.py index 1a6ed6cfeb..a1f89f3ead 100644 --- a/tests/prometheus/redis/basic.py +++ b/tests/prometheus/redis/basic.py @@ -1,6 +1,5 @@ from typing import Any -from faststream.redis import RedisBroker from faststream.redis.prometheus import RedisPrometheusMiddleware from tests.brokers.redis.basic import RedisTestcaseConfig @@ -8,8 +7,5 @@ class RedisPrometheusSettings(RedisTestcaseConfig): messaging_system = "redis" - def get_broker(self, apply_types=False, **kwargs: Any) -> RedisBroker: - return RedisBroker(apply_types=apply_types, **kwargs) - def get_middleware(self, **kwargs: Any) -> RedisPrometheusMiddleware: return RedisPrometheusMiddleware(**kwargs)