Skip to content

Commit

Permalink
Fix: return logger (#1982)
Browse files Browse the repository at this point in the history
* Fix: return logger, typing

* Fix: lint

* chore: fix tests

---------

Co-authored-by: Daniil Dumchenko <[email protected]>
Co-authored-by: Nikita Pastukhov <[email protected]>
  • Loading branch information
3 people authored Dec 10, 2024
1 parent baf6178 commit 5f4c9f4
Show file tree
Hide file tree
Showing 14 changed files with 25 additions and 31 deletions.
2 changes: 1 addition & 1 deletion faststream/_internal/broker/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def __init__(
decoder=decoder,
include_in_schema=include_in_schema,
state=EmptyBrokerState("You should include router to any broker."),
routers=routers
routers=routers,
)

for h in handlers:
Expand Down
1 change: 1 addition & 0 deletions faststream/confluent/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ async def _connect( # type: ignore[override]
**kwargs,
client_id=client_id,
config=self.config,
logger=self._state.get().logger_state,
)

self._producer.connect(native_producer)
Expand Down
7 changes: 5 additions & 2 deletions faststream/confluent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class AsyncConfluentProducer:
def __init__(
self,
*,
logger: "LoggerState",
config: config_module.ConfluentFastConfig,
bootstrap_servers: Union[str, list[str]] = "localhost",
client_id: Optional[str] = None,
Expand All @@ -64,6 +65,8 @@ def __init__(
sasl_plain_password: Optional[str] = None,
sasl_plain_username: Optional[str] = None,
) -> None:
self.logger_state = logger

if isinstance(bootstrap_servers, Iterable) and not isinstance(
bootstrap_servers,
str,
Expand Down Expand Up @@ -107,7 +110,7 @@ def __init__(
},
)

self.producer = Producer(final_config, logger=self.logger) # type: ignore[call-arg]
self.producer = Producer(final_config, logger=self.logger_state.logger.logger) # type: ignore[call-arg]

self.__running = True
self._poll_task = asyncio.create_task(self._poll_loop())
Expand Down Expand Up @@ -309,7 +312,7 @@ def __init__(
)

self.config = final_config
self.consumer = Consumer(final_config, logger=self.logger) # type: ignore[call-arg]
self.consumer = Consumer(final_config, logger=self.logger_state.logger.logger) # type: ignore[call-arg]

# We shouldn't read messages and close consumer concurrently
# https://github.com/airtai/faststream/issues/1904#issuecomment-2506990895
Expand Down
2 changes: 1 addition & 1 deletion faststream/rabbit/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from typing_extensions import Doc, override

from faststream.__about__ import SERVICE_NAME
from faststream._internal.broker.broker import BrokerUsecase
from faststream._internal.broker.broker import ABCBroker, BrokerUsecase
from faststream._internal.constants import EMPTY
from faststream._internal.publisher.proto import PublisherProto
from faststream.message import gen_cor_id
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from typing import Optional, Self
from typing import Optional

from typing_extensions import Self

from faststream.specification.asyncapi.v2_6_0.schema.bindings.amqp import (
ChannelBinding as V2Binding,
Expand Down
8 changes: 6 additions & 2 deletions tests/brokers/kafka/test_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,9 @@ async def handler(msg):

await asyncio.wait(
(
asyncio.create_task(br._producer._producer.producer.send(queue, key=b"")),
asyncio.create_task(
br._producer._producer.producer.send(queue, key=b"")
),
asyncio.create_task(event.wait()),
),
timeout=3,
Expand All @@ -426,7 +428,9 @@ async def handler(msg):

await asyncio.wait(
(
asyncio.create_task(br._producer._producer.producer.send(queue, key=b"")),
asyncio.create_task(
br._producer._producer.producer.send(queue, key=b"")
),
asyncio.create_task(event.wait()),
),
timeout=3,
Expand Down
4 changes: 0 additions & 4 deletions tests/prometheus/confluent/basic.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
from typing import Any

from faststream import AckPolicy
from faststream.confluent import KafkaBroker
from faststream.confluent.prometheus import KafkaPrometheusMiddleware
from tests.brokers.confluent.basic import ConfluentTestcaseConfig


class KafkaPrometheusSettings(ConfluentTestcaseConfig):
messaging_system = "kafka"

def get_broker(self, apply_types=False, **kwargs: Any) -> KafkaBroker:
return KafkaBroker(apply_types=apply_types, **kwargs)

def get_middleware(self, **kwargs: Any) -> KafkaPrometheusMiddleware:
return KafkaPrometheusMiddleware(**kwargs)

Expand Down
4 changes: 0 additions & 4 deletions tests/prometheus/kafka/basic.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
from typing import Any

from faststream import AckPolicy
from faststream.kafka import KafkaBroker
from faststream.kafka.prometheus import KafkaPrometheusMiddleware
from tests.brokers.kafka.basic import KafkaTestcaseConfig


class KafkaPrometheusSettings(KafkaTestcaseConfig):
messaging_system = "kafka"

def get_broker(self, apply_types=False, **kwargs: Any) -> KafkaBroker:
return KafkaBroker(apply_types=apply_types, **kwargs)

def get_middleware(self, **kwargs: Any) -> KafkaPrometheusMiddleware:
return KafkaPrometheusMiddleware(**kwargs)

Expand Down
3 changes: 2 additions & 1 deletion tests/prometheus/nats/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

from faststream.nats import NatsBroker
from faststream.nats.prometheus import NatsPrometheusMiddleware
from tests.brokers.nats.basic import NatsTestcaseConfig


class NatsPrometheusSettings:
class NatsPrometheusSettings(NatsTestcaseConfig):
messaging_system = "nats"

def get_broker(self, apply_types=False, **kwargs: Any) -> NatsBroker:
Expand Down
4 changes: 3 additions & 1 deletion tests/prometheus/nats/test_nats.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ def stream(queue):

@pytest.mark.nats()
class TestPrometheus(
NatsPrometheusSettings, LocalPrometheusTestcase, LocalRPCPrometheusTestcase
NatsPrometheusSettings,
LocalPrometheusTestcase,
LocalRPCPrometheusTestcase,
):
async def test_metrics_batch(
self,
Expand Down
3 changes: 2 additions & 1 deletion tests/prometheus/nats/test_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@


class LocalBaseNatsMetricsSettingsProviderTestcase(
NatsPrometheusSettings, LocalMetricsSettingsProviderTestcase
NatsPrometheusSettings,
LocalMetricsSettingsProviderTestcase,
):
def test_get_publish_destination_name_from_cmd(self, queue: str) -> None:
expected_destination_name = queue
Expand Down
4 changes: 0 additions & 4 deletions tests/prometheus/rabbit/basic.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
from typing import Any

from faststream.rabbit import RabbitBroker
from faststream.rabbit.prometheus import RabbitPrometheusMiddleware
from tests.brokers.rabbit.basic import RabbitTestcaseConfig


class RabbitPrometheusSettings(RabbitTestcaseConfig):
messaging_system = "rabbitmq"

def get_broker(self, apply_types=False, **kwargs: Any) -> RabbitBroker:
return RabbitBroker(apply_types=apply_types, **kwargs)

def get_middleware(self, **kwargs: Any) -> RabbitPrometheusMiddleware:
return RabbitPrometheusMiddleware(**kwargs)
6 changes: 1 addition & 5 deletions tests/prometheus/rabbit/test_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,14 @@

from faststream.prometheus import MetricsSettingsProvider
from faststream.rabbit.prometheus.provider import RabbitMetricsSettingsProvider
from tests.prometheus.basic import (
LocalMetricsSettingsProviderTestcase,
LocalRPCPrometheusTestcase,
)
from tests.prometheus.basic import LocalMetricsSettingsProviderTestcase

from .basic import RabbitPrometheusSettings


class TestRabbitMetricsSettingsProvider(
RabbitPrometheusSettings,
LocalMetricsSettingsProviderTestcase,
LocalRPCPrometheusTestcase,
):
@staticmethod
def get_provider() -> MetricsSettingsProvider:
Expand Down
4 changes: 0 additions & 4 deletions tests/prometheus/redis/basic.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
from typing import Any

from faststream.redis import RedisBroker
from faststream.redis.prometheus import RedisPrometheusMiddleware
from tests.brokers.redis.basic import RedisTestcaseConfig


class RedisPrometheusSettings(RedisTestcaseConfig):
messaging_system = "redis"

def get_broker(self, apply_types=False, **kwargs: Any) -> RedisBroker:
return RedisBroker(apply_types=apply_types, **kwargs)

def get_middleware(self, **kwargs: Any) -> RedisPrometheusMiddleware:
return RedisPrometheusMiddleware(**kwargs)

0 comments on commit 5f4c9f4

Please sign in to comment.