defadd_api_mq_route(
-self,
-path:Union[NameRequired,str],
-*extra:Union[NameRequired,str],
-endpoint:Callable[P_HandlerParams,T_HandlerReturn],
-dependencies:Sequence[params.Depends],
-**broker_kwargs:Any,
-)->HandlerCallWrapper[MsgType,P_HandlerParams,T_HandlerReturn]:
-"""Add an API message queue route.
-
- Args:
- path: The path of the route.
- *extra: Additional path segments.
- endpoint: The endpoint function to be called for this route.
- dependencies: The dependencies required by the endpoint function.
- **broker_kwargs: Additional keyword arguments to be passed to the broker.
-
- Returns:
- The handler call wrapper for the route.
+226
+227
+228
+229
defadd_api_mq_route(
+self,
+path:Union[NameRequired,str],
+*extra:Union[NameRequired,str],
+endpoint:Callable[P_HandlerParams,T_HandlerReturn],
+dependencies:Sequence[params.Depends],
+**broker_kwargs:Any,
+)->HandlerCallWrapper[MsgType,P_HandlerParams,T_HandlerReturn]:
+"""Add an API message queue route.
+
+ Args:
+ path: The path of the route.
+ *extra: Additional path segments.
+ endpoint: The endpoint function to be called for this route.
+ dependencies: The dependencies required by the endpoint function.
+ **broker_kwargs: Additional keyword arguments to be passed to the broker.
- """
-route:StreamRoute[MsgType,P_HandlerParams,T_HandlerReturn]=StreamRoute(
-path,
-*extra,
-endpoint=endpoint,
-dependencies=dependencies,
-dependency_overrides_provider=self.dependency_overrides_provider,
-broker=self.broker,
-**broker_kwargs,
-)
-self.routes.append(route)
-returnroute.handler
+ Returns:
+ The handler call wrapper for the route.
+
+ """
+route:StreamRoute[MsgType,P_HandlerParams,T_HandlerReturn]=StreamRoute(
+path,
+*extra,
+endpoint=endpoint,
+dependencies=dependencies,
+dependency_overrides_provider=self.dependency_overrides_provider,
+broker=self.broker,
+**broker_kwargs,
+)
+self.routes.append(route)
+returnroute.handler
A function to be executed after startup. It can take an AppType argument and return a mapping of strings to any values, or it can take an AppType argument and return an awaitable that resolves to a mapping of strings to any values, or it can take an AppType argument and return nothing, or it can take an AppType argument and return an awaitable that resolves to nothing.
A function to be executed after startup. It can take an AppType argument and return a mapping of strings to any values, or it can take an AppType argument and return an awaitable that resolves to a mapping of strings to any values, or it can take an AppType argument and return nothing, or it can take an AppType argument and return an awaitable that resolves to nothing.
defafter_startup(
-self,
-func:Union[
-Callable[[AppType],Mapping[str,Any]],
-Callable[[AppType],Awaitable[Mapping[str,Any]]],
-Callable[[AppType],None],
-Callable[[AppType],Awaitable[None]],
-],
-)->Union[
-Callable[[AppType],Mapping[str,Any]],
-Callable[[AppType],Awaitable[Mapping[str,Any]]],
-Callable[[AppType],None],
-Callable[[AppType],Awaitable[None]],
-]:
-"""Register a function to be executed after startup.
-
- Args:
- func: A function to be executed after startup. It can take an `AppType` argument and return a mapping of strings to any values, or it can take an `AppType` argument and return an awaitable that resolves to a mapping of strings to any values, or it can take an `AppType` argument and return nothing, or it can take an `AppType` argument and return an awaitable that resolves to nothing.
+443
+444
+445
+446
defafter_startup(
+self,
+func:Union[
+Callable[[AppType],Mapping[str,Any]],
+Callable[[AppType],Awaitable[Mapping[str,Any]]],
+Callable[[AppType],None],
+Callable[[AppType],Awaitable[None]],
+],
+)->Union[
+Callable[[AppType],Mapping[str,Any]],
+Callable[[AppType],Awaitable[Mapping[str,Any]]],
+Callable[[AppType],None],
+Callable[[AppType],Awaitable[None]],
+]:
+"""Register a function to be executed after startup.
- Returns:
- The registered function.
+ Args:
+ func: A function to be executed after startup. It can take an `AppType` argument and return a mapping of strings to any values, or it can take an `AppType` argument and return an awaitable that resolves to a mapping of strings to any values, or it can take an `AppType` argument and return nothing, or it can take an `AppType` argument and return an awaitable that resolves to nothing.
- """
-self._after_startup_hooks.append(to_async(func))# type: ignore
-returnfunc
+ Returns:
+ The registered function.
+
+ """
+self._after_startup_hooks.append(to_async(func))# type: ignore
+returnfunc
This function defines three nested functions: download_app_json_schema, download_app_yaml_schema, and serve_asyncapi_schema. These functions are used to handle different routes for serving the AsyncAPI schema and documentation.
Source code in faststream/broker/fastapi/router.py
This function defines three nested functions: download_app_json_schema, download_app_yaml_schema, and serve_asyncapi_schema. These functions are used to handle different routes for serving the AsyncAPI schema and documentation.
Source code in faststream/broker/fastapi/router.py
defasyncapi_router(self,schema_url:Optional[str])->Optional[APIRouter]:
-"""Creates an API router for serving AsyncAPI documentation.
-
- Args:
- schema_url: The URL where the AsyncAPI schema will be served.
+569
+570
+571
+572
defasyncapi_router(self,schema_url:Optional[str])->Optional[APIRouter]:
+"""Creates an API router for serving AsyncAPI documentation.
- Returns:
- An instance of APIRouter if include_in_schema and schema_url are both True, otherwise returns None.
+ Args:
+ schema_url: The URL where the AsyncAPI schema will be served.
- Raises:
- AssertionError: If self.schema is not set.
+ Returns:
+ An instance of APIRouter if include_in_schema and schema_url are both True, otherwise returns None.
- Notes:
- This function defines three nested functions: download_app_json_schema, download_app_yaml_schema, and serve_asyncapi_schema. These functions are used to handle different routes for serving the AsyncAPI schema and documentation.
+ Raises:
+ AssertionError: If self.schema is not set.
- """
-ifnotself.include_in_schemaornotschema_url:
-returnNone
-
-defdownload_app_json_schema()->Response:
-assert(# nosec B101
-self.schema
-),"You need to run application lifespan at first"
-
-returnResponse(
-content=json.dumps(self.schema.to_jsonable(),indent=2),
-headers={"Content-Type":"application/octet-stream"},
-)
-
-defdownload_app_yaml_schema()->Response:
-assert(# nosec B101
-self.schema
-),"You need to run application lifespan at first"
-
-returnResponse(
-content=self.schema.to_yaml(),
-headers={
-"Content-Type":"application/octet-stream",
-},
-)
-
-defserve_asyncapi_schema(
-sidebar:bool=True,
-info:bool=True,
-servers:bool=True,
-operations:bool=True,
-messages:bool=True,
-schemas:bool=True,
-errors:bool=True,
-expandMessageExamples:bool=True,
-)->HTMLResponse:
-"""Serve the AsyncAPI schema as an HTML response.
-
- Args:
- sidebar (bool, optional): Whether to include the sidebar in the HTML. Defaults to True.
- info (bool, optional): Whether to include the info section in the HTML. Defaults to True.
- servers (bool, optional): Whether to include the servers section in the HTML. Defaults to True.
- operations (bool, optional): Whether to include the operations section in the HTML. Defaults to True.
- messages (bool, optional): Whether to include the messages section in the HTML. Defaults to True.
- schemas (bool, optional): Whether to include the schemas section in the HTML. Defaults to True.
- errors (bool, optional): Whether to include the errors section in the HTML. Defaults to True.
- expandMessageExamples (bool, optional): Whether to expand message examples in the HTML. Defaults to True.
-
- Returns:
- HTMLResponse: The HTML response containing the AsyncAPI schema.
+ Notes:
+ This function defines three nested functions: download_app_json_schema, download_app_yaml_schema, and serve_asyncapi_schema. These functions are used to handle different routes for serving the AsyncAPI schema and documentation.
+
+ """
+ifnotself.include_in_schemaornotschema_url:
+returnNone
+
+defdownload_app_json_schema()->Response:
+assert(# nosec B101
+self.schema
+),"You need to run application lifespan at first"
+
+returnResponse(
+content=json.dumps(self.schema.to_jsonable(),indent=2),
+headers={"Content-Type":"application/octet-stream"},
+)
+
+defdownload_app_yaml_schema()->Response:
+assert(# nosec B101
+self.schema
+),"You need to run application lifespan at first"
+
+returnResponse(
+content=self.schema.to_yaml(),
+headers={
+"Content-Type":"application/octet-stream",
+},
+)
+
+defserve_asyncapi_schema(
+sidebar:bool=True,
+info:bool=True,
+servers:bool=True,
+operations:bool=True,
+messages:bool=True,
+schemas:bool=True,
+errors:bool=True,
+expandMessageExamples:bool=True,
+)->HTMLResponse:
+"""Serve the AsyncAPI schema as an HTML response.
+
+ Args:
+ sidebar (bool, optional): Whether to include the sidebar in the HTML. Defaults to True.
+ info (bool, optional): Whether to include the info section in the HTML. Defaults to True.
+ servers (bool, optional): Whether to include the servers section in the HTML. Defaults to True.
+ operations (bool, optional): Whether to include the operations section in the HTML. Defaults to True.
+ messages (bool, optional): Whether to include the messages section in the HTML. Defaults to True.
+ schemas (bool, optional): Whether to include the schemas section in the HTML. Defaults to True.
+ errors (bool, optional): Whether to include the errors section in the HTML. Defaults to True.
+ expandMessageExamples (bool, optional): Whether to expand message examples in the HTML. Defaults to True.
- Raises:
- AssertionError: If the schema is not available.
- !!! note
-
- The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
- """
-assert(# nosec B101
-self.schema
-),"You need to run application lifespan at first"
-
-returnHTMLResponse(
-content=get_asyncapi_html(
-self.schema,
-sidebar=sidebar,
-info=info,
-servers=servers,
-operations=operations,
-messages=messages,
-schemas=schemas,
-errors=errors,
-expand_message_examples=expandMessageExamples,
-title=self.schema.info.title,
-)
-)
-
-docs_router=APIRouter(
-prefix=self.prefix,
-tags=["asyncapi"],
-redirect_slashes=self.redirect_slashes,
-default=self.default,
-deprecated=self.deprecated,
-)
-docs_router.get(schema_url)(serve_asyncapi_schema)
-docs_router.get(f"{schema_url}.json")(download_app_json_schema)
-docs_router.get(f"{schema_url}.yaml")(download_app_yaml_schema)
-returndocs_router
+ Returns:
+ HTMLResponse: The HTML response containing the AsyncAPI schema.
+
+ Raises:
+ AssertionError: If the schema is not available.
+ !!! note
+
+ The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
+ """
+assert(# nosec B101
+self.schema
+),"You need to run application lifespan at first"
+
+returnHTMLResponse(
+content=get_asyncapi_html(
+self.schema,
+sidebar=sidebar,
+info=info,
+servers=servers,
+operations=operations,
+messages=messages,
+schemas=schemas,
+errors=errors,
+expand_message_examples=expandMessageExamples,
+title=self.schema.info.title,
+)
+)
+
+docs_router=APIRouter(
+prefix=self.prefix,
+tags=["asyncapi"],
+redirect_slashes=self.redirect_slashes,
+default=self.default,
+deprecated=self.deprecated,
+)
+docs_router.get(schema_url)(serve_asyncapi_schema)
+docs_router.get(f"{schema_url}.json")(download_app_json_schema)
+docs_router.get(f"{schema_url}.yaml")(download_app_yaml_schema)
+returndocs_router
definclude_router(
-self,
-router:"APIRouter",
-*,
-prefix:str="",
-tags:Optional[List[Union[str,Enum]]]=None,
-dependencies:Optional[Sequence[params.Depends]]=None,
-default_response_class:Type[Response]=Default(JSONResponse),
-responses:Optional[Dict[Union[int,str],AnyDict]]=None,
-callbacks:Optional[List[BaseRoute]]=None,
-deprecated:Optional[bool]=None,
-include_in_schema:bool=True,
-generate_unique_id_function:Callable[[APIRoute],str]=Default(
-generate_unique_id
-),
-)->None:
-"""Includes a router in the API.
-
- Args:
- router (APIRouter): The router to include.
- prefix (str, optional): The prefix to prepend to all paths defined in the router. Defaults to "".
- tags (List[Union[str, Enum]], optional): The tags to assign to all paths defined in the router. Defaults to None.
- dependencies (Sequence[params.Depends], optional): The dependencies to apply to all paths defined in the router. Defaults to None.
- default_response_class (Type[Response], optional): The default response class to use for all paths defined in the router. Defaults to Default(JSONResponse).
- responses (Dict[Union[int, str], AnyDict], optional): The responses to define for all paths defined in the router. Defaults to None.
- callbacks (List[BaseRoute], optional): The callbacks to apply to all paths defined in the router. Defaults to None.
- deprecated (bool, optional): Whether the router is deprecated. Defaults to None.
- include_in_schema (bool, optional): Whether to include the router in the API schema. Defaults to True.
- generate_unique_id_function (Callable[[APIRoute], str], optional): The function to generate unique IDs for
-
- """
-ifisinstance(router,StreamRouter):# pragma: no branch
-self._setup_log_context(self.broker,router.broker)
-self.broker.handlers={**self.broker.handlers,**router.broker.handlers}
-self.broker._publishers={
-**self.broker._publishers,
-**router.broker._publishers,
-}
-
-super().include_router(
-router=router,
-prefix=prefix,
-tags=tags,
-dependencies=dependencies,
-default_response_class=default_response_class,
-responses=responses,
-callbacks=callbacks,
-deprecated=deprecated,
-include_in_schema=include_in_schema,
-generate_unique_id_function=generate_unique_id_function,
-)
+621
+622
+623
+624
definclude_router(
+self,
+router:"APIRouter",
+*,
+prefix:str="",
+tags:Optional[List[Union[str,Enum]]]=None,
+dependencies:Optional[Sequence[params.Depends]]=None,
+default_response_class:Type[Response]=Default(JSONResponse),
+responses:Optional[Dict[Union[int,str],AnyDict]]=None,
+callbacks:Optional[List[BaseRoute]]=None,
+deprecated:Optional[bool]=None,
+include_in_schema:bool=True,
+generate_unique_id_function:Callable[[APIRoute],str]=Default(
+generate_unique_id
+),
+)->None:
+"""Includes a router in the API.
+
+ Args:
+ router (APIRouter): The router to include.
+ prefix (str, optional): The prefix to prepend to all paths defined in the router. Defaults to "".
+ tags (List[Union[str, Enum]], optional): The tags to assign to all paths defined in the router. Defaults to None.
+ dependencies (Sequence[params.Depends], optional): The dependencies to apply to all paths defined in the router. Defaults to None.
+ default_response_class (Type[Response], optional): The default response class to use for all paths defined in the router. Defaults to Default(JSONResponse).
+ responses (Dict[Union[int, str], AnyDict], optional): The responses to define for all paths defined in the router. Defaults to None.
+ callbacks (List[BaseRoute], optional): The callbacks to apply to all paths defined in the router. Defaults to None.
+ deprecated (bool, optional): Whether the router is deprecated. Defaults to None.
+ include_in_schema (bool, optional): Whether to include the router in the API schema. Defaults to True.
+ generate_unique_id_function (Callable[[APIRoute], str], optional): The function to generate unique IDs for
+
+ """
+ifisinstance(router,StreamRouter):# pragma: no branch
+self._setup_log_context(self.broker,router.broker)
+self.broker.handlers={**self.broker.handlers,**router.broker.handlers}
+self.broker._publishers={
+**self.broker._publishers,
+**router.broker._publishers,
+}
+
+super().include_router(
+router=router,
+prefix=prefix,
+tags=tags,
+dependencies=dependencies,
+default_response_class=default_response_class,
+responses=responses,
+callbacks=callbacks,
+deprecated=deprecated,
+include_in_schema=include_in_schema,
+generate_unique_id_function=generate_unique_id_function,
+)
defpublisher(
-self,
-queue:Union[NameRequired,str],
-*publisher_args:Any,
-**publisher_kwargs:Any,
-)->BasePublisher[MsgType]:
-"""Publishes messages to a queue.
-
- Args:
- queue: The queue to publish the messages to. Can be either a `NameRequired` object or a string.
- *publisher_args: Additional arguments to be passed to the publisher.
- **publisher_kwargs: Additional keyword arguments to be passed to the publisher.
-
- Returns:
- An instance of `BasePublisher` that can be used to publish messages to the specified queue.
+466
+467
+468
+469
defpublisher(
+self,
+queue:Union[NameRequired,str],
+*publisher_args:Any,
+**publisher_kwargs:Any,
+)->BasePublisher[MsgType]:
+"""Publishes messages to a queue.
+
+ Args:
+ queue: The queue to publish the messages to. Can be either a `NameRequired` object or a string.
+ *publisher_args: Additional arguments to be passed to the publisher.
+ **publisher_kwargs: Additional keyword arguments to be passed to the publisher.
- """
-returnself.broker.publisher(
-queue,
-*publisher_args,
-**publisher_kwargs,
-)
+ Returns:
+ An instance of `BasePublisher` that can be used to publish messages to the specified queue.
+
+ """
+returnself.broker.publisher(
+queue,
+*publisher_args,
+**publisher_kwargs,
+)
defsubscriber(
-self,
-path:Union[str,NameRequired],
-*extra:Union[NameRequired,str],
-dependencies:Optional[Sequence[params.Depends]]=None,
-**broker_kwargs:Any,
-)->Callable[
-[Callable[P_HandlerParams,T_HandlerReturn]],
-HandlerCallWrapper[MsgType,P_HandlerParams,T_HandlerReturn],
-]:
-"""A function decorator for subscribing to a message queue.
-
- Args:
- path : The path to subscribe to. Can be a string or a `NameRequired` object.
- *extra : Additional path segments. Can be a `NameRequired` object or a string.
- dependencies : Optional sequence of dependencies.
- **broker_kwargs : Additional keyword arguments for the broker.
-
- Returns:
- A callable decorator that adds the decorated function as an endpoint for the specified path.
+279
+280
+281
+282
defsubscriber(
+self,
+path:Union[str,NameRequired],
+*extra:Union[NameRequired,str],
+dependencies:Optional[Sequence[params.Depends]]=None,
+**broker_kwargs:Any,
+)->Callable[
+[Callable[P_HandlerParams,T_HandlerReturn]],
+HandlerCallWrapper[MsgType,P_HandlerParams,T_HandlerReturn],
+]:
+"""A function decorator for subscribing to a message queue.
+
+ Args:
+ path : The path to subscribe to. Can be a string or a `NameRequired` object.
+ *extra : Additional path segments. Can be a `NameRequired` object or a string.
+ dependencies : Optional sequence of dependencies.
+ **broker_kwargs : Additional keyword arguments for the broker.
- Raises:
- NotImplementedError: If silent animals are not supported.
+ Returns:
+ A callable decorator that adds the decorated function as an endpoint for the specified path.
- """
-current_dependencies=self.dependencies.copy()
-ifdependencies:
-current_dependencies.extend(dependencies)
-
-defdecorator(
-func:Callable[P_HandlerParams,T_HandlerReturn],
-)->HandlerCallWrapper[MsgType,P_HandlerParams,T_HandlerReturn]:
-"""A decorator function.
-
- Args:
- func: The function to be decorated.
+ Raises:
+ NotImplementedError: If silent animals are not supported.
+
+ """
+current_dependencies=self.dependencies.copy()
+ifdependencies:
+current_dependencies.extend(dependencies)
+
+defdecorator(
+func:Callable[P_HandlerParams,T_HandlerReturn],
+)->HandlerCallWrapper[MsgType,P_HandlerParams,T_HandlerReturn]:
+"""A decorator function.
- Returns:
- The decorated function.
- !!! note
-
- The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
- """
-returnself.add_api_mq_route(
-path,
-*extra,
-endpoint=func,
-dependencies=current_dependencies,
-**broker_kwargs,
-)
-
-returndecorator
+ Args:
+ func: The function to be decorated.
+
+ Returns:
+ The decorated function.
+ !!! note
+
+ The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
+ """
+returnself.add_api_mq_route(
+path,
+*extra,
+endpoint=func,
+dependencies=current_dependencies,
+**broker_kwargs,
+)
+
+returndecorator