diff --git a/faststream/confluent/broker/broker.py b/faststream/confluent/broker/broker.py index 09ecaa7da8..3d62e1e1d3 100644 --- a/faststream/confluent/broker/broker.py +++ b/faststream/confluent/broker/broker.py @@ -111,11 +111,6 @@ def __init__( """ ), ] = 9 * 60 * 1000, - sasl_kerberos_service_name: str = "kafka", - sasl_kerberos_domain_name: Optional[str] = None, - sasl_oauth_token_provider: Annotated[ - Optional[str], Doc("OAuthBearer token provider.") - ] = None, loop: Optional[AbstractEventLoop] = None, client_id: Annotated[ Optional[str], @@ -168,14 +163,6 @@ def __init__( """ ), ] = _missing, - key_serializer: Annotated[ - Optional[Callable[[Any], bytes]], - Doc("Used to convert user-supplied keys to bytes."), - ] = None, - value_serializer: Annotated[ - Optional[Callable[[Any], bytes]], - Doc("used to convert user-supplied message values to bytes."), - ] = None, compression_type: Annotated[ Optional[Literal["gzip", "snappy", "lz4", "zstd"]], Doc( @@ -187,15 +174,6 @@ def __init__( """ ), ] = None, - max_batch_size: Annotated[ - int, - Doc( - """ - Maximum size of buffered data per partition. - After this amount `send` coroutine will block until batch is drained. - """ - ), - ] = 16 * 1024, partitioner: Annotated[ Union[ str, @@ -244,7 +222,6 @@ def __init__( """ ), ] = 0, - send_backoff_ms: int = 100, enable_idempotence: Annotated[ bool, Doc( @@ -370,25 +347,17 @@ def __init__( bootstrap_servers=servers, # both args client_id=client_id, - api_version=protocol_version, request_timeout_ms=request_timeout_ms, retry_backoff_ms=retry_backoff_ms, metadata_max_age_ms=metadata_max_age_ms, connections_max_idle_ms=connections_max_idle_ms, - sasl_kerberos_service_name=sasl_kerberos_service_name, - sasl_kerberos_domain_name=sasl_kerberos_domain_name, - sasl_oauth_token_provider=sasl_oauth_token_provider, loop=loop, # publisher args acks=acks, - key_serializer=key_serializer, - value_serializer=value_serializer, compression_type=compression_type, - max_batch_size=max_batch_size, partitioner=partitioner, max_request_size=max_request_size, linger_ms=linger_ms, - send_backoff_ms=send_backoff_ms, enable_idempotence=enable_idempotence, transactional_id=transactional_id, transaction_timeout_ms=transaction_timeout_ms, diff --git a/faststream/confluent/broker/registrator.py b/faststream/confluent/broker/registrator.py index 277a77ef69..10c2584111 100644 --- a/faststream/confluent/broker/registrator.py +++ b/faststream/confluent/broker/registrator.py @@ -1,7 +1,6 @@ from typing import ( TYPE_CHECKING, Any, - Callable, Dict, Iterable, Literal, @@ -75,20 +74,6 @@ def subscriber( """ ), ] = None, - key_deserializer: Annotated[ - Optional[Callable[[bytes], Any]], - Doc( - "Any callable that takes a raw message `bytes` " - "key and returns a deserialized one." - ), - ] = None, - value_deserializer: Annotated[ - Optional[Callable[[bytes], Any]], - Doc( - "Any callable that takes a raw message `bytes` " - "value and returns a deserialized value." - ), - ] = None, fetch_max_wait_ms: Annotated[ int, Doc( @@ -209,20 +194,6 @@ def subscriber( """ ), ] = 5 * 60 * 1000, - rebalance_timeout_ms: Annotated[ - Optional[int], - Doc( - """ - The maximum time server will wait for this - consumer to rejoin the group in a case of rebalance. In Java client - this behaviour is bound to `max.poll.interval.ms` configuration, - but as ``aiokafka`` will rejoin the group in the background, we - decouple this setting to allow finer tuning by users that use - `ConsumerRebalanceListener` to delay rebalacing. Defaults - to ``session_timeout_ms`` - """ - ), - ] = None, session_timeout_ms: Annotated[ int, Doc( @@ -254,36 +225,6 @@ def subscriber( """ ), ] = 3 * 1000, - consumer_timeout_ms: Annotated[ - int, - Doc( - """ - Maximum wait timeout for background fetching - routine. Mostly defines how fast the system will see rebalance and - request new data for new partitions. - """ - ), - ] = 200, - max_poll_records: Annotated[ - Optional[int], - Doc( - """ - The maximum number of records returned in a - single call by batch consumer. Has no limit by default. - """ - ), - ] = None, - exclude_internal_topics: Annotated[ - bool, - Doc( - """ - Whether records from internal topics - (such as offsets) should be exposed to the consumer. If set to True - the only way to receive records from an internal topic is - subscribing to it. - """ - ), - ] = True, isolation_level: Annotated[ Literal["read_uncommitted", "read_committed"], Doc( @@ -411,20 +352,6 @@ def subscriber( """ ), ] = None, - key_deserializer: Annotated[ - Optional[Callable[[bytes], Any]], - Doc( - "Any callable that takes a raw message `bytes` " - "key and returns a deserialized one." - ), - ] = None, - value_deserializer: Annotated[ - Optional[Callable[[bytes], Any]], - Doc( - "Any callable that takes a raw message `bytes` " - "value and returns a deserialized value." - ), - ] = None, fetch_max_wait_ms: Annotated[ int, Doc( @@ -545,20 +472,6 @@ def subscriber( """ ), ] = 5 * 60 * 1000, - rebalance_timeout_ms: Annotated[ - Optional[int], - Doc( - """ - The maximum time server will wait for this - consumer to rejoin the group in a case of rebalance. In Java client - this behaviour is bound to `max.poll.interval.ms` configuration, - but as ``aiokafka`` will rejoin the group in the background, we - decouple this setting to allow finer tuning by users that use - `ConsumerRebalanceListener` to delay rebalacing. Defaults - to ``session_timeout_ms`` - """ - ), - ] = None, session_timeout_ms: Annotated[ int, Doc( @@ -590,36 +503,6 @@ def subscriber( """ ), ] = 3 * 1000, - consumer_timeout_ms: Annotated[ - int, - Doc( - """ - Maximum wait timeout for background fetching - routine. Mostly defines how fast the system will see rebalance and - request new data for new partitions. - """ - ), - ] = 200, - max_poll_records: Annotated[ - Optional[int], - Doc( - """ - The maximum number of records returned in a - single call by batch consumer. Has no limit by default. - """ - ), - ] = None, - exclude_internal_topics: Annotated[ - bool, - Doc( - """ - Whether records from internal topics - (such as offsets) should be exposed to the consumer. If set to True - the only way to receive records from an internal topic is - subscribing to it. - """ - ), - ] = True, isolation_level: Annotated[ Literal["read_uncommitted", "read_committed"], Doc( @@ -747,20 +630,6 @@ def subscriber( """ ), ] = None, - key_deserializer: Annotated[ - Optional[Callable[[bytes], Any]], - Doc( - "Any callable that takes a raw message `bytes` " - "key and returns a deserialized one." - ), - ] = None, - value_deserializer: Annotated[ - Optional[Callable[[bytes], Any]], - Doc( - "Any callable that takes a raw message `bytes` " - "value and returns a deserialized value." - ), - ] = None, fetch_max_wait_ms: Annotated[ int, Doc( @@ -881,20 +750,6 @@ def subscriber( """ ), ] = 5 * 60 * 1000, - rebalance_timeout_ms: Annotated[ - Optional[int], - Doc( - """ - The maximum time server will wait for this - consumer to rejoin the group in a case of rebalance. In Java client - this behaviour is bound to `max.poll.interval.ms` configuration, - but as ``aiokafka`` will rejoin the group in the background, we - decouple this setting to allow finer tuning by users that use - `ConsumerRebalanceListener` to delay rebalacing. Defaults - to ``session_timeout_ms`` - """ - ), - ] = None, session_timeout_ms: Annotated[ int, Doc( @@ -926,36 +781,6 @@ def subscriber( """ ), ] = 3 * 1000, - consumer_timeout_ms: Annotated[ - int, - Doc( - """ - Maximum wait timeout for background fetching - routine. Mostly defines how fast the system will see rebalance and - request new data for new partitions. - """ - ), - ] = 200, - max_poll_records: Annotated[ - Optional[int], - Doc( - """ - The maximum number of records returned in a - single call by batch consumer. Has no limit by default. - """ - ), - ] = None, - exclude_internal_topics: Annotated[ - bool, - Doc( - """ - Whether records from internal topics - (such as offsets) should be exposed to the consumer. If set to True - the only way to receive records from an internal topic is - subscribing to it. - """ - ), - ] = True, isolation_level: Annotated[ Literal["read_uncommitted", "read_committed"], Doc( @@ -1086,20 +911,6 @@ def subscriber( """ ), ] = None, - key_deserializer: Annotated[ - Optional[Callable[[bytes], Any]], - Doc( - "Any callable that takes a raw message `bytes` " - "key and returns a deserialized one." - ), - ] = None, - value_deserializer: Annotated[ - Optional[Callable[[bytes], Any]], - Doc( - "Any callable that takes a raw message `bytes` " - "value and returns a deserialized value." - ), - ] = None, fetch_max_wait_ms: Annotated[ int, Doc( @@ -1220,20 +1031,6 @@ def subscriber( """ ), ] = 5 * 60 * 1000, - rebalance_timeout_ms: Annotated[ - Optional[int], - Doc( - """ - The maximum time server will wait for this - consumer to rejoin the group in a case of rebalance. In Java client - this behaviour is bound to `max.poll.interval.ms` configuration, - but as ``aiokafka`` will rejoin the group in the background, we - decouple this setting to allow finer tuning by users that use - `ConsumerRebalanceListener` to delay rebalacing. Defaults - to ``session_timeout_ms`` - """ - ), - ] = None, session_timeout_ms: Annotated[ int, Doc( @@ -1265,36 +1062,6 @@ def subscriber( """ ), ] = 3 * 1000, - consumer_timeout_ms: Annotated[ - int, - Doc( - """ - Maximum wait timeout for background fetching - routine. Mostly defines how fast the system will see rebalance and - request new data for new partitions. - """ - ), - ] = 200, - max_poll_records: Annotated[ - Optional[int], - Doc( - """ - The maximum number of records returned in a - single call by batch consumer. Has no limit by default. - """ - ), - ] = None, - exclude_internal_topics: Annotated[ - bool, - Doc( - """ - Whether records from internal topics - (such as offsets) should be exposed to the consumer. If set to True - the only way to receive records from an internal topic is - subscribing to it. - """ - ), - ] = True, isolation_level: Annotated[ Literal["read_uncommitted", "read_committed"], Doc( @@ -1417,8 +1184,6 @@ def subscriber( max_records=max_records, group_id=group_id, connection_data={ - "key_deserializer": key_deserializer, - "value_deserializer": value_deserializer, "fetch_max_wait_ms": fetch_max_wait_ms, "fetch_max_bytes": fetch_max_bytes, "fetch_min_bytes": fetch_min_bytes, @@ -1429,12 +1194,8 @@ def subscriber( "check_crcs": check_crcs, "partition_assignment_strategy": partition_assignment_strategy, "max_poll_interval_ms": max_poll_interval_ms, - "rebalance_timeout_ms": rebalance_timeout_ms, "session_timeout_ms": session_timeout_ms, "heartbeat_interval_ms": heartbeat_interval_ms, - "consumer_timeout_ms": consumer_timeout_ms, - "max_poll_records": max_poll_records, - "exclude_internal_topics": exclude_internal_topics, "isolation_level": isolation_level, }, is_manual=not auto_commit, diff --git a/faststream/confluent/client.py b/faststream/confluent/client.py index f1e71c6312..763c218bf8 100644 --- a/faststream/confluent/client.py +++ b/faststream/confluent/client.py @@ -3,7 +3,6 @@ from typing import ( TYPE_CHECKING, Any, - Callable, Dict, Iterable, List, @@ -87,16 +86,11 @@ def __init__( client_id: Optional[str] = None, metadata_max_age_ms: int = 300000, request_timeout_ms: int = 40000, - api_version: str = "auto", acks: Any = _missing, - key_serializer: Optional[Callable[[bytes], bytes]] = None, - value_serializer: Optional[Callable[[bytes], bytes]] = None, compression_type: Optional[str] = None, - max_batch_size: int = 16384, partitioner: str = "consistent_random", max_request_size: int = 1048576, linger_ms: int = 0, - send_backoff_ms: int = 100, retry_backoff_ms: int = 100, security_protocol: str = "PLAINTEXT", connections_max_idle_ms: int = 540000, @@ -106,9 +100,6 @@ def __init__( sasl_mechanism: Optional[str] = None, sasl_plain_password: Optional[str] = None, sasl_plain_username: Optional[str] = None, - sasl_kerberos_service_name: str = "kafka", - sasl_kerberos_domain_name: Optional[str] = None, - sasl_oauth_token_provider: Optional[str] = None, config: Optional[ConfluentConfig] = None, logger: Annotated[ Union["LoggerProto", None, object], @@ -147,7 +138,6 @@ def __init__( "retry.backoff.ms": retry_backoff_ms, "security.protocol": security_protocol.lower(), "connections.max.idle.ms": connections_max_idle_ms, - "sasl.kerberos.service.name": sasl_kerberos_service_name, } self.config = {**self.config, **config_from_params} @@ -278,13 +268,10 @@ def __init__( client_id: Optional[str] = "confluent-kafka-consumer", group_id: Optional[str] = None, group_instance_id: Optional[str] = None, - key_deserializer: Optional[Callable[[bytes], bytes]] = None, - value_deserializer: Optional[Callable[[bytes], bytes]] = None, fetch_max_wait_ms: int = 500, fetch_max_bytes: int = 52428800, fetch_min_bytes: int = 1, max_partition_fetch_bytes: int = 1 * 1024 * 1024, - request_timeout_ms: int = 40 * 1000, retry_backoff_ms: int = 100, auto_offset_reset: str = "latest", enable_auto_commit: bool = True, @@ -293,22 +280,14 @@ def __init__( metadata_max_age_ms: int = 5 * 60 * 1000, partition_assignment_strategy: Union[str, List[Any]] = "roundrobin", max_poll_interval_ms: int = 300000, - rebalance_timeout_ms: Optional[int] = None, session_timeout_ms: int = 10000, heartbeat_interval_ms: int = 3000, - consumer_timeout_ms: int = 200, - max_poll_records: Optional[int] = None, security_protocol: str = "PLAINTEXT", - api_version: str = "auto", - exclude_internal_topics: bool = True, connections_max_idle_ms: int = 540000, isolation_level: str = "read_uncommitted", sasl_mechanism: Optional[str] = None, sasl_plain_password: Optional[str] = None, sasl_plain_username: Optional[str] = None, - sasl_kerberos_service_name: str = "kafka", - sasl_kerberos_domain_name: Optional[str] = None, - sasl_oauth_token_provider: Optional[str] = None, config: Optional[ConfluentConfig] = None, logger: Annotated[ Union["LoggerProto", None, object], @@ -361,7 +340,6 @@ def __init__( "security.protocol": security_protocol.lower(), "connections.max.idle.ms": connections_max_idle_ms, "isolation.level": isolation_level, - "sasl.kerberos.service.name": sasl_kerberos_service_name, } self.config = {**self.config, **config_from_params} diff --git a/faststream/confluent/fastapi/fastapi.py b/faststream/confluent/fastapi/fastapi.py index 1897243d6c..b1f5c83590 100644 --- a/faststream/confluent/fastapi/fastapi.py +++ b/faststream/confluent/fastapi/fastapi.py @@ -423,20 +423,6 @@ def subscriber( """ ), ] = None, - key_deserializer: Annotated[ - Optional[Callable[[bytes], Any]], - Doc( - "Any callable that takes a raw message `bytes` " - "key and returns a deserialized one." - ), - ] = None, - value_deserializer: Annotated[ - Optional[Callable[[bytes], Any]], - Doc( - "Any callable that takes a raw message `bytes` " - "value and returns a deserialized value." - ), - ] = None, fetch_max_wait_ms: Annotated[ int, Doc( @@ -557,20 +543,6 @@ def subscriber( """ ), ] = 5 * 60 * 1000, - rebalance_timeout_ms: Annotated[ - Optional[int], - Doc( - """ - The maximum time server will wait for this - consumer to rejoin the group in a case of rebalance. In Java client - this behaviour is bound to `max.poll.interval.ms` configuration, - but as ``aiokafka`` will rejoin the group in the background, we - decouple this setting to allow finer tuning by users that use - `ConsumerRebalanceListener` to delay rebalacing. Defaults - to ``session_timeout_ms`` - """ - ), - ] = None, session_timeout_ms: Annotated[ int, Doc( @@ -602,36 +574,6 @@ def subscriber( """ ), ] = 3 * 1000, - consumer_timeout_ms: Annotated[ - int, - Doc( - """ - Maximum wait timeout for background fetching - routine. Mostly defines how fast the system will see rebalance and - request new data for new partitions. - """ - ), - ] = 200, - max_poll_records: Annotated[ - Optional[int], - Doc( - """ - The maximum number of records returned in a - single call by batch consumer. Has no limit by default. - """ - ), - ] = None, - exclude_internal_topics: Annotated[ - bool, - Doc( - """ - Whether records from internal topics - (such as offsets) should be exposed to the consumer. If set to True - the only way to receive records from an internal topic is - subscribing to it. - """ - ), - ] = True, isolation_level: Annotated[ Literal["read_uncommitted", "read_committed"], Doc( @@ -882,20 +824,6 @@ def subscriber( """ ), ] = None, - key_deserializer: Annotated[ - Optional[Callable[[bytes], Any]], - Doc( - "Any callable that takes a raw message `bytes` " - "key and returns a deserialized one." - ), - ] = None, - value_deserializer: Annotated[ - Optional[Callable[[bytes], Any]], - Doc( - "Any callable that takes a raw message `bytes` " - "value and returns a deserialized value." - ), - ] = None, fetch_max_wait_ms: Annotated[ int, Doc( @@ -1016,20 +944,6 @@ def subscriber( """ ), ] = 5 * 60 * 1000, - rebalance_timeout_ms: Annotated[ - Optional[int], - Doc( - """ - The maximum time server will wait for this - consumer to rejoin the group in a case of rebalance. In Java client - this behaviour is bound to `max.poll.interval.ms` configuration, - but as ``aiokafka`` will rejoin the group in the background, we - decouple this setting to allow finer tuning by users that use - `ConsumerRebalanceListener` to delay rebalacing. Defaults - to ``session_timeout_ms`` - """ - ), - ] = None, session_timeout_ms: Annotated[ int, Doc( @@ -1061,36 +975,6 @@ def subscriber( """ ), ] = 3 * 1000, - consumer_timeout_ms: Annotated[ - int, - Doc( - """ - Maximum wait timeout for background fetching - routine. Mostly defines how fast the system will see rebalance and - request new data for new partitions. - """ - ), - ] = 200, - max_poll_records: Annotated[ - Optional[int], - Doc( - """ - The maximum number of records returned in a - single call by batch consumer. Has no limit by default. - """ - ), - ] = None, - exclude_internal_topics: Annotated[ - bool, - Doc( - """ - Whether records from internal topics - (such as offsets) should be exposed to the consumer. If set to True - the only way to receive records from an internal topic is - subscribing to it. - """ - ), - ] = True, isolation_level: Annotated[ Literal["read_uncommitted", "read_committed"], Doc( @@ -1327,20 +1211,6 @@ def subscriber( """ ), ] = None, - key_deserializer: Annotated[ - Optional[Callable[[bytes], Any]], - Doc( - "Any callable that takes a raw message `bytes` " - "key and returns a deserialized one." - ), - ] = None, - value_deserializer: Annotated[ - Optional[Callable[[bytes], Any]], - Doc( - "Any callable that takes a raw message `bytes` " - "value and returns a deserialized value." - ), - ] = None, fetch_max_wait_ms: Annotated[ int, Doc( @@ -1461,20 +1331,6 @@ def subscriber( """ ), ] = 5 * 60 * 1000, - rebalance_timeout_ms: Annotated[ - Optional[int], - Doc( - """ - The maximum time server will wait for this - consumer to rejoin the group in a case of rebalance. In Java client - this behaviour is bound to `max.poll.interval.ms` configuration, - but as ``aiokafka`` will rejoin the group in the background, we - decouple this setting to allow finer tuning by users that use - `ConsumerRebalanceListener` to delay rebalacing. Defaults - to ``session_timeout_ms`` - """ - ), - ] = None, session_timeout_ms: Annotated[ int, Doc( @@ -1506,36 +1362,6 @@ def subscriber( """ ), ] = 3 * 1000, - consumer_timeout_ms: Annotated[ - int, - Doc( - """ - Maximum wait timeout for background fetching - routine. Mostly defines how fast the system will see rebalance and - request new data for new partitions. - """ - ), - ] = 200, - max_poll_records: Annotated[ - Optional[int], - Doc( - """ - The maximum number of records returned in a - single call by batch consumer. Has no limit by default. - """ - ), - ] = None, - exclude_internal_topics: Annotated[ - bool, - Doc( - """ - Whether records from internal topics - (such as offsets) should be exposed to the consumer. If set to True - the only way to receive records from an internal topic is - subscribing to it. - """ - ), - ] = True, isolation_level: Annotated[ Literal["read_uncommitted", "read_committed"], Doc( @@ -1789,20 +1615,6 @@ def subscriber( """ ), ] = None, - key_deserializer: Annotated[ - Optional[Callable[[bytes], Any]], - Doc( - "Any callable that takes a raw message `bytes` " - "key and returns a deserialized one." - ), - ] = None, - value_deserializer: Annotated[ - Optional[Callable[[bytes], Any]], - Doc( - "Any callable that takes a raw message `bytes` " - "value and returns a deserialized value." - ), - ] = None, fetch_max_wait_ms: Annotated[ int, Doc( @@ -1923,20 +1735,6 @@ def subscriber( """ ), ] = 5 * 60 * 1000, - rebalance_timeout_ms: Annotated[ - Optional[int], - Doc( - """ - The maximum time server will wait for this - consumer to rejoin the group in a case of rebalance. In Java client - this behaviour is bound to `max.poll.interval.ms` configuration, - but as ``aiokafka`` will rejoin the group in the background, we - decouple this setting to allow finer tuning by users that use - `ConsumerRebalanceListener` to delay rebalacing. Defaults - to ``session_timeout_ms`` - """ - ), - ] = None, session_timeout_ms: Annotated[ int, Doc( @@ -1968,36 +1766,6 @@ def subscriber( """ ), ] = 3 * 1000, - consumer_timeout_ms: Annotated[ - int, - Doc( - """ - Maximum wait timeout for background fetching - routine. Mostly defines how fast the system will see rebalance and - request new data for new partitions. - """ - ), - ] = 200, - max_poll_records: Annotated[ - Optional[int], - Doc( - """ - The maximum number of records returned in a - single call by batch consumer. Has no limit by default. - """ - ), - ] = None, - exclude_internal_topics: Annotated[ - bool, - Doc( - """ - Whether records from internal topics - (such as offsets) should be exposed to the consumer. If set to True - the only way to receive records from an internal topic is - subscribing to it. - """ - ), - ] = True, isolation_level: Annotated[ Literal["read_uncommitted", "read_committed"], Doc( @@ -2236,8 +2004,6 @@ def subscriber( topics[0], # path *topics, group_id=group_id, - key_deserializer=key_deserializer, - value_deserializer=value_deserializer, fetch_max_wait_ms=fetch_max_wait_ms, fetch_max_bytes=fetch_max_bytes, fetch_min_bytes=fetch_min_bytes, @@ -2248,12 +2014,8 @@ def subscriber( check_crcs=check_crcs, partition_assignment_strategy=partition_assignment_strategy, max_poll_interval_ms=max_poll_interval_ms, - rebalance_timeout_ms=rebalance_timeout_ms, session_timeout_ms=session_timeout_ms, heartbeat_interval_ms=heartbeat_interval_ms, - consumer_timeout_ms=consumer_timeout_ms, - max_poll_records=max_poll_records, - exclude_internal_topics=exclude_internal_topics, isolation_level=isolation_level, batch=batch, max_records=max_records, diff --git a/faststream/confluent/router.py b/faststream/confluent/router.py index f24a40e263..6cff87009c 100644 --- a/faststream/confluent/router.py +++ b/faststream/confluent/router.py @@ -159,20 +159,6 @@ def __init__( """ ), ] = None, - key_deserializer: Annotated[ - Optional[Callable[[bytes], Any]], - Doc( - "Any callable that takes a raw message `bytes` " - "key and returns a deserialized one." - ), - ] = None, - value_deserializer: Annotated[ - Optional[Callable[[bytes], Any]], - Doc( - "Any callable that takes a raw message `bytes` " - "value and returns a deserialized value." - ), - ] = None, fetch_max_wait_ms: Annotated[ int, Doc( @@ -293,20 +279,6 @@ def __init__( """ ), ] = 5 * 60 * 1000, - rebalance_timeout_ms: Annotated[ - Optional[int], - Doc( - """ - The maximum time server will wait for this - consumer to rejoin the group in a case of rebalance. In Java client - this behaviour is bound to `max.poll.interval.ms` configuration, - but as ``aiokafka`` will rejoin the group in the background, we - decouple this setting to allow finer tuning by users that use - `ConsumerRebalanceListener` to delay rebalacing. Defaults - to ``session_timeout_ms`` - """ - ), - ] = None, session_timeout_ms: Annotated[ int, Doc( @@ -338,36 +310,6 @@ def __init__( """ ), ] = 3 * 1000, - consumer_timeout_ms: Annotated[ - int, - Doc( - """ - Maximum wait timeout for background fetching - routine. Mostly defines how fast the system will see rebalance and - request new data for new partitions. - """ - ), - ] = 200, - max_poll_records: Annotated[ - Optional[int], - Doc( - """ - The maximum number of records returned in a - single call by batch consumer. Has no limit by default. - """ - ), - ] = None, - exclude_internal_topics: Annotated[ - bool, - Doc( - """ - Whether records from internal topics - (such as offsets) should be exposed to the consumer. If set to True - the only way to receive records from an internal topic is - subscribing to it. - """ - ), - ] = True, isolation_level: Annotated[ Literal["read_uncommitted", "read_committed"], Doc( @@ -481,8 +423,6 @@ def __init__( *topics, publishers=publishers, group_id=group_id, - key_deserializer=key_deserializer, - value_deserializer=value_deserializer, fetch_max_wait_ms=fetch_max_wait_ms, fetch_max_bytes=fetch_max_bytes, fetch_min_bytes=fetch_min_bytes, @@ -493,12 +433,8 @@ def __init__( check_crcs=check_crcs, partition_assignment_strategy=partition_assignment_strategy, max_poll_interval_ms=max_poll_interval_ms, - rebalance_timeout_ms=rebalance_timeout_ms, session_timeout_ms=session_timeout_ms, heartbeat_interval_ms=heartbeat_interval_ms, - consumer_timeout_ms=consumer_timeout_ms, - max_poll_records=max_poll_records, - exclude_internal_topics=exclude_internal_topics, isolation_level=isolation_level, max_records=max_records, batch_timeout_ms=batch_timeout_ms, diff --git a/faststream/confluent/schemas/params.py b/faststream/confluent/schemas/params.py index eb2d724b23..7df2ef67eb 100644 --- a/faststream/confluent/schemas/params.py +++ b/faststream/confluent/schemas/params.py @@ -1,4 +1,3 @@ -import ssl from asyncio import AbstractEventLoop from typing import List, Literal, Optional, Union @@ -11,14 +10,12 @@ class ConsumerConnectionParams(TypedDict, total=False): bootstrap_servers: Union[str, List[str]] loop: Optional[AbstractEventLoop] client_id: str - request_timeout_ms: int retry_backoff_ms: int metadata_max_age_ms: int security_protocol: Literal[ "SSL", "PLAINTEXT", ] - api_version: str connections_max_idle_ms: int sasl_mechanism: Literal[ "PLAIN", @@ -29,7 +26,3 @@ class ConsumerConnectionParams(TypedDict, total=False): ] sasl_plain_password: str sasl_plain_username: str - sasl_kerberos_service_name: str - sasl_kerberos_domain_name: str - ssl_context: ssl.SSLContext - sasl_oauth_token_provider: str