diff --git a/faststream/broker/publisher/usecase.py b/faststream/broker/publisher/usecase.py index 46bb96ef2a..1bdbc74513 100644 --- a/faststream/broker/publisher/usecase.py +++ b/faststream/broker/publisher/usecase.py @@ -20,7 +20,6 @@ from faststream.asyncapi.utils import to_camelcase from faststream.broker.publisher.proto import PublisherProto from faststream.broker.types import ( - BrokerMiddleware, MsgType, P_HandlerParams, T_HandlerReturn, diff --git a/faststream/broker/subscriber/usecase.py b/faststream/broker/subscriber/usecase.py index a2e9d1aa58..e5ee6fadea 100644 --- a/faststream/broker/subscriber/usecase.py +++ b/faststream/broker/subscriber/usecase.py @@ -21,7 +21,6 @@ from faststream.asyncapi.abc import AsyncAPIOperation from faststream.asyncapi.message import parse_handler_params from faststream.asyncapi.utils import to_camelcase -from faststream.broker.publisher.proto import ProducerProto from faststream.broker.subscriber.call_item import HandlerItem from faststream.broker.subscriber.proto import SubscriberProto from faststream.broker.types import ( @@ -40,6 +39,7 @@ from faststream.broker.message import StreamMessage from faststream.broker.middlewares import BaseMiddleware + from faststream.broker.publisher.proto import BasePublisherProto, ProducerProto from faststream.broker.types import ( AsyncCallable, BrokerMiddleware, @@ -93,6 +93,7 @@ def __init__( self, *, no_ack: bool, + no_reply: bool, retry: Union[bool, int], broker_dependencies: Iterable["Depends"], broker_middlewares: Iterable["BrokerMiddleware[MsgType]"], @@ -108,6 +109,7 @@ def __init__( self._default_parser = default_parser self._default_decoder = default_decoder + self._no_reply = no_reply # Watcher args self._no_ack = no_ack self._retry = retry @@ -139,7 +141,7 @@ def setup( # type: ignore[override] self, *, logger: Optional["LoggerProto"], - producer: Optional[ProducerProto], + producer: Optional["ProducerProto"], graceful_timeout: Optional[float], extra_context: "AnyDict", # broker options @@ -338,7 +340,7 @@ async def consume(self, msg: MsgType) -> Any: ) for p in chain( - self._make_response_publisher(message), + self.__get_reponse_publisher(message), h.handler._publishers, ): await p.publish( @@ -358,6 +360,16 @@ async def consume(self, msg: MsgType) -> Any: return None + def __get_reponse_publisher( + self, + message: "StreamMessage[MsgType]", + ) -> Iterable["BasePublisherProto"]: + if not message.reply_to or self._no_reply: + return () + + else: + return self._make_response_publisher(message) + def get_log_context( self, message: Optional["StreamMessage[MsgType]"], diff --git a/faststream/confluent/broker/registrator.py b/faststream/confluent/broker/registrator.py index 6d71a21046..277a77ef69 100644 --- a/faststream/confluent/broker/registrator.py +++ b/faststream/confluent/broker/registrator.py @@ -66,12 +66,14 @@ def subscriber( ], group_id: Annotated[ Optional[str], - Doc(""" + Doc( + """ Name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If `None`, auto-partition assignment (via group coordinator) and offset commits are disabled. - """), + """ + ), ] = None, key_deserializer: Annotated[ Optional[Callable[[bytes], Any]], @@ -89,16 +91,19 @@ def subscriber( ] = None, fetch_max_wait_ms: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of time in milliseconds the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by `fetch_min_bytes`. - """), + """ + ), ] = 500, fetch_max_bytes: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of data the server should return for a fetch request. This is not an absolute maximum, if the first message in the first non-empty partition of the fetch @@ -107,19 +112,23 @@ def subscriber( performs fetches to multiple brokers in parallel so memory usage will depend on the number of brokers containing partitions for the topic. - """), + """ + ), ] = 50 * 1024 * 1024, fetch_min_bytes: Annotated[ int, - Doc(""" + Doc( + """ Minimum amount of data the server should return for a fetch request, otherwise wait up to `fetch_max_wait_ms` for more data to accumulate. - """), + """ + ), ] = 1, max_partition_fetch_bytes: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of data per-partition the server will return. The maximum total memory used for a request ``= #partitions * max_partition_fetch_bytes``. @@ -128,43 +137,53 @@ def subscriber( send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition. - """), + """ + ), ] = 1 * 1024 * 1024, auto_offset_reset: Annotated[ Literal["latest", "earliest", "none"], - Doc(""" + Doc( + """ A policy for resetting offsets on `OffsetOutOfRangeError` errors: * `earliest` will move to the oldest available message * `latest` will move to the most recent * `none` will raise an exception so you can handle this case - """), + """ + ), ] = "latest", auto_commit: Annotated[ bool, - Doc(""" + Doc( + """ If `True` the consumer's offset will be periodically committed in the background. - """), + """ + ), ] = True, auto_commit_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ Milliseconds between automatic - offset commits, if `auto_commit` is `True`."""), + offset commits, if `auto_commit` is `True`.""" + ), ] = 5 * 1000, check_crcs: Annotated[ bool, - Doc(""" + Doc( + """ Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. - """), + """ + ), ] = True, partition_assignment_strategy: Annotated[ Sequence[str], - Doc(""" + Doc( + """ List of objects to use to distribute partition ownership amongst consumer instances when group management is used. This preference is implicit in the order @@ -174,22 +193,26 @@ def subscriber( one. The coordinator will choose the old assignment strategy until all members have been updated. Then it will choose the new strategy. - """), + """ + ), ] = ("roundrobin",), max_poll_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ Maximum allowed time between calls to consume messages in batches. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. If API methods block waiting for messages, that time does not count against this timeout. - """), + """ + ), ] = 5 * 60 * 1000, rebalance_timeout_ms: Annotated[ Optional[int], - Doc(""" + Doc( + """ The maximum time server will wait for this consumer to rejoin the group in a case of rebalance. In Java client this behaviour is bound to `max.poll.interval.ms` configuration, @@ -197,11 +220,13 @@ def subscriber( decouple this setting to allow finer tuning by users that use `ConsumerRebalanceListener` to delay rebalacing. Defaults to ``session_timeout_ms`` - """), + """ + ), ] = None, session_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Client group session and failure detection timeout. The consumer sends periodic heartbeats (`heartbeat.interval.ms`) to indicate its liveness to the broker. @@ -210,11 +235,13 @@ def subscriber( group and trigger a rebalance. The allowed range is configured with the **broker** configuration properties `group.min.session.timeout.ms` and `group.max.session.timeout.ms`. - """), + """ + ), ] = 10 * 1000, heartbeat_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka's group management feature. Heartbeats are used to ensure @@ -224,35 +251,43 @@ def subscriber( should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. - """), + """ + ), ] = 3 * 1000, consumer_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Maximum wait timeout for background fetching routine. Mostly defines how fast the system will see rebalance and request new data for new partitions. - """), + """ + ), ] = 200, max_poll_records: Annotated[ Optional[int], - Doc(""" + Doc( + """ The maximum number of records returned in a single call by batch consumer. Has no limit by default. - """), + """ + ), ] = None, exclude_internal_topics: Annotated[ bool, - Doc(""" + Doc( + """ Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to True the only way to receive records from an internal topic is subscribing to it. - """), + """ + ), ] = True, isolation_level: Annotated[ Literal["read_uncommitted", "read_committed"], - Doc(""" + Doc( + """ Controls how to read messages written transactionally. @@ -276,7 +311,8 @@ def subscriber( to the high watermark when there are in flight transactions. Further, when in `read_committed` the seek_to_end method will return the LSO. See method docs below. - """), + """ + ), ] = "read_uncommitted", batch: Annotated[ Literal[True], @@ -284,12 +320,14 @@ def subscriber( ], batch_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Milliseconds spent waiting if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. - """), + """ + ), ] = 200, max_records: Annotated[ Optional[int], @@ -331,6 +369,12 @@ def subscriber( bool, Doc("Whether to disable **FastStream** autoacknowledgement logic or not."), ] = False, + no_reply: Annotated[ + bool, + Doc( + "Whether to disable **FastStream** RPC and Reply To auto responses or not." + ), + ] = False, # AsyncAPI args title: Annotated[ Optional[str], @@ -358,12 +402,14 @@ def subscriber( ], group_id: Annotated[ Optional[str], - Doc(""" + Doc( + """ Name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If `None`, auto-partition assignment (via group coordinator) and offset commits are disabled. - """), + """ + ), ] = None, key_deserializer: Annotated[ Optional[Callable[[bytes], Any]], @@ -381,16 +427,19 @@ def subscriber( ] = None, fetch_max_wait_ms: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of time in milliseconds the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by `fetch_min_bytes`. - """), + """ + ), ] = 500, fetch_max_bytes: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of data the server should return for a fetch request. This is not an absolute maximum, if the first message in the first non-empty partition of the fetch @@ -399,19 +448,23 @@ def subscriber( performs fetches to multiple brokers in parallel so memory usage will depend on the number of brokers containing partitions for the topic. - """), + """ + ), ] = 50 * 1024 * 1024, fetch_min_bytes: Annotated[ int, - Doc(""" + Doc( + """ Minimum amount of data the server should return for a fetch request, otherwise wait up to `fetch_max_wait_ms` for more data to accumulate. - """), + """ + ), ] = 1, max_partition_fetch_bytes: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of data per-partition the server will return. The maximum total memory used for a request ``= #partitions * max_partition_fetch_bytes``. @@ -420,43 +473,53 @@ def subscriber( send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition. - """), + """ + ), ] = 1 * 1024 * 1024, auto_offset_reset: Annotated[ Literal["latest", "earliest", "none"], - Doc(""" + Doc( + """ A policy for resetting offsets on `OffsetOutOfRangeError` errors: * `earliest` will move to the oldest available message * `latest` will move to the most recent * `none` will raise an exception so you can handle this case - """), + """ + ), ] = "latest", auto_commit: Annotated[ bool, - Doc(""" + Doc( + """ If `True` the consumer's offset will be periodically committed in the background. - """), + """ + ), ] = True, auto_commit_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ Milliseconds between automatic - offset commits, if `auto_commit` is `True`."""), + offset commits, if `auto_commit` is `True`.""" + ), ] = 5 * 1000, check_crcs: Annotated[ bool, - Doc(""" + Doc( + """ Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. - """), + """ + ), ] = True, partition_assignment_strategy: Annotated[ Sequence[str], - Doc(""" + Doc( + """ List of objects to use to distribute partition ownership amongst consumer instances when group management is used. This preference is implicit in the order @@ -466,22 +529,26 @@ def subscriber( one. The coordinator will choose the old assignment strategy until all members have been updated. Then it will choose the new strategy. - """), + """ + ), ] = ("roundrobin",), max_poll_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ Maximum allowed time between calls to consume messages in batches. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. If API methods block waiting for messages, that time does not count against this timeout. - """), + """ + ), ] = 5 * 60 * 1000, rebalance_timeout_ms: Annotated[ Optional[int], - Doc(""" + Doc( + """ The maximum time server will wait for this consumer to rejoin the group in a case of rebalance. In Java client this behaviour is bound to `max.poll.interval.ms` configuration, @@ -489,11 +556,13 @@ def subscriber( decouple this setting to allow finer tuning by users that use `ConsumerRebalanceListener` to delay rebalacing. Defaults to ``session_timeout_ms`` - """), + """ + ), ] = None, session_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Client group session and failure detection timeout. The consumer sends periodic heartbeats (`heartbeat.interval.ms`) to indicate its liveness to the broker. @@ -502,11 +571,13 @@ def subscriber( group and trigger a rebalance. The allowed range is configured with the **broker** configuration properties `group.min.session.timeout.ms` and `group.max.session.timeout.ms`. - """), + """ + ), ] = 10 * 1000, heartbeat_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka's group management feature. Heartbeats are used to ensure @@ -516,35 +587,43 @@ def subscriber( should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. - """), + """ + ), ] = 3 * 1000, consumer_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Maximum wait timeout for background fetching routine. Mostly defines how fast the system will see rebalance and request new data for new partitions. - """), + """ + ), ] = 200, max_poll_records: Annotated[ Optional[int], - Doc(""" + Doc( + """ The maximum number of records returned in a single call by batch consumer. Has no limit by default. - """), + """ + ), ] = None, exclude_internal_topics: Annotated[ bool, - Doc(""" + Doc( + """ Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to True the only way to receive records from an internal topic is subscribing to it. - """), + """ + ), ] = True, isolation_level: Annotated[ Literal["read_uncommitted", "read_committed"], - Doc(""" + Doc( + """ Controls how to read messages written transactionally. @@ -568,7 +647,8 @@ def subscriber( to the high watermark when there are in flight transactions. Further, when in `read_committed` the seek_to_end method will return the LSO. See method docs below. - """), + """ + ), ] = "read_uncommitted", batch: Annotated[ Literal[False], @@ -576,12 +656,14 @@ def subscriber( ] = False, batch_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Milliseconds spent waiting if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. - """), + """ + ), ] = 200, max_records: Annotated[ Optional[int], @@ -623,6 +705,12 @@ def subscriber( bool, Doc("Whether to disable **FastStream** autoacknowledgement logic or not."), ] = False, + no_reply: Annotated[ + bool, + Doc( + "Whether to disable **FastStream** RPC and Reply To auto responses or not." + ), + ] = False, # AsyncAPI args title: Annotated[ Optional[str], @@ -650,12 +738,14 @@ def subscriber( ], group_id: Annotated[ Optional[str], - Doc(""" + Doc( + """ Name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If `None`, auto-partition assignment (via group coordinator) and offset commits are disabled. - """), + """ + ), ] = None, key_deserializer: Annotated[ Optional[Callable[[bytes], Any]], @@ -673,16 +763,19 @@ def subscriber( ] = None, fetch_max_wait_ms: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of time in milliseconds the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by `fetch_min_bytes`. - """), + """ + ), ] = 500, fetch_max_bytes: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of data the server should return for a fetch request. This is not an absolute maximum, if the first message in the first non-empty partition of the fetch @@ -691,19 +784,23 @@ def subscriber( performs fetches to multiple brokers in parallel so memory usage will depend on the number of brokers containing partitions for the topic. - """), + """ + ), ] = 50 * 1024 * 1024, fetch_min_bytes: Annotated[ int, - Doc(""" + Doc( + """ Minimum amount of data the server should return for a fetch request, otherwise wait up to `fetch_max_wait_ms` for more data to accumulate. - """), + """ + ), ] = 1, max_partition_fetch_bytes: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of data per-partition the server will return. The maximum total memory used for a request ``= #partitions * max_partition_fetch_bytes``. @@ -712,43 +809,53 @@ def subscriber( send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition. - """), + """ + ), ] = 1 * 1024 * 1024, auto_offset_reset: Annotated[ Literal["latest", "earliest", "none"], - Doc(""" + Doc( + """ A policy for resetting offsets on `OffsetOutOfRangeError` errors: * `earliest` will move to the oldest available message * `latest` will move to the most recent * `none` will raise an exception so you can handle this case - """), + """ + ), ] = "latest", auto_commit: Annotated[ bool, - Doc(""" + Doc( + """ If `True` the consumer's offset will be periodically committed in the background. - """), + """ + ), ] = True, auto_commit_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ Milliseconds between automatic - offset commits, if `auto_commit` is `True`."""), + offset commits, if `auto_commit` is `True`.""" + ), ] = 5 * 1000, check_crcs: Annotated[ bool, - Doc(""" + Doc( + """ Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. - """), + """ + ), ] = True, partition_assignment_strategy: Annotated[ Sequence[str], - Doc(""" + Doc( + """ List of objects to use to distribute partition ownership amongst consumer instances when group management is used. This preference is implicit in the order @@ -758,22 +865,26 @@ def subscriber( one. The coordinator will choose the old assignment strategy until all members have been updated. Then it will choose the new strategy. - """), + """ + ), ] = ("roundrobin",), max_poll_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ Maximum allowed time between calls to consume messages in batches. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. If API methods block waiting for messages, that time does not count against this timeout. - """), + """ + ), ] = 5 * 60 * 1000, rebalance_timeout_ms: Annotated[ Optional[int], - Doc(""" + Doc( + """ The maximum time server will wait for this consumer to rejoin the group in a case of rebalance. In Java client this behaviour is bound to `max.poll.interval.ms` configuration, @@ -781,11 +892,13 @@ def subscriber( decouple this setting to allow finer tuning by users that use `ConsumerRebalanceListener` to delay rebalacing. Defaults to ``session_timeout_ms`` - """), + """ + ), ] = None, session_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Client group session and failure detection timeout. The consumer sends periodic heartbeats (`heartbeat.interval.ms`) to indicate its liveness to the broker. @@ -794,11 +907,13 @@ def subscriber( group and trigger a rebalance. The allowed range is configured with the **broker** configuration properties `group.min.session.timeout.ms` and `group.max.session.timeout.ms`. - """), + """ + ), ] = 10 * 1000, heartbeat_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka's group management feature. Heartbeats are used to ensure @@ -808,35 +923,43 @@ def subscriber( should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. - """), + """ + ), ] = 3 * 1000, consumer_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Maximum wait timeout for background fetching routine. Mostly defines how fast the system will see rebalance and request new data for new partitions. - """), + """ + ), ] = 200, max_poll_records: Annotated[ Optional[int], - Doc(""" + Doc( + """ The maximum number of records returned in a single call by batch consumer. Has no limit by default. - """), + """ + ), ] = None, exclude_internal_topics: Annotated[ bool, - Doc(""" + Doc( + """ Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to True the only way to receive records from an internal topic is subscribing to it. - """), + """ + ), ] = True, isolation_level: Annotated[ Literal["read_uncommitted", "read_committed"], - Doc(""" + Doc( + """ Controls how to read messages written transactionally. @@ -860,7 +983,8 @@ def subscriber( to the high watermark when there are in flight transactions. Further, when in `read_committed` the seek_to_end method will return the LSO. See method docs below. - """), + """ + ), ] = "read_uncommitted", batch: Annotated[ bool, @@ -868,12 +992,14 @@ def subscriber( ] = False, batch_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Milliseconds spent waiting if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. - """), + """ + ), ] = 200, max_records: Annotated[ Optional[int], @@ -915,6 +1041,12 @@ def subscriber( bool, Doc("Whether to disable **FastStream** autoacknowledgement logic or not."), ] = False, + no_reply: Annotated[ + bool, + Doc( + "Whether to disable **FastStream** RPC and Reply To auto responses or not." + ), + ] = False, # AsyncAPI args title: Annotated[ Optional[str], @@ -945,12 +1077,14 @@ def subscriber( ], group_id: Annotated[ Optional[str], - Doc(""" + Doc( + """ Name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If `None`, auto-partition assignment (via group coordinator) and offset commits are disabled. - """), + """ + ), ] = None, key_deserializer: Annotated[ Optional[Callable[[bytes], Any]], @@ -968,16 +1102,19 @@ def subscriber( ] = None, fetch_max_wait_ms: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of time in milliseconds the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by `fetch_min_bytes`. - """), + """ + ), ] = 500, fetch_max_bytes: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of data the server should return for a fetch request. This is not an absolute maximum, if the first message in the first non-empty partition of the fetch @@ -986,19 +1123,23 @@ def subscriber( performs fetches to multiple brokers in parallel so memory usage will depend on the number of brokers containing partitions for the topic. - """), + """ + ), ] = 50 * 1024 * 1024, fetch_min_bytes: Annotated[ int, - Doc(""" + Doc( + """ Minimum amount of data the server should return for a fetch request, otherwise wait up to `fetch_max_wait_ms` for more data to accumulate. - """), + """ + ), ] = 1, max_partition_fetch_bytes: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of data per-partition the server will return. The maximum total memory used for a request ``= #partitions * max_partition_fetch_bytes``. @@ -1007,43 +1148,53 @@ def subscriber( send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition. - """), + """ + ), ] = 1 * 1024 * 1024, auto_offset_reset: Annotated[ Literal["latest", "earliest", "none"], - Doc(""" + Doc( + """ A policy for resetting offsets on `OffsetOutOfRangeError` errors: * `earliest` will move to the oldest available message * `latest` will move to the most recent * `none` will raise an exception so you can handle this case - """), + """ + ), ] = "latest", auto_commit: Annotated[ bool, - Doc(""" + Doc( + """ If `True` the consumer's offset will be periodically committed in the background. - """), + """ + ), ] = True, auto_commit_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ Milliseconds between automatic - offset commits, if `auto_commit` is `True`."""), + offset commits, if `auto_commit` is `True`.""" + ), ] = 5 * 1000, check_crcs: Annotated[ bool, - Doc(""" + Doc( + """ Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. - """), + """ + ), ] = True, partition_assignment_strategy: Annotated[ Sequence[str], - Doc(""" + Doc( + """ List of objects to use to distribute partition ownership amongst consumer instances when group management is used. This preference is implicit in the order @@ -1053,22 +1204,26 @@ def subscriber( one. The coordinator will choose the old assignment strategy until all members have been updated. Then it will choose the new strategy. - """), + """ + ), ] = ("roundrobin",), max_poll_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ Maximum allowed time between calls to consume messages in batches. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. If API methods block waiting for messages, that time does not count against this timeout. - """), + """ + ), ] = 5 * 60 * 1000, rebalance_timeout_ms: Annotated[ Optional[int], - Doc(""" + Doc( + """ The maximum time server will wait for this consumer to rejoin the group in a case of rebalance. In Java client this behaviour is bound to `max.poll.interval.ms` configuration, @@ -1076,11 +1231,13 @@ def subscriber( decouple this setting to allow finer tuning by users that use `ConsumerRebalanceListener` to delay rebalacing. Defaults to ``session_timeout_ms`` - """), + """ + ), ] = None, session_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Client group session and failure detection timeout. The consumer sends periodic heartbeats (`heartbeat.interval.ms`) to indicate its liveness to the broker. @@ -1089,11 +1246,13 @@ def subscriber( group and trigger a rebalance. The allowed range is configured with the **broker** configuration properties `group.min.session.timeout.ms` and `group.max.session.timeout.ms`. - """), + """ + ), ] = 10 * 1000, heartbeat_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka's group management feature. Heartbeats are used to ensure @@ -1103,35 +1262,43 @@ def subscriber( should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. - """), + """ + ), ] = 3 * 1000, consumer_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Maximum wait timeout for background fetching routine. Mostly defines how fast the system will see rebalance and request new data for new partitions. - """), + """ + ), ] = 200, max_poll_records: Annotated[ Optional[int], - Doc(""" + Doc( + """ The maximum number of records returned in a single call by batch consumer. Has no limit by default. - """), + """ + ), ] = None, exclude_internal_topics: Annotated[ bool, - Doc(""" + Doc( + """ Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to True the only way to receive records from an internal topic is subscribing to it. - """), + """ + ), ] = True, isolation_level: Annotated[ Literal["read_uncommitted", "read_committed"], - Doc(""" + Doc( + """ Controls how to read messages written transactionally. @@ -1155,7 +1322,8 @@ def subscriber( to the high watermark when there are in flight transactions. Further, when in `read_committed` the seek_to_end method will return the LSO. See method docs below. - """), + """ + ), ] = "read_uncommitted", batch: Annotated[ bool, @@ -1163,12 +1331,14 @@ def subscriber( ] = False, batch_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Milliseconds spent waiting if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. - """), + """ + ), ] = 200, max_records: Annotated[ Optional[int], @@ -1210,6 +1380,12 @@ def subscriber( bool, Doc("Whether to disable **FastStream** autoacknowledgement logic or not."), ] = False, + no_reply: Annotated[ + bool, + Doc( + "Whether to disable **FastStream** RPC and Reply To auto responses or not." + ), + ] = False, # AsyncAPI args title: Annotated[ Optional[str], @@ -1264,6 +1440,7 @@ def subscriber( is_manual=not auto_commit, # subscriber args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=self._middlewares, broker_dependencies=self._dependencies, @@ -1301,7 +1478,8 @@ def publisher( *, key: Annotated[ Union[bytes, Any, None], - Doc(""" + Doc( + """ A key to associate with the message. Can be used to determine which partition to send the message to. If partition is `None` (and producer's partitioner config is left as default), @@ -1309,14 +1487,17 @@ def publisher( partition (but if key is `None`, partition is chosen randomly). Must be type `bytes`, or be serializable to bytes via configured `key_serializer`. - """), + """ + ), ] = None, partition: Annotated[ Optional[int], - Doc(""" + Doc( + """ Specify a partition. If not set, the partition will be selected using the configured `partitioner`. - """), + """ + ), ] = None, headers: Annotated[ Optional[Dict[str, str]], @@ -1371,7 +1552,8 @@ def publisher( *, key: Annotated[ Union[bytes, Any, None], - Doc(""" + Doc( + """ A key to associate with the message. Can be used to determine which partition to send the message to. If partition is `None` (and producer's partitioner config is left as default), @@ -1379,14 +1561,17 @@ def publisher( partition (but if key is `None`, partition is chosen randomly). Must be type `bytes`, or be serializable to bytes via configured `key_serializer`. - """), + """ + ), ] = None, partition: Annotated[ Optional[int], - Doc(""" + Doc( + """ Specify a partition. If not set, the partition will be selected using the configured `partitioner`. - """), + """ + ), ] = None, headers: Annotated[ Optional[Dict[str, str]], @@ -1441,7 +1626,8 @@ def publisher( *, key: Annotated[ Union[bytes, Any, None], - Doc(""" + Doc( + """ A key to associate with the message. Can be used to determine which partition to send the message to. If partition is `None` (and producer's partitioner config is left as default), @@ -1449,14 +1635,17 @@ def publisher( partition (but if key is `None`, partition is chosen randomly). Must be type `bytes`, or be serializable to bytes via configured `key_serializer`. - """), + """ + ), ] = None, partition: Annotated[ Optional[int], - Doc(""" + Doc( + """ Specify a partition. If not set, the partition will be selected using the configured `partitioner`. - """), + """ + ), ] = None, headers: Annotated[ Optional[Dict[str, str]], @@ -1514,7 +1703,8 @@ def publisher( *, key: Annotated[ Union[bytes, Any, None], - Doc(""" + Doc( + """ A key to associate with the message. Can be used to determine which partition to send the message to. If partition is `None` (and producer's partitioner config is left as default), @@ -1522,14 +1712,17 @@ def publisher( partition (but if key is `None`, partition is chosen randomly). Must be type `bytes`, or be serializable to bytes via configured `key_serializer`. - """), + """ + ), ] = None, partition: Annotated[ Optional[int], - Doc(""" + Doc( + """ Specify a partition. If not set, the partition will be selected using the configured `partitioner`. - """), + """ + ), ] = None, headers: Annotated[ Optional[Dict[str, str]], diff --git a/faststream/confluent/fastapi/fastapi.py b/faststream/confluent/fastapi/fastapi.py index 2a494a7712..1897243d6c 100644 --- a/faststream/confluent/fastapi/fastapi.py +++ b/faststream/confluent/fastapi/fastapi.py @@ -414,12 +414,14 @@ def subscriber( ], group_id: Annotated[ Optional[str], - Doc(""" + Doc( + """ Name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If `None`, auto-partition assignment (via group coordinator) and offset commits are disabled. - """), + """ + ), ] = None, key_deserializer: Annotated[ Optional[Callable[[bytes], Any]], @@ -437,16 +439,19 @@ def subscriber( ] = None, fetch_max_wait_ms: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of time in milliseconds the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by `fetch_min_bytes`. - """), + """ + ), ] = 500, fetch_max_bytes: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of data the server should return for a fetch request. This is not an absolute maximum, if the first message in the first non-empty partition of the fetch @@ -455,19 +460,23 @@ def subscriber( performs fetches to multiple brokers in parallel so memory usage will depend on the number of brokers containing partitions for the topic. - """), + """ + ), ] = 50 * 1024 * 1024, fetch_min_bytes: Annotated[ int, - Doc(""" + Doc( + """ Minimum amount of data the server should return for a fetch request, otherwise wait up to `fetch_max_wait_ms` for more data to accumulate. - """), + """ + ), ] = 1, max_partition_fetch_bytes: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of data per-partition the server will return. The maximum total memory used for a request ``= #partitions * max_partition_fetch_bytes``. @@ -476,43 +485,53 @@ def subscriber( send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition. - """), + """ + ), ] = 1 * 1024 * 1024, auto_offset_reset: Annotated[ Literal["latest", "earliest", "none"], - Doc(""" + Doc( + """ A policy for resetting offsets on `OffsetOutOfRangeError` errors: * `earliest` will move to the oldest available message * `latest` will move to the most recent * `none` will raise an exception so you can handle this case - """), + """ + ), ] = "latest", auto_commit: Annotated[ bool, - Doc(""" + Doc( + """ If `True` the consumer's offset will be periodically committed in the background. - """), + """ + ), ] = True, auto_commit_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ Milliseconds between automatic - offset commits, if `auto_commit` is `True`."""), + offset commits, if `auto_commit` is `True`.""" + ), ] = 5 * 1000, check_crcs: Annotated[ bool, - Doc(""" + Doc( + """ Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. - """), + """ + ), ] = True, partition_assignment_strategy: Annotated[ Sequence[str], - Doc(""" + Doc( + """ List of objects to use to distribute partition ownership amongst consumer instances when group management is used. This preference is implicit in the order @@ -522,22 +541,26 @@ def subscriber( one. The coordinator will choose the old assignment strategy until all members have been updated. Then it will choose the new strategy. - """), + """ + ), ] = ("roundrobin",), max_poll_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ Maximum allowed time between calls to consume messages in batches. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. If API methods block waiting for messages, that time does not count against this timeout. - """), + """ + ), ] = 5 * 60 * 1000, rebalance_timeout_ms: Annotated[ Optional[int], - Doc(""" + Doc( + """ The maximum time server will wait for this consumer to rejoin the group in a case of rebalance. In Java client this behaviour is bound to `max.poll.interval.ms` configuration, @@ -545,11 +568,13 @@ def subscriber( decouple this setting to allow finer tuning by users that use `ConsumerRebalanceListener` to delay rebalacing. Defaults to ``session_timeout_ms`` - """), + """ + ), ] = None, session_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Client group session and failure detection timeout. The consumer sends periodic heartbeats (`heartbeat.interval.ms`) to indicate its liveness to the broker. @@ -558,11 +583,13 @@ def subscriber( group and trigger a rebalance. The allowed range is configured with the **broker** configuration properties `group.min.session.timeout.ms` and `group.max.session.timeout.ms`. - """), + """ + ), ] = 10 * 1000, heartbeat_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka's group management feature. Heartbeats are used to ensure @@ -572,35 +599,43 @@ def subscriber( should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. - """), + """ + ), ] = 3 * 1000, consumer_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Maximum wait timeout for background fetching routine. Mostly defines how fast the system will see rebalance and request new data for new partitions. - """), + """ + ), ] = 200, max_poll_records: Annotated[ Optional[int], - Doc(""" + Doc( + """ The maximum number of records returned in a single call by batch consumer. Has no limit by default. - """), + """ + ), ] = None, exclude_internal_topics: Annotated[ bool, - Doc(""" + Doc( + """ Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to True the only way to receive records from an internal topic is subscribing to it. - """), + """ + ), ] = True, isolation_level: Annotated[ Literal["read_uncommitted", "read_committed"], - Doc(""" + Doc( + """ Controls how to read messages written transactionally. @@ -624,7 +659,8 @@ def subscriber( to the high watermark when there are in flight transactions. Further, when in `read_committed` the seek_to_end method will return the LSO. See method docs below. - """), + """ + ), ] = "read_uncommitted", batch: Annotated[ Literal[False], @@ -632,12 +668,14 @@ def subscriber( ] = False, batch_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Milliseconds spent waiting if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. - """), + """ + ), ] = 200, max_records: Annotated[ Optional[int], @@ -679,6 +717,12 @@ def subscriber( bool, Doc("Whether to disable **FastStream** autoacknowledgement logic or not."), ] = False, + no_reply: Annotated[ + bool, + Doc( + "Whether to disable **FastStream** RPC and Reply To auto responses or not." + ), + ] = False, # AsyncAPI args title: Annotated[ Optional[str], @@ -829,12 +873,14 @@ def subscriber( ], group_id: Annotated[ Optional[str], - Doc(""" + Doc( + """ Name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If `None`, auto-partition assignment (via group coordinator) and offset commits are disabled. - """), + """ + ), ] = None, key_deserializer: Annotated[ Optional[Callable[[bytes], Any]], @@ -852,16 +898,19 @@ def subscriber( ] = None, fetch_max_wait_ms: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of time in milliseconds the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by `fetch_min_bytes`. - """), + """ + ), ] = 500, fetch_max_bytes: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of data the server should return for a fetch request. This is not an absolute maximum, if the first message in the first non-empty partition of the fetch @@ -870,19 +919,23 @@ def subscriber( performs fetches to multiple brokers in parallel so memory usage will depend on the number of brokers containing partitions for the topic. - """), + """ + ), ] = 50 * 1024 * 1024, fetch_min_bytes: Annotated[ int, - Doc(""" + Doc( + """ Minimum amount of data the server should return for a fetch request, otherwise wait up to `fetch_max_wait_ms` for more data to accumulate. - """), + """ + ), ] = 1, max_partition_fetch_bytes: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of data per-partition the server will return. The maximum total memory used for a request ``= #partitions * max_partition_fetch_bytes``. @@ -891,43 +944,53 @@ def subscriber( send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition. - """), + """ + ), ] = 1 * 1024 * 1024, auto_offset_reset: Annotated[ Literal["latest", "earliest", "none"], - Doc(""" + Doc( + """ A policy for resetting offsets on `OffsetOutOfRangeError` errors: * `earliest` will move to the oldest available message * `latest` will move to the most recent * `none` will raise an exception so you can handle this case - """), + """ + ), ] = "latest", auto_commit: Annotated[ bool, - Doc(""" + Doc( + """ If `True` the consumer's offset will be periodically committed in the background. - """), + """ + ), ] = True, auto_commit_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ Milliseconds between automatic - offset commits, if `auto_commit` is `True`."""), + offset commits, if `auto_commit` is `True`.""" + ), ] = 5 * 1000, check_crcs: Annotated[ bool, - Doc(""" + Doc( + """ Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. - """), + """ + ), ] = True, partition_assignment_strategy: Annotated[ Sequence[str], - Doc(""" + Doc( + """ List of objects to use to distribute partition ownership amongst consumer instances when group management is used. This preference is implicit in the order @@ -937,22 +1000,26 @@ def subscriber( one. The coordinator will choose the old assignment strategy until all members have been updated. Then it will choose the new strategy. - """), + """ + ), ] = ("roundrobin",), max_poll_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ Maximum allowed time between calls to consume messages in batches. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. If API methods block waiting for messages, that time does not count against this timeout. - """), + """ + ), ] = 5 * 60 * 1000, rebalance_timeout_ms: Annotated[ Optional[int], - Doc(""" + Doc( + """ The maximum time server will wait for this consumer to rejoin the group in a case of rebalance. In Java client this behaviour is bound to `max.poll.interval.ms` configuration, @@ -960,11 +1027,13 @@ def subscriber( decouple this setting to allow finer tuning by users that use `ConsumerRebalanceListener` to delay rebalacing. Defaults to ``session_timeout_ms`` - """), + """ + ), ] = None, session_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Client group session and failure detection timeout. The consumer sends periodic heartbeats (`heartbeat.interval.ms`) to indicate its liveness to the broker. @@ -973,11 +1042,13 @@ def subscriber( group and trigger a rebalance. The allowed range is configured with the **broker** configuration properties `group.min.session.timeout.ms` and `group.max.session.timeout.ms`. - """), + """ + ), ] = 10 * 1000, heartbeat_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka's group management feature. Heartbeats are used to ensure @@ -987,35 +1058,43 @@ def subscriber( should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. - """), + """ + ), ] = 3 * 1000, consumer_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Maximum wait timeout for background fetching routine. Mostly defines how fast the system will see rebalance and request new data for new partitions. - """), + """ + ), ] = 200, max_poll_records: Annotated[ Optional[int], - Doc(""" + Doc( + """ The maximum number of records returned in a single call by batch consumer. Has no limit by default. - """), + """ + ), ] = None, exclude_internal_topics: Annotated[ bool, - Doc(""" + Doc( + """ Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to True the only way to receive records from an internal topic is subscribing to it. - """), + """ + ), ] = True, isolation_level: Annotated[ Literal["read_uncommitted", "read_committed"], - Doc(""" + Doc( + """ Controls how to read messages written transactionally. @@ -1039,7 +1118,8 @@ def subscriber( to the high watermark when there are in flight transactions. Further, when in `read_committed` the seek_to_end method will return the LSO. See method docs below. - """), + """ + ), ] = "read_uncommitted", batch: Annotated[ Literal[True], @@ -1047,12 +1127,14 @@ def subscriber( ], batch_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Milliseconds spent waiting if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. - """), + """ + ), ] = 200, max_records: Annotated[ Optional[int], @@ -1236,12 +1318,14 @@ def subscriber( ], group_id: Annotated[ Optional[str], - Doc(""" + Doc( + """ Name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If `None`, auto-partition assignment (via group coordinator) and offset commits are disabled. - """), + """ + ), ] = None, key_deserializer: Annotated[ Optional[Callable[[bytes], Any]], @@ -1259,16 +1343,19 @@ def subscriber( ] = None, fetch_max_wait_ms: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of time in milliseconds the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by `fetch_min_bytes`. - """), + """ + ), ] = 500, fetch_max_bytes: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of data the server should return for a fetch request. This is not an absolute maximum, if the first message in the first non-empty partition of the fetch @@ -1277,19 +1364,23 @@ def subscriber( performs fetches to multiple brokers in parallel so memory usage will depend on the number of brokers containing partitions for the topic. - """), + """ + ), ] = 50 * 1024 * 1024, fetch_min_bytes: Annotated[ int, - Doc(""" + Doc( + """ Minimum amount of data the server should return for a fetch request, otherwise wait up to `fetch_max_wait_ms` for more data to accumulate. - """), + """ + ), ] = 1, max_partition_fetch_bytes: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of data per-partition the server will return. The maximum total memory used for a request ``= #partitions * max_partition_fetch_bytes``. @@ -1298,43 +1389,53 @@ def subscriber( send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition. - """), + """ + ), ] = 1 * 1024 * 1024, auto_offset_reset: Annotated[ Literal["latest", "earliest", "none"], - Doc(""" + Doc( + """ A policy for resetting offsets on `OffsetOutOfRangeError` errors: * `earliest` will move to the oldest available message * `latest` will move to the most recent * `none` will raise an exception so you can handle this case - """), + """ + ), ] = "latest", auto_commit: Annotated[ bool, - Doc(""" + Doc( + """ If `True` the consumer's offset will be periodically committed in the background. - """), + """ + ), ] = True, auto_commit_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ Milliseconds between automatic - offset commits, if `auto_commit` is `True`."""), + offset commits, if `auto_commit` is `True`.""" + ), ] = 5 * 1000, check_crcs: Annotated[ bool, - Doc(""" + Doc( + """ Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. - """), + """ + ), ] = True, partition_assignment_strategy: Annotated[ Sequence[str], - Doc(""" + Doc( + """ List of objects to use to distribute partition ownership amongst consumer instances when group management is used. This preference is implicit in the order @@ -1344,22 +1445,26 @@ def subscriber( one. The coordinator will choose the old assignment strategy until all members have been updated. Then it will choose the new strategy. - """), + """ + ), ] = ("roundrobin",), max_poll_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ Maximum allowed time between calls to consume messages in batches. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. If API methods block waiting for messages, that time does not count against this timeout. - """), + """ + ), ] = 5 * 60 * 1000, rebalance_timeout_ms: Annotated[ Optional[int], - Doc(""" + Doc( + """ The maximum time server will wait for this consumer to rejoin the group in a case of rebalance. In Java client this behaviour is bound to `max.poll.interval.ms` configuration, @@ -1367,11 +1472,13 @@ def subscriber( decouple this setting to allow finer tuning by users that use `ConsumerRebalanceListener` to delay rebalacing. Defaults to ``session_timeout_ms`` - """), + """ + ), ] = None, session_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Client group session and failure detection timeout. The consumer sends periodic heartbeats (`heartbeat.interval.ms`) to indicate its liveness to the broker. @@ -1380,11 +1487,13 @@ def subscriber( group and trigger a rebalance. The allowed range is configured with the **broker** configuration properties `group.min.session.timeout.ms` and `group.max.session.timeout.ms`. - """), + """ + ), ] = 10 * 1000, heartbeat_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka's group management feature. Heartbeats are used to ensure @@ -1394,35 +1503,43 @@ def subscriber( should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. - """), + """ + ), ] = 3 * 1000, consumer_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Maximum wait timeout for background fetching routine. Mostly defines how fast the system will see rebalance and request new data for new partitions. - """), + """ + ), ] = 200, max_poll_records: Annotated[ Optional[int], - Doc(""" + Doc( + """ The maximum number of records returned in a single call by batch consumer. Has no limit by default. - """), + """ + ), ] = None, exclude_internal_topics: Annotated[ bool, - Doc(""" + Doc( + """ Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to True the only way to receive records from an internal topic is subscribing to it. - """), + """ + ), ] = True, isolation_level: Annotated[ Literal["read_uncommitted", "read_committed"], - Doc(""" + Doc( + """ Controls how to read messages written transactionally. @@ -1446,7 +1563,8 @@ def subscriber( to the high watermark when there are in flight transactions. Further, when in `read_committed` the seek_to_end method will return the LSO. See method docs below. - """), + """ + ), ] = "read_uncommitted", batch: Annotated[ bool, @@ -1454,12 +1572,14 @@ def subscriber( ] = False, batch_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Milliseconds spent waiting if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. - """), + """ + ), ] = 200, max_records: Annotated[ Optional[int], @@ -1501,6 +1621,12 @@ def subscriber( bool, Doc("Whether to disable **FastStream** autoacknowledgement logic or not."), ] = False, + no_reply: Annotated[ + bool, + Doc( + "Whether to disable **FastStream** RPC and Reply To auto responses or not." + ), + ] = False, # AsyncAPI args title: Annotated[ Optional[str], @@ -1654,12 +1780,14 @@ def subscriber( ], group_id: Annotated[ Optional[str], - Doc(""" + Doc( + """ Name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If `None`, auto-partition assignment (via group coordinator) and offset commits are disabled. - """), + """ + ), ] = None, key_deserializer: Annotated[ Optional[Callable[[bytes], Any]], @@ -1677,16 +1805,19 @@ def subscriber( ] = None, fetch_max_wait_ms: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of time in milliseconds the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by `fetch_min_bytes`. - """), + """ + ), ] = 500, fetch_max_bytes: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of data the server should return for a fetch request. This is not an absolute maximum, if the first message in the first non-empty partition of the fetch @@ -1695,19 +1826,23 @@ def subscriber( performs fetches to multiple brokers in parallel so memory usage will depend on the number of brokers containing partitions for the topic. - """), + """ + ), ] = 50 * 1024 * 1024, fetch_min_bytes: Annotated[ int, - Doc(""" + Doc( + """ Minimum amount of data the server should return for a fetch request, otherwise wait up to `fetch_max_wait_ms` for more data to accumulate. - """), + """ + ), ] = 1, max_partition_fetch_bytes: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of data per-partition the server will return. The maximum total memory used for a request ``= #partitions * max_partition_fetch_bytes``. @@ -1716,43 +1851,53 @@ def subscriber( send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition. - """), + """ + ), ] = 1 * 1024 * 1024, auto_offset_reset: Annotated[ Literal["latest", "earliest", "none"], - Doc(""" + Doc( + """ A policy for resetting offsets on `OffsetOutOfRangeError` errors: * `earliest` will move to the oldest available message * `latest` will move to the most recent * `none` will raise an exception so you can handle this case - """), + """ + ), ] = "latest", auto_commit: Annotated[ bool, - Doc(""" + Doc( + """ If `True` the consumer's offset will be periodically committed in the background. - """), + """ + ), ] = True, auto_commit_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ Milliseconds between automatic - offset commits, if `auto_commit` is `True`."""), + offset commits, if `auto_commit` is `True`.""" + ), ] = 5 * 1000, check_crcs: Annotated[ bool, - Doc(""" + Doc( + """ Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. - """), + """ + ), ] = True, partition_assignment_strategy: Annotated[ Sequence[str], - Doc(""" + Doc( + """ List of objects to use to distribute partition ownership amongst consumer instances when group management is used. This preference is implicit in the order @@ -1762,22 +1907,26 @@ def subscriber( one. The coordinator will choose the old assignment strategy until all members have been updated. Then it will choose the new strategy. - """), + """ + ), ] = ("roundrobin",), max_poll_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ Maximum allowed time between calls to consume messages in batches. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. If API methods block waiting for messages, that time does not count against this timeout. - """), + """ + ), ] = 5 * 60 * 1000, rebalance_timeout_ms: Annotated[ Optional[int], - Doc(""" + Doc( + """ The maximum time server will wait for this consumer to rejoin the group in a case of rebalance. In Java client this behaviour is bound to `max.poll.interval.ms` configuration, @@ -1785,11 +1934,13 @@ def subscriber( decouple this setting to allow finer tuning by users that use `ConsumerRebalanceListener` to delay rebalacing. Defaults to ``session_timeout_ms`` - """), + """ + ), ] = None, session_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Client group session and failure detection timeout. The consumer sends periodic heartbeats (`heartbeat.interval.ms`) to indicate its liveness to the broker. @@ -1798,11 +1949,13 @@ def subscriber( group and trigger a rebalance. The allowed range is configured with the **broker** configuration properties `group.min.session.timeout.ms` and `group.max.session.timeout.ms`. - """), + """ + ), ] = 10 * 1000, heartbeat_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka's group management feature. Heartbeats are used to ensure @@ -1812,35 +1965,43 @@ def subscriber( should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. - """), + """ + ), ] = 3 * 1000, consumer_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Maximum wait timeout for background fetching routine. Mostly defines how fast the system will see rebalance and request new data for new partitions. - """), + """ + ), ] = 200, max_poll_records: Annotated[ Optional[int], - Doc(""" + Doc( + """ The maximum number of records returned in a single call by batch consumer. Has no limit by default. - """), + """ + ), ] = None, exclude_internal_topics: Annotated[ bool, - Doc(""" + Doc( + """ Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to True the only way to receive records from an internal topic is subscribing to it. - """), + """ + ), ] = True, isolation_level: Annotated[ Literal["read_uncommitted", "read_committed"], - Doc(""" + Doc( + """ Controls how to read messages written transactionally. @@ -1864,7 +2025,8 @@ def subscriber( to the high watermark when there are in flight transactions. Further, when in `read_committed` the seek_to_end method will return the LSO. See method docs below. - """), + """ + ), ] = "read_uncommitted", batch: Annotated[ bool, @@ -1872,12 +2034,14 @@ def subscriber( ] = False, batch_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Milliseconds spent waiting if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. - """), + """ + ), ] = 200, max_records: Annotated[ Optional[int], @@ -1919,6 +2083,12 @@ def subscriber( bool, Doc("Whether to disable **FastStream** autoacknowledgement logic or not."), ] = False, + no_reply: Annotated[ + bool, + Doc( + "Whether to disable **FastStream** RPC and Reply To auto responses or not." + ), + ] = False, # AsyncAPI args title: Annotated[ Optional[str], @@ -2096,6 +2266,7 @@ def subscriber( filter=filter, retry=retry, no_ack=no_ack, + no_reply=no_reply, title=title, description=description, include_in_schema=include_in_schema, diff --git a/faststream/confluent/router.py b/faststream/confluent/router.py index 33480a12ea..f24a40e263 100644 --- a/faststream/confluent/router.py +++ b/faststream/confluent/router.py @@ -48,7 +48,8 @@ def __init__( *, key: Annotated[ Union[bytes, Any, None], - Doc(""" + Doc( + """ A key to associate with the message. Can be used to determine which partition to send the message to. If partition is `None` (and producer's partitioner config is left as default), @@ -56,14 +57,17 @@ def __init__( partition (but if key is `None`, partition is chosen randomly). Must be type `bytes`, or be serializable to bytes via configured `key_serializer`. - """), + """ + ), ] = None, partition: Annotated[ Optional[int], - Doc(""" + Doc( + """ Specify a partition. If not set, the partition will be selected using the configured `partitioner`. - """), + """ + ), ] = None, headers: Annotated[ Optional[Dict[str, str]], @@ -146,12 +150,14 @@ def __init__( ] = (), group_id: Annotated[ Optional[str], - Doc(""" + Doc( + """ Name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If `None`, auto-partition assignment (via group coordinator) and offset commits are disabled. - """), + """ + ), ] = None, key_deserializer: Annotated[ Optional[Callable[[bytes], Any]], @@ -169,16 +175,19 @@ def __init__( ] = None, fetch_max_wait_ms: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of time in milliseconds the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by `fetch_min_bytes`. - """), + """ + ), ] = 500, fetch_max_bytes: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of data the server should return for a fetch request. This is not an absolute maximum, if the first message in the first non-empty partition of the fetch @@ -187,19 +196,23 @@ def __init__( performs fetches to multiple brokers in parallel so memory usage will depend on the number of brokers containing partitions for the topic. - """), + """ + ), ] = 50 * 1024 * 1024, fetch_min_bytes: Annotated[ int, - Doc(""" + Doc( + """ Minimum amount of data the server should return for a fetch request, otherwise wait up to `fetch_max_wait_ms` for more data to accumulate. - """), + """ + ), ] = 1, max_partition_fetch_bytes: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of data per-partition the server will return. The maximum total memory used for a request ``= #partitions * max_partition_fetch_bytes``. @@ -208,43 +221,53 @@ def __init__( send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition. - """), + """ + ), ] = 1 * 1024 * 1024, auto_offset_reset: Annotated[ Literal["latest", "earliest", "none"], - Doc(""" + Doc( + """ A policy for resetting offsets on `OffsetOutOfRangeError` errors: * `earliest` will move to the oldest available message * `latest` will move to the most recent * `none` will raise an exception so you can handle this case - """), + """ + ), ] = "latest", auto_commit: Annotated[ bool, - Doc(""" + Doc( + """ If `True` the consumer's offset will be periodically committed in the background. - """), + """ + ), ] = True, auto_commit_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ Milliseconds between automatic - offset commits, if `auto_commit` is `True`."""), + offset commits, if `auto_commit` is `True`.""" + ), ] = 5 * 1000, check_crcs: Annotated[ bool, - Doc(""" + Doc( + """ Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. - """), + """ + ), ] = True, partition_assignment_strategy: Annotated[ Sequence[str], - Doc(""" + Doc( + """ List of objects to use to distribute partition ownership amongst consumer instances when group management is used. This preference is implicit in the order @@ -254,22 +277,26 @@ def __init__( one. The coordinator will choose the old assignment strategy until all members have been updated. Then it will choose the new strategy. - """), + """ + ), ] = ("roundrobin",), max_poll_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ Maximum allowed time between calls to consume messages in batches. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. If API methods block waiting for messages, that time does not count against this timeout. - """), + """ + ), ] = 5 * 60 * 1000, rebalance_timeout_ms: Annotated[ Optional[int], - Doc(""" + Doc( + """ The maximum time server will wait for this consumer to rejoin the group in a case of rebalance. In Java client this behaviour is bound to `max.poll.interval.ms` configuration, @@ -277,11 +304,13 @@ def __init__( decouple this setting to allow finer tuning by users that use `ConsumerRebalanceListener` to delay rebalacing. Defaults to ``session_timeout_ms`` - """), + """ + ), ] = None, session_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Client group session and failure detection timeout. The consumer sends periodic heartbeats (`heartbeat.interval.ms`) to indicate its liveness to the broker. @@ -290,11 +319,13 @@ def __init__( group and trigger a rebalance. The allowed range is configured with the **broker** configuration properties `group.min.session.timeout.ms` and `group.max.session.timeout.ms`. - """), + """ + ), ] = 10 * 1000, heartbeat_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka's group management feature. Heartbeats are used to ensure @@ -304,35 +335,43 @@ def __init__( should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. - """), + """ + ), ] = 3 * 1000, consumer_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Maximum wait timeout for background fetching routine. Mostly defines how fast the system will see rebalance and request new data for new partitions. - """), + """ + ), ] = 200, max_poll_records: Annotated[ Optional[int], - Doc(""" + Doc( + """ The maximum number of records returned in a single call by batch consumer. Has no limit by default. - """), + """ + ), ] = None, exclude_internal_topics: Annotated[ bool, - Doc(""" + Doc( + """ Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to True the only way to receive records from an internal topic is subscribing to it. - """), + """ + ), ] = True, isolation_level: Annotated[ Literal["read_uncommitted", "read_committed"], - Doc(""" + Doc( + """ Controls how to read messages written transactionally. @@ -356,7 +395,8 @@ def __init__( to the high watermark when there are in flight transactions. Further, when in `read_committed` the seek_to_end method will return the LSO. See method docs below. - """), + """ + ), ] = "read_uncommitted", batch: Annotated[ bool, @@ -364,12 +404,14 @@ def __init__( ] = False, batch_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Milliseconds spent waiting if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. - """), + """ + ), ] = 200, max_records: Annotated[ Optional[int], @@ -411,6 +453,12 @@ def __init__( bool, Doc("Whether to disable **FastStream** autoacknowledgement logic or not."), ] = False, + no_reply: Annotated[ + bool, + Doc( + "Whether to disable **FastStream** RPC and Reply To auto responses or not." + ), + ] = False, # AsyncAPI args title: Annotated[ Optional[str], @@ -461,6 +509,7 @@ def __init__( decoder=decoder, middlewares=middlewares, filter=filter, + no_reply=no_reply, # AsyncAPI args title=title, description=description, diff --git a/faststream/confluent/subscriber/factory.py b/faststream/confluent/subscriber/factory.py index b7b6b6ca61..f1d001b888 100644 --- a/faststream/confluent/subscriber/factory.py +++ b/faststream/confluent/subscriber/factory.py @@ -33,6 +33,7 @@ def create_subscriber( is_manual: bool, # Subscriber args no_ack: bool, + no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], broker_middlewares: Iterable["BrokerMiddleware[Tuple[ConfluentMsg, ...]]"], @@ -55,6 +56,7 @@ def create_subscriber( is_manual: bool, # Subscriber args no_ack: bool, + no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], broker_middlewares: Iterable["BrokerMiddleware[ConfluentMsg]"], @@ -77,6 +79,7 @@ def create_subscriber( is_manual: bool, # Subscriber args no_ack: bool, + no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], broker_middlewares: Iterable[ @@ -103,6 +106,7 @@ def create_subscriber( is_manual: bool, # Subscriber args no_ack: bool, + no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], broker_middlewares: Iterable[ @@ -125,6 +129,7 @@ def create_subscriber( connection_data=connection_data, is_manual=is_manual, no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_dependencies=broker_dependencies, broker_middlewares=broker_middlewares, @@ -139,6 +144,7 @@ def create_subscriber( connection_data=connection_data, is_manual=is_manual, no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_dependencies=broker_dependencies, broker_middlewares=broker_middlewares, diff --git a/faststream/confluent/subscriber/usecase.py b/faststream/confluent/subscriber/usecase.py index 28f7ece4e7..dde949848f 100644 --- a/faststream/confluent/subscriber/usecase.py +++ b/faststream/confluent/subscriber/usecase.py @@ -57,6 +57,7 @@ def __init__( default_parser: "AsyncCallable", default_decoder: "AsyncCallable", no_ack: bool, + no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], broker_middlewares: Iterable["BrokerMiddleware[MsgType]"], @@ -70,6 +71,7 @@ def __init__( default_decoder=default_decoder, # Propagated args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=broker_middlewares, broker_dependencies=broker_dependencies, @@ -156,9 +158,10 @@ async def close(self) -> None: self.task = None def _make_response_publisher( - self, message: "StreamMessage[Any]" + self, + message: "StreamMessage[Any]", ) -> Sequence[FakePublisher]: - if not message.reply_to or self._producer is None: + if self._producer is None: return () return ( @@ -226,6 +229,7 @@ def __init__( is_manual: bool, # Subscriber args no_ack: bool, + no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], broker_middlewares: Iterable["BrokerMiddleware[Message]"], @@ -244,6 +248,7 @@ def __init__( default_decoder=AsyncConfluentParser.decode_message, # Propagated args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=broker_middlewares, broker_dependencies=broker_dependencies, @@ -285,6 +290,7 @@ def __init__( is_manual: bool, # Subscriber args no_ack: bool, + no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], broker_middlewares: Iterable["BrokerMiddleware[Tuple[Message, ...]]"], @@ -306,6 +312,7 @@ def __init__( default_decoder=AsyncConfluentParser.decode_message_batch, # Propagated args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=broker_middlewares, broker_dependencies=broker_dependencies, diff --git a/faststream/kafka/broker/broker.py b/faststream/kafka/broker/broker.py index de0b6980f1..42cc4f281b 100644 --- a/faststream/kafka/broker/broker.py +++ b/faststream/kafka/broker/broker.py @@ -67,20 +67,24 @@ class KafkaInitKwargs(TypedDict, total=False): ] metadata_max_age_ms: Annotated[ int, - Doc(""" + Doc( + """ The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions. - """), + """ + ), ] connections_max_idle_ms: Annotated[ int, - Doc(""" + Doc( + """ Close idle connections after the number of milliseconds specified by this config. Specifying `None` will disable idle checks. - """), + """ + ), ] sasl_kerberos_service_name: str sasl_kerberos_domain_name: Optional[str] @@ -91,18 +95,21 @@ class KafkaInitKwargs(TypedDict, total=False): loop: Optional[AbstractEventLoop] client_id: Annotated[ Optional[str], - Doc(""" + Doc( + """ A name for this client. This string is passed in each request to servers and can be used to identify specific server-side log entries that correspond to this client. Also submitted to :class:`~.consumer.group_coordinator.GroupCoordinator` for logging with respect to consumer group administration. - """), + """ + ), ] # publisher args acks: Annotated[ Union[Literal[0, 1, -1, "all"], object], - Doc(""" + Doc( + """ One of ``0``, ``1``, ``all``. The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are @@ -127,7 +134,8 @@ class KafkaInitKwargs(TypedDict, total=False): If unset, defaults to ``acks=1``. If `enable_idempotence` is :data:`True` defaults to ``acks=all``. - """), + """ + ), ] key_serializer: Annotated[ Optional[Callable[[Any], bytes]], @@ -139,26 +147,31 @@ class KafkaInitKwargs(TypedDict, total=False): ] compression_type: Annotated[ Optional[Literal["gzip", "snappy", "lz4", "zstd"]], - Doc(""" + Doc( + """ The compression type for all data generated bythe producer. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression). - """), + """ + ), ] max_batch_size: Annotated[ int, - Doc(""" + Doc( + """ Maximum size of buffered data per partition. After this amount `send` coroutine will block until batch is drained. - """), + """ + ), ] partitioner: Annotated[ Callable[ [bytes, List[Partition], List[Partition]], Partition, ], - Doc(""" + Doc( + """ Callable used to determine which partition each message is assigned to. Called (after key serialization): ``partitioner(key_bytes, all_partitions, available_partitions)``. @@ -167,21 +180,25 @@ class KafkaInitKwargs(TypedDict, total=False): messages with the same key are assigned to the same partition. When a key is :data:`None`, the message is delivered to a random partition (filtered to partitions with available leaders only, if possible). - """), + """ + ), ] max_request_size: Annotated[ int, - Doc(""" + Doc( + """ The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server has its own cap on record size which may be different from this. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests. - """), + """ + ), ] linger_ms: Annotated[ int, - Doc(""" + Doc( + """ The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster @@ -190,19 +207,22 @@ class KafkaInitKwargs(TypedDict, total=False): This setting accomplishes this by adding a small amount of artificial delay; that is, if first request is processed faster, than `linger_ms`, producer will wait ``linger_ms - process_time``. - """), + """ + ), ] send_backoff_ms: int enable_idempotence: Annotated[ bool, - Doc(""" + Doc( + """ When set to `True`, the producer will ensure that exactly one copy of each message is written in the stream. If `False`, producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. Note that enabling idempotence acks to set to ``all``. If it is not explicitly set by the user it will be chosen. - """), + """ + ), ] transactional_id: Optional[str] transaction_timeout_ms: int @@ -219,14 +239,16 @@ def __init__( self, bootstrap_servers: Annotated[ Union[str, Iterable[str]], - Doc(""" + Doc( + """ A `host[:port]` string (or list of `host[:port]` strings) that the consumer should contact to bootstrap initial cluster metadata. This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request. Default port is 9092. - """), + """ + ), ] = "localhost", *, # both @@ -240,20 +262,24 @@ def __init__( ] = 100, metadata_max_age_ms: Annotated[ int, - Doc(""" + Doc( + """ The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions. - """), + """ + ), ] = 5 * 60 * 1000, connections_max_idle_ms: Annotated[ int, - Doc(""" + Doc( + """ Close idle connections after the number of milliseconds specified by this config. Specifying `None` will disable idle checks. - """), + """ + ), ] = 9 * 60 * 1000, sasl_kerberos_service_name: str = "kafka", sasl_kerberos_domain_name: Optional[str] = None, @@ -264,18 +290,21 @@ def __init__( loop: Optional["AbstractEventLoop"] = None, client_id: Annotated[ Optional[str], - Doc(""" + Doc( + """ A name for this client. This string is passed in each request to servers and can be used to identify specific server-side log entries that correspond to this client. Also submitted to :class:`~.consumer.group_coordinator.GroupCoordinator` for logging with respect to consumer group administration. - """), + """ + ), ] = SERVICE_NAME, # publisher args acks: Annotated[ Union[Literal[0, 1, -1, "all"], object], - Doc(""" + Doc( + """ One of ``0``, ``1``, ``all``. The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are @@ -300,7 +329,8 @@ def __init__( If unset, defaults to ``acks=1``. If `enable_idempotence` is :data:`True` defaults to ``acks=all``. - """), + """ + ), ] = _missing, key_serializer: Annotated[ Optional[Callable[[Any], bytes]], @@ -312,26 +342,31 @@ def __init__( ] = None, compression_type: Annotated[ Optional[Literal["gzip", "snappy", "lz4", "zstd"]], - Doc(""" + Doc( + """ The compression type for all data generated bythe producer. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression). - """), + """ + ), ] = None, max_batch_size: Annotated[ int, - Doc(""" + Doc( + """ Maximum size of buffered data per partition. After this amount `send` coroutine will block until batch is drained. - """), + """ + ), ] = 16 * 1024, partitioner: Annotated[ Callable[ [bytes, List[Partition], List[Partition]], Partition, ], - Doc(""" + Doc( + """ Callable used to determine which partition each message is assigned to. Called (after key serialization): ``partitioner(key_bytes, all_partitions, available_partitions)``. @@ -340,21 +375,25 @@ def __init__( messages with the same key are assigned to the same partition. When a key is :data:`None`, the message is delivered to a random partition (filtered to partitions with available leaders only, if possible). - """), + """ + ), ] = DefaultPartitioner(), max_request_size: Annotated[ int, - Doc(""" + Doc( + """ The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server has its own cap on record size which may be different from this. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests. - """), + """ + ), ] = 1024 * 1024, linger_ms: Annotated[ int, - Doc(""" + Doc( + """ The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster @@ -363,19 +402,22 @@ def __init__( This setting accomplishes this by adding a small amount of artificial delay; that is, if first request is processed faster, than `linger_ms`, producer will wait ``linger_ms - process_time``. - """), + """ + ), ] = 0, send_backoff_ms: int = 100, enable_idempotence: Annotated[ bool, - Doc(""" + Doc( + """ When set to `True`, the producer will ensure that exactly one copy of each message is written in the stream. If `False`, producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. Note that enabling idempotence acks to set to ``all``. If it is not explicitly set by the user it will be chosen. - """), + """ + ), ] = False, transactional_id: Optional[str] = None, transaction_timeout_ms: int = 60 * 1000, @@ -632,7 +674,8 @@ async def publish( # type: ignore[override] *, key: Annotated[ Union[bytes, Any, None], - Doc(""" + Doc( + """ A key to associate with the message. Can be used to determine which partition to send the message to. If partition is `None` (and producer's partitioner config is left as default), @@ -640,21 +683,26 @@ async def publish( # type: ignore[override] partition (but if key is `None`, partition is chosen randomly). Must be type `bytes`, or be serializable to bytes via configured `key_serializer`. - """), + """ + ), ] = None, partition: Annotated[ Optional[int], - Doc(""" + Doc( + """ Specify a partition. If not set, the partition will be selected using the configured `partitioner`. - """), + """ + ), ] = None, timestamp_ms: Annotated[ Optional[int], - Doc(""" + Doc( + """ Epoch milliseconds (from Jan 1 1970 UTC) to use as the message timestamp. Defaults to current time. - """), + """ + ), ] = None, headers: Annotated[ Optional[Dict[str, str]], @@ -708,17 +756,21 @@ async def publish_batch( ], partition: Annotated[ Optional[int], - Doc(""" + Doc( + """ Specify a partition. If not set, the partition will be selected using the configured `partitioner`. - """), + """ + ), ] = None, timestamp_ms: Annotated[ Optional[int], - Doc(""" + Doc( + """ Epoch milliseconds (from Jan 1 1970 UTC) to use as the message timestamp. Defaults to current time. - """), + """ + ), ] = None, headers: Annotated[ Optional[Dict[str, str]], diff --git a/faststream/kafka/broker/registrator.py b/faststream/kafka/broker/registrator.py index 0633032c06..1cb3fa38e2 100644 --- a/faststream/kafka/broker/registrator.py +++ b/faststream/kafka/broker/registrator.py @@ -23,7 +23,7 @@ from faststream.kafka.subscriber.factory import create_subscriber if TYPE_CHECKING: - from aiokafka import ConsumerRecord, TopicPartition + from aiokafka import TopicPartition from aiokafka.abc import ConsumerRebalanceListener from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor from fast_depends.dependencies import Depends @@ -48,8 +48,8 @@ class KafkaRegistrator( ABCBroker[ Union[ - "ConsumerRecord", - Tuple["ConsumerRecord", ...], + ConsumerRecord, + Tuple[ConsumerRecord, ...], ] ] ): @@ -77,12 +77,14 @@ def subscriber( ] = False, group_id: Annotated[ Optional[str], - Doc(""" + Doc( + """ Name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If `None`, auto-partition assignment (via group coordinator) and offset commits are disabled. - """), + """ + ), ] = None, key_deserializer: Annotated[ Optional[Callable[[bytes], Any]], @@ -100,7 +102,8 @@ def subscriber( ] = None, fetch_max_bytes: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of data the server should return for a fetch request. This is not an absolute maximum, if the first message in the first non-empty partition of the fetch @@ -109,28 +112,34 @@ def subscriber( performs fetches to multiple brokers in parallel so memory usage will depend on the number of brokers containing partitions for the topic. - """), + """ + ), ] = 50 * 1024 * 1024, fetch_min_bytes: Annotated[ int, - Doc(""" + Doc( + """ Minimum amount of data the server should return for a fetch request, otherwise wait up to `fetch_max_wait_ms` for more data to accumulate. - """), + """ + ), ] = 1, fetch_max_wait_ms: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of time in milliseconds the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by `fetch_min_bytes`. - """), + """ + ), ] = 500, max_partition_fetch_bytes: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of data per-partition the server will return. The maximum total memory used for a request ``= #partitions * max_partition_fetch_bytes``. @@ -139,43 +148,53 @@ def subscriber( send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition. - """), + """ + ), ] = 1 * 1024 * 1024, auto_offset_reset: Annotated[ Literal["latest", "earliest", "none"], - Doc(""" + Doc( + """ A policy for resetting offsets on `OffsetOutOfRangeError` errors: * `earliest` will move to the oldest available message * `latest` will move to the most recent * `none` will raise an exception so you can handle this case - """), + """ + ), ] = "latest", auto_commit: Annotated[ bool, - Doc(""" + Doc( + """ If `True` the consumer's offset will be periodically committed in the background. - """), + """ + ), ] = True, auto_commit_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ Milliseconds between automatic - offset commits, if `auto_commit` is `True`."""), + offset commits, if `auto_commit` is `True`.""" + ), ] = 5 * 1000, check_crcs: Annotated[ bool, - Doc(""" + Doc( + """ Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. - """), + """ + ), ] = True, partition_assignment_strategy: Annotated[ Sequence["AbstractPartitionAssignor"], - Doc(""" + Doc( + """ List of objects to use to distribute partition ownership amongst consumer instances when group management is used. This preference is implicit in the order @@ -185,22 +204,26 @@ def subscriber( one. The coordinator will choose the old assignment strategy until all members have been updated. Then it will choose the new strategy. - """), + """ + ), ] = (RoundRobinPartitionAssignor,), max_poll_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ Maximum allowed time between calls to consume messages in batches. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. If API methods block waiting for messages, that time does not count against this timeout. - """), + """ + ), ] = 5 * 60 * 1000, rebalance_timeout_ms: Annotated[ Optional[int], - Doc(""" + Doc( + """ The maximum time server will wait for this consumer to rejoin the group in a case of rebalance. In Java client this behaviour is bound to `max.poll.interval.ms` configuration, @@ -208,11 +231,13 @@ def subscriber( decouple this setting to allow finer tuning by users that use `ConsumerRebalanceListener` to delay rebalacing. Defaults to ``session_timeout_ms`` - """), + """ + ), ] = None, session_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Client group session and failure detection timeout. The consumer sends periodic heartbeats (`heartbeat.interval.ms`) to indicate its liveness to the broker. @@ -221,11 +246,13 @@ def subscriber( group and trigger a rebalance. The allowed range is configured with the **broker** configuration properties `group.min.session.timeout.ms` and `group.max.session.timeout.ms`. - """), + """ + ), ] = 10 * 1000, heartbeat_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka's group management feature. Heartbeats are used to ensure @@ -235,35 +262,43 @@ def subscriber( should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. - """), + """ + ), ] = 3 * 1000, consumer_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Maximum wait timeout for background fetching routine. Mostly defines how fast the system will see rebalance and request new data for new partitions. - """), + """ + ), ] = 200, max_poll_records: Annotated[ Optional[int], - Doc(""" + Doc( + """ The maximum number of records returned in a single call by batch consumer. Has no limit by default. - """), + """ + ), ] = None, exclude_internal_topics: Annotated[ bool, - Doc(""" + Doc( + """ Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to True the only way to receive records from an internal topic is subscribing to it. - """), + """ + ), ] = True, isolation_level: Annotated[ Literal["read_uncommitted", "read_committed"], - Doc(""" + Doc( + """ Controls how to read messages written transactionally. @@ -287,16 +322,19 @@ def subscriber( to the high watermark when there are in flight transactions. Further, when in `read_committed` the seek_to_end method will return the LSO. See method docs below. - """), + """ + ), ] = "read_uncommitted", batch_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Milliseconds spent waiting if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. - """), + """ + ), ] = 200, max_records: Annotated[ Optional[int], @@ -304,7 +342,8 @@ def subscriber( ] = None, listener: Annotated[ Optional["ConsumerRebalanceListener"], - Doc(""" + Doc( + """ Optionally include listener callback, which will be called before and after each rebalance operation. @@ -326,20 +365,25 @@ def subscriber( to subscribe. It is guaranteed, however, that the partitions revoked/assigned through this interface are from topics subscribed in this call. - """), + """ + ), ] = None, pattern: Annotated[ Optional[str], - Doc(""" + Doc( + """ Pattern to match available topics. You must provide either topics or pattern, but not both. - """), + """ + ), ] = None, partitions: Annotated[ Iterable["TopicPartition"], - Doc(""" + Doc( + """ An explicit partitions list to assign. You can't use 'topics' and 'partitions' in the same time. - """), + """ + ), ] = (), # broker args dependencies: Annotated[ @@ -377,6 +421,12 @@ def subscriber( bool, Doc("Whether to disable **FastStream** autoacknowledgement logic or not."), ] = False, + no_reply: Annotated[ + bool, + Doc( + "Whether to disable **FastStream** RPC and Reply To auto responses or not." + ), + ] = False, # AsyncAPI args title: Annotated[ Optional[str], @@ -408,12 +458,14 @@ def subscriber( ], group_id: Annotated[ Optional[str], - Doc(""" + Doc( + """ Name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If `None`, auto-partition assignment (via group coordinator) and offset commits are disabled. - """), + """ + ), ] = None, key_deserializer: Annotated[ Optional[Callable[[bytes], Any]], @@ -431,7 +483,8 @@ def subscriber( ] = None, fetch_max_bytes: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of data the server should return for a fetch request. This is not an absolute maximum, if the first message in the first non-empty partition of the fetch @@ -440,28 +493,34 @@ def subscriber( performs fetches to multiple brokers in parallel so memory usage will depend on the number of brokers containing partitions for the topic. - """), + """ + ), ] = 50 * 1024 * 1024, fetch_min_bytes: Annotated[ int, - Doc(""" + Doc( + """ Minimum amount of data the server should return for a fetch request, otherwise wait up to `fetch_max_wait_ms` for more data to accumulate. - """), + """ + ), ] = 1, fetch_max_wait_ms: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of time in milliseconds the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by `fetch_min_bytes`. - """), + """ + ), ] = 500, max_partition_fetch_bytes: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of data per-partition the server will return. The maximum total memory used for a request ``= #partitions * max_partition_fetch_bytes``. @@ -470,43 +529,53 @@ def subscriber( send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition. - """), + """ + ), ] = 1 * 1024 * 1024, auto_offset_reset: Annotated[ Literal["latest", "earliest", "none"], - Doc(""" + Doc( + """ A policy for resetting offsets on `OffsetOutOfRangeError` errors: * `earliest` will move to the oldest available message * `latest` will move to the most recent * `none` will raise an exception so you can handle this case - """), + """ + ), ] = "latest", auto_commit: Annotated[ bool, - Doc(""" + Doc( + """ If `True` the consumer's offset will be periodically committed in the background. - """), + """ + ), ] = True, auto_commit_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ Milliseconds between automatic - offset commits, if `auto_commit` is `True`."""), + offset commits, if `auto_commit` is `True`.""" + ), ] = 5 * 1000, check_crcs: Annotated[ bool, - Doc(""" + Doc( + """ Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. - """), + """ + ), ] = True, partition_assignment_strategy: Annotated[ Sequence["AbstractPartitionAssignor"], - Doc(""" + Doc( + """ List of objects to use to distribute partition ownership amongst consumer instances when group management is used. This preference is implicit in the order @@ -516,22 +585,26 @@ def subscriber( one. The coordinator will choose the old assignment strategy until all members have been updated. Then it will choose the new strategy. - """), + """ + ), ] = (RoundRobinPartitionAssignor,), max_poll_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ Maximum allowed time between calls to consume messages in batches. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. If API methods block waiting for messages, that time does not count against this timeout. - """), + """ + ), ] = 5 * 60 * 1000, rebalance_timeout_ms: Annotated[ Optional[int], - Doc(""" + Doc( + """ The maximum time server will wait for this consumer to rejoin the group in a case of rebalance. In Java client this behaviour is bound to `max.poll.interval.ms` configuration, @@ -539,11 +612,13 @@ def subscriber( decouple this setting to allow finer tuning by users that use `ConsumerRebalanceListener` to delay rebalacing. Defaults to ``session_timeout_ms`` - """), + """ + ), ] = None, session_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Client group session and failure detection timeout. The consumer sends periodic heartbeats (`heartbeat.interval.ms`) to indicate its liveness to the broker. @@ -552,11 +627,13 @@ def subscriber( group and trigger a rebalance. The allowed range is configured with the **broker** configuration properties `group.min.session.timeout.ms` and `group.max.session.timeout.ms`. - """), + """ + ), ] = 10 * 1000, heartbeat_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka's group management feature. Heartbeats are used to ensure @@ -566,35 +643,43 @@ def subscriber( should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. - """), + """ + ), ] = 3 * 1000, consumer_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Maximum wait timeout for background fetching routine. Mostly defines how fast the system will see rebalance and request new data for new partitions. - """), + """ + ), ] = 200, max_poll_records: Annotated[ Optional[int], - Doc(""" + Doc( + """ The maximum number of records returned in a single call by batch consumer. Has no limit by default. - """), + """ + ), ] = None, exclude_internal_topics: Annotated[ bool, - Doc(""" + Doc( + """ Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to True the only way to receive records from an internal topic is subscribing to it. - """), + """ + ), ] = True, isolation_level: Annotated[ Literal["read_uncommitted", "read_committed"], - Doc(""" + Doc( + """ Controls how to read messages written transactionally. @@ -618,16 +703,19 @@ def subscriber( to the high watermark when there are in flight transactions. Further, when in `read_committed` the seek_to_end method will return the LSO. See method docs below. - """), + """ + ), ] = "read_uncommitted", batch_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Milliseconds spent waiting if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. - """), + """ + ), ] = 200, max_records: Annotated[ Optional[int], @@ -635,7 +723,8 @@ def subscriber( ] = None, listener: Annotated[ Optional["ConsumerRebalanceListener"], - Doc(""" + Doc( + """ Optionally include listener callback, which will be called before and after each rebalance operation. @@ -657,20 +746,25 @@ def subscriber( to subscribe. It is guaranteed, however, that the partitions revoked/assigned through this interface are from topics subscribed in this call. - """), + """ + ), ] = None, pattern: Annotated[ Optional[str], - Doc(""" + Doc( + """ Pattern to match available topics. You must provide either topics or pattern, but not both. - """), + """ + ), ] = None, partitions: Annotated[ Iterable["TopicPartition"], - Doc(""" + Doc( + """ An explicit partitions list to assign. You can't use 'topics' and 'partitions' in the same time. - """), + """ + ), ] = (), # broker args dependencies: Annotated[ @@ -708,6 +802,12 @@ def subscriber( bool, Doc("Whether to disable **FastStream** autoacknowledgement logic or not."), ] = False, + no_reply: Annotated[ + bool, + Doc( + "Whether to disable **FastStream** RPC and Reply To auto responses or not." + ), + ] = False, # AsyncAPI args title: Annotated[ Optional[str], @@ -739,12 +839,14 @@ def subscriber( ] = False, group_id: Annotated[ Optional[str], - Doc(""" + Doc( + """ Name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If `None`, auto-partition assignment (via group coordinator) and offset commits are disabled. - """), + """ + ), ] = None, key_deserializer: Annotated[ Optional[Callable[[bytes], Any]], @@ -762,7 +864,8 @@ def subscriber( ] = None, fetch_max_bytes: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of data the server should return for a fetch request. This is not an absolute maximum, if the first message in the first non-empty partition of the fetch @@ -771,28 +874,34 @@ def subscriber( performs fetches to multiple brokers in parallel so memory usage will depend on the number of brokers containing partitions for the topic. - """), + """ + ), ] = 50 * 1024 * 1024, fetch_min_bytes: Annotated[ int, - Doc(""" + Doc( + """ Minimum amount of data the server should return for a fetch request, otherwise wait up to `fetch_max_wait_ms` for more data to accumulate. - """), + """ + ), ] = 1, fetch_max_wait_ms: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of time in milliseconds the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by `fetch_min_bytes`. - """), + """ + ), ] = 500, max_partition_fetch_bytes: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of data per-partition the server will return. The maximum total memory used for a request ``= #partitions * max_partition_fetch_bytes``. @@ -801,43 +910,53 @@ def subscriber( send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition. - """), + """ + ), ] = 1 * 1024 * 1024, auto_offset_reset: Annotated[ Literal["latest", "earliest", "none"], - Doc(""" + Doc( + """ A policy for resetting offsets on `OffsetOutOfRangeError` errors: * `earliest` will move to the oldest available message * `latest` will move to the most recent * `none` will raise an exception so you can handle this case - """), + """ + ), ] = "latest", auto_commit: Annotated[ bool, - Doc(""" + Doc( + """ If `True` the consumer's offset will be periodically committed in the background. - """), + """ + ), ] = True, auto_commit_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ Milliseconds between automatic - offset commits, if `auto_commit` is `True`."""), + offset commits, if `auto_commit` is `True`.""" + ), ] = 5 * 1000, check_crcs: Annotated[ bool, - Doc(""" + Doc( + """ Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. - """), + """ + ), ] = True, partition_assignment_strategy: Annotated[ Sequence["AbstractPartitionAssignor"], - Doc(""" + Doc( + """ List of objects to use to distribute partition ownership amongst consumer instances when group management is used. This preference is implicit in the order @@ -847,22 +966,26 @@ def subscriber( one. The coordinator will choose the old assignment strategy until all members have been updated. Then it will choose the new strategy. - """), + """ + ), ] = (RoundRobinPartitionAssignor,), max_poll_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ Maximum allowed time between calls to consume messages in batches. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. If API methods block waiting for messages, that time does not count against this timeout. - """), + """ + ), ] = 5 * 60 * 1000, rebalance_timeout_ms: Annotated[ Optional[int], - Doc(""" + Doc( + """ The maximum time server will wait for this consumer to rejoin the group in a case of rebalance. In Java client this behaviour is bound to `max.poll.interval.ms` configuration, @@ -870,11 +993,13 @@ def subscriber( decouple this setting to allow finer tuning by users that use `ConsumerRebalanceListener` to delay rebalacing. Defaults to ``session_timeout_ms`` - """), + """ + ), ] = None, session_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Client group session and failure detection timeout. The consumer sends periodic heartbeats (`heartbeat.interval.ms`) to indicate its liveness to the broker. @@ -883,11 +1008,13 @@ def subscriber( group and trigger a rebalance. The allowed range is configured with the **broker** configuration properties `group.min.session.timeout.ms` and `group.max.session.timeout.ms`. - """), + """ + ), ] = 10 * 1000, heartbeat_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka's group management feature. Heartbeats are used to ensure @@ -897,35 +1024,43 @@ def subscriber( should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. - """), + """ + ), ] = 3 * 1000, consumer_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Maximum wait timeout for background fetching routine. Mostly defines how fast the system will see rebalance and request new data for new partitions. - """), + """ + ), ] = 200, max_poll_records: Annotated[ Optional[int], - Doc(""" + Doc( + """ The maximum number of records returned in a single call by batch consumer. Has no limit by default. - """), + """ + ), ] = None, exclude_internal_topics: Annotated[ bool, - Doc(""" + Doc( + """ Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to True the only way to receive records from an internal topic is subscribing to it. - """), + """ + ), ] = True, isolation_level: Annotated[ Literal["read_uncommitted", "read_committed"], - Doc(""" + Doc( + """ Controls how to read messages written transactionally. @@ -949,16 +1084,19 @@ def subscriber( to the high watermark when there are in flight transactions. Further, when in `read_committed` the seek_to_end method will return the LSO. See method docs below. - """), + """ + ), ] = "read_uncommitted", batch_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Milliseconds spent waiting if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. - """), + """ + ), ] = 200, max_records: Annotated[ Optional[int], @@ -966,7 +1104,8 @@ def subscriber( ] = None, listener: Annotated[ Optional["ConsumerRebalanceListener"], - Doc(""" + Doc( + """ Optionally include listener callback, which will be called before and after each rebalance operation. @@ -988,20 +1127,25 @@ def subscriber( to subscribe. It is guaranteed, however, that the partitions revoked/assigned through this interface are from topics subscribed in this call. - """), + """ + ), ] = None, pattern: Annotated[ Optional[str], - Doc(""" + Doc( + """ Pattern to match available topics. You must provide either topics or pattern, but not both. - """), + """ + ), ] = None, partitions: Annotated[ Iterable["TopicPartition"], - Doc(""" + Doc( + """ An explicit partitions list to assign. You can't use 'topics' and 'partitions' in the same time. - """), + """ + ), ] = (), # broker args dependencies: Annotated[ @@ -1039,6 +1183,12 @@ def subscriber( bool, Doc("Whether to disable **FastStream** autoacknowledgement logic or not."), ] = False, + no_reply: Annotated[ + bool, + Doc( + "Whether to disable **FastStream** RPC and Reply To auto responses or not." + ), + ] = False, # AsyncAPI args title: Annotated[ Optional[str], @@ -1073,12 +1223,14 @@ def subscriber( ] = False, group_id: Annotated[ Optional[str], - Doc(""" + Doc( + """ Name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If `None`, auto-partition assignment (via group coordinator) and offset commits are disabled. - """), + """ + ), ] = None, key_deserializer: Annotated[ Optional[Callable[[bytes], Any]], @@ -1096,7 +1248,8 @@ def subscriber( ] = None, fetch_max_bytes: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of data the server should return for a fetch request. This is not an absolute maximum, if the first message in the first non-empty partition of the fetch @@ -1105,28 +1258,34 @@ def subscriber( performs fetches to multiple brokers in parallel so memory usage will depend on the number of brokers containing partitions for the topic. - """), + """ + ), ] = 50 * 1024 * 1024, fetch_min_bytes: Annotated[ int, - Doc(""" + Doc( + """ Minimum amount of data the server should return for a fetch request, otherwise wait up to `fetch_max_wait_ms` for more data to accumulate. - """), + """ + ), ] = 1, fetch_max_wait_ms: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of time in milliseconds the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by `fetch_min_bytes`. - """), + """ + ), ] = 500, max_partition_fetch_bytes: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of data per-partition the server will return. The maximum total memory used for a request ``= #partitions * max_partition_fetch_bytes``. @@ -1135,43 +1294,53 @@ def subscriber( send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition. - """), + """ + ), ] = 1 * 1024 * 1024, auto_offset_reset: Annotated[ Literal["latest", "earliest", "none"], - Doc(""" + Doc( + """ A policy for resetting offsets on `OffsetOutOfRangeError` errors: * `earliest` will move to the oldest available message * `latest` will move to the most recent * `none` will raise an exception so you can handle this case - """), + """ + ), ] = "latest", auto_commit: Annotated[ bool, - Doc(""" + Doc( + """ If `True` the consumer's offset will be periodically committed in the background. - """), + """ + ), ] = True, auto_commit_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ Milliseconds between automatic - offset commits, if `auto_commit` is `True`."""), + offset commits, if `auto_commit` is `True`.""" + ), ] = 5 * 1000, check_crcs: Annotated[ bool, - Doc(""" + Doc( + """ Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. - """), + """ + ), ] = True, partition_assignment_strategy: Annotated[ Sequence["AbstractPartitionAssignor"], - Doc(""" + Doc( + """ List of objects to use to distribute partition ownership amongst consumer instances when group management is used. This preference is implicit in the order @@ -1181,22 +1350,26 @@ def subscriber( one. The coordinator will choose the old assignment strategy until all members have been updated. Then it will choose the new strategy. - """), + """ + ), ] = (RoundRobinPartitionAssignor,), max_poll_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ Maximum allowed time between calls to consume messages in batches. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. If API methods block waiting for messages, that time does not count against this timeout. - """), + """ + ), ] = 5 * 60 * 1000, rebalance_timeout_ms: Annotated[ Optional[int], - Doc(""" + Doc( + """ The maximum time server will wait for this consumer to rejoin the group in a case of rebalance. In Java client this behaviour is bound to `max.poll.interval.ms` configuration, @@ -1204,11 +1377,13 @@ def subscriber( decouple this setting to allow finer tuning by users that use `ConsumerRebalanceListener` to delay rebalacing. Defaults to ``session_timeout_ms`` - """), + """ + ), ] = None, session_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Client group session and failure detection timeout. The consumer sends periodic heartbeats (`heartbeat.interval.ms`) to indicate its liveness to the broker. @@ -1217,11 +1392,13 @@ def subscriber( group and trigger a rebalance. The allowed range is configured with the **broker** configuration properties `group.min.session.timeout.ms` and `group.max.session.timeout.ms`. - """), + """ + ), ] = 10 * 1000, heartbeat_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka's group management feature. Heartbeats are used to ensure @@ -1231,35 +1408,43 @@ def subscriber( should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. - """), + """ + ), ] = 3 * 1000, consumer_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Maximum wait timeout for background fetching routine. Mostly defines how fast the system will see rebalance and request new data for new partitions. - """), + """ + ), ] = 200, max_poll_records: Annotated[ Optional[int], - Doc(""" + Doc( + """ The maximum number of records returned in a single call by batch consumer. Has no limit by default. - """), + """ + ), ] = None, exclude_internal_topics: Annotated[ bool, - Doc(""" + Doc( + """ Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to True the only way to receive records from an internal topic is subscribing to it. - """), + """ + ), ] = True, isolation_level: Annotated[ Literal["read_uncommitted", "read_committed"], - Doc(""" + Doc( + """ Controls how to read messages written transactionally. @@ -1283,16 +1468,19 @@ def subscriber( to the high watermark when there are in flight transactions. Further, when in `read_committed` the seek_to_end method will return the LSO. See method docs below. - """), + """ + ), ] = "read_uncommitted", batch_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Milliseconds spent waiting if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. - """), + """ + ), ] = 200, max_records: Annotated[ Optional[int], @@ -1300,7 +1488,8 @@ def subscriber( ] = None, listener: Annotated[ Optional["ConsumerRebalanceListener"], - Doc(""" + Doc( + """ Optionally include listener callback, which will be called before and after each rebalance operation. @@ -1322,20 +1511,25 @@ def subscriber( to subscribe. It is guaranteed, however, that the partitions revoked/assigned through this interface are from topics subscribed in this call. - """), + """ + ), ] = None, pattern: Annotated[ Optional[str], - Doc(""" + Doc( + """ Pattern to match available topics. You must provide either topics or pattern, but not both. - """), + """ + ), ] = None, partitions: Annotated[ Iterable["TopicPartition"], - Doc(""" + Doc( + """ An explicit partitions list to assign. You can't use 'topics' and 'partitions' in the same time. - """), + """ + ), ] = (), # broker args dependencies: Annotated[ @@ -1373,6 +1567,12 @@ def subscriber( bool, Doc("Whether to disable **FastStream** autoacknowledgement logic or not."), ] = False, + no_reply: Annotated[ + bool, + Doc( + "Whether to disable **FastStream** RPC and Reply To auto responses or not." + ), + ] = False, # AsyncAPI args title: Annotated[ Optional[str], @@ -1427,6 +1627,7 @@ def subscriber( is_manual=not auto_commit, # subscriber args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=self._middlewares, broker_dependencies=self._dependencies, @@ -1465,7 +1666,8 @@ def publisher( *, key: Annotated[ Union[bytes, Any, None], - Doc(""" + Doc( + """ A key to associate with the message. Can be used to determine which partition to send the message to. If partition is `None` (and producer's partitioner config is left as default), @@ -1473,14 +1675,17 @@ def publisher( partition (but if key is `None`, partition is chosen randomly). Must be type `bytes`, or be serializable to bytes via configured `key_serializer`. - """), + """ + ), ] = None, partition: Annotated[ Optional[int], - Doc(""" + Doc( + """ Specify a partition. If not set, the partition will be selected using the configured `partitioner`. - """), + """ + ), ] = None, headers: Annotated[ Optional[Dict[str, str]], @@ -1535,7 +1740,8 @@ def publisher( *, key: Annotated[ Union[bytes, Any, None], - Doc(""" + Doc( + """ A key to associate with the message. Can be used to determine which partition to send the message to. If partition is `None` (and producer's partitioner config is left as default), @@ -1543,14 +1749,17 @@ def publisher( partition (but if key is `None`, partition is chosen randomly). Must be type `bytes`, or be serializable to bytes via configured `key_serializer`. - """), + """ + ), ] = None, partition: Annotated[ Optional[int], - Doc(""" + Doc( + """ Specify a partition. If not set, the partition will be selected using the configured `partitioner`. - """), + """ + ), ] = None, headers: Annotated[ Optional[Dict[str, str]], @@ -1605,7 +1814,8 @@ def publisher( *, key: Annotated[ Union[bytes, Any, None], - Doc(""" + Doc( + """ A key to associate with the message. Can be used to determine which partition to send the message to. If partition is `None` (and producer's partitioner config is left as default), @@ -1613,14 +1823,17 @@ def publisher( partition (but if key is `None`, partition is chosen randomly). Must be type `bytes`, or be serializable to bytes via configured `key_serializer`. - """), + """ + ), ] = None, partition: Annotated[ Optional[int], - Doc(""" + Doc( + """ Specify a partition. If not set, the partition will be selected using the configured `partitioner`. - """), + """ + ), ] = None, headers: Annotated[ Optional[Dict[str, str]], @@ -1678,7 +1891,8 @@ def publisher( *, key: Annotated[ Union[bytes, Any, None], - Doc(""" + Doc( + """ A key to associate with the message. Can be used to determine which partition to send the message to. If partition is `None` (and producer's partitioner config is left as default), @@ -1686,14 +1900,17 @@ def publisher( partition (but if key is `None`, partition is chosen randomly). Must be type `bytes`, or be serializable to bytes via configured `key_serializer`. - """), + """ + ), ] = None, partition: Annotated[ Optional[int], - Doc(""" + Doc( + """ Specify a partition. If not set, the partition will be selected using the configured `partitioner`. - """), + """ + ), ] = None, headers: Annotated[ Optional[Dict[str, str]], diff --git a/faststream/kafka/fastapi/fastapi.py b/faststream/kafka/fastapi/fastapi.py index 541940d79e..18884edd12 100644 --- a/faststream/kafka/fastapi/fastapi.py +++ b/faststream/kafka/fastapi/fastapi.py @@ -890,7 +890,8 @@ def subscriber( ] = False, listener: Annotated[ Optional["ConsumerRebalanceListener"], - Doc(""" + Doc( + """ Optionally include listener callback, which will be called before and after each rebalance operation. @@ -912,20 +913,25 @@ def subscriber( to subscribe. It is guaranteed, however, that the partitions revoked/assigned through this interface are from topics subscribed in this call. - """), + """ + ), ] = None, pattern: Annotated[ Optional[str], - Doc(""" + Doc( + """ Pattern to match available topics. You must provide either topics or pattern, but not both. - """), + """ + ), ] = None, partitions: Annotated[ Iterable["TopicPartition"], - Doc(""" + Doc( + """ An explicit partitions list to assign. You can't use 'topics' and 'partitions' in the same time. - """), + """ + ), ] = (), # broker args dependencies: Annotated[ @@ -963,6 +969,12 @@ def subscriber( bool, Doc("Whether to disable **FastStream** autoacknowledgement logic or not."), ] = False, + no_reply: Annotated[ + bool, + Doc( + "Whether to disable **FastStream** RPC and Reply To auto responses or not." + ), + ] = False, # AsyncAPI information title: Annotated[ Optional[str], @@ -1379,7 +1391,8 @@ def subscriber( ], listener: Annotated[ Optional["ConsumerRebalanceListener"], - Doc(""" + Doc( + """ Optionally include listener callback, which will be called before and after each rebalance operation. @@ -1401,20 +1414,25 @@ def subscriber( to subscribe. It is guaranteed, however, that the partitions revoked/assigned through this interface are from topics subscribed in this call. - """), + """ + ), ] = None, pattern: Annotated[ Optional[str], - Doc(""" + Doc( + """ Pattern to match available topics. You must provide either topics or pattern, but not both. - """), + """ + ), ] = None, partitions: Annotated[ Iterable["TopicPartition"], - Doc(""" + Doc( + """ An explicit partitions list to assign. You can't use 'topics' and 'partitions' in the same time. - """), + """ + ), ] = (), # broker args dependencies: Annotated[ @@ -1452,6 +1470,12 @@ def subscriber( bool, Doc("Whether to disable **FastStream** autoacknowledgement logic or not."), ] = False, + no_reply: Annotated[ + bool, + Doc( + "Whether to disable **FastStream** RPC and Reply To auto responses or not." + ), + ] = False, # AsyncAPI information title: Annotated[ Optional[str], @@ -1868,7 +1892,8 @@ def subscriber( ] = False, listener: Annotated[ Optional["ConsumerRebalanceListener"], - Doc(""" + Doc( + """ Optionally include listener callback, which will be called before and after each rebalance operation. @@ -1890,20 +1915,25 @@ def subscriber( to subscribe. It is guaranteed, however, that the partitions revoked/assigned through this interface are from topics subscribed in this call. - """), + """ + ), ] = None, pattern: Annotated[ Optional[str], - Doc(""" + Doc( + """ Pattern to match available topics. You must provide either topics or pattern, but not both. - """), + """ + ), ] = None, partitions: Annotated[ Iterable["TopicPartition"], - Doc(""" + Doc( + """ An explicit partitions list to assign. You can't use 'topics' and 'partitions' in the same time. - """), + """ + ), ] = (), # broker args dependencies: Annotated[ @@ -1941,6 +1971,12 @@ def subscriber( bool, Doc("Whether to disable **FastStream** autoacknowledgement logic or not."), ] = False, + no_reply: Annotated[ + bool, + Doc( + "Whether to disable **FastStream** RPC and Reply To auto responses or not." + ), + ] = False, # AsyncAPI information title: Annotated[ Optional[str], @@ -2360,7 +2396,8 @@ def subscriber( ] = False, listener: Annotated[ Optional["ConsumerRebalanceListener"], - Doc(""" + Doc( + """ Optionally include listener callback, which will be called before and after each rebalance operation. @@ -2382,20 +2419,25 @@ def subscriber( to subscribe. It is guaranteed, however, that the partitions revoked/assigned through this interface are from topics subscribed in this call. - """), + """ + ), ] = None, pattern: Annotated[ Optional[str], - Doc(""" + Doc( + """ Pattern to match available topics. You must provide either topics or pattern, but not both. - """), + """ + ), ] = None, partitions: Annotated[ Iterable["TopicPartition"], - Doc(""" + Doc( + """ An explicit partitions list to assign. You can't use 'topics' and 'partitions' in the same time. - """), + """ + ), ] = (), # broker args dependencies: Annotated[ @@ -2433,6 +2475,12 @@ def subscriber( bool, Doc("Whether to disable **FastStream** autoacknowledgement logic or not."), ] = False, + no_reply: Annotated[ + bool, + Doc( + "Whether to disable **FastStream** RPC and Reply To auto responses or not." + ), + ] = False, # AsyncAPI information title: Annotated[ Optional[str], @@ -2613,6 +2661,7 @@ def subscriber( filter=filter, retry=retry, no_ack=no_ack, + no_reply=no_reply, title=title, description=description, include_in_schema=include_in_schema, diff --git a/faststream/kafka/publisher/usecase.py b/faststream/kafka/publisher/usecase.py index 66a3ed5b38..b254334a61 100644 --- a/faststream/kafka/publisher/usecase.py +++ b/faststream/kafka/publisher/usecase.py @@ -110,7 +110,8 @@ async def publish( *, key: Annotated[ Union[bytes, Any, None], - Doc(""" + Doc( + """ A key to associate with the message. Can be used to determine which partition to send the message to. If partition is `None` (and producer's partitioner config is left as default), @@ -118,21 +119,26 @@ async def publish( partition (but if key is `None`, partition is chosen randomly). Must be type `bytes`, or be serializable to bytes via configured `key_serializer`. - """), + """ + ), ] = None, partition: Annotated[ Optional[int], - Doc(""" + Doc( + """ Specify a partition. If not set, the partition will be selected using the configured `partitioner`. - """), + """ + ), ] = None, timestamp_ms: Annotated[ Optional[int], - Doc(""" + Doc( + """ Epoch milliseconds (from Jan 1 1970 UTC) to use as the message timestamp. Defaults to current time. - """), + """ + ), ] = None, headers: Annotated[ Optional[Dict[str, str]], @@ -205,17 +211,21 @@ async def publish( # type: ignore[override] ] = "", partition: Annotated[ Optional[int], - Doc(""" + Doc( + """ Specify a partition. If not set, the partition will be selected using the configured `partitioner`. - """), + """ + ), ] = None, timestamp_ms: Annotated[ Optional[int], - Doc(""" + Doc( + """ Epoch milliseconds (from Jan 1 1970 UTC) to use as the message timestamp. Defaults to current time. - """), + """ + ), ] = None, headers: Annotated[ Optional[Dict[str, str]], diff --git a/faststream/kafka/router.py b/faststream/kafka/router.py index 44540ee4d5..cef54442c8 100644 --- a/faststream/kafka/router.py +++ b/faststream/kafka/router.py @@ -51,7 +51,8 @@ def __init__( *, key: Annotated[ Union[bytes, Any, None], - Doc(""" + Doc( + """ A key to associate with the message. Can be used to determine which partition to send the message to. If partition is `None` (and producer's partitioner config is left as default), @@ -59,14 +60,17 @@ def __init__( partition (but if key is `None`, partition is chosen randomly). Must be type `bytes`, or be serializable to bytes via configured `key_serializer`. - """), + """ + ), ] = None, partition: Annotated[ Optional[int], - Doc(""" + Doc( + """ Specify a partition. If not set, the partition will be selected using the configured `partitioner`. - """), + """ + ), ] = None, headers: Annotated[ Optional[Dict[str, str]], @@ -156,12 +160,14 @@ def __init__( ] = False, group_id: Annotated[ Optional[str], - Doc(""" + Doc( + """ Name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. If `None`, auto-partition assignment (via group coordinator) and offset commits are disabled. - """), + """ + ), ] = None, key_deserializer: Annotated[ Optional[Callable[[bytes], Any]], @@ -179,7 +185,8 @@ def __init__( ] = None, fetch_max_bytes: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of data the server should return for a fetch request. This is not an absolute maximum, if the first message in the first non-empty partition of the fetch @@ -188,28 +195,34 @@ def __init__( performs fetches to multiple brokers in parallel so memory usage will depend on the number of brokers containing partitions for the topic. - """), + """ + ), ] = 50 * 1024 * 1024, fetch_min_bytes: Annotated[ int, - Doc(""" + Doc( + """ Minimum amount of data the server should return for a fetch request, otherwise wait up to `fetch_max_wait_ms` for more data to accumulate. - """), + """ + ), ] = 1, fetch_max_wait_ms: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of time in milliseconds the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by `fetch_min_bytes`. - """), + """ + ), ] = 500, max_partition_fetch_bytes: Annotated[ int, - Doc(""" + Doc( + """ The maximum amount of data per-partition the server will return. The maximum total memory used for a request ``= #partitions * max_partition_fetch_bytes``. @@ -218,43 +231,53 @@ def __init__( send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition. - """), + """ + ), ] = 1 * 1024 * 1024, auto_offset_reset: Annotated[ Literal["latest", "earliest", "none"], - Doc(""" + Doc( + """ A policy for resetting offsets on `OffsetOutOfRangeError` errors: * `earliest` will move to the oldest available message * `latest` will move to the most recent * `none` will raise an exception so you can handle this case - """), + """ + ), ] = "latest", auto_commit: Annotated[ bool, - Doc(""" + Doc( + """ If `True` the consumer's offset will be periodically committed in the background. - """), + """ + ), ] = True, auto_commit_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ Milliseconds between automatic - offset commits, if `auto_commit` is `True`."""), + offset commits, if `auto_commit` is `True`.""" + ), ] = 5 * 1000, check_crcs: Annotated[ bool, - Doc(""" + Doc( + """ Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. - """), + """ + ), ] = True, partition_assignment_strategy: Annotated[ Sequence["AbstractPartitionAssignor"], - Doc(""" + Doc( + """ List of objects to use to distribute partition ownership amongst consumer instances when group management is used. This preference is implicit in the order @@ -264,22 +287,26 @@ def __init__( one. The coordinator will choose the old assignment strategy until all members have been updated. Then it will choose the new strategy. - """), + """ + ), ] = (RoundRobinPartitionAssignor,), max_poll_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ Maximum allowed time between calls to consume messages in batches. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. If API methods block waiting for messages, that time does not count against this timeout. - """), + """ + ), ] = 5 * 60 * 1000, rebalance_timeout_ms: Annotated[ Optional[int], - Doc(""" + Doc( + """ The maximum time server will wait for this consumer to rejoin the group in a case of rebalance. In Java client this behaviour is bound to `max.poll.interval.ms` configuration, @@ -287,11 +314,13 @@ def __init__( decouple this setting to allow finer tuning by users that use `ConsumerRebalanceListener` to delay rebalacing. Defaults to ``session_timeout_ms`` - """), + """ + ), ] = None, session_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Client group session and failure detection timeout. The consumer sends periodic heartbeats (`heartbeat.interval.ms`) to indicate its liveness to the broker. @@ -300,11 +329,13 @@ def __init__( group and trigger a rebalance. The allowed range is configured with the **broker** configuration properties `group.min.session.timeout.ms` and `group.max.session.timeout.ms`. - """), + """ + ), ] = 10 * 1000, heartbeat_interval_ms: Annotated[ int, - Doc(""" + Doc( + """ The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka's group management feature. Heartbeats are used to ensure @@ -314,35 +345,43 @@ def __init__( should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. - """), + """ + ), ] = 3 * 1000, consumer_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Maximum wait timeout for background fetching routine. Mostly defines how fast the system will see rebalance and request new data for new partitions. - """), + """ + ), ] = 200, max_poll_records: Annotated[ Optional[int], - Doc(""" + Doc( + """ The maximum number of records returned in a single call by batch consumer. Has no limit by default. - """), + """ + ), ] = None, exclude_internal_topics: Annotated[ bool, - Doc(""" + Doc( + """ Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to True the only way to receive records from an internal topic is subscribing to it. - """), + """ + ), ] = True, isolation_level: Annotated[ Literal["read_uncommitted", "read_committed"], - Doc(""" + Doc( + """ Controls how to read messages written transactionally. @@ -366,16 +405,19 @@ def __init__( to the high watermark when there are in flight transactions. Further, when in `read_committed` the seek_to_end method will return the LSO. See method docs below. - """), + """ + ), ] = "read_uncommitted", batch_timeout_ms: Annotated[ int, - Doc(""" + Doc( + """ Milliseconds spent waiting if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. - """), + """ + ), ] = 200, max_records: Annotated[ Optional[int], @@ -383,7 +425,8 @@ def __init__( ] = None, listener: Annotated[ Optional["ConsumerRebalanceListener"], - Doc(""" + Doc( + """ Optionally include listener callback, which will be called before and after each rebalance operation. @@ -405,19 +448,24 @@ def __init__( to subscribe. It is guaranteed, however, that the partitions revoked/assigned through this interface are from topics subscribed in this call. - """), + """ + ), ] = None, pattern: Annotated[ Optional[str], - Doc(""" + Doc( + """ Pattern to match available topics. You must provide either topics or pattern, but not both. - """), + """ + ), ] = None, partitions: Annotated[ Optional[Iterable["TopicPartition"]], - Doc(""" + Doc( + """ A topic and partition tuple. You can't use 'topics' and 'partitions' in the same time. - """), + """ + ), ] = (), # broker args dependencies: Annotated[ @@ -455,6 +503,12 @@ def __init__( bool, Doc("Whether to disable **FastStream** autoacknowledgement logic or not."), ] = False, + no_reply: Annotated[ + bool, + Doc( + "Whether to disable **FastStream** RPC and Reply To auto responses or not." + ), + ] = False, # AsyncAPI args title: Annotated[ Optional[str], @@ -508,6 +562,7 @@ def __init__( decoder=decoder, middlewares=middlewares, filter=filter, + no_reply=no_reply, # AsyncAPI args title=title, description=description, diff --git a/faststream/kafka/subscriber/factory.py b/faststream/kafka/subscriber/factory.py index fb5de4bf1a..0f504667f4 100644 --- a/faststream/kafka/subscriber/factory.py +++ b/faststream/kafka/subscriber/factory.py @@ -38,6 +38,7 @@ def create_subscriber( is_manual: bool, # Subscriber args no_ack: bool, + no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], broker_middlewares: Iterable["BrokerMiddleware[Tuple[ConsumerRecord, ...]]"], @@ -63,6 +64,7 @@ def create_subscriber( is_manual: bool, # Subscriber args no_ack: bool, + no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], broker_middlewares: Iterable["BrokerMiddleware[ConsumerRecord]"], @@ -88,6 +90,7 @@ def create_subscriber( is_manual: bool, # Subscriber args no_ack: bool, + no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], broker_middlewares: Iterable[ @@ -117,6 +120,7 @@ def create_subscriber( is_manual: bool, # Subscriber args no_ack: bool, + no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], broker_middlewares: Iterable[ @@ -156,6 +160,7 @@ def create_subscriber( partitions=partitions, is_manual=is_manual, no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_dependencies=broker_dependencies, broker_middlewares=broker_middlewares, @@ -174,6 +179,7 @@ def create_subscriber( partitions=partitions, is_manual=is_manual, no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_dependencies=broker_dependencies, broker_middlewares=broker_middlewares, diff --git a/faststream/kafka/subscriber/usecase.py b/faststream/kafka/subscriber/usecase.py index 650bae75d1..fa01a11fcb 100644 --- a/faststream/kafka/subscriber/usecase.py +++ b/faststream/kafka/subscriber/usecase.py @@ -65,6 +65,7 @@ def __init__( default_parser: "AsyncCallable", default_decoder: "AsyncCallable", no_ack: bool, + no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], broker_middlewares: Iterable["BrokerMiddleware[MsgType]"], @@ -78,6 +79,7 @@ def __init__( default_decoder=default_decoder, # Propagated args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=broker_middlewares, broker_dependencies=broker_dependencies, @@ -179,7 +181,7 @@ def _make_response_publisher( self, message: "StreamMessage[Any]", ) -> Sequence[FakePublisher]: - if not message.reply_to or self._producer is None: + if self._producer is None: return () return ( @@ -295,6 +297,7 @@ def __init__( is_manual: bool, # Subscriber args no_ack: bool, + no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], broker_middlewares: Iterable["BrokerMiddleware[ConsumerRecord]"], @@ -316,6 +319,7 @@ def __init__( default_decoder=AioKafkaParser.decode_message, # Propagated args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=broker_middlewares, broker_dependencies=broker_dependencies, @@ -345,6 +349,7 @@ def __init__( is_manual: bool, # Subscriber args no_ack: bool, + no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], broker_middlewares: Iterable[ @@ -371,6 +376,7 @@ def __init__( default_decoder=AioKafkaParser.decode_message_batch, # Propagated args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=broker_middlewares, broker_dependencies=broker_dependencies, diff --git a/faststream/nats/broker/registrator.py b/faststream/nats/broker/registrator.py index 9fe73a3386..ca6b84d4d4 100644 --- a/faststream/nats/broker/registrator.py +++ b/faststream/nats/broker/registrator.py @@ -1,11 +1,9 @@ from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Union, cast -from fast_depends.dependencies import Depends from nats.js import api from typing_extensions import Annotated, Doc, deprecated, override from faststream.broker.core.abc import ABCBroker -from faststream.broker.types import CustomCallable from faststream.broker.utils import default_filter from faststream.nats.helpers import StreamBuilder from faststream.nats.publisher.asyncapi import AsyncAPIPublisher @@ -183,6 +181,12 @@ def subscriber( # type: ignore[override] bool, Doc("Whether to disable **FastStream** autoacknowledgement logic or not."), ] = False, + no_reply: Annotated[ + bool, + Doc( + "Whether to disable **FastStream** RPC and Reply To auto responses or not." + ), + ] = False, # AsyncAPI information title: Annotated[ Optional[str], @@ -233,6 +237,7 @@ def subscriber( # type: ignore[override] ack_first=ack_first, # subscriber args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=self._middlewares, broker_dependencies=self._dependencies, diff --git a/faststream/nats/fastapi/fastapi.py b/faststream/nats/fastapi/fastapi.py index 7ea3a2a5df..4010c5f02a 100644 --- a/faststream/nats/fastapi/fastapi.py +++ b/faststream/nats/fastapi/fastapi.py @@ -717,6 +717,12 @@ def subscriber( # type: ignore[override] bool, Doc("Whether to disable **FastStream** autoacknowledgement logic or not."), ] = False, + no_reply: Annotated[ + bool, + Doc( + "Whether to disable **FastStream** RPC and Reply To auto responses or not." + ), + ] = False, # AsyncAPI information title: Annotated[ Optional[str], @@ -886,6 +892,7 @@ def subscriber( # type: ignore[override] max_workers=max_workers, retry=retry, no_ack=no_ack, + no_reply=no_reply, title=title, description=description, include_in_schema=include_in_schema, diff --git a/faststream/nats/router.py b/faststream/nats/router.py index 74215d3e78..ace895ba59 100644 --- a/faststream/nats/router.py +++ b/faststream/nats/router.py @@ -271,6 +271,12 @@ def __init__( bool, Doc("Whether to disable **FastStream** autoacknowledgement logic or not."), ] = False, + no_reply: Annotated[ + bool, + Doc( + "Whether to disable **FastStream** RPC and Reply To auto responses or not." + ), + ] = False, # AsyncAPI information title: Annotated[ Optional[str], @@ -317,6 +323,7 @@ def __init__( filter=filter, retry=retry, no_ack=no_ack, + no_reply=no_reply, title=title, description=description, include_in_schema=include_in_schema, diff --git a/faststream/nats/subscriber/factory.py b/faststream/nats/subscriber/factory.py index 590598a2dd..2ae7c9b820 100644 --- a/faststream/nats/subscriber/factory.py +++ b/faststream/nats/subscriber/factory.py @@ -58,6 +58,7 @@ def create_subscriber( stream: Optional["JStream"], # Subscriber args no_ack: bool, + no_reply: bool, retry: Union[bool, int], broker_dependencies: Iterable["Depends"], broker_middlewares: Iterable["BrokerMiddleware[Any]"], @@ -148,6 +149,7 @@ def create_subscriber( extra_options=extra_options, # Subscriber args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_dependencies=broker_dependencies, broker_middlewares=broker_middlewares, @@ -165,6 +167,7 @@ def create_subscriber( extra_options=extra_options, # Subscriber args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_dependencies=broker_dependencies, broker_middlewares=broker_middlewares, @@ -186,6 +189,7 @@ def create_subscriber( extra_options=extra_options, # Subscriber args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_dependencies=broker_dependencies, broker_middlewares=broker_middlewares, @@ -205,6 +209,7 @@ def create_subscriber( extra_options=extra_options, # Subscriber args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_dependencies=broker_dependencies, broker_middlewares=broker_middlewares, @@ -225,6 +230,7 @@ def create_subscriber( extra_options=extra_options, # Subscriber args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_dependencies=broker_dependencies, broker_middlewares=broker_middlewares, @@ -243,6 +249,7 @@ def create_subscriber( extra_options=extra_options, # Subscriber args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_dependencies=broker_dependencies, broker_middlewares=broker_middlewares, @@ -261,6 +268,7 @@ def create_subscriber( extra_options=extra_options, # Subscriber args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_dependencies=broker_dependencies, broker_middlewares=broker_middlewares, diff --git a/faststream/nats/subscriber/usecase.py b/faststream/nats/subscriber/usecase.py index f668ac387e..76ae509052 100644 --- a/faststream/nats/subscriber/usecase.py +++ b/faststream/nats/subscriber/usecase.py @@ -24,7 +24,6 @@ from nats.js.kv import KeyValue from typing_extensions import Annotated, Doc, override -from faststream.broker.message import StreamMessage from faststream.broker.publisher.fake import FakePublisher from faststream.broker.subscriber.usecase import SubscriberUsecase from faststream.broker.types import CustomCallable, MsgType @@ -79,6 +78,7 @@ def __init__( default_parser: "AsyncCallable", default_decoder: "AsyncCallable", no_ack: bool, + no_reply: bool, retry: Union[bool, int], broker_dependencies: Iterable[Depends], broker_middlewares: Iterable["BrokerMiddleware[MsgType]"], @@ -96,6 +96,7 @@ def __init__( default_decoder=default_decoder, # Propagated args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=broker_middlewares, broker_dependencies=broker_dependencies, @@ -234,6 +235,7 @@ def __init__( default_parser: "AsyncCallable", default_decoder: "AsyncCallable", no_ack: bool, + no_reply: bool, retry: Union[bool, int], broker_dependencies: Iterable[Depends], broker_middlewares: Iterable["BrokerMiddleware[MsgType]"], @@ -250,6 +252,7 @@ def __init__( default_decoder=default_decoder, # Propagated args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=broker_middlewares, broker_dependencies=broker_dependencies, @@ -261,13 +264,10 @@ def __init__( def _make_response_publisher( self, - message: Annotated[ - "StreamMessage[Any]", - Doc("Message requiring reply"), - ], + message: "StreamMessage[Any]", ) -> Sequence[FakePublisher]: """Create FakePublisher object to use it as one of `publishers` in `self.consume` scope.""" - if not message.reply_to or self._producer is None: + if self._producer is None: return () return ( @@ -372,6 +372,7 @@ def __init__( extra_options: Optional[AnyDict], # Subscriber args no_ack: bool, + no_reply: bool, retry: Union[bool, int], broker_dependencies: Iterable[Depends], broker_middlewares: Iterable["BrokerMiddleware[Msg]"], @@ -392,6 +393,7 @@ def __init__( default_decoder=parser_.decode_message, # Propagated args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=broker_middlewares, broker_dependencies=broker_dependencies, @@ -441,6 +443,7 @@ def __init__( extra_options: Optional[AnyDict], # Subscriber args no_ack: bool, + no_reply: bool, retry: Union[bool, int], broker_dependencies: Iterable[Depends], broker_middlewares: Iterable["BrokerMiddleware[Msg]"], @@ -457,6 +460,7 @@ def __init__( extra_options=extra_options, # Propagated args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=broker_middlewares, broker_dependencies=broker_dependencies, @@ -494,6 +498,7 @@ def __init__( extra_options: Optional[AnyDict], # Subscriber args no_ack: bool, + no_reply: bool, retry: Union[bool, int], broker_dependencies: Iterable[Depends], broker_middlewares: Iterable["BrokerMiddleware[Msg]"], @@ -515,6 +520,7 @@ def __init__( default_decoder=parser_.decode_message, # Propagated args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=broker_middlewares, broker_dependencies=broker_dependencies, @@ -572,6 +578,7 @@ def __init__( extra_options: Optional[AnyDict], # Subscriber args no_ack: bool, + no_reply: bool, retry: Union[bool, int], broker_dependencies: Iterable[Depends], broker_middlewares: Iterable["BrokerMiddleware[Msg]"], @@ -589,6 +596,7 @@ def __init__( extra_options=extra_options, # Propagated args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=broker_middlewares, broker_dependencies=broker_dependencies, @@ -628,6 +636,7 @@ def __init__( extra_options: Optional[AnyDict], # Subscriber args no_ack: bool, + no_reply: bool, retry: Union[bool, int], broker_dependencies: Iterable[Depends], broker_middlewares: Iterable["BrokerMiddleware[Msg]"], @@ -646,6 +655,7 @@ def __init__( queue="", # Propagated args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=broker_middlewares, broker_dependencies=broker_dependencies, @@ -701,6 +711,7 @@ def __init__( extra_options: Optional[AnyDict], # Subscriber args no_ack: bool, + no_reply: bool, retry: Union[bool, int], broker_dependencies: Iterable[Depends], broker_middlewares: Iterable["BrokerMiddleware[Msg]"], @@ -718,6 +729,7 @@ def __init__( extra_options=extra_options, # Propagated args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=broker_middlewares, broker_dependencies=broker_dependencies, @@ -758,6 +770,7 @@ def __init__( extra_options: Optional[AnyDict], # Subscriber args no_ack: bool, + no_reply: bool, retry: Union[bool, int], broker_dependencies: Iterable[Depends], broker_middlewares: Iterable["BrokerMiddleware[List[Msg]]"], @@ -779,6 +792,7 @@ def __init__( default_decoder=parser.decode_batch, # Propagated args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=broker_middlewares, broker_dependencies=broker_dependencies, @@ -838,6 +852,7 @@ def __init__( subject=subject, extra_options=None, no_ack=True, + no_reply=True, retry=False, default_parser=parser.parse_message, default_decoder=parser.decode_message, @@ -942,6 +957,7 @@ def __init__( subject=subject, extra_options=None, no_ack=True, + no_reply=True, retry=False, default_parser=parser.parse_message, default_decoder=parser.decode_message, diff --git a/faststream/nats/testing.py b/faststream/nats/testing.py index 5a9190dfd7..34230cb788 100644 --- a/faststream/nats/testing.py +++ b/faststream/nats/testing.py @@ -145,7 +145,8 @@ async def ack(self) -> None: pass async def ack_sync( - self, timeout: float = 1 + self, + timeout: float = 1, ) -> "PatchedMessage": # pragma: no cover return self diff --git a/faststream/rabbit/broker/registrator.py b/faststream/rabbit/broker/registrator.py index e13b7b5261..0c0f99df70 100644 --- a/faststream/rabbit/broker/registrator.py +++ b/faststream/rabbit/broker/registrator.py @@ -98,6 +98,12 @@ def subscriber( # type: ignore[override] bool, Doc("Whether to disable **FastStream** autoacknowledgement logic or not."), ] = False, + no_reply: Annotated[ + bool, + Doc( + "Whether to disable **FastStream** RPC and Reply To auto responses or not." + ), + ] = False, # AsyncAPI information title: Annotated[ Optional[str], @@ -125,6 +131,7 @@ def subscriber( # type: ignore[override] reply_config=reply_config, # subscriber args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=self._middlewares, broker_dependencies=self._dependencies, diff --git a/faststream/rabbit/fastapi/router.py b/faststream/rabbit/fastapi/router.py index 6d13beabae..d0445badfb 100644 --- a/faststream/rabbit/fastapi/router.py +++ b/faststream/rabbit/fastapi/router.py @@ -528,6 +528,12 @@ def subscriber( # type: ignore[override] bool, Doc("Whether to disable **FastStream** autoacknowledgement logic or not."), ] = False, + no_reply: Annotated[ + bool, + Doc( + "Whether to disable **FastStream** RPC and Reply To auto responses or not." + ), + ] = False, # AsyncAPI information title: Annotated[ Optional[str], @@ -684,6 +690,7 @@ def subscriber( # type: ignore[override] filter=filter, retry=retry, no_ack=no_ack, + no_reply=no_reply, title=title, description=description, include_in_schema=include_in_schema, diff --git a/faststream/rabbit/router.py b/faststream/rabbit/router.py index 0890433347..18f6747b9d 100644 --- a/faststream/rabbit/router.py +++ b/faststream/rabbit/router.py @@ -250,6 +250,12 @@ def __init__( bool, Doc("Whether to disable **FastStream** autoacknowledgement logic or not."), ] = False, + no_reply: Annotated[ + bool, + Doc( + "Whether to disable **FastStream** RPC and Reply To auto responses or not." + ), + ] = False, # AsyncAPI information title: Annotated[ Optional[str], @@ -281,6 +287,7 @@ def __init__( filter=filter, retry=retry, no_ack=no_ack, + no_reply=no_reply, title=title, description=description, include_in_schema=include_in_schema, diff --git a/faststream/rabbit/subscriber/factory.py b/faststream/rabbit/subscriber/factory.py index f0ee6b752a..0683d2d62f 100644 --- a/faststream/rabbit/subscriber/factory.py +++ b/faststream/rabbit/subscriber/factory.py @@ -19,6 +19,7 @@ def create_subscriber( reply_config: Optional["ReplyConfig"], # Subscriber args no_ack: bool, + no_reply: bool, retry: Union[bool, int], broker_dependencies: Iterable["Depends"], broker_middlewares: Iterable["BrokerMiddleware[IncomingMessage]"], @@ -33,6 +34,7 @@ def create_subscriber( consume_args=consume_args, reply_config=reply_config, no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_dependencies=broker_dependencies, broker_middlewares=broker_middlewares, diff --git a/faststream/rabbit/subscriber/usecase.py b/faststream/rabbit/subscriber/usecase.py index c0700dcc82..e518d3ca37 100644 --- a/faststream/rabbit/subscriber/usecase.py +++ b/faststream/rabbit/subscriber/usecase.py @@ -55,6 +55,7 @@ def __init__( reply_config: Optional["ReplyConfig"], # Subscriber args no_ack: bool, + no_reply: bool, retry: Union[bool, int], broker_dependencies: Iterable["Depends"], broker_middlewares: Iterable["BrokerMiddleware[IncomingMessage]"], @@ -70,6 +71,7 @@ def __init__( default_decoder=parser.decode_message, # Propagated options no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=broker_middlewares, broker_dependencies=broker_dependencies, @@ -173,7 +175,7 @@ def _make_response_publisher( self, message: "StreamMessage[Any]", ) -> Sequence["FakePublisher"]: - if not message.reply_to or self._producer is None: + if self._producer is None: return () return ( diff --git a/faststream/redis/broker/broker.py b/faststream/redis/broker/broker.py index 3164c7a01b..93bea0a7f4 100644 --- a/faststream/redis/broker/broker.py +++ b/faststream/redis/broker/broker.py @@ -13,7 +13,6 @@ ) from urllib.parse import urlparse -from fast_depends.dependencies import Depends from redis.asyncio.client import Redis from redis.asyncio.connection import ( Connection, diff --git a/faststream/redis/broker/registrator.py b/faststream/redis/broker/registrator.py index 7a643d189a..8038214f64 100644 --- a/faststream/redis/broker/registrator.py +++ b/faststream/redis/broker/registrator.py @@ -84,6 +84,12 @@ def subscriber( # type: ignore[override] bool, Doc("Whether to disable **FastStream** autoacknowledgement logic or not."), ] = False, + no_reply: Annotated[ + bool, + Doc( + "Whether to disable **FastStream** RPC and Reply To auto responses or not." + ), + ] = False, # AsyncAPI information title: Annotated[ Optional[str], @@ -110,6 +116,7 @@ def subscriber( # type: ignore[override] stream=stream, # subscriber args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=self._middlewares, broker_dependencies=self._dependencies, diff --git a/faststream/redis/fastapi/fastapi.py b/faststream/redis/fastapi/fastapi.py index 5d45d2c47d..7a8de1b18d 100644 --- a/faststream/redis/fastapi/fastapi.py +++ b/faststream/redis/fastapi/fastapi.py @@ -488,6 +488,12 @@ def subscriber( # type: ignore[override] bool, Doc("Whether to disable **FastStream** autoacknowledgement logic or not."), ] = False, + no_reply: Annotated[ + bool, + Doc( + "Whether to disable **FastStream** RPC and Reply To auto responses or not." + ), + ] = False, # AsyncAPI information title: Annotated[ Optional[str], @@ -648,6 +654,7 @@ def subscriber( # type: ignore[override] filter=filter, retry=retry, no_ack=no_ack, + no_reply=no_reply, title=title, description=description, include_in_schema=include_in_schema, diff --git a/faststream/redis/router.py b/faststream/redis/router.py index 635f86083e..38964c2c59 100644 --- a/faststream/redis/router.py +++ b/faststream/redis/router.py @@ -163,6 +163,12 @@ def __init__( bool, Doc("Whether to disable **FastStream** autoacknowledgement logic or not."), ] = False, + no_reply: Annotated[ + bool, + Doc( + "Whether to disable **FastStream** RPC and Reply To auto responses or not." + ), + ] = False, # AsyncAPI information title: Annotated[ Optional[str], @@ -193,6 +199,7 @@ def __init__( filter=filter, retry=retry, no_ack=no_ack, + no_reply=no_reply, title=title, description=description, include_in_schema=include_in_schema, diff --git a/faststream/redis/subscriber/factory.py b/faststream/redis/subscriber/factory.py index da5fe02898..ee0ae84c9b 100644 --- a/faststream/redis/subscriber/factory.py +++ b/faststream/redis/subscriber/factory.py @@ -35,6 +35,7 @@ def create_subscriber( stream: Union["StreamSub", str, None], # Subscriber args no_ack: bool = False, + no_reply: bool = False, retry: bool = False, broker_dependencies: Iterable["Depends"] = (), broker_middlewares: Iterable["BrokerMiddleware[UnifyRedisDict]"] = (), @@ -50,6 +51,7 @@ def create_subscriber( channel=channel_sub, # basic args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_dependencies=broker_dependencies, broker_middlewares=broker_middlewares, @@ -65,6 +67,7 @@ def create_subscriber( stream=stream_sub, # basic args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_dependencies=broker_dependencies, broker_middlewares=broker_middlewares, @@ -78,6 +81,7 @@ def create_subscriber( stream=stream_sub, # basic args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_dependencies=broker_dependencies, broker_middlewares=broker_middlewares, @@ -93,6 +97,7 @@ def create_subscriber( list=list_sub, # basic args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_dependencies=broker_dependencies, broker_middlewares=broker_middlewares, @@ -106,6 +111,7 @@ def create_subscriber( list=list_sub, # basic args no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_dependencies=broker_dependencies, broker_middlewares=broker_middlewares, diff --git a/faststream/redis/subscriber/usecase.py b/faststream/redis/subscriber/usecase.py index 7919f384f7..5dee39ef52 100644 --- a/faststream/redis/subscriber/usecase.py +++ b/faststream/redis/subscriber/usecase.py @@ -69,6 +69,7 @@ def __init__( default_decoder: "AsyncCallable", # Subscriber args no_ack: bool, + no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], broker_middlewares: Iterable["BrokerMiddleware[UnifyRedisDict]"], @@ -82,6 +83,7 @@ def __init__( default_decoder=default_decoder, # Propagated options no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=broker_middlewares, broker_dependencies=broker_dependencies, @@ -129,9 +131,10 @@ def setup( # type: ignore[override] ) def _make_response_publisher( - self, message: "BrokerStreamMessage[UnifyRedisDict]" + self, + message: "BrokerStreamMessage[UnifyRedisDict]", ) -> Sequence[FakePublisher]: - if not message.reply_to or self._producer is None: + if self._producer is None: return () return ( @@ -207,6 +210,7 @@ def __init__( channel: "PubSub", # Subscriber args no_ack: bool, + no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], broker_middlewares: Iterable["BrokerMiddleware[UnifyRedisDict]"], @@ -221,6 +225,7 @@ def __init__( default_decoder=parser.decode_message, # Propagated options no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=broker_middlewares, broker_dependencies=broker_dependencies, @@ -296,6 +301,7 @@ def __init__( default_decoder: "AsyncCallable", # Subscriber args no_ack: bool, + no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], broker_middlewares: Iterable["BrokerMiddleware[UnifyRedisDict]"], @@ -309,6 +315,7 @@ def __init__( default_decoder=default_decoder, # Propagated options no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=broker_middlewares, broker_dependencies=broker_dependencies, @@ -360,6 +367,7 @@ def __init__( list: ListSub, # Subscriber args no_ack: bool, + no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], broker_middlewares: Iterable["BrokerMiddleware[UnifyRedisDict]"], @@ -375,6 +383,7 @@ def __init__( default_decoder=parser.decode_message, # Propagated options no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=broker_middlewares, broker_dependencies=broker_dependencies, @@ -407,6 +416,7 @@ def __init__( list: ListSub, # Subscriber args no_ack: bool, + no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], broker_middlewares: Iterable["BrokerMiddleware[UnifyRedisDict]"], @@ -422,6 +432,7 @@ def __init__( default_decoder=parser.decode_message, # Propagated options no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=broker_middlewares, broker_dependencies=broker_dependencies, @@ -459,6 +470,7 @@ def __init__( default_decoder: "AsyncCallable", # Subscriber args no_ack: bool, + no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], broker_middlewares: Iterable["BrokerMiddleware[UnifyRedisDict]"], @@ -472,6 +484,7 @@ def __init__( default_decoder=default_decoder, # Propagated options no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=broker_middlewares, broker_dependencies=broker_dependencies, @@ -605,6 +618,7 @@ def __init__( stream: StreamSub, # Subscriber args no_ack: bool, + no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], broker_middlewares: Iterable["BrokerMiddleware[UnifyRedisDict]"], @@ -620,6 +634,7 @@ def __init__( default_decoder=parser.decode_message, # Propagated options no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=broker_middlewares, broker_dependencies=broker_dependencies, @@ -672,6 +687,7 @@ def __init__( stream: StreamSub, # Subscriber args no_ack: bool, + no_reply: bool, retry: bool, broker_dependencies: Iterable["Depends"], broker_middlewares: Iterable["BrokerMiddleware[UnifyRedisDict]"], @@ -687,6 +703,7 @@ def __init__( default_decoder=parser.decode_message, # Propagated options no_ack=no_ack, + no_reply=no_reply, retry=retry, broker_middlewares=broker_middlewares, broker_dependencies=broker_dependencies, diff --git a/tests/brokers/base/publish.py b/tests/brokers/base/publish.py index 327f31627b..974b12d8cf 100644 --- a/tests/brokers/base/publish.py +++ b/tests/brokers/base/publish.py @@ -9,8 +9,8 @@ import pytest from pydantic import BaseModel +from faststream import BaseMiddleware from faststream._compat import dump_json, model_to_json -from faststream.annotations import Logger from faststream.broker.core.usecase import BrokerUsecase @@ -147,17 +147,17 @@ def patch_broker(self, broker: BrokerUsecase[Any, Any]) -> BrokerUsecase[Any, An ) async def test_serialize( self, - mock: Mock, queue: str, message, message_type, expected_message, - event, + event: asyncio.Event, + mock: Mock, ): pub_broker = self.get_broker(apply_types=True) @pub_broker.subscriber(queue, **self.subscriber_kwargs) - async def handler(m: message_type, logger: Logger): + async def handler(m: message_type): event.set() mock(m) @@ -178,14 +178,14 @@ async def handler(m: message_type, logger: Logger): @pytest.mark.asyncio() async def test_unwrap_dict( self, - mock: Mock, queue: str, - event, + event: asyncio.Event, + mock: Mock, ): pub_broker = self.get_broker(apply_types=True) @pub_broker.subscriber(queue, **self.subscriber_kwargs) - async def m(a: int, b: int, logger: Logger): + async def m(a: int, b: int): event.set() mock({"a": a, "b": b}) @@ -217,7 +217,7 @@ async def test_unwrap_list( pub_broker = self.get_broker(apply_types=True) @pub_broker.subscriber(queue, **self.subscriber_kwargs) - async def m(a: int, b: int, *args: Tuple[int, ...], logger: Logger): + async def m(a: int, b: int, *args: Tuple[int, ...]): event.set() mock({"a": a, "b": b, "args": args}) @@ -238,8 +238,8 @@ async def m(a: int, b: int, *args: Tuple[int, ...], logger: Logger): async def test_base_publisher( self, queue: str, - event, - mock, + event: asyncio.Event, + mock: Mock, ): pub_broker = self.get_broker(apply_types=True) @@ -270,8 +270,8 @@ async def resp(msg): async def test_publisher_object( self, queue: str, - event, - mock, + event: asyncio.Event, + mock: Mock, ): pub_broker = self.get_broker(apply_types=True) @@ -304,8 +304,8 @@ async def resp(msg): async def test_publish_manual( self, queue: str, - event, - mock, + event: asyncio.Event, + mock: Mock, ): pub_broker = self.get_broker(apply_types=True) @@ -337,7 +337,7 @@ async def resp(msg): async def test_multiple_publishers( self, queue: str, - mock, + mock: Mock, ): pub_broker = self.get_broker(apply_types=True) @@ -380,7 +380,7 @@ async def resp2(msg): async def test_reusable_publishers( self, queue: str, - mock, + mock: Mock, ): pub_broker = self.get_broker(apply_types=True) @@ -427,8 +427,8 @@ async def resp(): async def test_reply_to( self, queue: str, - event, - mock, + event: asyncio.Event, + mock: Mock, ): pub_broker = self.get_broker(apply_types=True) @@ -457,12 +457,52 @@ async def handler(m): assert event.is_set() mock.assert_called_with("Hello!") + @pytest.mark.asyncio() + async def test_no_reply( + self, + queue: str, + event: asyncio.Event, + mock: Mock, + ): + class Mid(BaseMiddleware): + async def after_processed(self, *args: Any, **kwargs: Any): + event.set() + + return await super().after_processed(*args, **kwargs) + + pub_broker = self.get_broker(apply_types=True) + pub_broker.add_middleware(Mid) + + @pub_broker.subscriber(queue + "reply", **self.subscriber_kwargs) + async def reply_handler(m): + mock(m) + + @pub_broker.subscriber(queue, no_reply=True, **self.subscriber_kwargs) + async def handler(m): + return m + + async with self.patch_broker(pub_broker) as br: + await br.start() + + await asyncio.wait( + ( + asyncio.create_task( + br.publish("Hello!", queue, reply_to=queue + "reply") + ), + asyncio.create_task(event.wait()), + ), + timeout=self.timeout, + ) + + assert event.is_set() + assert not mock.called + @pytest.mark.asyncio() async def test_publisher_after_start( self, queue: str, - event, - mock, + event: asyncio.Event, + mock: Mock, ): pub_broker = self.get_broker(apply_types=True) diff --git a/tests/brokers/rabbit/test_publish.py b/tests/brokers/rabbit/test_publish.py index 97be60f066..7e1986246e 100644 --- a/tests/brokers/rabbit/test_publish.py +++ b/tests/brokers/rabbit/test_publish.py @@ -1,5 +1,5 @@ import asyncio -from unittest.mock import patch +from unittest.mock import Mock, patch import pytest @@ -18,8 +18,8 @@ def get_broker(self, apply_types: bool = False) -> RabbitBroker: async def test_reply_config( self, queue: str, - event, - mock, + event: asyncio.Event, + mock: Mock, ): pub_broker = self.get_broker()