Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: support only uvicorn ASGI Runner #1965

Merged
merged 3 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ search:
- [make_ping_asgi](api/faststream/asgi/make_ping_asgi.md)
- app
- [AsgiFastStream](api/faststream/asgi/app/AsgiFastStream.md)
- [cast_uvicorn_params](api/faststream/asgi/app/cast_uvicorn_params.md)
- factories
- [make_asyncapi_asgi](api/faststream/asgi/factories/make_asyncapi_asgi.md)
- [make_ping_asgi](api/faststream/asgi/factories/make_ping_asgi.md)
Expand Down Expand Up @@ -448,6 +449,8 @@ search:
- [run](api/faststream/cli/main/run.md)
- [version_callback](api/faststream/cli/main/version_callback.md)
- supervisors
- asgi_multiprocess
- [ASGIMultiprocess](api/faststream/cli/supervisors/asgi_multiprocess/ASGIMultiprocess.md)
- basereload
- [BaseReload](api/faststream/cli/supervisors/basereload/BaseReload.md)
- multiprocess
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/asgi/app/cast_uvicorn_params.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.asgi.app.cast_uvicorn_params
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.cli.supervisors.asgi_multiprocess.ASGIMultiprocess
68 changes: 27 additions & 41 deletions faststream/asgi/app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import inspect
import logging
import traceback
from contextlib import asynccontextmanager
Expand All @@ -6,7 +7,6 @@
Any,
AsyncIterator,
Dict,
List,
Optional,
Sequence,
Tuple,
Expand Down Expand Up @@ -44,6 +44,14 @@
)


def cast_uvicorn_params(params: Dict[str, Any]) -> Dict[str, Any]:
if port := params.get("port"):
params["port"] = int(port)
if fd := params.get("fd"):
params["fd"] = int(fd)
return params


class AsgiFastStream(Application):
def __init__(
self,
Expand Down Expand Up @@ -148,50 +156,28 @@ async def run(
sleep_time: float = 0.1,
) -> None:
try:
import uvicorn # noqa: F401
from gunicorn.app.base import BaseApplication
import uvicorn
except ImportError as e:
raise RuntimeError(
"You need uvicorn and gunicorn to run FastStream ASGI App via CLI. pip install uvicorn gunicorn"
"You need uvicorn to run FastStream ASGI App via CLI. pip install uvicorn"
) from e

class ASGIRunner(BaseApplication): # type: ignore[misc]
def __init__(self, options: Dict[str, Any], asgi_app: "ASGIApp") -> None:
self.options = options
self.asgi_app = asgi_app
super().__init__()

def load_config(self) -> None:
for k, v in self.options.items():
if k in self.cfg.settings and v is not None:
self.cfg.set(k.lower(), v)

def load(self) -> "ASGIApp":
return self.asgi_app

run_extra_options = run_extra_options or {}

bindings: List[str] = []
host = run_extra_options.pop("host", None)
port = run_extra_options.pop("port", None)
if host is not None and port is not None:
bindings.append(f"{host}:{port}")
elif host is not None:
bindings.append(f"{host}:8000")
elif port is not None:
bindings.append(f"127.0.0.1:{port}")

bind = run_extra_options.get("bind")
if isinstance(bind, list):
bindings.extend(bind) # type: ignore
elif isinstance(bind, str):
bindings.append(bind)

run_extra_options["bind"] = bindings or "127.0.0.1:8000"
# We use gunicorn with uvicorn workers because uvicorn don't support multiple workers
run_extra_options["worker_class"] = "uvicorn.workers.UvicornWorker"

ASGIRunner(run_extra_options, self).run()
run_extra_options = cast_uvicorn_params(run_extra_options or {})

uvicorn_config_params = set(inspect.signature(uvicorn.Config).parameters.keys())

config = uvicorn.Config(
app=self,
log_level=log_level,
**{
key: v
for key, v in run_extra_options.items()
if key in uvicorn_config_params
},
)

server = uvicorn.Server(config)
await server.serve()

@asynccontextmanager
async def start_lifespan_context(self) -> AsyncIterator[None]:
Expand Down
18 changes: 14 additions & 4 deletions faststream/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from faststream import FastStream
from faststream.__about__ import __version__
from faststream._internal.application import Application
from faststream.asgi.app import AsgiFastStream
from faststream.cli.docs.app import docs_app
from faststream.cli.utils.imports import import_from_string
from faststream.cli.utils.logs import LogLevels, get_log_level, set_log_level
Expand Down Expand Up @@ -146,17 +147,26 @@ def run(
).run()

elif workers > 1:
from faststream.cli.supervisors.multiprocess import Multiprocess

if isinstance(app_obj, FastStream):
from faststream.cli.supervisors.multiprocess import Multiprocess

Multiprocess(
target=_run,
args=(*args, logging.DEBUG),
workers=workers,
).run()
elif isinstance(app_obj, AsgiFastStream):
from faststream.cli.supervisors.asgi_multiprocess import ASGIMultiprocess

ASGIMultiprocess(
target=app,
args=args, # type: ignore[arg-type]
workers=workers,
).run()
else:
args[1]["workers"] = workers # type: ignore[assignment]
_run(*args)
raise typer.BadParameter(
f"Unexpected app type, expected FastStream or AsgiFastStream, got: {type(app_obj)}."
)

else:
_run_imported_app(
Expand Down
38 changes: 38 additions & 0 deletions faststream/cli/supervisors/asgi_multiprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import inspect
from typing import Dict, Tuple

from faststream.asgi.app import cast_uvicorn_params


class ASGIMultiprocess:
def __init__(
self, target: str, args: Tuple[str, Dict[str, str], bool, int], workers: int
) -> None:
_, uvicorn_kwargs, is_factory, log_level = args
self._target = target
self._uvicorn_kwargs = cast_uvicorn_params(uvicorn_kwargs or {})
self._workers = workers
self._is_factory = is_factory
self._log_level = log_level

def run(self) -> None:
try:
import uvicorn
except ImportError as e:
raise RuntimeError(
"You need uvicorn to run FastStream ASGI App via CLI. pip install uvicorn"
) from e

uvicorn_params = set(inspect.signature(uvicorn.run).parameters.keys())

uvicorn.run(
self._target,
factory=self._is_factory,
workers=self._workers,
log_level=self._log_level,
**{
key: v
for key, v in self._uvicorn_kwargs.items()
if key in uvicorn_params
},
)
88 changes: 83 additions & 5 deletions tests/cli/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from faststream.app import FastStream
from faststream.asgi import AsgiFastStream
from faststream.cli.main import cli as faststream_app
from faststream.cli.utils.logs import get_log_level


@pytest.mark.parametrize(
Expand Down Expand Up @@ -36,13 +37,42 @@ def test_run(runner: CliRunner, app: Application):
assert result.exit_code == 0


@pytest.mark.parametrize("workers", [1, 2, 5])
@pytest.mark.parametrize("app", [pytest.param(AsgiFastStream())])
def test_run_as_asgi_with_workers(runner: CliRunner, workers: int, app: Application):
def test_run_as_asgi_with_single_worker(runner: CliRunner, app: Application):
app.run = AsyncMock()

with patch(
"faststream.cli.utils.imports._import_obj_or_factory", return_value=(None, app)
):
result = runner.invoke(
faststream_app,
[
"run",
"faststream:app",
"--host",
"0.0.0.0",
"--port",
"8000",
"--workers",
"1",
],
)
app.run.assert_awaited_once_with(
logging.INFO, {"host": "0.0.0.0", "port": "8000"}
)
assert result.exit_code == 0


@pytest.mark.parametrize("workers", [3, 5, 7])
@pytest.mark.parametrize("app", [pytest.param(AsgiFastStream())])
def test_run_as_asgi_with_many_workers(
runner: CliRunner, workers: int, app: Application
):
asgi_multiprocess = "faststream.cli.supervisors.asgi_multiprocess.ASGIMultiprocess"
_import_obj_or_factory = "faststream.cli.utils.imports._import_obj_or_factory"

with patch(asgi_multiprocess) as asgi_runner, patch(
_import_obj_or_factory, return_value=(None, app)
):
result = runner.invoke(
faststream_app,
Expand All @@ -57,13 +87,61 @@ def test_run_as_asgi_with_workers(runner: CliRunner, workers: int, app: Applicat
str(workers),
],
)
extra = {"workers": workers} if workers > 1 else {}
assert result.exit_code == 0

app.run.assert_awaited_once_with(
logging.INFO, {"host": "0.0.0.0", "port": "8000", **extra}
asgi_runner.assert_called_once()
asgi_runner.assert_called_once_with(
target="faststream:app",
args=("faststream:app", {"host": "0.0.0.0", "port": "8000"}, False, 0),
workers=workers,
)
asgi_runner().run.assert_called_once()


@pytest.mark.parametrize(
"log_level",
["critical", "fatal", "error", "warning", "warn", "info", "debug", "notset"],
)
@pytest.mark.parametrize("app", [pytest.param(AsgiFastStream())])
def test_run_as_asgi_mp_with_log_level(
runner: CliRunner, app: Application, log_level: str
):
asgi_multiprocess = "faststream.cli.supervisors.asgi_multiprocess.ASGIMultiprocess"
_import_obj_or_factory = "faststream.cli.utils.imports._import_obj_or_factory"

with patch(asgi_multiprocess) as asgi_runner, patch(
_import_obj_or_factory, return_value=(None, app)
):
result = runner.invoke(
faststream_app,
[
"run",
"faststream:app",
"--host",
"0.0.0.0",
"--port",
"8000",
"--workers",
"3",
"--log-level",
log_level,
],
)
assert result.exit_code == 0

asgi_runner.assert_called_once()
asgi_runner.assert_called_once_with(
target="faststream:app",
args=(
"faststream:app",
{"host": "0.0.0.0", "port": "8000"},
False,
get_log_level(log_level),
),
workers=3,
)
asgi_runner().run.assert_called_once()


@pytest.mark.parametrize(
"app", [pytest.param(FastStream()), pytest.param(AsgiFastStream())]
Expand Down
Loading