From bd188c7c94c3c9c55f5142301ab02da89b453340 Mon Sep 17 00:00:00 2001 From: sheldy <85823514+sheldygg@users.noreply.github.com> Date: Mon, 11 Nov 2024 06:48:14 +0100 Subject: [PATCH] Add more warning's to nats subscription factory (#1907) * Add dynaconf example for nats * Update index * :) * Add type for new_value and uncomment * Update dynaconf.md * docs: polish markup * Proofread howto docs page * Add more warning's to nats subscription factory * docs: generate API References * Add `validate_input_for_warnings` function * docs: generate API References * refactor: merge options to config object * refactor: use correct defaults * Moreeeee warnings * Show warnings for core subscription without category * Upd warning text * chore: correct text * Pre-final variant * Add warnings for nats core subscriber * docs: generate API References * Add end-line * Add return type * refactor: polish NATS warnings * refactor: polish conditions * Proofread warning messages * Add missing full stop * Fix linting issues --------- Co-authored-by: Nikita Pastukhov Co-authored-by: Kumaran Rajendhiran Co-authored-by: sheldygg Co-authored-by: Pastukhov Nikita --- faststream/nats/broker/registrator.py | 4 +- faststream/nats/fastapi/fastapi.py | 4 +- faststream/nats/router.py | 4 +- faststream/nats/subscriber/factory.py | 237 +++++++++++++++++++++++--- 4 files changed, 219 insertions(+), 30 deletions(-) diff --git a/faststream/nats/broker/registrator.py b/faststream/nats/broker/registrator.py index fb7aaf8e7f..8be4f57803 100644 --- a/faststream/nats/broker/registrator.py +++ b/faststream/nats/broker/registrator.py @@ -95,9 +95,9 @@ def subscriber( # type: ignore[override] Doc("Enable Heartbeats for a consumer to detect failures."), ] = None, flow_control: Annotated[ - bool, + Optional[bool], Doc("Enable Flow Control for a consumer."), - ] = False, + ] = None, deliver_policy: Annotated[ Optional["api.DeliverPolicy"], Doc("Deliver Policy to be used for subscription."), diff --git a/faststream/nats/fastapi/fastapi.py b/faststream/nats/fastapi/fastapi.py index 263465543e..4a871426ec 100644 --- a/faststream/nats/fastapi/fastapi.py +++ b/faststream/nats/fastapi/fastapi.py @@ -630,9 +630,9 @@ def subscriber( # type: ignore[override] Doc("Enable Heartbeats for a consumer to detect failures."), ] = None, flow_control: Annotated[ - bool, + Optional[bool], Doc("Enable Flow Control for a consumer."), - ] = False, + ] = None, deliver_policy: Annotated[ Optional["api.DeliverPolicy"], Doc("Deliver Policy to be used for subscription."), diff --git a/faststream/nats/router.py b/faststream/nats/router.py index ace895ba59..ed838f133a 100644 --- a/faststream/nats/router.py +++ b/faststream/nats/router.py @@ -184,9 +184,9 @@ def __init__( Doc("Enable Heartbeats for a consumer to detect failures."), ] = None, flow_control: Annotated[ - bool, + Optional[bool], Doc("Enable Flow Control for a consumer."), - ] = False, + ] = None, deliver_policy: Annotated[ Optional["api.DeliverPolicy"], Doc("Deliver Policy to be used for subscription."), diff --git a/faststream/nats/subscriber/factory.py b/faststream/nats/subscriber/factory.py index c17556fe85..5adf4af55b 100644 --- a/faststream/nats/subscriber/factory.py +++ b/faststream/nats/subscriber/factory.py @@ -5,7 +5,7 @@ DEFAULT_SUB_PENDING_BYTES_LIMIT, DEFAULT_SUB_PENDING_MSGS_LIMIT, ) -from nats.js.api import ConsumerConfig +from nats.js.api import ConsumerConfig, DeliverPolicy from nats.js.client import ( DEFAULT_JS_SUB_PENDING_BYTES_LIMIT, DEFAULT_JS_SUB_PENDING_MSGS_LIMIT, @@ -46,7 +46,7 @@ def create_subscriber( config: Optional["api.ConsumerConfig"], ordered_consumer: bool, idle_heartbeat: Optional[float], - flow_control: bool, + flow_control: Optional[bool], deliver_policy: Optional["api.DeliverPolicy"], headers_only: Optional[bool], # pull args @@ -79,18 +79,39 @@ def create_subscriber( "AsyncAPIKeyValueWatchSubscriber", "AsyncAPIObjStoreWatchSubscriber", ]: - if pull_sub is not None and stream is None: - raise SetupError("Pull subscriber can be used only with a stream") - - if not subject and not config: - raise SetupError("You must provide either `subject` or `config` option.") + _validate_input_for_misconfigure( + subject=subject, + queue=queue, + pending_msgs_limit=pending_msgs_limit, + pending_bytes_limit=pending_bytes_limit, + max_msgs=max_msgs, + durable=durable, + config=config, + ordered_consumer=ordered_consumer, + idle_heartbeat=idle_heartbeat, + flow_control=flow_control, + deliver_policy=deliver_policy, + headers_only=headers_only, + pull_sub=pull_sub, + kv_watch=kv_watch, + obj_watch=obj_watch, + ack_first=ack_first, + max_workers=max_workers, + stream=stream, + ) config = config or ConsumerConfig(filter_subjects=[]) + if config.durable_name is None: + config.durable_name = durable + if config.idle_heartbeat is None: + config.idle_heartbeat = idle_heartbeat + if config.headers_only is None: + config.headers_only = headers_only + if config.deliver_policy is DeliverPolicy.ALL: + config.deliver_policy = deliver_policy or DeliverPolicy.ALL if stream: - # TODO: pull & queue warning - # TODO: push & durable warning - + # Both JS Subscribers extra_options: AnyDict = { "pending_msgs_limit": pending_msgs_limit or DEFAULT_JS_SUB_PENDING_MSGS_LIMIT, @@ -101,9 +122,11 @@ def create_subscriber( } if pull_sub is not None: + # JS Pull Subscriber extra_options.update({"inbox_prefix": inbox_prefix}) else: + # JS Push Subscriber extra_options.update( { "ordered_consumer": ordered_consumer, @@ -116,6 +139,7 @@ def create_subscriber( ) else: + # Core Subscriber extra_options = { "pending_msgs_limit": pending_msgs_limit or DEFAULT_SUB_PENDING_MSGS_LIMIT, "pending_bytes_limit": pending_bytes_limit @@ -124,13 +148,6 @@ def create_subscriber( } if obj_watch is not None: - if max_workers > 1: - warnings.warn( - "`max_workers` has no effect for ObjectValue subscriber.", - RuntimeWarning, - stacklevel=3, - ) - return AsyncAPIObjStoreWatchSubscriber( subject=subject, config=config, @@ -143,13 +160,6 @@ def create_subscriber( ) if kv_watch is not None: - if max_workers > 1: - warnings.warn( - "`max_workers` has no effect for KeyValue subscriber.", - RuntimeWarning, - stacklevel=3, - ) - return AsyncAPIKeyValueWatchSubscriber( subject=subject, config=config, @@ -306,3 +316,182 @@ def create_subscriber( description_=description_, include_in_schema=include_in_schema, ) + + +def _validate_input_for_misconfigure( + subject: str, + queue: str, # default "" + pending_msgs_limit: Optional[int], + pending_bytes_limit: Optional[int], + max_msgs: int, # default 0 + durable: Optional[str], + config: Optional["api.ConsumerConfig"], + ordered_consumer: bool, # default False + idle_heartbeat: Optional[float], + flow_control: Optional[bool], + deliver_policy: Optional["api.DeliverPolicy"], + headers_only: Optional[bool], + pull_sub: Optional["PullSub"], + kv_watch: Optional["KvWatch"], + obj_watch: Optional["ObjWatch"], + ack_first: bool, # default False + max_workers: int, # default 1 + stream: Optional["JStream"], +) -> None: + if not subject and not config: + raise SetupError("You must provide either the `subject` or `config` option.") + + if stream and kv_watch: + raise SetupError( + "You can't use both the `stream` and `kv_watch` options simultaneously." + ) + + if stream and obj_watch: + raise SetupError( + "You can't use both the `stream` and `obj_watch` options simultaneously." + ) + + if kv_watch and obj_watch: + raise SetupError( + "You can't use both the `kv_watch` and `obj_watch` options simultaneously." + ) + + if pull_sub and not stream: + raise SetupError( + "The pull subscriber can only be used with the `stream` option." + ) + + if max_msgs > 0 and any((stream, kv_watch, obj_watch)): + warnings.warn( + "The `max_msgs` option can be used only with a NATS Core Subscriber.", + RuntimeWarning, + stacklevel=4, + ) + + if not stream: + if obj_watch or kv_watch: + # Obj/Kv Subscriber + if pending_msgs_limit is not None: + warnings.warn( + message="The `pending_msgs_limit` option can be used only with JetStream (Pull/Push) or Core Subscription.", + category=RuntimeWarning, + stacklevel=4, + ) + + if pending_bytes_limit is not None: + warnings.warn( + message="The `pending_bytes_limit` option can be used only with JetStream (Pull/Push) or Core Subscription.", + category=RuntimeWarning, + stacklevel=4, + ) + + if queue: + warnings.warn( + message="The `queue` option can be used only with JetStream Push or Core Subscription.", + category=RuntimeWarning, + stacklevel=4, + ) + + if max_workers > 1: + warnings.warn( + message="The `max_workers` option can be used only with JetStream (Pull/Push) or Core Subscription.", + category=RuntimeWarning, + stacklevel=4, + ) + + # Core/Obj/Kv Subscriber + if durable: + warnings.warn( + message="The `durable` option can be used only with JetStream (Pull/Push) Subscription.", + category=RuntimeWarning, + stacklevel=4, + ) + + if config is not None: + warnings.warn( + message="The `config` option can be used only with JetStream (Pull/Push) Subscription.", + category=RuntimeWarning, + stacklevel=4, + ) + + if ordered_consumer: + warnings.warn( + message="The `ordered_consumer` option can be used only with JetStream (Pull/Push) Subscription.", + category=RuntimeWarning, + stacklevel=4, + ) + + if idle_heartbeat is not None: + warnings.warn( + message="The `idle_heartbeat` option can be used only with JetStream (Pull/Push) Subscription.", + category=RuntimeWarning, + stacklevel=4, + ) + + if flow_control: + warnings.warn( + message="The `flow_control` option can be used only with JetStream Push Subscription.", + category=RuntimeWarning, + stacklevel=4, + ) + + if deliver_policy: + warnings.warn( + message="The `deliver_policy` option can be used only with JetStream (Pull/Push) Subscription.", + category=RuntimeWarning, + stacklevel=4, + ) + + if headers_only: + warnings.warn( + message="The `headers_only` option can be used only with JetStream (Pull/Push) Subscription.", + category=RuntimeWarning, + stacklevel=4, + ) + + if ack_first: + warnings.warn( + message="The `ack_first` option can be used only with JetStream Push Subscription.", + category=RuntimeWarning, + stacklevel=4, + ) + + else: + # JetStream Subscribers + if pull_sub: + if queue: + warnings.warn( + message="The `queue` option has no effect with JetStream Pull Subscription. You probably wanted to use the `durable` option instead.", + category=RuntimeWarning, + stacklevel=4, + ) + + if ordered_consumer: + warnings.warn( + "The `ordered_consumer` option has no effect with JetStream Pull Subscription. It can only be used with JetStream Push Subscription.", + RuntimeWarning, + stacklevel=4, + ) + + if ack_first: + warnings.warn( + message="The `ack_first` option has no effect with JetStream Pull Subscription. It can only be used with JetStream Push Subscription.", + category=RuntimeWarning, + stacklevel=4, + ) + + if flow_control: + warnings.warn( + message="The `flow_control` option has no effect with JetStream Pull Subscription. It can only be used with JetStream Push Subscription.", + category=RuntimeWarning, + stacklevel=4, + ) + + else: + # JS PushSub + if durable is not None: + warnings.warn( + message="The JetStream Push consumer with the `durable` option can't be scaled horizontally across multiple instances. You probably wanted to use the `queue` option instead. Also, we strongly recommend using the Jetstream PullSubsriber with the `durable` option as the default.", + category=RuntimeWarning, + stacklevel=4, + )