From 26aca1aa9a383e6048e47c1b16f82330f8b6b873 Mon Sep 17 00:00:00 2001 From: sehat1137 Date: Sun, 24 Nov 2024 19:53:00 +0300 Subject: [PATCH] fix: #1874 support workers for ASGI FastStream --- docs/docs/en/getting-started/asgi.md | 32 ++++++++++++ faststream/asgi/app.py | 76 +++++++++++++++++++++------- faststream/cli/utils/parser.py | 5 +- tests/cli/utils/test_parser.py | 18 ++++--- 4 files changed, 103 insertions(+), 28 deletions(-) diff --git a/docs/docs/en/getting-started/asgi.md b/docs/docs/en/getting-started/asgi.md index 008c56741b..90de5980df 100644 --- a/docs/docs/en/getting-started/asgi.md +++ b/docs/docs/en/getting-started/asgi.md @@ -38,6 +38,22 @@ uvicorn main:app It does nothing but launch the app itself as an **ASGI lifespan**. +!!! note + If you want to run your app using several workers, you need to use something else than `uvicorn`. + ```shell + faststream run main:app --workers 4 + ``` + ```shell + gunicorn -k uvicorn.workers.UvicornWorker main:app --workers=4 + ``` + ```shell + granian --interface asgi main:app --workers 4 + ``` + ```shell + hypercorn main:app --workers 4 + ``` + + ### ASGI Routes It doesn't look very helpful, so let's add some **HTTP** endpoints. @@ -137,6 +153,8 @@ app = FastStream(broker).as_asgi( ```shell faststream run main:app --host 0.0.0.0 --port 8000 --workers 4 ``` + This possibility builded on gunicorn + uvicorn, you need install them to run FastStream ASGI app via CLI. + We send all args directly to gunicorn, you can learn more about it [here](https://github.com/benoitc/gunicorn/blob/master/examples/example_config.py). ## Other ASGI Compatibility @@ -166,3 +184,17 @@ app = FastAPI(lifespan=start_broker) app.mount("/health", make_ping_asgi(broker, timeout=5.0)) app.mount("/asyncapi", make_asyncapi_asgi(FastStream(broker))) ``` + +!!! tip + You can also bind to unix domain or a file descriptor. FastStream will bind to “127.0.0.1:8000” by default + + ```shell + faststream run main:app --bind unix:/tmp/socket.sock + ``` + ```shell + faststream run main:app --bind fd://2 + ``` + You can use multiple binds if you want + ```shell + faststream run main:app --bind 0.0.0.0:8000 '[::]:8000' + ``` diff --git a/faststream/asgi/app.py b/faststream/asgi/app.py index a031021ad0..85d54cb25f 100644 --- a/faststream/asgi/app.py +++ b/faststream/asgi/app.py @@ -10,6 +10,7 @@ Sequence, Tuple, Union, + List, ) import anyio @@ -20,6 +21,17 @@ from faststream.asgi.websocket import WebSocketClose from faststream.log.logging import logger +try: + from gunicorn.app.base import BaseApplication +except ImportError: + BaseApplication = None + +try: + import uvicorn +except ImportError: + uvicorn = None # type: ignore + + if TYPE_CHECKING: from faststream.asgi.types import ASGIApp, Receive, Scope, Send from faststream.asyncapi.schema import ( @@ -43,6 +55,23 @@ ) +class ASGIRunner(BaseApplication): # type: ignore + def __init__(self, asgi_app: "ASGIApp", options: Dict[str, Any]): + self.options = options + self.asgi_app = asgi_app + super().__init__() + + def load_config(self) -> None: + for k, v in self.options.items(): + if k in self.cfg.settings and v is not None: + self.cfg.set(k.lower(), v) + else: + logger.warn(f"Unknown config variable: {k} with value {v}") + + def load(self) -> "ASGIApp": + return self.asgi_app + + class AsgiFastStream(Application): def __init__( self, @@ -146,25 +175,34 @@ async def run( run_extra_options: Optional[Dict[str, "SettingField"]] = None, sleep_time: float = 0.1, ) -> None: - import uvicorn - - if not run_extra_options: - run_extra_options = {} - port = int(run_extra_options.pop("port", 8000)) # type: ignore[arg-type] - workers = int(run_extra_options.pop("workers", 1)) # type: ignore[arg-type] - host = str(run_extra_options.pop("host", "localhost")) - fd = int(run_extra_options.pop("fd", -1)) # type: ignore[arg-type] - config = uvicorn.Config( - self, - host=host, - port=port, - log_level=log_level, - workers=workers, - fd=fd if fd != -1 else None, - **run_extra_options, - ) - server = uvicorn.Server(config) - await server.serve() + if not all([uvicorn, BaseApplication]): + raise RuntimeError( + "You need uvicorn and gunicorn to run FastStream ASGI App via CLI" + ) + + run_extra_options = run_extra_options or {} + + bindings: List[str] = [] + host = run_extra_options.pop("host", None) + port = run_extra_options.pop("port", None) + if host is not None and port is not None: + bindings.append(f"{host}:{port}") + elif host is not None: + bindings.append(f"{host}:8000") + elif port is not None: + bindings.append(f"127.0.0.1:{port}") + + bind = run_extra_options.get("bind") + if isinstance(bind, list): + bindings.extend(bind) # type: ignore + elif isinstance(bind, str): + bindings.append(bind) + + run_extra_options["bind"] = bindings or "127.0.0.1:8000" + # We use gunicorn with uvicorn workers because uvicorn don't support multiple workers + run_extra_options["worker_class"] = "uvicorn.workers.UvicornWorker" + + ASGIRunner(self, run_extra_options).run() @asynccontextmanager async def start_lifespan_context(self) -> AsyncIterator[None]: diff --git a/faststream/cli/utils/parser.py b/faststream/cli/utils/parser.py index 00c904d774..69a0674e12 100644 --- a/faststream/cli/utils/parser.py +++ b/faststream/cli/utils/parser.py @@ -1,9 +1,12 @@ +import re from functools import reduce from typing import TYPE_CHECKING, Dict, List, Tuple if TYPE_CHECKING: from faststream.types import SettingField +APP_REGEX = re.compile(r"[a-zA-Z]+:[a-zA-Z]+") + def parse_cli_args(*args: str) -> Tuple[str, Dict[str, "SettingField"]]: """Parses command line arguments.""" @@ -22,7 +25,7 @@ def parse_cli_args(*args: str) -> Tuple[str, Dict[str, "SettingField"]]: ), "-", ]: - if ":" in item: + if re.match(APP_REGEX, item): app = item else: diff --git a/tests/cli/utils/test_parser.py b/tests/cli/utils/test_parser.py index 91ace3770e..11c935468a 100644 --- a/tests/cli/utils/test_parser.py +++ b/tests/cli/utils/test_parser.py @@ -23,19 +23,20 @@ ) ARG6 = ("--some-key",) ARG7 = ("--k7", "1", "2", "--k7", "3") +ARG8 = ("--bind", "[::]:8000", "0.0.0.0:8000", "fd://2") @pytest.mark.parametrize( "args", ( # noqa: PT007 - (APPLICATION, *ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7), - (*ARG1, APPLICATION, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7), - (*ARG1, *ARG2, APPLICATION, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7), - (*ARG1, *ARG2, *ARG3, APPLICATION, *ARG4, *ARG5, *ARG6, *ARG7), - (*ARG1, *ARG2, *ARG3, *ARG4, APPLICATION, *ARG5, *ARG6, *ARG7), - (*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, APPLICATION, *ARG6, *ARG7), - (*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, APPLICATION, *ARG7), - (*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, APPLICATION), + (APPLICATION, *ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8), + (*ARG1, APPLICATION, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8), + (*ARG1, *ARG2, APPLICATION, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8), + (*ARG1, *ARG2, *ARG3, APPLICATION, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8), + (*ARG1, *ARG2, *ARG3, *ARG4, APPLICATION, *ARG5, *ARG6, *ARG7, *ARG8), + (*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, APPLICATION, *ARG6, *ARG7, *ARG8), + (*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, APPLICATION, *ARG7, *ARG8), + (*ARG1, *ARG2, *ARG3, *ARG4, *ARG5, *ARG6, *ARG7, *ARG8, APPLICATION), ), ) def test_custom_argument_parsing(args: Tuple[str]): @@ -49,4 +50,5 @@ def test_custom_argument_parsing(args: Tuple[str]): "k5": ["1", "1"], "some_key": True, "k7": ["1", "2", "3"], + "bind": ["[::]:8000", "0.0.0.0:8000", "fd://2"], }