From 4ba19100f7daa5409876cfcc03df9feb0ea4317f Mon Sep 17 00:00:00 2001 From: Daniil Dumchenko Date: Wed, 11 Dec 2024 01:09:59 +0700 Subject: [PATCH 1/3] Fix: return logger, typing --- faststream/confluent/broker/broker.py | 1 + faststream/confluent/client.py | 7 +++++-- faststream/rabbit/broker/broker.py | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/faststream/confluent/broker/broker.py b/faststream/confluent/broker/broker.py index 1b96a46126..a940f1c2aa 100644 --- a/faststream/confluent/broker/broker.py +++ b/faststream/confluent/broker/broker.py @@ -454,6 +454,7 @@ async def _connect( # type: ignore[override] **kwargs, client_id=client_id, config=self.config, + logger=self._state.get().logger_state ) self._producer.connect(native_producer) diff --git a/faststream/confluent/client.py b/faststream/confluent/client.py index 05a0210a6a..385a1b4389 100644 --- a/faststream/confluent/client.py +++ b/faststream/confluent/client.py @@ -43,6 +43,7 @@ class AsyncConfluentProducer: def __init__( self, *, + logger: "LoggerState", config: config_module.ConfluentFastConfig, bootstrap_servers: Union[str, list[str]] = "localhost", client_id: Optional[str] = None, @@ -64,6 +65,8 @@ def __init__( sasl_plain_password: Optional[str] = None, sasl_plain_username: Optional[str] = None, ) -> None: + self.logger_state = logger + if isinstance(bootstrap_servers, Iterable) and not isinstance( bootstrap_servers, str, @@ -107,7 +110,7 @@ def __init__( }, ) - self.producer = Producer(final_config, logger=self.logger) # type: ignore[call-arg] + self.producer = Producer(final_config, logger=self.logger_state.logger.logger) # type: ignore[call-arg] self.__running = True self._poll_task = asyncio.create_task(self._poll_loop()) @@ -309,7 +312,7 @@ def __init__( ) self.config = final_config - self.consumer = Consumer(final_config, logger=self.logger) # type: ignore[call-arg] + self.consumer = Consumer(final_config, logger=self.logger_state.logger.logger) # type: ignore[call-arg] # We shouldn't read messages and close consumer concurrently # https://github.com/airtai/faststream/issues/1904#issuecomment-2506990895 diff --git a/faststream/rabbit/broker/broker.py b/faststream/rabbit/broker/broker.py index a171755cf1..04d2c2cd4f 100644 --- a/faststream/rabbit/broker/broker.py +++ b/faststream/rabbit/broker/broker.py @@ -16,7 +16,7 @@ from typing_extensions import Doc, override from faststream.__about__ import SERVICE_NAME -from faststream._internal.broker.broker import BrokerUsecase +from faststream._internal.broker.broker import ABCBroker, BrokerUsecase from faststream._internal.constants import EMPTY from faststream._internal.publisher.proto import PublisherProto from faststream.message import gen_cor_id From 6c67fd08466f57e4dde944bdc4bc906837a26414 Mon Sep 17 00:00:00 2001 From: Daniil Dumchenko Date: Wed, 11 Dec 2024 01:10:33 +0700 Subject: [PATCH 2/3] Fix: lint --- faststream/_internal/broker/router.py | 2 +- faststream/confluent/broker/broker.py | 2 +- tests/brokers/kafka/test_consume.py | 8 ++++++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/faststream/_internal/broker/router.py b/faststream/_internal/broker/router.py index 81f4a2998a..8c02459095 100644 --- a/faststream/_internal/broker/router.py +++ b/faststream/_internal/broker/router.py @@ -84,7 +84,7 @@ def __init__( decoder=decoder, include_in_schema=include_in_schema, state=EmptyBrokerState("You should include router to any broker."), - routers=routers + routers=routers, ) for h in handlers: diff --git a/faststream/confluent/broker/broker.py b/faststream/confluent/broker/broker.py index a940f1c2aa..0262dbd9c3 100644 --- a/faststream/confluent/broker/broker.py +++ b/faststream/confluent/broker/broker.py @@ -454,7 +454,7 @@ async def _connect( # type: ignore[override] **kwargs, client_id=client_id, config=self.config, - logger=self._state.get().logger_state + logger=self._state.get().logger_state, ) self._producer.connect(native_producer) diff --git a/tests/brokers/kafka/test_consume.py b/tests/brokers/kafka/test_consume.py index 3c9bd4ca90..1c8ee55f99 100644 --- a/tests/brokers/kafka/test_consume.py +++ b/tests/brokers/kafka/test_consume.py @@ -399,7 +399,9 @@ async def handler(msg): await asyncio.wait( ( - asyncio.create_task(br._producer._producer.producer.send(queue, key=b"")), + asyncio.create_task( + br._producer._producer.producer.send(queue, key=b"") + ), asyncio.create_task(event.wait()), ), timeout=3, @@ -426,7 +428,9 @@ async def handler(msg): await asyncio.wait( ( - asyncio.create_task(br._producer._producer.producer.send(queue, key=b"")), + asyncio.create_task( + br._producer._producer.producer.send(queue, key=b"") + ), asyncio.create_task(event.wait()), ), timeout=3, From 59ac61dec992c37d5821f8f8f2829c956e106637 Mon Sep 17 00:00:00 2001 From: Nikita Pastukhov Date: Tue, 10 Dec 2024 22:18:25 +0300 Subject: [PATCH 3/3] chore: fix tests --- .../asyncapi/v3_0_0/schema/bindings/amqp/channel.py | 4 +++- tests/prometheus/confluent/basic.py | 4 ---- tests/prometheus/kafka/basic.py | 4 ---- tests/prometheus/nats/basic.py | 3 ++- tests/prometheus/nats/test_nats.py | 4 +++- tests/prometheus/nats/test_provider.py | 3 ++- tests/prometheus/rabbit/basic.py | 4 ---- tests/prometheus/rabbit/test_provider.py | 6 +----- tests/prometheus/redis/basic.py | 4 ---- 9 files changed, 11 insertions(+), 25 deletions(-) diff --git a/faststream/specification/asyncapi/v3_0_0/schema/bindings/amqp/channel.py b/faststream/specification/asyncapi/v3_0_0/schema/bindings/amqp/channel.py index 3d2bcc23ad..bfadb4c0f4 100644 --- a/faststream/specification/asyncapi/v3_0_0/schema/bindings/amqp/channel.py +++ b/faststream/specification/asyncapi/v3_0_0/schema/bindings/amqp/channel.py @@ -1,4 +1,6 @@ -from typing import Optional, Self +from typing import Optional + +from typing_extensions import Self from faststream.specification.asyncapi.v2_6_0.schema.bindings.amqp import ( ChannelBinding as V2Binding, diff --git a/tests/prometheus/confluent/basic.py b/tests/prometheus/confluent/basic.py index 92df8e38f4..facea2efec 100644 --- a/tests/prometheus/confluent/basic.py +++ b/tests/prometheus/confluent/basic.py @@ -1,7 +1,6 @@ from typing import Any from faststream import AckPolicy -from faststream.confluent import KafkaBroker from faststream.confluent.prometheus import KafkaPrometheusMiddleware from tests.brokers.confluent.basic import ConfluentTestcaseConfig @@ -9,9 +8,6 @@ class KafkaPrometheusSettings(ConfluentTestcaseConfig): messaging_system = "kafka" - def get_broker(self, apply_types=False, **kwargs: Any) -> KafkaBroker: - return KafkaBroker(apply_types=apply_types, **kwargs) - def get_middleware(self, **kwargs: Any) -> KafkaPrometheusMiddleware: return KafkaPrometheusMiddleware(**kwargs) diff --git a/tests/prometheus/kafka/basic.py b/tests/prometheus/kafka/basic.py index 0225f7053a..9fdd8795dc 100644 --- a/tests/prometheus/kafka/basic.py +++ b/tests/prometheus/kafka/basic.py @@ -1,7 +1,6 @@ from typing import Any from faststream import AckPolicy -from faststream.kafka import KafkaBroker from faststream.kafka.prometheus import KafkaPrometheusMiddleware from tests.brokers.kafka.basic import KafkaTestcaseConfig @@ -9,9 +8,6 @@ class KafkaPrometheusSettings(KafkaTestcaseConfig): messaging_system = "kafka" - def get_broker(self, apply_types=False, **kwargs: Any) -> KafkaBroker: - return KafkaBroker(apply_types=apply_types, **kwargs) - def get_middleware(self, **kwargs: Any) -> KafkaPrometheusMiddleware: return KafkaPrometheusMiddleware(**kwargs) diff --git a/tests/prometheus/nats/basic.py b/tests/prometheus/nats/basic.py index 2bb3abbad9..5199ee84ef 100644 --- a/tests/prometheus/nats/basic.py +++ b/tests/prometheus/nats/basic.py @@ -2,9 +2,10 @@ from faststream.nats import NatsBroker from faststream.nats.prometheus import NatsPrometheusMiddleware +from tests.brokers.nats.basic import NatsTestcaseConfig -class NatsPrometheusSettings: +class NatsPrometheusSettings(NatsTestcaseConfig): messaging_system = "nats" def get_broker(self, apply_types=False, **kwargs: Any) -> NatsBroker: diff --git a/tests/prometheus/nats/test_nats.py b/tests/prometheus/nats/test_nats.py index a3bdbed2e0..edb07ad20b 100644 --- a/tests/prometheus/nats/test_nats.py +++ b/tests/prometheus/nats/test_nats.py @@ -22,7 +22,9 @@ def stream(queue): @pytest.mark.nats() class TestPrometheus( - NatsPrometheusSettings, LocalPrometheusTestcase, LocalRPCPrometheusTestcase + NatsPrometheusSettings, + LocalPrometheusTestcase, + LocalRPCPrometheusTestcase, ): async def test_metrics_batch( self, diff --git a/tests/prometheus/nats/test_provider.py b/tests/prometheus/nats/test_provider.py index c8ce53dc72..10410b95f8 100644 --- a/tests/prometheus/nats/test_provider.py +++ b/tests/prometheus/nats/test_provider.py @@ -16,7 +16,8 @@ class LocalBaseNatsMetricsSettingsProviderTestcase( - NatsPrometheusSettings, LocalMetricsSettingsProviderTestcase + NatsPrometheusSettings, + LocalMetricsSettingsProviderTestcase, ): def test_get_publish_destination_name_from_cmd(self, queue: str) -> None: expected_destination_name = queue diff --git a/tests/prometheus/rabbit/basic.py b/tests/prometheus/rabbit/basic.py index d1d192e459..a489fcb2e8 100644 --- a/tests/prometheus/rabbit/basic.py +++ b/tests/prometheus/rabbit/basic.py @@ -1,6 +1,5 @@ from typing import Any -from faststream.rabbit import RabbitBroker from faststream.rabbit.prometheus import RabbitPrometheusMiddleware from tests.brokers.rabbit.basic import RabbitTestcaseConfig @@ -8,8 +7,5 @@ class RabbitPrometheusSettings(RabbitTestcaseConfig): messaging_system = "rabbitmq" - def get_broker(self, apply_types=False, **kwargs: Any) -> RabbitBroker: - return RabbitBroker(apply_types=apply_types, **kwargs) - def get_middleware(self, **kwargs: Any) -> RabbitPrometheusMiddleware: return RabbitPrometheusMiddleware(**kwargs) diff --git a/tests/prometheus/rabbit/test_provider.py b/tests/prometheus/rabbit/test_provider.py index 9b6eb0b7bf..88f4c506bb 100644 --- a/tests/prometheus/rabbit/test_provider.py +++ b/tests/prometheus/rabbit/test_provider.py @@ -5,10 +5,7 @@ from faststream.prometheus import MetricsSettingsProvider from faststream.rabbit.prometheus.provider import RabbitMetricsSettingsProvider -from tests.prometheus.basic import ( - LocalMetricsSettingsProviderTestcase, - LocalRPCPrometheusTestcase, -) +from tests.prometheus.basic import LocalMetricsSettingsProviderTestcase from .basic import RabbitPrometheusSettings @@ -16,7 +13,6 @@ class TestRabbitMetricsSettingsProvider( RabbitPrometheusSettings, LocalMetricsSettingsProviderTestcase, - LocalRPCPrometheusTestcase, ): @staticmethod def get_provider() -> MetricsSettingsProvider: diff --git a/tests/prometheus/redis/basic.py b/tests/prometheus/redis/basic.py index 1a6ed6cfeb..a1f89f3ead 100644 --- a/tests/prometheus/redis/basic.py +++ b/tests/prometheus/redis/basic.py @@ -1,6 +1,5 @@ from typing import Any -from faststream.redis import RedisBroker from faststream.redis.prometheus import RedisPrometheusMiddleware from tests.brokers.redis.basic import RedisTestcaseConfig @@ -8,8 +7,5 @@ class RedisPrometheusSettings(RedisTestcaseConfig): messaging_system = "redis" - def get_broker(self, apply_types=False, **kwargs: Any) -> RedisBroker: - return RedisBroker(apply_types=apply_types, **kwargs) - def get_middleware(self, **kwargs: Any) -> RedisPrometheusMiddleware: return RedisPrometheusMiddleware(**kwargs)