Skip to content

Commit

Permalink
lint: fix rabbit otel & prometheus mypy
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Dec 15, 2024
1 parent 2bb82c2 commit c7f4bc3
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 47 deletions.
14 changes: 12 additions & 2 deletions faststream/_internal/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
Union,
)

from typing_extensions import ParamSpec, TypeAlias
from typing_extensions import (
ParamSpec,
TypeAlias,
TypeVar as TypeVar313,
)

from faststream._internal.basic_types import AsyncFuncAny
from faststream._internal.context.repository import ContextRepo
Expand All @@ -21,7 +25,6 @@
StreamMsg = TypeVar("StreamMsg", bound=StreamMessage[Any])
ConnectionType = TypeVar("ConnectionType")


SyncFilter: TypeAlias = Callable[[StreamMsg], bool]
AsyncFilter: TypeAlias = Callable[[StreamMsg], Awaitable[bool]]
Filter: TypeAlias = Union[
Expand Down Expand Up @@ -82,6 +85,13 @@ def __call__(
]


PublishCommandType = TypeVar313(
"PublishCommandType",
bound=PublishCommand,
default=PublishCommand,
)


class PublisherMiddleware(Protocol):
"""Publisher middleware interface."""

Expand Down
22 changes: 7 additions & 15 deletions faststream/middlewares/base.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from collections.abc import Awaitable
from typing import TYPE_CHECKING, Any, Callable, Generic, Optional

# We should use typing_extensions.TypeVar until python3.13 due default
from typing_extensions import Self, TypeVar
from typing_extensions import Self

from faststream.response.response import PublishCommand
from faststream._internal.types import PublishCommandType

if TYPE_CHECKING:
from types import TracebackType
Expand All @@ -14,14 +13,7 @@
from faststream.message import StreamMessage


PublishCommand_T = TypeVar(
"PublishCommand_T",
bound=PublishCommand,
default=PublishCommand,
)


class BaseMiddleware(Generic[PublishCommand_T]):
class BaseMiddleware(Generic[PublishCommandType]):
"""A base middleware class."""

def __init__(
Expand Down Expand Up @@ -92,8 +84,8 @@ async def consume_scope(

async def on_publish(
self,
msg: PublishCommand_T,
) -> PublishCommand_T:
msg: PublishCommandType,
) -> PublishCommandType:
"""This option was deprecated and will be removed in 0.6.10. Please, use `publish_scope` instead."""
return msg

Expand All @@ -107,8 +99,8 @@ async def after_publish(

async def publish_scope(
self,
call_next: Callable[[PublishCommand_T], Awaitable[Any]],
cmd: PublishCommand_T,
call_next: Callable[[PublishCommandType], Awaitable[Any]],
cmd: PublishCommandType,
) -> Any:
"""Publish a message and return an async iterator."""
err: Optional[Exception] = None
Expand Down
18 changes: 9 additions & 9 deletions faststream/opentelemetry/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from opentelemetry.trace import Link, Span
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

from faststream.middlewares.base import BaseMiddleware, PublishCommand_T
from faststream._internal.types import PublishCommandType
from faststream.middlewares.base import BaseMiddleware
from faststream.opentelemetry.baggage import Baggage
from faststream.opentelemetry.consts import (
ERROR_TYPE,
Expand All @@ -32,14 +33,13 @@
from faststream._internal.context.repository import ContextRepo
from faststream.message import StreamMessage
from faststream.opentelemetry.provider import TelemetrySettingsProvider
from faststream.response.response import PublishCommand


_BAGGAGE_PROPAGATOR = W3CBaggagePropagator()
_TRACE_PROPAGATOR = TraceContextTextMapPropagator()


class TelemetryMiddleware(Generic[PublishCommand_T]):
class TelemetryMiddleware(Generic[PublishCommandType]):
__slots__ = (
"_meter",
"_metrics",
Expand All @@ -52,7 +52,7 @@ def __init__(
*,
settings_provider_factory: Callable[
[Any],
Optional["TelemetrySettingsProvider[Any]"],
Optional["TelemetrySettingsProvider[Any, PublishCommandType]"],
],
tracer_provider: Optional["TracerProvider"] = None,
meter_provider: Optional["MeterProvider"] = None,
Expand All @@ -70,8 +70,8 @@ def __call__(
/,
*,
context: "ContextRepo",
) -> "BaseTelemetryMiddleware[PublishCommand_T]":
return BaseTelemetryMiddleware[PublishCommand_T](
) -> "BaseTelemetryMiddleware[PublishCommandType]":
return BaseTelemetryMiddleware[PublishCommandType](
msg,
tracer=self._tracer,
metrics_container=self._metrics,
Expand Down Expand Up @@ -152,7 +152,7 @@ def observe_consume(
)


class BaseTelemetryMiddleware(BaseMiddleware[PublishCommand_T]):
class BaseTelemetryMiddleware(BaseMiddleware[PublishCommandType]):
def __init__(
self,
msg: Optional[Any],
Expand All @@ -161,7 +161,7 @@ def __init__(
tracer: "Tracer",
settings_provider_factory: Callable[
[Any],
Optional["TelemetrySettingsProvider[Any]"],
Optional["TelemetrySettingsProvider[Any, PublishCommandType]"],
],
metrics_container: _MetricsContainer,
context: "ContextRepo",
Expand All @@ -178,7 +178,7 @@ def __init__(
async def publish_scope(
self,
call_next: "AsyncFunc",
msg: "PublishCommand",
msg: "PublishCommandType",
) -> Any:
if (provider := self.__settings_provider) is None:
return await call_next(msg)
Expand Down
18 changes: 14 additions & 4 deletions faststream/opentelemetry/provider.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
from typing import TYPE_CHECKING, Protocol

from typing_extensions import TypeVar as TypeVar313

from faststream._internal.types import MsgType
from faststream.response import PublishCommand

if TYPE_CHECKING:
from faststream._internal.basic_types import AnyDict
from faststream.message import StreamMessage
from faststream.response.response import PublishCommand


class TelemetrySettingsProvider(Protocol[MsgType]):
PublishCommandType_contra = TypeVar313(
"PublishCommandType_contra",
bound=PublishCommand,
default=PublishCommand,
contravariant=True,
)


class TelemetrySettingsProvider(Protocol[MsgType, PublishCommandType_contra]):
messaging_system: str

def get_consume_attrs_from_message(
Expand All @@ -23,10 +33,10 @@ def get_consume_destination_name(

def get_publish_attrs_from_cmd(
self,
cmd: "PublishCommand",
cmd: PublishCommandType_contra,
) -> "AnyDict": ...

def get_publish_destination_name(
self,
cmd: "PublishCommand",
cmd: PublishCommandType_contra,
) -> str: ...
21 changes: 12 additions & 9 deletions faststream/prometheus/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from typing import TYPE_CHECKING, Any, Callable, Generic, Optional

from faststream._internal.constants import EMPTY
from faststream._internal.types import PublishCommandType
from faststream.exceptions import IgnoredException
from faststream.message import SourceType
from faststream.middlewares.base import BaseMiddleware, PublishCommand_T
from faststream.middlewares.base import BaseMiddleware
from faststream.prometheus.consts import (
PROCESSING_STATUS_BY_ACK_STATUS,
PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP,
Expand All @@ -24,14 +25,15 @@
from faststream.message.message import StreamMessage


class PrometheusMiddleware(Generic[PublishCommand_T]):
class PrometheusMiddleware(Generic[PublishCommandType]):
__slots__ = ("_metrics_container", "_metrics_manager", "_settings_provider_factory")

def __init__(
self,
*,
settings_provider_factory: Callable[
[Any], Optional[MetricsSettingsProvider[Any]]
[Any],
Optional[MetricsSettingsProvider[Any, PublishCommandType]],
],
registry: "CollectorRegistry",
app_name: str = EMPTY,
Expand All @@ -58,23 +60,24 @@ def __call__(
/,
*,
context: "ContextRepo",
) -> "BasePrometheusMiddleware[PublishCommand_T]":
return BasePrometheusMiddleware[PublishCommand_T](
) -> "BasePrometheusMiddleware[PublishCommandType]":
return BasePrometheusMiddleware[PublishCommandType](
msg,
metrics_manager=self._metrics_manager,
settings_provider_factory=self._settings_provider_factory,
context=context,
)


class BasePrometheusMiddleware(BaseMiddleware[PublishCommand_T]):
class BasePrometheusMiddleware(BaseMiddleware[PublishCommandType]):
def __init__(
self,
msg: Optional[Any],
/,
*,
settings_provider_factory: Callable[
[Any], Optional[MetricsSettingsProvider[Any]]
[Any],
Optional[MetricsSettingsProvider[Any, PublishCommandType]],
],
metrics_manager: MetricsManager,
context: "ContextRepo",
Expand Down Expand Up @@ -164,8 +167,8 @@ async def consume_scope(

async def publish_scope(
self,
call_next: Callable[[PublishCommand_T], Awaitable[Any]],
cmd: PublishCommand_T,
call_next: Callable[[PublishCommandType], Awaitable[Any]],
cmd: PublishCommandType,
) -> Any:
if self._settings_provider is None or cmd.publish_type is PublishType.REPLY:
return await call_next(cmd)
Expand Down
16 changes: 13 additions & 3 deletions faststream/prometheus/provider.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
from typing import TYPE_CHECKING, Protocol

from typing_extensions import TypeVar as TypeVar313

from faststream.message.message import MsgType, StreamMessage
from faststream.response.response import PublishCommand

if TYPE_CHECKING:
from faststream.prometheus import ConsumeAttrs
from faststream.response.response import PublishCommand


class MetricsSettingsProvider(Protocol[MsgType]):
PublishCommandType_contra = TypeVar313(
"PublishCommandType_contra",
bound=PublishCommand,
default=PublishCommand,
contravariant=True,
)


class MetricsSettingsProvider(Protocol[MsgType, PublishCommandType_contra]):
messaging_system: str

def get_consume_attrs_from_message(
Expand All @@ -17,5 +27,5 @@ def get_consume_attrs_from_message(

def get_publish_destination_name_from_cmd(
self,
cmd: "PublishCommand",
cmd: PublishCommandType_contra,
) -> str: ...
6 changes: 4 additions & 2 deletions faststream/rabbit/opentelemetry/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@

from faststream.opentelemetry import TelemetrySettingsProvider
from faststream.opentelemetry.consts import MESSAGING_DESTINATION_PUBLISH_NAME
from faststream.rabbit.response import RabbitPublishCommand

if TYPE_CHECKING:
from aio_pika import IncomingMessage

from faststream._internal.basic_types import AnyDict
from faststream.message import StreamMessage
from faststream.rabbit.response import RabbitPublishCommand


class RabbitTelemetrySettingsProvider(TelemetrySettingsProvider["IncomingMessage"]):
class RabbitTelemetrySettingsProvider(
TelemetrySettingsProvider["IncomingMessage", RabbitPublishCommand],
):
__slots__ = ("messaging_system",)

def __init__(self) -> None:
Expand Down
8 changes: 5 additions & 3 deletions faststream/rabbit/prometheus/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@
ConsumeAttrs,
MetricsSettingsProvider,
)
from faststream.rabbit.response import RabbitPublishCommand

if TYPE_CHECKING:
from aio_pika import IncomingMessage

from faststream.message.message import StreamMessage
from faststream.rabbit.response import RabbitPublishCommand


class RabbitMetricsSettingsProvider(MetricsSettingsProvider["IncomingMessage"]):
class RabbitMetricsSettingsProvider(
MetricsSettingsProvider["IncomingMessage", RabbitPublishCommand],
):
__slots__ = ("messaging_system",)

def __init__(self) -> None:
Expand All @@ -33,6 +35,6 @@ def get_consume_attrs_from_message(

def get_publish_destination_name_from_cmd(
self,
cmd: "RabbitPublishCommand",
cmd: RabbitPublishCommand,
) -> str:
return f"{cmd.exchange.name or 'default'}.{cmd.destination}"

0 comments on commit c7f4bc3

Please sign in to comment.