Skip to content

Commit

Permalink
refactor: new Specification Schema (#1843)
Browse files Browse the repository at this point in the history
* refactor: new Specification Schema

* fix: add missing pre-commit changes

* refactor: delete defaults

* refactor: polish AsyncAPI all brokers

* fix: add missing pre-commit changes

---------

Co-authored-by: Lancetnik <[email protected]>
  • Loading branch information
Lancetnik and Lancetnik authored Nov 20, 2024
1 parent 6c2d155 commit f22c2d5
Show file tree
Hide file tree
Showing 178 changed files with 2,141 additions and 2,326 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from faststream import FastStream
from faststream.specification.asyncapi import AsyncAPI
from faststream.specification.schema.license import License
from faststream.specification.schema.contact import Contact
from faststream.specification import License, Contact
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")
Expand Down
5 changes: 2 additions & 3 deletions faststream/_internal/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def model_to_jsonable(
def dump_json(data: Any) -> bytes:
return json_dumps(model_to_jsonable(data))

def get_model_fields(model: type[BaseModel]) -> dict[str, Any]:
def get_model_fields(model: type[BaseModel]) -> AnyDict:
return model.model_fields

def model_to_json(model: BaseModel, **kwargs: Any) -> str:
Expand Down Expand Up @@ -140,7 +140,7 @@ def model_schema(model: type[BaseModel], **kwargs: Any) -> AnyDict:
def dump_json(data: Any) -> bytes:
return json_dumps(data, default=pydantic_encoder)

def get_model_fields(model: type[BaseModel]) -> dict[str, Any]:
def get_model_fields(model: type[BaseModel]) -> AnyDict:
return model.__fields__ # type: ignore[return-value]

def model_to_json(model: BaseModel, **kwargs: Any) -> str:
Expand Down Expand Up @@ -187,7 +187,6 @@ def with_info_plain_validator_function( # type: ignore[misc]
ExceptionGroup,
)


try:
import email_validator

Expand Down
2 changes: 1 addition & 1 deletion faststream/_internal/basic_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
class StandardDataclass(Protocol):
"""Protocol to check type is dataclass."""

__dataclass_fields__: ClassVar[dict[str, Any]]
__dataclass_fields__: ClassVar[AnyDict]


BaseSendableMessage: TypeAlias = Union[
Expand Down
6 changes: 4 additions & 2 deletions faststream/_internal/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
MsgType,
)
from faststream._internal.utils.functions import to_async
from faststream.specification.proto import ServerSpecification

from .abc_broker import ABCBroker
from .pub_base import BrokerPublishMixin
Expand All @@ -51,12 +52,13 @@
PublisherProto,
)
from faststream.security import BaseSecurity
from faststream.specification.schema.tag import Tag, TagDict
from faststream.specification.schema.extra import Tag, TagDict


class BrokerUsecase(
ABCBroker[MsgType],
SetupAble,
ServerSpecification,
BrokerPublishMixin[MsgType],
Generic[MsgType, ConnectionType],
):
Expand Down Expand Up @@ -121,7 +123,7 @@ def __init__(
Doc("AsyncAPI server description."),
],
tags: Annotated[
Optional[Iterable[Union["Tag", "TagDict"]]],
Iterable[Union["Tag", "TagDict"]],
Doc("AsyncAPI server tags."),
],
specification_url: Annotated[
Expand Down
8 changes: 6 additions & 2 deletions faststream/_internal/cli/docs/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@
from faststream._internal.cli.utils.imports import import_from_string
from faststream.exceptions import INSTALL_WATCHFILES, INSTALL_YAML, SCHEMA_NOT_SUPPORTED
from faststream.specification.asyncapi.site import serve_app
from faststream.specification.asyncapi.v2_6_0.schema import Schema as SchemaV2_6
from faststream.specification.asyncapi.v3_0_0.schema import Schema as SchemaV3
from faststream.specification.asyncapi.v2_6_0.schema import (
ApplicationSchema as SchemaV2_6,
)
from faststream.specification.asyncapi.v3_0_0.schema import (
ApplicationSchema as SchemaV3,
)
from faststream.specification.base.specification import Specification

if TYPE_CHECKING:
Expand Down
4 changes: 3 additions & 1 deletion faststream/_internal/cli/utils/imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@


def import_from_string(
import_str: str, *, is_factory: bool = False
import_str: str,
*,
is_factory: bool = False,
) -> tuple[Path, object]:
module_path, instance = _import_object_or_factory(import_str)

Expand Down
4 changes: 2 additions & 2 deletions faststream/_internal/fastapi/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
from faststream._internal.types import BrokerMiddleware
from faststream.message import StreamMessage
from faststream.specification.base.specification import Specification
from faststream.specification.schema.tag import Tag, TagDict
from faststream.specification.schema.extra import Tag, TagDict


class _BackgroundMiddleware(BaseMiddleware):
Expand Down Expand Up @@ -121,7 +121,7 @@ def __init__(
generate_unique_id,
),
# Specification information
specification_tags: Optional[Iterable[Union["Tag", "TagDict"]]] = None,
specification_tags: Iterable[Union["Tag", "TagDict"]] = (),
schema_url: Optional[str] = "/asyncapi",
**connection_kwars: Any,
) -> None:
Expand Down
36 changes: 20 additions & 16 deletions faststream/_internal/publisher/specified.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,50 @@
from inspect import Parameter, unwrap
from typing import TYPE_CHECKING, Any, Optional
from typing import TYPE_CHECKING, Any, Callable, Optional, Union

from fast_depends.core import build_call_model
from fast_depends.pydantic._compat import create_model, get_config_base

from faststream._internal.publisher.proto import PublisherProto
from faststream._internal.subscriber.call_wrapper.call import HandlerCallWrapper
from faststream._internal.types import (
MsgType,
P_HandlerParams,
T_HandlerReturn,
)
from faststream.specification.asyncapi.message import get_model_schema
from faststream.specification.asyncapi.utils import to_camelcase
from faststream.specification.base.proto import SpecificationEndpoint
from faststream.specification.proto import EndpointSpecification
from faststream.specification.schema import PublisherSpec

if TYPE_CHECKING:
from faststream._internal.basic_types import AnyDict
from faststream._internal.basic_types import AnyCallable, AnyDict
from faststream._internal.state import BrokerState, Pointer
from faststream._internal.subscriber.call_wrapper.call import HandlerCallWrapper


class BaseSpicificationPublisher(SpecificationEndpoint, PublisherProto[MsgType]):
class SpecificationPublisher(EndpointSpecification[PublisherSpec]):
"""A base class for publishers in an asynchronous API."""

_state: "Pointer[BrokerState]" # should be set in next parent

def __init__(
self,
*,
*args: Any,
schema_: Optional[Any],
title_: Optional[str],
description_: Optional[str],
include_in_schema: bool,
**kwargs: Any,
) -> None:
self.calls = []
self.calls: list[AnyCallable] = []

self.title_ = title_
self.description_ = description_
self.include_in_schema = include_in_schema
self.schema_ = schema_

super().__init__(*args, **kwargs)

def __call__(
self,
func: HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn],
) -> HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]:
func: Union[
Callable[P_HandlerParams, T_HandlerReturn],
"HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]",
],
) -> "HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]":
func = super().__call__(func)
self.calls.append(func._original_call)
return func

Expand Down
17 changes: 1 addition & 16 deletions faststream/_internal/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
)
from faststream.message.source_type import SourceType

from .specified import BaseSpicificationPublisher

if TYPE_CHECKING:
from faststream._internal.publisher.proto import ProducerProto
from faststream._internal.types import (
Expand All @@ -38,19 +36,14 @@
from faststream.response.response import PublishCommand


class PublisherUsecase(BaseSpicificationPublisher, PublisherProto[MsgType]):
class PublisherUsecase(PublisherProto[MsgType]):
"""A base class for publishers in an asynchronous API."""

def __init__(
self,
*,
broker_middlewares: Iterable["BrokerMiddleware[MsgType]"],
middlewares: Iterable["PublisherMiddleware"],
# AsyncAPI args
schema_: Optional[Any],
title_: Optional[str],
description_: Optional[str],
include_in_schema: bool,
) -> None:
self.middlewares = middlewares
self._broker_middlewares = broker_middlewares
Expand All @@ -60,13 +53,6 @@ def __init__(
self._fake_handler = False
self.mock: Optional[MagicMock] = None

super().__init__(
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
schema_=schema_,
)

self._state: Pointer[BrokerState] = Pointer(
EmptyBrokerState("You should include publisher to any broker.")
)
Expand Down Expand Up @@ -115,7 +101,6 @@ def __call__(
ensure_call_wrapper(func)
)
handler._publishers.append(self)
super().__call__(handler)
return handler

async def _basic_publish(
Expand Down
7 changes: 2 additions & 5 deletions faststream/_internal/subscriber/proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
ProducerProto,
)
from faststream._internal.state import BrokerState, Pointer
from faststream._internal.subscriber.call_item import HandlerItem
from faststream._internal.types import (
BrokerMiddleware,
CustomCallable,
Expand All @@ -27,6 +26,8 @@
from faststream.message import StreamMessage
from faststream.response import Response

from .call_item import HandlerItem


class SubscriberProto(
Endpoint,
Expand Down Expand Up @@ -68,10 +69,6 @@ def _make_response_publisher(
message: "StreamMessage[MsgType]",
) -> Iterable["BasePublisherProto"]: ...

@property
@abstractmethod
def call_name(self) -> str: ...

@abstractmethod
async def start(self) -> None: ...

Expand Down
34 changes: 21 additions & 13 deletions faststream/_internal/subscriber/specified.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,38 @@
from typing import (
TYPE_CHECKING,
Any,
Optional,
)

from faststream._internal.subscriber.proto import SubscriberProto
from faststream._internal.types import MsgType
from faststream.exceptions import SetupError
from faststream.specification.asyncapi.message import parse_handler_params
from faststream.specification.asyncapi.utils import to_camelcase
from faststream.specification.base.proto import SpecificationEndpoint
from faststream.specification.proto import EndpointSpecification
from faststream.specification.schema import SubscriberSpec

if TYPE_CHECKING:
from faststream._internal.basic_types import AnyDict
from faststream._internal.types import (
MsgType,
)

from .call_item import HandlerItem


class SpecificationSubscriber(
EndpointSpecification[SubscriberSpec],
):
calls: list["HandlerItem[MsgType]"]

class BaseSpicificationSubscriber(SpecificationEndpoint, SubscriberProto[MsgType]):
def __init__(
self,
*,
title_: Optional[str],
description_: Optional[str],
include_in_schema: bool,
*args: Any,
**kwargs: Any,
) -> None:
self.title_ = title_
self.description_ = description_
self.include_in_schema = include_in_schema
self.calls = []

# Call next base class parent init
super().__init__(*args, **kwargs)

@property
def call_name(self) -> str:
Expand All @@ -34,9 +42,9 @@ def call_name(self) -> str:

return to_camelcase(self.calls[0].call_name)

def get_description(self) -> Optional[str]:
def get_default_description(self) -> Optional[str]:
"""Returns the description of the handler."""
if not self.calls: # pragma: no cover
if not self.calls:
return None

return self.calls[0].description
Expand Down
14 changes: 1 addition & 13 deletions faststream/_internal/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
from faststream.middlewares.logging import CriticalLogMiddleware
from faststream.response import ensure_response

from .specified import BaseSpicificationSubscriber

if TYPE_CHECKING:
from fast_depends.dependencies import Dependant

Expand Down Expand Up @@ -79,7 +77,7 @@ def __init__(
self.dependencies = dependencies


class SubscriberUsecase(BaseSpicificationSubscriber, SubscriberProto[MsgType]):
class SubscriberUsecase(SubscriberProto[MsgType]):
"""A class representing an asynchronous handler."""

lock: "AbstractContextManager[Any]"
Expand All @@ -100,18 +98,8 @@ def __init__(
default_parser: "AsyncCallable",
default_decoder: "AsyncCallable",
ack_policy: AckPolicy,
# AsyncAPI information
title_: Optional[str],
description_: Optional[str],
include_in_schema: bool,
) -> None:
"""Initialize a new instance of the class."""
super().__init__(
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
)

self.calls = []

self._parser = default_parser
Expand Down
19 changes: 15 additions & 4 deletions faststream/_internal/utils/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,19 @@
TypedDictCls = TypeVar("TypedDictCls")


def filter_by_dict(typed_dict: type[TypedDictCls], data: AnyDict) -> TypedDictCls:
def filter_by_dict(
typed_dict: type[TypedDictCls],
data: AnyDict,
) -> tuple[TypedDictCls, AnyDict]:
annotations = typed_dict.__annotations__
return typed_dict( # type: ignore[call-arg]
{k: v for k, v in data.items() if k in annotations},
)

out_data = {}
extra_data = {}

for k, v in data.items():
if k in annotations:
out_data[k] = v
else:
extra_data[k] = v

return typed_dict(out_data), extra_data
Loading

0 comments on commit f22c2d5

Please sign in to comment.