This looks really cool - as someone who had to create a
- distributed application using rabbitmq and aiopika - this would have saved me alot of time - I
+ distributed application using rabbitmq and aiopika - this would have saved me a lot of time - I
love the decorations for pub sub.
diff --git a/examples/e10_middlewares.py b/examples/e10_middlewares.py
index 6b20f05502..6b115e514b 100644
--- a/examples/e10_middlewares.py
+++ b/examples/e10_middlewares.py
@@ -15,10 +15,10 @@ async def after_processed(
self,
exc_type: Optional[Type[BaseException]] = None,
exc_val: Optional[BaseException] = None,
- exec_tb: Optional[TracebackType] = None,
+ exc_tb: Optional[TracebackType] = None,
) -> bool:
print("highlevel middleware out")
- return await super().after_processed(exc_type, exc_val, exec_tb)
+ return await super().after_processed(exc_type, exc_val, exc_tb)
class HandlerMiddleware(BaseMiddleware):
diff --git a/examples/fastapi_integration/app.py b/examples/fastapi_integration/app.py
index a5633084de..e9f7ccc579 100644
--- a/examples/fastapi_integration/app.py
+++ b/examples/fastapi_integration/app.py
@@ -10,7 +10,7 @@
@publisher
@router.subscriber("test-q")
-async def handler(user_id: int, logger: Logger):
+async def handler(user_id: int, logger: Logger) -> str:
logger.info(user_id)
return f"{user_id} created"
diff --git a/examples/howto/structlogs.py b/examples/howto/structlogs.py
index 2d55c2c390..467dbfc875 100644
--- a/examples/howto/structlogs.py
+++ b/examples/howto/structlogs.py
@@ -14,23 +14,27 @@ def merge_contextvars(
) -> structlog.types.EventDict:
event_dict["extra"] = event_dict.get(
"extra",
- context.get("log_context", {}),
+ context.get_local("log_context") or {},
)
return event_dict
-shared_processors = [
+shared_processors = (
merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.dev.set_exc_info,
structlog.processors.TimeStamper(fmt="iso"),
-]
+)
if sys.stderr.isatty():
- processors = shared_processors + [structlog.dev.ConsoleRenderer()]
+ processors = [
+ *shared_processors,
+ structlog.dev.ConsoleRenderer(),
+ ]
else:
- processors = shared_processors + [
+ processors = [
+ *shared_processors,
structlog.processors.dict_tracebacks,
structlog.processors.JSONRenderer(),
]
diff --git a/faststream/__about__.py b/faststream/__about__.py
index b18f065828..f075947a34 100644
--- a/faststream/__about__.py
+++ b/faststream/__about__.py
@@ -1,7 +1,9 @@
"""Simple and fast framework to create message brokers based microservices."""
-__version__ = "0.4.3"
+__version__ = "0.5.0"
+SERVICE_NAME = f"faststream-{__version__}"
+
INSTALL_YAML = """
To generate YAML documentation, please install dependencies:\n
pip install PyYAML
diff --git a/faststream/_compat.py b/faststream/_compat.py
index c3274870e9..872b3325ee 100644
--- a/faststream/_compat.py
+++ b/faststream/_compat.py
@@ -74,6 +74,13 @@ def raise_fastapi_validation_error(errors: List[Any], body: AnyDict) -> Never:
JsonSchemaValue = Mapping[str, Any]
+PValidationError: Optional[Type[Exception]]
+try:
+ from pydantic import ValidationError
+ PValidationError = ValidationError
+except ImportError:
+ PValidationError = None
+
if PYDANTIC_V2:
if PYDANTIC_VERSION >= "2.4.0":
from pydantic.annotated_handlers import (
@@ -110,9 +117,6 @@ def get_model_fields(model: Type[BaseModel]) -> Dict[str, Any]:
def model_to_json(model: BaseModel, **kwargs: Any) -> str:
return model.model_dump_json(**kwargs)
- def model_to_dict(model: BaseModel, **kwargs: Any) -> AnyDict:
- return model.model_dump(**kwargs)
-
def model_parse(
model: Type[ModelVar], data: Union[str, bytes], **kwargs: Any
) -> ModelVar:
@@ -121,9 +125,6 @@ def model_parse(
def model_schema(model: Type[BaseModel], **kwargs: Any) -> AnyDict:
return model.model_json_schema(**kwargs)
- def model_copy(model: ModelVar, **kwargs: Any) -> ModelVar:
- return model.model_copy(**kwargs)
-
else:
from pydantic.json import pydantic_encoder
@@ -141,9 +142,6 @@ def get_model_fields(model: Type[BaseModel]) -> Dict[str, Any]:
def model_to_json(model: BaseModel, **kwargs: Any) -> str:
return model.json(**kwargs)
- def model_to_dict(model: BaseModel, **kwargs: Any) -> AnyDict:
- return model.dict(**kwargs)
-
def model_parse(
model: Type[ModelVar], data: Union[str, bytes], **kwargs: Any
) -> ModelVar:
@@ -158,9 +156,6 @@ def model_to_jsonable(
) -> Any:
return json_loads(model.json(**kwargs))
- def model_copy(model: ModelVar, **kwargs: Any) -> ModelVar:
- return model.copy(**kwargs)
-
# TODO: pydantic types misc
def with_info_plain_validator_function( # type: ignore[misc]
function: Callable[..., Any],
diff --git a/faststream/annotations.py b/faststream/annotations.py
index f16b3c5d1c..5532daf9f5 100644
--- a/faststream/annotations.py
+++ b/faststream/annotations.py
@@ -3,12 +3,14 @@
from typing_extensions import Annotated
-from faststream.utils.context import Context as ContextField
+from faststream.app import FastStream as FS
+from faststream.utils.context import Context
from faststream.utils.context import ContextRepo as CR
from faststream.utils.no_cast import NoCast as NC
_NoCastType = TypeVar("_NoCastType")
-Logger = Annotated[logging.Logger, ContextField("logger")]
-ContextRepo = Annotated[CR, ContextField("context")]
+Logger = Annotated[logging.Logger, Context("logger")]
+ContextRepo = Annotated[CR, Context("context")]
NoCast = Annotated[_NoCastType, NC()]
+FastStream = Annotated[FS, Context("app")]
diff --git a/faststream/app.py b/faststream/app.py
index 69cbd97b84..76b4a5cfad 100644
--- a/faststream/app.py
+++ b/faststream/app.py
@@ -1,26 +1,25 @@
import logging
-from abc import ABC
-from typing import Any, Callable, Dict, List, Optional, Sequence, TypeVar, Union
+import logging.config
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Callable,
+ Dict,
+ List,
+ Optional,
+ Sequence,
+ TypeVar,
+ Union,
+)
import anyio
-from pydantic import AnyHttpUrl
from typing_extensions import ParamSpec
-from faststream._compat import ExceptionGroup
-from faststream.asyncapi.schema import (
- Contact,
- ContactDict,
- ExternalDocs,
- ExternalDocsDict,
- License,
- LicenseDict,
- Tag,
- TagDict,
-)
-from faststream.broker.core.asynchronous import BrokerAsyncUsecase
-from faststream.cli.supervisors.utils import HANDLED_SIGNALS, set_exit
-from faststream.log import logger
-from faststream.types import AnyCallable, AnyDict, AsyncFunc, Lifespan, SettingField
+from faststream._compat import PValidationError
+from faststream.cli.supervisors.utils import set_exit
+from faststream.exceptions import ValidationError
+from faststream.log.logging import logger
+from faststream.types import AnyDict, AsyncFunc, Lifespan, LoggerProtocol, SettingField
from faststream.utils import apply_types, context
from faststream.utils.functions import drop_response_type, fake_context, to_async
@@ -28,149 +27,22 @@
T_HookReturn = TypeVar("T_HookReturn")
-class ABCApp(ABC):
- """A class representing an ABC App.
-
- Attributes:
- _on_startup_calling : List of callable functions to be called on startup
- _after_startup_calling : List of callable functions to be called after startup
- _on_shutdown_calling : List of callable functions to be called on shutdown
- _after_shutdown_calling : List of callable functions to be called after shutdown
- broker : Optional broker object
- logger : Optional logger object
- title : Title of the app
- version : Version of the app
- description : Description of the app
- terms_of_service : Optional terms of service URL
- license : Optional license information
- contact : Optional contact information
- identifier : Optional identifier
- asyncapi_tags : Optional list of tags
- external_docs : Optional external documentation
-
- Methods:
- set_broker : Set the broker object
- on_startup : Add a hook to be run before the broker is connected
- on_shutdown : Add a hook to be run before the broker is disconnected
- after_startup : Add a hook to be run after the broker is connected
- after_shutdown : Add a hook to be run after the broker is disconnected
- _log : Log a message at a specified
- """
-
- _on_startup_calling: List[AnyCallable]
- _after_startup_calling: List[AnyCallable]
- _on_shutdown_calling: List[AnyCallable]
- _after_shutdown_calling: List[AnyCallable]
-
- def __init__(
- self,
- broker: Optional[BrokerAsyncUsecase[Any, Any]] = None,
- logger: Optional[logging.Logger] = logger,
- # AsyncAPI information
- title: str = "FastStream",
- version: str = "0.1.0",
- description: str = "",
- terms_of_service: Optional[AnyHttpUrl] = None,
- license: Optional[Union[License, LicenseDict, AnyDict]] = None,
- contact: Optional[Union[Contact, ContactDict, AnyDict]] = None,
- identifier: Optional[str] = None,
- tags: Optional[Sequence[Union[Tag, TagDict, AnyDict]]] = None,
- external_docs: Optional[Union[ExternalDocs, ExternalDocsDict, AnyDict]] = None,
- ) -> None:
- """Initialize an instance of the class.
-
- Args:
- broker: An optional instance of the BrokerAsyncUsecase class.
- logger: An optional instance of the logging.Logger class.
- title: A string representing the title of the AsyncAPI.
- version: A string representing the version of the AsyncAPI.
- description: A string representing the description of the AsyncAPI.
- terms_of_service: An optional URL representing the terms of service of the AsyncAPI.
- license: An optional instance of the License class.
- contact: An optional instance of the Contact class.
- identifier: An optional string representing the identifier of the AsyncAPI.
- tags: An optional sequence of Tag instances.
- external_docs: An optional instance of the ExternalDocs class.
- """
- self.broker = broker
- self.logger = logger
- self.context = context
- context.set_global("app", self)
-
- self._on_startup_calling = []
- self._after_startup_calling = []
- self._on_shutdown_calling = []
- self._after_shutdown_calling = []
-
- # AsyncAPI information
- self.title = title
- self.version = version
- self.description = description
- self.terms_of_service = terms_of_service
- self.license = license
- self.contact = contact
- self.identifier = identifier
- self.asyncapi_tags = tags
- self.external_docs = external_docs
-
- def set_broker(self, broker: BrokerAsyncUsecase[Any, Any]) -> None:
- """Set already existed App object broker.
-
- Useful then you create/init broker in `on_startup` hook.
- """
- self.broker = broker
-
- def on_startup(
- self,
- func: Callable[P_HookParams, T_HookReturn],
- ) -> Callable[P_HookParams, T_HookReturn]:
- """Add hook running BEFORE broker connected.
-
- This hook also takes an extra CLI options as a kwargs.
- """
- self._on_startup_calling.append(apply_types(func))
- return func
+if TYPE_CHECKING:
+ from faststream.asyncapi.schema import (
+ Contact,
+ ContactDict,
+ ExternalDocs,
+ ExternalDocsDict,
+ License,
+ LicenseDict,
+ Tag,
+ TagDict,
+ )
+ from faststream.broker.core.broker import BrokerUsecase
+ from faststream.types import AnyHttpUrl
- def on_shutdown(
- self,
- func: Callable[P_HookParams, T_HookReturn],
- ) -> Callable[P_HookParams, T_HookReturn]:
- """Add hook running BEFORE broker disconnected."""
- self._on_shutdown_calling.append(apply_types(func))
- return func
- def after_startup(
- self,
- func: Callable[P_HookParams, T_HookReturn],
- ) -> Callable[P_HookParams, T_HookReturn]:
- """Add hook running AFTER broker connected."""
- self._after_startup_calling.append(apply_types(func))
- return func
-
- def after_shutdown(
- self,
- func: Callable[P_HookParams, T_HookReturn],
- ) -> Callable[P_HookParams, T_HookReturn]:
- """Add hook running AFTER broker disconnected."""
- self._after_shutdown_calling.append(apply_types(func))
- return func
-
- def _log(self, level: int, message: str) -> None:
- """Logs a message with the specified log level.
-
- Args:
- level (int): The log level.
- message (str): The message to be logged.
-
- Returns:
- None
-
- """
- if self.logger is not None:
- self.logger.log(level, message)
-
-
-class FastStream(ABCApp):
+class FastStream:
"""A class representing a FastStream application.
Attributes:
@@ -200,19 +72,21 @@ class FastStream(ABCApp):
def __init__(
self,
- broker: Optional[BrokerAsyncUsecase[Any, Any]] = None,
- logger: Optional[logging.Logger] = logger,
+ broker: Optional["BrokerUsecase[Any, Any]"] = None,
+ logger: Optional[LoggerProtocol] = logger,
lifespan: Optional[Lifespan] = None,
# AsyncAPI args,
title: str = "FastStream",
version: str = "0.1.0",
description: str = "",
- terms_of_service: Optional[AnyHttpUrl] = None,
- license: Optional[Union[License, LicenseDict, AnyDict]] = None,
- contact: Optional[Union[Contact, ContactDict, AnyDict]] = None,
+ terms_of_service: Optional["AnyHttpUrl"] = None,
+ license: Optional[Union["License", "LicenseDict", AnyDict]] = None,
+ contact: Optional[Union["Contact", "ContactDict", AnyDict]] = None,
identifier: Optional[str] = None,
- tags: Optional[Sequence[Union[Tag, TagDict, AnyDict]]] = None,
- external_docs: Optional[Union[ExternalDocs, ExternalDocsDict, AnyDict]] = None,
+ tags: Optional[Sequence[Union["Tag", "TagDict", AnyDict]]] = None,
+ external_docs: Optional[
+ Union["ExternalDocs", "ExternalDocsDict", AnyDict]
+ ] = None,
) -> None:
"""Asynchronous FastStream Application class.
@@ -232,19 +106,16 @@ def __init__(
tags: application tags - for AsyncAPI docs
external_docs: application external docs - for AsyncAPI docs
"""
- super().__init__(
- broker=broker,
- logger=logger,
- title=title,
- version=version,
- description=description,
- terms_of_service=terms_of_service,
- license=license,
- contact=contact,
- identifier=identifier,
- tags=tags,
- external_docs=external_docs,
- )
+ context.set_global("app", self)
+
+ self.broker = broker
+ self.logger = logger
+ self.context = context
+
+ self._on_startup_calling = []
+ self._after_startup_calling = []
+ self._on_shutdown_calling = []
+ self._after_shutdown_calling = []
self.lifespan_context = (
apply_types(
@@ -255,6 +126,26 @@ def __init__(
else fake_context
)
+ self.should_exit = False
+
+ # AsyncAPI information
+ self.title = title
+ self.version = version
+ self.description = description
+ self.terms_of_service = terms_of_service
+ self.license = license
+ self.contact = contact
+ self.identifier = identifier
+ self.asyncapi_tags = tags
+ self.external_docs = external_docs
+
+ def set_broker(self, broker: "BrokerUsecase[Any, Any]") -> None:
+ """Set already existed App object broker.
+
+ Useful then you create/init broker in `on_startup` hook.
+ """
+ self.broker = broker
+
def on_startup(
self,
func: Callable[P_HookParams, T_HookReturn],
@@ -269,7 +160,7 @@ def on_startup(
Returns:
Async version of the func argument
"""
- super().on_startup(to_async(func))
+ self._on_startup_calling.append(apply_types(to_async(func)))
return func
def on_shutdown(
@@ -284,7 +175,7 @@ def on_shutdown(
Returns:
Async version of the func argument
"""
- super().on_shutdown(to_async(func))
+ self._on_shutdown_calling.append(apply_types(to_async(func)))
return func
def after_startup(
@@ -299,7 +190,7 @@ def after_startup(
Returns:
Async version of the func argument
"""
- super().after_startup(to_async(func))
+ self._after_startup_calling.append(apply_types(to_async(func)))
return func
def after_shutdown(
@@ -314,13 +205,14 @@ def after_shutdown(
Returns:
Async version of the func argument
"""
- super().after_shutdown(to_async(func))
+ self._after_shutdown_calling.append(apply_types(to_async(func)))
return func
async def run(
self,
log_level: int = logging.INFO,
run_extra_options: Optional[Dict[str, SettingField]] = None,
+ sleep_time: float = 0.1,
) -> None:
"""Run FastStream Application.
@@ -333,17 +225,68 @@ async def run(
"""
assert self.broker, "You should setup a broker" # nosec B101
+ set_exit(lambda *_: self.exit(), sync=False)
+
async with self.lifespan_context(**(run_extra_options or {})):
- try:
- async with anyio.create_task_group() as tg:
- tg.start_soon(self._start, log_level, run_extra_options)
- await self._stop(log_level)
- tg.cancel_scope.cancel()
- except ExceptionGroup as e:
- for ex in e.exceptions:
- raise ex from None
-
- async def _start(
+ await self._startup(log_level, run_extra_options)
+
+ while not self.should_exit:
+ await anyio.sleep(sleep_time)
+
+ await self._shutdown(log_level)
+
+ def exit(self) -> None:
+ """Stop application manually."""
+ self.should_exit = True
+
+ async def start(
+ self,
+ **run_extra_options: SettingField,
+ ) -> None:
+ """Executes startup tasks.
+
+ Args:
+ run_extra_options: Additional options to be passed to the startup tasks.
+
+ Returns:
+ None
+ """
+ for func in self._on_startup_calling:
+ call = func(**run_extra_options)
+
+ if PValidationError is not None:
+ try:
+ await call
+ except PValidationError as e:
+ raise ValidationError(
+ fields=[x["loc"][0] for x in e.errors()]
+ )
+
+ else:
+ await call
+
+ if self.broker is not None:
+ await self.broker.start()
+
+ for func in self._after_startup_calling:
+ await func()
+
+ async def stop(self) -> None:
+ """Executes shutdown tasks.
+
+ Returns:
+ None
+ """
+ for func in self._on_shutdown_calling:
+ await func()
+
+ if self.broker is not None:
+ await self.broker.close()
+
+ for func in self._after_shutdown_calling:
+ await func()
+
+ async def _startup(
self,
log_level: int = logging.INFO,
run_extra_options: Optional[Dict[str, SettingField]] = None,
@@ -358,12 +301,12 @@ async def _start(
None
"""
self._log(log_level, "FastStream app starting...")
- await self._startup(**(run_extra_options or {}))
+ await self.start(**(run_extra_options or {}))
self._log(
log_level, "FastStream app started successfully! To exit, press CTRL+C"
)
- async def _stop(self, log_level: int = logging.INFO) -> None:
+ async def _shutdown(self, log_level: int = logging.INFO) -> None:
"""Stop the application gracefully.
Args:
@@ -372,45 +315,19 @@ async def _stop(self, log_level: int = logging.INFO) -> None:
Returns:
None
"""
- try:
- with anyio.open_signal_receiver(*HANDLED_SIGNALS) as signals:
- async for _ in signals:
- break
- except NotImplementedError:
- # Windows
- event = anyio.Event()
- set_exit(lambda *_: event.set())
- await event.wait()
-
self._log(log_level, "FastStream app shutting down...")
- await self._shutdown()
+ await self.stop()
self._log(log_level, "FastStream app shut down gracefully.")
- return
- async def _startup(self, **run_extra_options: SettingField) -> None:
- """Executes startup tasks.
+ def _log(self, level: int, message: str) -> None:
+ """Logs a message with the specified log level.
Args:
- run_extra_options: Additional options to be passed to the startup tasks.
+ level (int): The log level.
+ message (str): The message to be logged.
Returns:
None
"""
- for func in self._on_startup_calling:
- await func(**run_extra_options)
-
- if self.broker is not None:
- await self.broker.start()
-
- for func in self._after_startup_calling:
- await func()
-
- async def _shutdown(self) -> None:
- for func in self._on_shutdown_calling:
- await func()
-
- if self.broker is not None:
- await self.broker.close()
-
- for func in self._after_shutdown_calling:
- await func()
+ if self.logger is not None:
+ self.logger.log(level, message)
diff --git a/faststream/asyncapi/__init__.py b/faststream/asyncapi/__init__.py
index adee11dca5..07827b8101 100644
--- a/faststream/asyncapi/__init__.py
+++ b/faststream/asyncapi/__init__.py
@@ -1,5 +1,9 @@
"""AsyncAPI related functions."""
+from faststream.asyncapi.generate import get_app_schema
from faststream.asyncapi.site import get_asyncapi_html
-__all__ = ("get_asyncapi_html",)
+__all__ = (
+ "get_asyncapi_html",
+ "get_app_schema",
+)
diff --git a/faststream/asyncapi/base.py b/faststream/asyncapi/base.py
index 40a2ee129b..8bdde5e9fa 100644
--- a/faststream/asyncapi/base.py
+++ b/faststream/asyncapi/base.py
@@ -1,29 +1,54 @@
-from abc import abstractproperty
-from dataclasses import dataclass, field
-from typing import Dict
-
-from faststream.asyncapi.schema.channels import Channel
+from abc import abstractmethod
+from typing import Dict, Optional, Protocol
+from typing_extensions import Annotated, Doc
-@dataclass
-class AsyncAPIOperation:
- """A class representing an asynchronous API operation.
-
- Attributes:
- name : name of the API operation
+from faststream.asyncapi.schema.channels import Channel
- Methods:
- schema() : returns the schema of the API operation as a dictionary of channel names and channel objects
- """
+class AsyncAPIOperation(Protocol):
+ """A class representing an asynchronous API operation."""
- include_in_schema: bool = field(default=True)
+ title_: Annotated[
+ Optional[str],
+ Doc("AsyncAPI object title."),
+ ]
+ description_: Annotated[
+ Optional[str],
+ Doc("AsyncAPI object description."),
+ ]
+ include_in_schema: Annotated[
+ bool,
+ Doc("Whetever to include operation in AsyncAPI schema or not."),
+ ]
- @abstractproperty
+ @property
def name(self) -> str:
"""Returns the name of the API operation."""
+ return self.title_ or self.get_name()
+
+ @abstractmethod
+ def get_name(self) -> str:
+ """Name property fallback."""
raise NotImplementedError()
- def schema(self) -> Dict[str, Channel]: # pragma: no cover
+ @property
+ def description(self) -> Optional[str]:
+ """Returns the description of the API operation."""
+ return self.description_ or self.get_description()
+
+ def get_description(self) -> Optional[str]:
+ """Description property fallback."""
+ return None
+
+ def schema(self) -> Dict[str, Channel]:
"""Returns the schema of the API operation as a dictionary of channel names and channel objects."""
- return {}
+ if self.include_in_schema:
+ return self.get_schema()
+ else:
+ return {}
+
+ @abstractmethod
+ def get_schema(self) -> Dict[str, Channel]:
+ """Generate AsyncAPI schema."""
+ raise NotImplementedError()
diff --git a/faststream/asyncapi/generate.py b/faststream/asyncapi/generate.py
index dc4da6cc0a..1b827af953 100644
--- a/faststream/asyncapi/generate.py
+++ b/faststream/asyncapi/generate.py
@@ -159,7 +159,6 @@ def get_app_broker_channels(
Raises:
AssertionError: If the app does not have a broker.
-
"""
channels = {}
assert app.broker # nosec B101
diff --git a/faststream/asyncapi/site.py b/faststream/asyncapi/site.py
index 2286d2c26b..d1526699aa 100644
--- a/faststream/asyncapi/site.py
+++ b/faststream/asyncapi/site.py
@@ -1,4 +1,8 @@
-from typing import TYPE_CHECKING
+from functools import partial
+from http import server
+from logging import Logger
+from typing import TYPE_CHECKING, Any, Dict, Optional
+from urllib.parse import parse_qs, urlparse
from faststream._compat import json_dumps
@@ -17,6 +21,8 @@ def get_asyncapi_html(
errors: bool = True,
expand_message_examples: bool = True,
title: str = "FastStream",
+ asyncapi_js_url: str = "https://unpkg.com/@asyncapi/react-component@1.0.0-next.47/browser/standalone/index.js",
+ asyncapi_css_url: str = "https://unpkg.com/@asyncapi/react-component@1.0.0-next.46/styles/default.min.css",
) -> str:
"""Generate HTML for displaying an AsyncAPI document.
@@ -77,7 +83,11 @@ def get_asyncapi_html(
-
+ """
+ f"""
+
+ """
+ """