diff --git a/faststream/cli/docs/app.py b/faststream/cli/docs/app.py index 4751222a17..450abdb061 100644 --- a/faststream/cli/docs/app.py +++ b/faststream/cli/docs/app.py @@ -44,6 +44,10 @@ def serve( " Defaults to the current working directory." ), ), + is_factory: bool = typer.Option( + False, + "--factory", help="Treat APP as an application factory" + ), ) -> None: """Serve project AsyncAPI schema.""" if ":" in app: @@ -66,18 +70,18 @@ def serve( except ImportError: warnings.warn(INSTALL_WATCHFILES, category=ImportWarning, stacklevel=1) - _parse_and_serve(app, host, port) + _parse_and_serve(app, host, port, is_factory) else: WatchReloader( target=_parse_and_serve, - args=(app, host, port), + args=(app, host, port, is_factory), reload_dirs=(str(module_parent),), extra_extensions=extra_extensions, ).run() else: - _parse_and_serve(app, host, port) + _parse_and_serve(app, host, port, is_factory) @docs_app.command(name="gen") @@ -104,12 +108,18 @@ def gen( " Defaults to the current working directory." ), ), + is_factory: bool = typer.Option( + False, + "--factory", help="Treat APP as an application factory" + ), ) -> None: """Generate project AsyncAPI schema.""" if app_dir: # pragma: no branch sys.path.insert(0, app_dir) _, app_obj = import_from_string(app) + if callable(app_obj) and is_factory: + app_obj = app_obj() raw_schema = get_app_schema(app_obj) if yaml: @@ -138,9 +148,12 @@ def _parse_and_serve( app: str, host: str = "localhost", port: int = 8000, + is_factory: bool = False, ) -> None: if ":" in app: _, app_obj = import_from_string(app) + if callable(app_obj) and is_factory: + app_obj = app_obj() raw_schema = get_app_schema(app_obj) else: diff --git a/faststream/cli/main.py b/faststream/cli/main.py index 7c0ec0391f..3ed7afa3e6 100644 --- a/faststream/cli/main.py +++ b/faststream/cli/main.py @@ -94,6 +94,12 @@ def run( " Defaults to the current working directory." ), ), + is_factory: bool = typer.Option( + False, + "--factory", + is_flag=True, + help="Treat APP as an application factory", + ), ) -> None: """Run [MODULE:APP] FastStream application.""" if watch_extensions and not reload: @@ -108,7 +114,7 @@ def run( if app_dir: # pragma: no branch sys.path.insert(0, app_dir) - args = (app, extra, casted_log_level) + args = (app, extra, is_factory, casted_log_level) if reload and workers > 1: raise SetupError("You can't use reload option with multiprocessing") @@ -151,11 +157,14 @@ def _run( # NOTE: we should pass `str` due FastStream is not picklable app: str, extra_options: Dict[str, "SettingField"], + is_factory: bool, log_level: int = logging.INFO, app_level: int = logging.INFO, ) -> None: """Runs the specified application.""" _, app_obj = import_from_string(app) + if is_factory and callable(app_obj): + app_obj = app_obj() if not isinstance(app_obj, FastStream): raise typer.BadParameter( @@ -200,6 +209,10 @@ def publish( app: str = typer.Argument(..., help="FastStream app instance, e.g., main:app"), message: str = typer.Argument(..., help="Message to be published"), rpc: bool = typer.Option(False, help="Enable RPC mode and system output"), + is_factory: bool = typer.Option( + False, + "--factory", help="Treat APP as an application factory" + ), ) -> None: """Publish a message using the specified broker in a FastStream application. @@ -218,6 +231,9 @@ def publish( raise ValueError("Message parameter is required.") _, app_obj = import_from_string(app) + if callable(app_obj) and is_factory: + app_obj = app_obj() + if not app_obj.broker: raise ValueError("Broker instance not found in the app.")