Skip to content

Commit

Permalink
Pre-final variant
Browse files Browse the repository at this point in the history
  • Loading branch information
sheldygg committed Nov 8, 2024
1 parent a5882f9 commit 01c7e84
Showing 1 changed file with 98 additions and 48 deletions.
146 changes: 98 additions & 48 deletions faststream/nats/subscriber/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,14 +346,15 @@ def _validate_input_for_misconfigure(
if not subject and not config:
raise SetupError("You must provide either `subject` or `config` option.")

if max_msgs > 0 and any((stream, kv_watch, obj_watch)):
if max_msgs > 0 and any((stream, kv_watch, obj_watch, pull_sub)):
warnings.warn(
"`max_msgs` option can be used with NATS Core Subscriber - only.",
RuntimeWarning,
stacklevel=4,
)

if not stream:
# Core/Obj/Kv Subscriber
if idle_heartbeat is not None:
warnings.warn(
"`idle_heartbeat` option can be used with JetStream (Pull/Push) - only.",
Expand Down Expand Up @@ -382,86 +383,135 @@ def _validate_input_for_misconfigure(
stacklevel=4,
)

if obj_watch is not None:
# KeyStorage watch
if max_workers > 1:
elif stream:
if pull_sub is not None:
if queue: # default ""
warnings.warn(
message="`queue` option has no effect with JetStream Pull Subscription. Probably, you wanted to use `durable` instead.",
category=RuntimeWarning,
stacklevel=4,
)

if ordered_consumer: # default False
warnings.warn(
"`ordered_consumer` option has no effect with JetStream Pull Subscription. It can be used with JetStream Push Subscription - only.",
RuntimeWarning,
stacklevel=4,
)

if ack_first: # default False
warnings.warn(
message="`ack_first` option has no effect with JetStream Pull Subscription. It can be used with JetStream Push Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)

else:
# JS PushSub
if durable is not None:
warnings.warn(
message="JetStream Push consumer with durable option can't be scaled horizontally by multiple instances. Probably, you are looking for `queue` option. Also, we strongly recommend to use Jetstream PullSubsriber with durable option as a default.",
category=RuntimeWarning,
stacklevel=4,
)

if obj_watch is not None or kv_watch is not None:
# Obj/Kv Subscriber
if pending_msgs_limit is not None:
warnings.warn(
message="`pending_msgs_limit` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)
if pending_bytes_limit is not None:
warnings.warn(
message="`max_workers` has no effect for ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
message="`pending_bytes_limit` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)

if pending_msgs_limit is not None:
if queue:
warnings.warn(
message="`pending_msgs_limit` has no effect for ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
message="`queue` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)

if pending_bytes_limit is not None:
if durable:
warnings.warn(
message="`pending_bytes_limit` has no effect for ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
message="`durable` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)

elif kv_watch is not None:
# KeyValue watch
if max_workers > 1:
if config is not None:
warnings.warn(
message="`max_workers` has no effect for KeyValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
message="`config` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)

if pending_msgs_limit is not None:
if ordered_consumer:
warnings.warn(
message="`pending_msgs_limit` has no effect for KeyValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
message="`ordered_consumer` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)

if pending_bytes_limit is not None:
if idle_heartbeat is not None:
warnings.warn(
message="`pending_bytes_limit` has no effect for KeyValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
message="`idle_heartbeat` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)

elif stream:
# JetStream subscribers
if pull_sub is not None:
# JS PullSub
if queue: # default ""
warnings.warn(
"`queue` option has no effect with JetStream Pull Subscription. Probably, you wanted to use durable instead.",
RuntimeWarning,
stacklevel=4,
)
if flow_control:
warnings.warn(
message="`flow_control` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)

if ordered_consumer: # default False
warnings.warn(
"`ordered_consumer` option has no effect with JetStream Pull Subscription. It can be used with JetStream Push Subscription - only.",
RuntimeWarning,
stacklevel=4,
)
if deliver_policy:
warnings.warn(
message="`deliver_policy` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)

if ack_first: # default False
warnings.warn(
message="`ack_first` option has no effect with JetStream Pull Subscription. It can be used with JetStream Push Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)
if headers_only:
warnings.warn(
message="`headers_only` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)

else:
# JS PushSub
if durable is not None:
warnings.warn(
message="JetStream Push consumer with durable option can't be scaled horizontally by multiple instances. Probably, you are looking for `queue` option. Also, we strongly recommend to use Jetstream PullSubsriber with durable option as a default.",
category=RuntimeWarning,
stacklevel=4,
)
if ack_first:
warnings.warn(
message="`ack_first` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)

if max_workers > 1:
warnings.warn(
message="`max_workers` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)
else:
# Core Subscriber
pass
if durable:
warnings.warn(
message="`durable` option has no effect for NATS Core Subscription. It can be used with JetStream (Pull/Push) - only.",
category=RuntimeWarning,
stacklevel=4,
)

if config:
warnings.warn(
message="`config` option has no effect for NATS Core Subscription. It can be used with JetStream (Pull/Push) - only.",
category=RuntimeWarning,
stacklevel=4,
)

0 comments on commit 01c7e84

Please sign in to comment.