Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: return logger #1982

Merged
merged 3 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion faststream/_internal/broker/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def __init__(
decoder=decoder,
include_in_schema=include_in_schema,
state=EmptyBrokerState("You should include router to any broker."),
routers=routers
routers=routers,
)

for h in handlers:
Expand Down
1 change: 1 addition & 0 deletions faststream/confluent/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ async def _connect( # type: ignore[override]
**kwargs,
client_id=client_id,
config=self.config,
logger=self._state.get().logger_state,
)

self._producer.connect(native_producer)
Expand Down
7 changes: 5 additions & 2 deletions faststream/confluent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class AsyncConfluentProducer:
def __init__(
self,
*,
logger: "LoggerState",
config: config_module.ConfluentFastConfig,
bootstrap_servers: Union[str, list[str]] = "localhost",
client_id: Optional[str] = None,
Expand All @@ -64,6 +65,8 @@ def __init__(
sasl_plain_password: Optional[str] = None,
sasl_plain_username: Optional[str] = None,
) -> None:
self.logger_state = logger

if isinstance(bootstrap_servers, Iterable) and not isinstance(
bootstrap_servers,
str,
Expand Down Expand Up @@ -107,7 +110,7 @@ def __init__(
},
)

self.producer = Producer(final_config, logger=self.logger) # type: ignore[call-arg]
self.producer = Producer(final_config, logger=self.logger_state.logger.logger) # type: ignore[call-arg]

self.__running = True
self._poll_task = asyncio.create_task(self._poll_loop())
Expand Down Expand Up @@ -309,7 +312,7 @@ def __init__(
)

self.config = final_config
self.consumer = Consumer(final_config, logger=self.logger) # type: ignore[call-arg]
self.consumer = Consumer(final_config, logger=self.logger_state.logger.logger) # type: ignore[call-arg]

# We shouldn't read messages and close consumer concurrently
# https://github.com/airtai/faststream/issues/1904#issuecomment-2506990895
Expand Down
2 changes: 1 addition & 1 deletion faststream/rabbit/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from typing_extensions import Doc, override

from faststream.__about__ import SERVICE_NAME
from faststream._internal.broker.broker import BrokerUsecase
from faststream._internal.broker.broker import ABCBroker, BrokerUsecase
from faststream._internal.constants import EMPTY
from faststream._internal.publisher.proto import PublisherProto
from faststream.message import gen_cor_id
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from typing import Optional, Self
from typing import Optional

from typing_extensions import Self

from faststream.specification.asyncapi.v2_6_0.schema.bindings.amqp import (
ChannelBinding as V2Binding,
Expand Down
8 changes: 6 additions & 2 deletions tests/brokers/kafka/test_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,9 @@ async def handler(msg):

await asyncio.wait(
(
asyncio.create_task(br._producer._producer.producer.send(queue, key=b"")),
asyncio.create_task(
br._producer._producer.producer.send(queue, key=b"")
),
asyncio.create_task(event.wait()),
),
timeout=3,
Expand All @@ -426,7 +428,9 @@ async def handler(msg):

await asyncio.wait(
(
asyncio.create_task(br._producer._producer.producer.send(queue, key=b"")),
asyncio.create_task(
br._producer._producer.producer.send(queue, key=b"")
),
asyncio.create_task(event.wait()),
),
timeout=3,
Expand Down
4 changes: 0 additions & 4 deletions tests/prometheus/confluent/basic.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
from typing import Any

from faststream import AckPolicy
from faststream.confluent import KafkaBroker
from faststream.confluent.prometheus import KafkaPrometheusMiddleware
from tests.brokers.confluent.basic import ConfluentTestcaseConfig


class KafkaPrometheusSettings(ConfluentTestcaseConfig):
messaging_system = "kafka"

def get_broker(self, apply_types=False, **kwargs: Any) -> KafkaBroker:
return KafkaBroker(apply_types=apply_types, **kwargs)

def get_middleware(self, **kwargs: Any) -> KafkaPrometheusMiddleware:
return KafkaPrometheusMiddleware(**kwargs)

Expand Down
4 changes: 0 additions & 4 deletions tests/prometheus/kafka/basic.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
from typing import Any

from faststream import AckPolicy
from faststream.kafka import KafkaBroker
from faststream.kafka.prometheus import KafkaPrometheusMiddleware
from tests.brokers.kafka.basic import KafkaTestcaseConfig


class KafkaPrometheusSettings(KafkaTestcaseConfig):
messaging_system = "kafka"

def get_broker(self, apply_types=False, **kwargs: Any) -> KafkaBroker:
return KafkaBroker(apply_types=apply_types, **kwargs)

def get_middleware(self, **kwargs: Any) -> KafkaPrometheusMiddleware:
return KafkaPrometheusMiddleware(**kwargs)

Expand Down
3 changes: 2 additions & 1 deletion tests/prometheus/nats/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

from faststream.nats import NatsBroker
from faststream.nats.prometheus import NatsPrometheusMiddleware
from tests.brokers.nats.basic import NatsTestcaseConfig


class NatsPrometheusSettings:
class NatsPrometheusSettings(NatsTestcaseConfig):
messaging_system = "nats"

def get_broker(self, apply_types=False, **kwargs: Any) -> NatsBroker:
Expand Down
4 changes: 3 additions & 1 deletion tests/prometheus/nats/test_nats.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ def stream(queue):

@pytest.mark.nats()
class TestPrometheus(
NatsPrometheusSettings, LocalPrometheusTestcase, LocalRPCPrometheusTestcase
NatsPrometheusSettings,
LocalPrometheusTestcase,
LocalRPCPrometheusTestcase,
):
async def test_metrics_batch(
self,
Expand Down
3 changes: 2 additions & 1 deletion tests/prometheus/nats/test_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@


class LocalBaseNatsMetricsSettingsProviderTestcase(
NatsPrometheusSettings, LocalMetricsSettingsProviderTestcase
NatsPrometheusSettings,
LocalMetricsSettingsProviderTestcase,
):
def test_get_publish_destination_name_from_cmd(self, queue: str) -> None:
expected_destination_name = queue
Expand Down
4 changes: 0 additions & 4 deletions tests/prometheus/rabbit/basic.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
from typing import Any

from faststream.rabbit import RabbitBroker
from faststream.rabbit.prometheus import RabbitPrometheusMiddleware
from tests.brokers.rabbit.basic import RabbitTestcaseConfig


class RabbitPrometheusSettings(RabbitTestcaseConfig):
messaging_system = "rabbitmq"

def get_broker(self, apply_types=False, **kwargs: Any) -> RabbitBroker:
return RabbitBroker(apply_types=apply_types, **kwargs)

def get_middleware(self, **kwargs: Any) -> RabbitPrometheusMiddleware:
return RabbitPrometheusMiddleware(**kwargs)
6 changes: 1 addition & 5 deletions tests/prometheus/rabbit/test_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,14 @@

from faststream.prometheus import MetricsSettingsProvider
from faststream.rabbit.prometheus.provider import RabbitMetricsSettingsProvider
from tests.prometheus.basic import (
LocalMetricsSettingsProviderTestcase,
LocalRPCPrometheusTestcase,
)
from tests.prometheus.basic import LocalMetricsSettingsProviderTestcase

from .basic import RabbitPrometheusSettings


class TestRabbitMetricsSettingsProvider(
RabbitPrometheusSettings,
LocalMetricsSettingsProviderTestcase,
LocalRPCPrometheusTestcase,
):
@staticmethod
def get_provider() -> MetricsSettingsProvider:
Expand Down
4 changes: 0 additions & 4 deletions tests/prometheus/redis/basic.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
from typing import Any

from faststream.redis import RedisBroker
from faststream.redis.prometheus import RedisPrometheusMiddleware
from tests.brokers.redis.basic import RedisTestcaseConfig


class RedisPrometheusSettings(RedisTestcaseConfig):
messaging_system = "redis"

def get_broker(self, apply_types=False, **kwargs: Any) -> RedisBroker:
return RedisBroker(apply_types=apply_types, **kwargs)

def get_middleware(self, **kwargs: Any) -> RedisPrometheusMiddleware:
return RedisPrometheusMiddleware(**kwargs)
Loading