diff --git a/faststream/nats/subscriber/factory.py b/faststream/nats/subscriber/factory.py index c48cc4a6ef..13c3578358 100644 --- a/faststream/nats/subscriber/factory.py +++ b/faststream/nats/subscriber/factory.py @@ -105,8 +105,6 @@ def create_subscriber( config.durable_name = durable if config.idle_heartbeat is None: config.idle_heartbeat = idle_heartbeat - if config.flow_control is None: - config.flow_control = flow_control if config.headers_only is None: config.headers_only = headers_only if config.deliver_policy is DeliverPolicy.ALL: @@ -322,13 +320,13 @@ def create_subscriber( def _validate_input_for_misconfigure( subject: str, - queue: str, + queue: str, # default "" pending_msgs_limit: Optional[int], pending_bytes_limit: Optional[int], - max_msgs: int, + max_msgs: int, # default 0 durable: Optional[str], config: Optional["api.ConsumerConfig"], - ordered_consumer: bool, + ordered_consumer: bool, # default False idle_heartbeat: Optional[float], flow_control: Optional[bool], deliver_policy: Optional["api.DeliverPolicy"], @@ -336,17 +334,26 @@ def _validate_input_for_misconfigure( pull_sub: Optional["PullSub"], kv_watch: Optional["KvWatch"], obj_watch: Optional["ObjWatch"], - ack_first: bool, - max_workers: int, + ack_first: bool, # default False + max_workers: int, # default 1 stream: Optional["JStream"], ) -> None: - if pull_sub is not None and stream is None: - raise SetupError("Pull subscriber can be used only with a stream") - if not subject and not config: raise SetupError("You must provide either `subject` or `config` option.") - if max_msgs > 0 and any((stream, kv_watch, obj_watch, pull_sub)): + if stream and kv_watch: + raise SetupError("You can't use `stream` and `kv_watch` options both.") + + if stream and obj_watch: + raise SetupError("You can't use `stream` and `obj_watch` options both.") + + if kv_watch and obj_watch: + raise SetupError("You can't use `kv_watch` and `obj_watch` options both.") + + if pull_sub and not stream: + raise SetupError("Pull subscriber can be used only with a `stream` option.") + + if max_msgs > 0 and any((stream, kv_watch, obj_watch)): warnings.warn( "`max_msgs` option can be used with NATS Core Subscriber - only.", RuntimeWarning, @@ -354,205 +361,128 @@ def _validate_input_for_misconfigure( ) if not stream: - # Core/Obj/Kv Subscriber - if idle_heartbeat is not None: - warnings.warn( - "`idle_heartbeat` option can be used with JetStream (Pull/Push) - only.", - RuntimeWarning, - stacklevel=4, - ) - - if flow_control is not None: - warnings.warn( - "`flow_control` option can be used with JetStream (Pull/Push) - only.", - RuntimeWarning, - stacklevel=4, - ) - - if deliver_policy is not None: - warnings.warn( - "`deliver_policy` option can be used with JetStream (Pull/Push) - only.", - RuntimeWarning, - stacklevel=4, - ) - - if headers_only is not None: - warnings.warn( - "`headers_only` option can be used with JetStream (Pull/Push) - only.", - RuntimeWarning, - stacklevel=4, - ) - - elif stream: - if pull_sub is not None: - if queue: # default "" + if obj_watch or kv_watch: + # Obj/Kv Subscriber + if pending_msgs_limit is not None: warnings.warn( - message="`queue` option has no effect with JetStream Pull Subscription. Probably, you wanted to use `durable` instead.", + message="`pending_msgs_limit` option can be used with JetStream (Pull/Push) or Core Subscription - only.", category=RuntimeWarning, stacklevel=4, ) - if ordered_consumer: # default False + if pending_bytes_limit is not None: warnings.warn( - "`ordered_consumer` option has no effect with JetStream Pull Subscription. It can be used with JetStream Push Subscription - only.", - RuntimeWarning, + message="`pending_bytes_limit` option can be used with JetStream (Pull/Push) or Core Subscription - only.", + category=RuntimeWarning, stacklevel=4, ) - if ack_first: # default False + if queue: warnings.warn( - message="`ack_first` option has no effect with JetStream Pull Subscription. It can be used with JetStream Push Subscription - only.", + message="`queue` option can be used with JetStream Push or Core Subscription - only.", category=RuntimeWarning, stacklevel=4, ) - else: - # JS PushSub - if durable is not None: + if max_workers > 1: warnings.warn( - message="JetStream Push consumer with durable option can't be scaled horizontally by multiple instances. Probably, you are looking for `queue` option. Also, we strongly recommend to use Jetstream PullSubsriber with durable option as a default.", + message="`max_workers` option can be used with JetStream (Pull/Push) or Core Subscription - only.", category=RuntimeWarning, stacklevel=4, ) - if obj_watch is not None or kv_watch is not None: - # Obj/Kv Subscriber - if pending_msgs_limit is not None: - warnings.warn( - message="`pending_msgs_limit` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.", - category=RuntimeWarning, - stacklevel=4, - ) - if pending_bytes_limit is not None: - warnings.warn( - message="`pending_bytes_limit` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.", - category=RuntimeWarning, - stacklevel=4, - ) - - if queue: - warnings.warn( - message="`queue` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.", - category=RuntimeWarning, - stacklevel=4, - ) - + # Core/Obj/Kv Subscriber if durable: warnings.warn( - message="`durable` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.", + message="`durable` option can be used with JetStream (Pull/Push) Subscription - only.", category=RuntimeWarning, stacklevel=4, ) if config is not None: warnings.warn( - message="`config` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.", + message="`config` option can be used with JetStream (Pull/Push) Subscription - only.", category=RuntimeWarning, stacklevel=4, ) if ordered_consumer: warnings.warn( - message="`ordered_consumer` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.", + message="`ordered_consumer` option can be used with JetStream (Pull/Push) Subscription - only.", category=RuntimeWarning, stacklevel=4, ) if idle_heartbeat is not None: warnings.warn( - message="`idle_heartbeat` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.", + message="`idle_heartbeat` option can be used with JetStream (Pull/Push) Subscription - only.", category=RuntimeWarning, stacklevel=4, ) if flow_control: warnings.warn( - message="`flow_control` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.", + message="`flow_control` option can be used with JetStream Push Subscription - only.", category=RuntimeWarning, stacklevel=4, ) if deliver_policy: warnings.warn( - message="`deliver_policy` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.", + message="`deliver_policy` option can be used with JetStream (Pull/Push) Subscription - only.", category=RuntimeWarning, stacklevel=4, ) if headers_only: warnings.warn( - message="`headers_only` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.", + message="`headers_only` option be used with JetStream (Pull/Push) Subscription - only.", category=RuntimeWarning, stacklevel=4, ) if ack_first: warnings.warn( - message="`ack_first` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.", + message="`ack_first` can be used with JetStream Push Subscription - only.", category=RuntimeWarning, stacklevel=4, ) - if max_workers > 1: - warnings.warn( - message="`max_workers` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.", - category=RuntimeWarning, - stacklevel=4, - ) - else: - # Core Subscriber - if durable: - warnings.warn( - message="`durable` option has no effect for NATS Core Subscription. It can be used with JetStream (Pull) - only.", - category=RuntimeWarning, - stacklevel=4, - ) - - if config: - warnings.warn( - message="`config` option has no effect for NATS Core Subscription. It can be used with JetStream (Pull/Push) - only.", - category=RuntimeWarning, - stacklevel=4, - ) - - if ordered_consumer: - warnings.warn( - message="`ordered_consumer` option has no effect for NATS Core Subscription. It can be used with JetStream Push - only.", - category=RuntimeWarning, - stacklevel=4, - ) - - if idle_heartbeat is not None: - warnings.warn( - message="`idle_heartbeat` option has no effect for NATS Core Subscription. It can be used with JetStream (Pull/Push) - only.", - category=RuntimeWarning, - stacklevel=4, - ) + elif stream: + if pull_sub: + if queue: + warnings.warn( + message="`queue` option has no effect with JetStream Pull Subscription. Probably, you wanted to use `durable` instead.", + category=RuntimeWarning, + stacklevel=4, + ) - if flow_control: - warnings.warn( - message="`flow_control` option has no effect for NATS Core Subscription. It can be used with JetStream (Pull/Push) - only.", - category=RuntimeWarning, - stacklevel=4, - ) + if ordered_consumer: + warnings.warn( + "`ordered_consumer` option has no effect with JetStream Pull Subscription. It can be used with JetStream Push Subscription - only.", + RuntimeWarning, + stacklevel=4, + ) - if deliver_policy: - warnings.warn( - message="`deliver_policy` option has no effect for NATS Core Subscription. It can be used with JetStream (Pull/Push) - only.", - category=RuntimeWarning, - stacklevel=4, - ) + if ack_first: + warnings.warn( + message="`ack_first` option has no effect with JetStream Pull Subscription. It can be used with JetStream Push Subscription - only.", + category=RuntimeWarning, + stacklevel=4, + ) - if headers_only: - warnings.warn( - message="`headers_only` option has no effect for NATS Core Subscription. It can be used with JetStream (Pull/Push) - only.", - category=RuntimeWarning, - stacklevel=4, - ) + if flow_control: + warnings.warn( + message="`flow_control` option has no effect with JetStream Pull Subscription. It can be used with JetStream Push Subscription - only.", + category=RuntimeWarning, + stacklevel=4, + ) - if ack_first: - warnings.warn( - message="`ack_first` option has no effect for NATS Core Subscription. It can be used with JetStream Push - only.", - category=RuntimeWarning, - stacklevel=4, - ) + else: + # JS PushSub + if durable is not None: + warnings.warn( + message="JetStream Push consumer with durable option can't be scaled horizontally by multiple instances. Probably, you are looking for `queue` option. Also, we strongly recommend to use Jetstream PullSubsriber with durable option as a default.", + category=RuntimeWarning, + stacklevel=4, + )