Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add CLI support for AsgiFastStream #1782

Merged
merged 4 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions docs/docs/en/getting-started/asgi.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,37 @@ app = AsgiFastStream(

Now, your **AsyncAPI HTML** representation can be found by the `/docs` url.

### FastStream object reusage

You may also use regular `FastStream` application object for similar result

```python linenums="1" hl_lines="2 9"
from faststream import FastStream
from faststream.nats import NatsBroker
from faststream.asgi import make_ping_asgi, AsgiResponse

broker = NatsBroker()

async def liveness_ping(scope, receive, send):
return AsgiResponse(b"", status_code=200)


app = FastStream(broker).as_asgi(
asgi_routes=[
("/liveness", liveness_ping),
("/readiness", make_ping_asgi(broker, timeout=5.0)),
],
asyncapi_path="/docs",
)
```

``` tip
For app which use ASGI you may use cli command like for default FastStream app

```shell
faststream run main:app --host 0.0.0.0 --port 8000 --workers 4
```

## Other ASGI Compatibility

Moreover, our wrappers can be used as ready-to-use endpoins for other **ASGI** frameworks. This can be very helpful When you are running **FastStream** in the same runtime as any other **ASGI** frameworks.
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/en/getting-started/middlewares/exception.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Sometimes, you need to register exception processors at the top level of your ap
For this purpose, **FastStream** provides a special `ExceptionMiddleware`. You just need to create it, register handlers, and add it to the broker, router, or subscribers you want (as a [regular middleware](index.md){.internal-link}).

```python linenums="1"
from faststream. import ExceptionMiddleware
from faststream import ExceptionMiddleware

exception_middleware = ExceptionMiddleware()

Expand Down
2 changes: 1 addition & 1 deletion faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Simple and fast framework to create message brokers based microservices."""

__version__ = "0.5.23"
__version__ = "0.5.24"

SERVICE_NAME = f"faststream-{__version__}"
Empty file.
207 changes: 207 additions & 0 deletions faststream/_internal/application.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import logging
from abc import ABC, abstractmethod
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
Optional,
Sequence,
TypeVar,
Union,
)

import anyio
from typing_extensions import ParamSpec

from faststream.asyncapi.proto import AsyncAPIApplication
from faststream.log.logging import logger
from faststream.utils import apply_types, context
from faststream.utils.functions import drop_response_type, fake_context, to_async

P_HookParams = ParamSpec("P_HookParams")
T_HookReturn = TypeVar("T_HookReturn")


if TYPE_CHECKING:
from faststream.asyncapi.schema import (
Contact,
ContactDict,
ExternalDocs,
ExternalDocsDict,
License,
LicenseDict,
Tag,
TagDict,
)
from faststream.broker.core.usecase import BrokerUsecase
from faststream.types import (
AnyDict,
AnyHttpUrl,
AsyncFunc,
Lifespan,
LoggerProto,
SettingField,
)


class Application(ABC, AsyncAPIApplication):
def __init__(
self,
broker: Optional["BrokerUsecase[Any, Any]"] = None,
logger: Optional["LoggerProto"] = logger,
lifespan: Optional["Lifespan"] = None,
# AsyncAPI args,
title: str = "FastStream",
version: str = "0.1.0",
description: str = "",
terms_of_service: Optional["AnyHttpUrl"] = None,
license: Optional[Union["License", "LicenseDict", "AnyDict"]] = None,
contact: Optional[Union["Contact", "ContactDict", "AnyDict"]] = None,
tags: Optional[Sequence[Union["Tag", "TagDict", "AnyDict"]]] = None,
external_docs: Optional[
Union["ExternalDocs", "ExternalDocsDict", "AnyDict"]
] = None,
identifier: Optional[str] = None,
on_startup: Sequence[Callable[P_HookParams, T_HookReturn]] = (),
after_startup: Sequence[Callable[P_HookParams, T_HookReturn]] = (),
on_shutdown: Sequence[Callable[P_HookParams, T_HookReturn]] = (),
after_shutdown: Sequence[Callable[P_HookParams, T_HookReturn]] = (),
) -> None:
context.set_global("app", self)

self._should_exit = anyio.Event()
self.broker = broker
self.logger = logger
self.context = context

self._on_startup_calling: List[AsyncFunc] = [
apply_types(to_async(x)) for x in on_startup
]
self._after_startup_calling: List[AsyncFunc] = [
apply_types(to_async(x)) for x in after_startup
]
self._on_shutdown_calling: List[AsyncFunc] = [
apply_types(to_async(x)) for x in on_shutdown
]
self._after_shutdown_calling: List[AsyncFunc] = [
apply_types(to_async(x)) for x in after_shutdown
]

if lifespan is not None:
self.lifespan_context = apply_types(
func=lifespan, wrap_model=drop_response_type
)
else:
self.lifespan_context = fake_context

# AsyncAPI information
self.title = title
self.version = version
self.description = description
self.terms_of_service = terms_of_service
self.license = license
self.contact = contact
self.identifier = identifier
self.asyncapi_tags = tags
self.external_docs = external_docs

@abstractmethod
async def run(
self,
log_level: int,
run_extra_options: Optional[Dict[str, "SettingField"]] = None,
sleep_time: float = 0.1,
) -> None: ...

def set_broker(self, broker: "BrokerUsecase[Any, Any]") -> None:
"""Set already existed App object broker.
Useful then you create/init broker in `on_startup` hook.
"""
self.broker = broker

def on_startup(
self,
func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
"""Add hook running BEFORE broker connected.
This hook also takes an extra CLI options as a kwargs.
"""
self._on_startup_calling.append(apply_types(to_async(func)))
return func

def on_shutdown(
self,
func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
"""Add hook running BEFORE broker disconnected."""
self._on_shutdown_calling.append(apply_types(to_async(func)))
return func

def after_startup(
self,
func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
"""Add hook running AFTER broker connected."""
self._after_startup_calling.append(apply_types(to_async(func)))
return func

def after_shutdown(
self,
func: Callable[P_HookParams, T_HookReturn],
) -> Callable[P_HookParams, T_HookReturn]:
"""Add hook running AFTER broker disconnected."""
self._after_shutdown_calling.append(apply_types(to_async(func)))
return func

def exit(self) -> None:
"""Stop application manually."""
self._should_exit.set()

async def start(
self,
**run_extra_options: "SettingField",
) -> None:
"""Executes startup hooks and start broker."""
for func in self._on_startup_calling:
await func(**run_extra_options)

if self.broker is not None:
await self.broker.start()

for func in self._after_startup_calling:
await func()

async def stop(self) -> None:
"""Executes shutdown hooks and stop broker."""
for func in self._on_shutdown_calling:
await func()

if self.broker is not None:
await self.broker.close()

for func in self._after_shutdown_calling:
await func()

async def _startup(
self,
log_level: int = logging.INFO,
run_extra_options: Optional[Dict[str, "SettingField"]] = None,
) -> None:
self._log(log_level, "FastStream app starting...")
await self.start(**(run_extra_options or {}))
self._log(
log_level, "FastStream app started successfully! To exit, press CTRL+C"
)

async def _shutdown(self, log_level: int = logging.INFO) -> None:
self._log(log_level, "FastStream app shutting down...")
await self.stop()
self._log(log_level, "FastStream app shut down gracefully.")

def _log(self, level: int, message: str) -> None:
if self.logger is not None:
self.logger.log(level, message)
Loading
Loading