Skip to content

Commit

Permalink
refactor: polish NATS warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Nov 9, 2024
1 parent 0fa9ebc commit 7a7c36f
Showing 1 changed file with 72 additions and 142 deletions.
214 changes: 72 additions & 142 deletions faststream/nats/subscriber/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@ def create_subscriber(
config.durable_name = durable
if config.idle_heartbeat is None:
config.idle_heartbeat = idle_heartbeat
if config.flow_control is None:
config.flow_control = flow_control
if config.headers_only is None:
config.headers_only = headers_only
if config.deliver_policy is DeliverPolicy.ALL:
Expand Down Expand Up @@ -322,237 +320,169 @@ def create_subscriber(

def _validate_input_for_misconfigure(
subject: str,
queue: str,
queue: str, # default ""
pending_msgs_limit: Optional[int],
pending_bytes_limit: Optional[int],
max_msgs: int,
max_msgs: int, # default 0
durable: Optional[str],
config: Optional["api.ConsumerConfig"],
ordered_consumer: bool,
ordered_consumer: bool, # default False
idle_heartbeat: Optional[float],
flow_control: Optional[bool],
deliver_policy: Optional["api.DeliverPolicy"],
headers_only: Optional[bool],
pull_sub: Optional["PullSub"],
kv_watch: Optional["KvWatch"],
obj_watch: Optional["ObjWatch"],
ack_first: bool,
max_workers: int,
ack_first: bool, # default False
max_workers: int, # default 1
stream: Optional["JStream"],
) -> None:
if pull_sub is not None and stream is None:
raise SetupError("Pull subscriber can be used only with a stream")

if not subject and not config:
raise SetupError("You must provide either `subject` or `config` option.")

if max_msgs > 0 and any((stream, kv_watch, obj_watch, pull_sub)):
if stream and kv_watch:
raise SetupError("You can't use `stream` and `kv_watch` options both.")

if stream and obj_watch:
raise SetupError("You can't use `stream` and `obj_watch` options both.")

if kv_watch and obj_watch:
raise SetupError("You can't use `kv_watch` and `obj_watch` options both.")

if pull_sub and not stream:
raise SetupError("Pull subscriber can be used only with a `stream` option.")

if max_msgs > 0 and any((stream, kv_watch, obj_watch)):
warnings.warn(
"`max_msgs` option can be used with NATS Core Subscriber - only.",
RuntimeWarning,
stacklevel=4,
)

if not stream:
# Core/Obj/Kv Subscriber
if idle_heartbeat is not None:
warnings.warn(
"`idle_heartbeat` option can be used with JetStream (Pull/Push) - only.",
RuntimeWarning,
stacklevel=4,
)

if flow_control is not None:
warnings.warn(
"`flow_control` option can be used with JetStream (Pull/Push) - only.",
RuntimeWarning,
stacklevel=4,
)

if deliver_policy is not None:
warnings.warn(
"`deliver_policy` option can be used with JetStream (Pull/Push) - only.",
RuntimeWarning,
stacklevel=4,
)

if headers_only is not None:
warnings.warn(
"`headers_only` option can be used with JetStream (Pull/Push) - only.",
RuntimeWarning,
stacklevel=4,
)

elif stream:
if pull_sub is not None:
if queue: # default ""
if obj_watch or kv_watch:
# Obj/Kv Subscriber
if pending_msgs_limit is not None:
warnings.warn(
message="`queue` option has no effect with JetStream Pull Subscription. Probably, you wanted to use `durable` instead.",
message="`pending_msgs_limit` option can be used with JetStream (Pull/Push) or Core Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)

if ordered_consumer: # default False
if pending_bytes_limit is not None:
warnings.warn(
"`ordered_consumer` option has no effect with JetStream Pull Subscription. It can be used with JetStream Push Subscription - only.",
RuntimeWarning,
message="`pending_bytes_limit` option can be used with JetStream (Pull/Push) or Core Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)

if ack_first: # default False
if queue:
warnings.warn(
message="`ack_first` option has no effect with JetStream Pull Subscription. It can be used with JetStream Push Subscription - only.",
message="`queue` option can be used with JetStream Push or Core Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)

else:
# JS PushSub
if durable is not None:
if max_workers > 1:
warnings.warn(
message="JetStream Push consumer with durable option can't be scaled horizontally by multiple instances. Probably, you are looking for `queue` option. Also, we strongly recommend to use Jetstream PullSubsriber with durable option as a default.",
message="`max_workers` option can be used with JetStream (Pull/Push) or Core Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)

if obj_watch is not None or kv_watch is not None:
# Obj/Kv Subscriber
if pending_msgs_limit is not None:
warnings.warn(
message="`pending_msgs_limit` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)
if pending_bytes_limit is not None:
warnings.warn(
message="`pending_bytes_limit` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)

if queue:
warnings.warn(
message="`queue` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)

# Core/Obj/Kv Subscriber
if durable:
warnings.warn(
message="`durable` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
message="`durable` option can be used with JetStream (Pull/Push) Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)

if config is not None:
warnings.warn(
message="`config` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
message="`config` option can be used with JetStream (Pull/Push) Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)

if ordered_consumer:
warnings.warn(
message="`ordered_consumer` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
message="`ordered_consumer` option can be used with JetStream (Pull/Push) Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)

if idle_heartbeat is not None:
warnings.warn(
message="`idle_heartbeat` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
message="`idle_heartbeat` option can be used with JetStream (Pull/Push) Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)

if flow_control:
warnings.warn(
message="`flow_control` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
message="`flow_control` option can be used with JetStream Push Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)

if deliver_policy:
warnings.warn(
message="`deliver_policy` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
message="`deliver_policy` option can be used with JetStream (Pull/Push) Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)

if headers_only:
warnings.warn(
message="`headers_only` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
message="`headers_only` option be used with JetStream (Pull/Push) Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)

if ack_first:
warnings.warn(
message="`ack_first` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
message="`ack_first` can be used with JetStream Push Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)

if max_workers > 1:
warnings.warn(
message="`max_workers` has no effect for KeyValue/ObjectValue subscriber. It can be used with JetStream (Pull/Push) or Core Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)
else:
# Core Subscriber
if durable:
warnings.warn(
message="`durable` option has no effect for NATS Core Subscription. It can be used with JetStream (Pull) - only.",
category=RuntimeWarning,
stacklevel=4,
)

if config:
warnings.warn(
message="`config` option has no effect for NATS Core Subscription. It can be used with JetStream (Pull/Push) - only.",
category=RuntimeWarning,
stacklevel=4,
)

if ordered_consumer:
warnings.warn(
message="`ordered_consumer` option has no effect for NATS Core Subscription. It can be used with JetStream Push - only.",
category=RuntimeWarning,
stacklevel=4,
)

if idle_heartbeat is not None:
warnings.warn(
message="`idle_heartbeat` option has no effect for NATS Core Subscription. It can be used with JetStream (Pull/Push) - only.",
category=RuntimeWarning,
stacklevel=4,
)
elif stream:
if pull_sub:
if queue:
warnings.warn(
message="`queue` option has no effect with JetStream Pull Subscription. Probably, you wanted to use `durable` instead.",
category=RuntimeWarning,
stacklevel=4,
)

if flow_control:
warnings.warn(
message="`flow_control` option has no effect for NATS Core Subscription. It can be used with JetStream (Pull/Push) - only.",
category=RuntimeWarning,
stacklevel=4,
)
if ordered_consumer:
warnings.warn(
"`ordered_consumer` option has no effect with JetStream Pull Subscription. It can be used with JetStream Push Subscription - only.",
RuntimeWarning,
stacklevel=4,
)

if deliver_policy:
warnings.warn(
message="`deliver_policy` option has no effect for NATS Core Subscription. It can be used with JetStream (Pull/Push) - only.",
category=RuntimeWarning,
stacklevel=4,
)
if ack_first:
warnings.warn(
message="`ack_first` option has no effect with JetStream Pull Subscription. It can be used with JetStream Push Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)

if headers_only:
warnings.warn(
message="`headers_only` option has no effect for NATS Core Subscription. It can be used with JetStream (Pull/Push) - only.",
category=RuntimeWarning,
stacklevel=4,
)
if flow_control:
warnings.warn(
message="`flow_control` option has no effect with JetStream Pull Subscription. It can be used with JetStream Push Subscription - only.",
category=RuntimeWarning,
stacklevel=4,
)

if ack_first:
warnings.warn(
message="`ack_first` option has no effect for NATS Core Subscription. It can be used with JetStream Push - only.",
category=RuntimeWarning,
stacklevel=4,
)
else:
# JS PushSub
if durable is not None:
warnings.warn(
message="JetStream Push consumer with durable option can't be scaled horizontally by multiple instances. Probably, you are looking for `queue` option. Also, we strongly recommend to use Jetstream PullSubsriber with durable option as a default.",
category=RuntimeWarning,
stacklevel=4,
)

0 comments on commit 7a7c36f

Please sign in to comment.