Skip to content

Commit

Permalink
draft
Browse files Browse the repository at this point in the history
  • Loading branch information
sehat1137 committed Nov 30, 2024
1 parent 48b7304 commit b3086f5
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 45 deletions.
69 changes: 28 additions & 41 deletions faststream/asgi/app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import inspect
import logging
import traceback
from contextlib import asynccontextmanager
Expand All @@ -6,7 +7,6 @@
Any,
AsyncIterator,
Dict,
List,
Optional,
Sequence,
Tuple,
Expand Down Expand Up @@ -44,6 +44,15 @@
)


def cast_uvicorn_params(params: Dict[str, Any]) -> Dict[str, Any]:
if port := params.get("port"):
params["port"] = int(port)
if fd := params.get("fd"):
params["fd"] = int(fd)
return params



class AsgiFastStream(Application):
def __init__(
self,
Expand Down Expand Up @@ -148,50 +157,28 @@ async def run(
sleep_time: float = 0.1,
) -> None:
try:
import uvicorn # noqa: F401
from gunicorn.app.base import BaseApplication
import uvicorn
except ImportError as e:
raise RuntimeError(
"You need uvicorn and gunicorn to run FastStream ASGI App via CLI. pip install uvicorn gunicorn"
"You need uvicorn to run FastStream ASGI App via CLI. pip install uvicorn"
) from e

class ASGIRunner(BaseApplication): # type: ignore[misc]
def __init__(self, options: Dict[str, Any], asgi_app: "ASGIApp") -> None:
self.options = options
self.asgi_app = asgi_app
super().__init__()

def load_config(self) -> None:
for k, v in self.options.items():
if k in self.cfg.settings and v is not None:
self.cfg.set(k.lower(), v)

def load(self) -> "ASGIApp":
return self.asgi_app

run_extra_options = run_extra_options or {}

bindings: List[str] = []
host = run_extra_options.pop("host", None)
port = run_extra_options.pop("port", None)
if host is not None and port is not None:
bindings.append(f"{host}:{port}")
elif host is not None:
bindings.append(f"{host}:8000")
elif port is not None:
bindings.append(f"127.0.0.1:{port}")

bind = run_extra_options.get("bind")
if isinstance(bind, list):
bindings.extend(bind) # type: ignore
elif isinstance(bind, str):
bindings.append(bind)

run_extra_options["bind"] = bindings or "127.0.0.1:8000"
# We use gunicorn with uvicorn workers because uvicorn don't support multiple workers
run_extra_options["worker_class"] = "uvicorn.workers.UvicornWorker"

ASGIRunner(run_extra_options, self).run()
run_extra_options = cast_uvicorn_params(run_extra_options or {})

uvicorn_config_params = set(inspect.signature(uvicorn.Config).parameters.keys())

config = uvicorn.Config(
app=self,
log_level=log_level,
**{
key: v
for key, v in run_extra_options.items()
if key in uvicorn_config_params
}, # type: ignore[arg-type]
)

server = uvicorn.Server(config)
await server.serve()

@asynccontextmanager
async def start_lifespan_context(self) -> AsyncIterator[None]:
Expand Down
16 changes: 12 additions & 4 deletions faststream/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from faststream import FastStream
from faststream.__about__ import __version__
from faststream._internal.application import Application
from faststream.asgi.app import AsgiFastStream
from faststream.cli.docs.app import docs_app
from faststream.cli.utils.imports import import_from_string
from faststream.cli.utils.logs import LogLevels, get_log_level, set_log_level
Expand Down Expand Up @@ -146,17 +147,24 @@ def run(
).run()

elif workers > 1:
from faststream.cli.supervisors.multiprocess import Multiprocess

if isinstance(app_obj, FastStream):
from faststream.cli.supervisors.multiprocess import Multiprocess
Multiprocess(
target=_run,
args=(*args, logging.DEBUG),
workers=workers,
).run()
elif isinstance(app_obj, AsgiFastStream):
from faststream.cli.supervisors.asgi_multiprocess import ASGIMultiprocess
ASGIMultiprocess(
target=app,
args=args,
workers=workers,
).run()
else:
args[1]["workers"] = workers
_run(*args)
raise typer.BadParameter(
f"Unexpected app type, expected FastStream or AsgiFastStream, got: {type(app_obj)}."
)

else:
_run_imported_app(
Expand Down
32 changes: 32 additions & 0 deletions faststream/cli/supervisors/asgi_multiprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import inspect
from typing import Dict, Tuple

from faststream.asgi.app import cast_uvicorn_params


class ASGIMultiprocess:
def __init__(self, target: str, args: Tuple[str, Dict[str, str], bool, int], workers: int) -> None:
_, uvicorn_kwargs, is_factory, log_level = args
self._target = target
self._uvicorn_kwargs = cast_uvicorn_params(uvicorn_kwargs or {})
self._workers = workers
self._is_factory = is_factory
self._log_level = log_level

def run(self) -> None:
try:
import uvicorn
except ImportError as e:
raise RuntimeError(
"You need uvicorn to run FastStream ASGI App via CLI. pip install uvicorn"
) from e

uvicorn_params = set(inspect.signature(uvicorn.run).parameters.keys())

uvicorn.run(
self._target,
factory=self._is_factory,
workers=self._workers,
log_level=self._log_level,
**{key: v for key, v in self._uvicorn_kwargs.items() if key in uvicorn_params}
)
3 changes: 3 additions & 0 deletions faststream/cli/utils/imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ def import_from_string(
else:
raise typer.BadParameter(f'"{instance}" is not a factory')

if callable(instance) and not is_factory:
raise typer.BadParameter("Please, use --factory option for callable object")

return module_path, instance


Expand Down

0 comments on commit b3086f5

Please sign in to comment.