Skip to content

Commit

Permalink
lint: fix mypy a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Dec 4, 2024
1 parent 1fd1039 commit af6f92f
Show file tree
Hide file tree
Showing 28 changed files with 217 additions and 236 deletions.
1 change: 1 addition & 0 deletions faststream/_internal/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
__all__ = (
"HAS_TYPER",
"PYDANTIC_V2",
"BaseModel",
"CoreSchema",
"EmailStr",
"GetJsonSchemaHandler",
Expand Down
28 changes: 18 additions & 10 deletions faststream/_internal/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
import sys
import warnings
from contextlib import suppress
from typing import TYPE_CHECKING, Any, Optional
from typing import TYPE_CHECKING, Any, Optional, cast

import anyio
import typer

from faststream import FastStream
from faststream.__about__ import __version__
from faststream._internal._compat import json_loads
from faststream._internal.application import Application
from faststream._internal.cli.docs import asyncapi_app
from faststream._internal.cli.utils.imports import import_from_string
Expand Down Expand Up @@ -122,6 +123,7 @@ def run(

# Should be imported after sys.path changes
module_path, app_obj = import_from_string(app, is_factory=is_factory)
app_obj = cast(Application, app_obj)

args = (app, extra, is_factory, casted_log_level)

Expand Down Expand Up @@ -160,7 +162,7 @@ def run(
workers=workers,
).run()
else:
args[1]["workers"] = workers
args[1]["workers"] = str(workers)
_run(*args)

else:
Expand All @@ -181,6 +183,7 @@ def _run(
) -> None:
"""Runs the specified application."""
_, app_obj = import_from_string(app, is_factory=is_factory)
app_obj = cast(Application, app_obj)
_run_imported_app(
app_obj,
extra_options=extra_options,
Expand Down Expand Up @@ -235,7 +238,7 @@ def publish(
),
message: str = typer.Argument(
...,
help="Message to be published.",
help="JSON Message string to publish.",
),
rpc: bool = typer.Option(
False,
Expand All @@ -255,9 +258,9 @@ def publish(
"""
app, extra = parse_cli_args(app, *ctx.args)

extra["message"] = message
if "timeout" in extra:
extra["timeout"] = float(extra["timeout"])
publish_extra: AnyDict = extra.copy()
if "timeout" in publish_extra:
publish_extra["timeout"] = float(publish_extra["timeout"])

try:
_, app_obj = import_from_string(app, is_factory=is_factory)
Expand All @@ -269,7 +272,7 @@ def publish(
raise ValueError(msg)

app_obj._setup()
result = anyio.run(publish_message, app_obj.broker, rpc, extra)
result = anyio.run(publish_message, app_obj.broker, rpc, message, publish_extra)

if rpc:
typer.echo(result)
Expand All @@ -282,13 +285,18 @@ def publish(
async def publish_message(
broker: "BrokerUsecase[Any, Any]",
rpc: bool,
message: str,
extra: "AnyDict",
) -> Any:
with suppress(Exception):
message = json_loads(message)

try:
async with broker:
if rpc:
return await broker.request(**extra)
return await broker.publish(**extra)
return await broker.request(message, **extra) # type: ignore[call-arg]
return await broker.publish(message, **extra) # type: ignore[call-arg]

except Exception as e:
typer.echo(f"Error when broker was publishing: {e}")
typer.echo(f"Error when broker was publishing: {e!r}")
sys.exit(1)
2 changes: 1 addition & 1 deletion faststream/_internal/fastapi/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from faststream._internal.broker.broker import BrokerUsecase
from faststream._internal.proto import NameRequired
from faststream._internal.publisher.proto import PublisherProto
from faststream._internal.subscriber.call_wrapper.call import HandlerCallWrapper
from faststream._internal.subscriber.call_wrapper import HandlerCallWrapper
from faststream._internal.types import BrokerMiddleware
from faststream.message import StreamMessage
from faststream.specification.base.specification import Specification
Expand Down
28 changes: 26 additions & 2 deletions faststream/_internal/proto.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,32 @@
from abc import abstractmethod
from typing import Any, Optional, Protocol, TypeVar, Union, overload
from typing import Any, Callable, Optional, Protocol, TypeVar, Union, overload

from faststream._internal.subscriber.call_wrapper import (
HandlerCallWrapper,
ensure_call_wrapper,
)
from faststream._internal.types import (
MsgType,
P_HandlerParams,
T_HandlerReturn,
)

class Endpoint(Protocol):

class EndpointWrapper(Protocol[MsgType]):
def __call__(
self,
func: Union[
Callable[P_HandlerParams, T_HandlerReturn],
HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn],
],
) -> HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]:
handler: HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn] = (
ensure_call_wrapper(func)
)
return handler


class Endpoint(EndpointWrapper[MsgType]):
@abstractmethod
def add_prefix(self, prefix: str) -> None: ...

Expand Down
34 changes: 16 additions & 18 deletions faststream/_internal/publisher/proto.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
from abc import abstractmethod
from collections.abc import Iterable, Sequence
from typing import TYPE_CHECKING, Any, Callable, Generic, Optional, Protocol

from typing_extensions import override
from typing import (
TYPE_CHECKING,
Any,
Optional,
Protocol,
)

from faststream._internal.proto import Endpoint
from faststream._internal.types import MsgType
from faststream._internal.types import (
MsgType,
)
from faststream.response.response import PublishCommand

if TYPE_CHECKING:
Expand All @@ -14,9 +19,7 @@
from faststream._internal.types import (
AsyncCallable,
BrokerMiddleware,
P_HandlerParams,
PublisherMiddleware,
T_HandlerReturn,
)
from faststream.response.response import PublishCommand

Expand Down Expand Up @@ -88,28 +91,23 @@ async def request(


class PublisherProto(
Endpoint,
Endpoint[MsgType],
BasePublisherProto,
Generic[MsgType],
):
_broker_middlewares: Sequence["BrokerMiddleware[MsgType]"]
_middlewares: Sequence["PublisherMiddleware"]
_producer: Optional["ProducerProto"]

@property
@abstractmethod
def _producer(self) -> "ProducerProto": ...

@abstractmethod
def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None: ...

@override
@abstractmethod
def _setup( # type: ignore[override]
def _setup(
self,
*,
producer: Optional["ProducerProto"],
state: "Pointer[BrokerState]",
producer: "ProducerProto",
) -> None: ...

@abstractmethod
def __call__(
self,
func: "Callable[P_HandlerParams, T_HandlerReturn]",
) -> "Callable[P_HandlerParams, T_HandlerReturn]": ...
10 changes: 5 additions & 5 deletions faststream/_internal/publisher/specified.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
if TYPE_CHECKING:
from faststream._internal.basic_types import AnyCallable, AnyDict
from faststream._internal.state import BrokerState, Pointer
from faststream._internal.subscriber.call_wrapper.call import HandlerCallWrapper
from faststream._internal.subscriber.call_wrapper import HandlerCallWrapper


class SpecificationPublisher(EndpointSpecification[PublisherSpec]):
class SpecificationPublisher(EndpointSpecification[MsgType, PublisherSpec]):
"""A base class for publishers in an asynchronous API."""

_state: "Pointer[BrokerState]" # should be set in next parent
Expand All @@ -44,9 +44,9 @@ def __call__(
"HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]",
],
) -> "HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]":
func = super().__call__(func)
self.calls.append(func._original_call)
return func
handler = super().__call__(func)
self.calls.append(handler._original_call)
return handler

def get_payloads(self) -> list[tuple["AnyDict", str]]:
payloads: list[tuple[AnyDict, str]] = []
Expand Down
21 changes: 9 additions & 12 deletions faststream/_internal/publisher/usecase.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from collections.abc import Awaitable, Iterable
from collections.abc import Awaitable, Iterable, Sequence
from functools import partial
from itertools import chain
from typing import (
Expand All @@ -15,9 +15,8 @@
from faststream._internal.publisher.proto import PublisherProto
from faststream._internal.state import BrokerState, EmptyBrokerState, Pointer
from faststream._internal.state.producer import ProducerUnset
from faststream._internal.subscriber.call_wrapper.call import (
from faststream._internal.subscriber.call_wrapper import (
HandlerCallWrapper,
ensure_call_wrapper,
)
from faststream._internal.subscriber.utils import process_msg
from faststream._internal.types import (
Expand All @@ -42,8 +41,8 @@ class PublisherUsecase(PublisherProto[MsgType]):
def __init__(
self,
*,
broker_middlewares: Iterable["BrokerMiddleware[MsgType]"],
middlewares: Iterable["PublisherMiddleware"],
broker_middlewares: Sequence["BrokerMiddleware[MsgType]"],
middlewares: Sequence["PublisherMiddleware"],
) -> None:
self.middlewares = middlewares
self._broker_middlewares = broker_middlewares
Expand All @@ -65,7 +64,7 @@ def _producer(self) -> "ProducerProto":
return self.__producer or self._state.get().producer

@override
def _setup( # type: ignore[override]
def _setup(
self,
*,
state: "Pointer[BrokerState]",
Expand Down Expand Up @@ -97,9 +96,7 @@ def __call__(
],
) -> HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]:
"""Decorate user's function by current publisher."""
handler: HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn] = (
ensure_call_wrapper(func)
)
handler = super().__call__(func)
handler._publishers.append(self)
return handler

Expand All @@ -125,7 +122,7 @@ async def _basic_publish(
):
pub = partial(pub_m, pub)

await pub(cmd)
return await pub(cmd)

async def _basic_request(
self,
Expand Down Expand Up @@ -163,7 +160,7 @@ async def _basic_publish_batch(
cmd: "PublishCommand",
*,
_extra_middlewares: Iterable["PublisherMiddleware"],
) -> Optional[Any]:
) -> Any:
pub = self._producer.publish_batch

context = self._state.get().di_state.context
Expand All @@ -180,4 +177,4 @@ async def _basic_publish_batch(
):
pub = partial(pub_m, pub)

await pub(cmd)
return await pub(cmd)
4 changes: 4 additions & 0 deletions faststream/_internal/state/logger/logger_proxy.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from abc import abstractmethod
from collections.abc import Mapping
from typing import Any, Optional

Expand All @@ -8,6 +9,7 @@
class LoggerObject(LoggerProto):
logger: Optional["LoggerProto"]

@abstractmethod
def __bool__(self) -> bool: ...


Expand Down Expand Up @@ -73,6 +75,8 @@ class RealLoggerObject(LoggerObject):
or in default logger case (.params_storage.DefaultLoggerStorage).
"""

logger: "LoggerProto"

def __init__(self, logger: "LoggerProto") -> None:
self.logger = logger

Expand Down
6 changes: 3 additions & 3 deletions faststream/_internal/subscriber/call_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from faststream._internal.basic_types import AsyncFuncAny, Decorator
from faststream._internal.state import BrokerState, Pointer
from faststream._internal.subscriber.call_wrapper.call import HandlerCallWrapper
from faststream._internal.subscriber.call_wrapper import HandlerCallWrapper
from faststream._internal.types import (
AsyncCallable,
AsyncFilter,
Expand Down Expand Up @@ -128,8 +128,8 @@ async def is_suitable(
if not (parser := cast(Optional["AsyncCallable"], self.item_parser)) or not (
decoder := cast(Optional["AsyncCallable"], self.item_decoder)
):
msg = "You should setup `HandlerItem` at first."
raise SetupError(msg)
error_msg = "You should setup `HandlerItem` at first."
raise SetupError(error_msg)

message = cache[parser] = cast(
"StreamMessage[MsgType]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

if TYPE_CHECKING:
from fast_depends.dependencies import Dependant
from fast_depends.use import InjectWrapper

from faststream._internal.basic_types import Decorator
from faststream._internal.publisher.proto import PublisherProto
Expand Down Expand Up @@ -88,7 +87,7 @@ def __call__(
async def call_wrapped(
self,
message: "StreamMessage[MsgType]",
) -> Awaitable[Any]:
) -> Any:
"""Calls the wrapped function with the given message."""
assert self._wrapped_call, "You should use `set_wrapped` first" # nosec B101
if self.is_test:
Expand Down Expand Up @@ -145,7 +144,7 @@ def refresh(self, with_mock: bool = False) -> None:
def set_wrapped(
self,
*,
dependencies: Iterable["Dependant"],
dependencies: Sequence["Dependant"],
_call_decorators: Iterable["Decorator"],
state: "DIState",
) -> Optional["CallModel"]:
Expand All @@ -166,7 +165,7 @@ def set_wrapped(
)

if state.use_fastdepends:
wrapper: InjectWrapper[Any, Any] = inject(
wrapper = inject(
func=None,
context__=state.context,
)
Expand Down
Empty file.
Loading

0 comments on commit af6f92f

Please sign in to comment.