From ee6b2e17f4ba64e65d61569f4108ac891bc1ac6c Mon Sep 17 00:00:00 2001 From: Mario Rivera Date: Wed, 17 Jan 2024 11:28:01 -0500 Subject: [PATCH 01/10] Add publish cli command --- faststream/cli/main.py | 64 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/faststream/cli/main.py b/faststream/cli/main.py index 9f491a2ab4..8f9dfb1fdf 100644 --- a/faststream/cli/main.py +++ b/faststream/cli/main.py @@ -200,3 +200,67 @@ def _run( ex.show() sys.exit(1) + + +@cli.command(context_settings={"allow_extra_args": True, "ignore_unknown_options": True}) +def publish( + ctx: typer.Context, + app: str = typer.Argument( + ..., + help="FastStream app instance, e.g., main:app" + ), + message: str = typer.Argument( + ..., + help="Message to be published" + ), + rpc: bool = typer.Option( + False, + help="Enable RPC mode and system output" + ), +): + """ + Publish a message using the specified broker in a FastStream application. + + This command publishes a message to a broker configured in a FastStream app instance. + It supports various brokers and can handle extra arguments specific to each broker type. + + Args: + ctx (typer.Context): The Typer context for the command. + app (str): The FastStream application instance path, in the format 'module:instance'. + message (str): The message to be published. + rpc (bool): If True, enables RPC mode and displays system output. + + The command allows extra CLI arguments to be passed, which are broker-specific. + These are parsed and passed to the broker's publish method. + """ + app, extra = parse_cli_args(app, *ctx.args) + extra['message'] = message + extra['rpc'] = rpc + + try: + if not app: + raise ValueError("App parameter is required.") + if not message: + raise ValueError("Message parameter is required.") + + _, app_obj = import_from_string(app) + if not app_obj.broker: + raise ValueError("Broker instance not found in the app.") + + result = anyio.run(publish_message, app_obj, extra) + + if rpc: + typer.echo(result) + + except Exception as e: + typer.echo(f"Publish error: {e}") + sys.exit(1) + + +async def publish_message(app_obj, extra): + try: + result = await app_obj.broker.publish(**extra) + return result + except Exception as e: + typer.echo(f"Error when broker was publishing: {e}") + sys.exit(1) From 341456a3c6d610975c7f07eca46d4dfa57b7a33a Mon Sep 17 00:00:00 2001 From: Mario Rivera Date: Wed, 17 Jan 2024 11:28:37 -0500 Subject: [PATCH 02/10] Add test file for testing the publish command with different brokers --- tests/cli/test_publish.py | 149 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 149 insertions(+) create mode 100644 tests/cli/test_publish.py diff --git a/tests/cli/test_publish.py b/tests/cli/test_publish.py new file mode 100644 index 0000000000..76049f3975 --- /dev/null +++ b/tests/cli/test_publish.py @@ -0,0 +1,149 @@ +from unittest.mock import patch, AsyncMock +import pytest +from typer.testing import CliRunner +from faststream.cli.main import cli as faststream_app +from faststream import FastStream +from faststream.confluent.producer import AsyncConfluentFastProducer +from faststream.kafka import KafkaBroker +from faststream.confluent import KafkaBroker as ConfluentBroker +from faststream.kafka.producer import AioKafkaFastProducer +from faststream.nats import NatsBroker +from faststream.nats.producer import NatsFastProducer +from faststream.rabbit import RabbitBroker +from faststream.rabbit.producer import AioPikaFastProducer +from faststream.redis import RedisBroker +from faststream.redis.producer import RedisFastProducer + +# Initialize the CLI runner +runner = CliRunner() + + +@pytest.fixture +def mock_app(request): + app = FastStream() + broker_type = request.param['broker_type'] + producer_type = request.param['producer_type'] + + broker = broker_type() + broker.connect = AsyncMock() + + mock_producer = AsyncMock(spec=producer_type) + mock_producer.publish = AsyncMock() + broker._producer = mock_producer + + app.broker = broker + return app + + +@pytest.mark.parametrize("mock_app", [{'broker_type': RedisBroker, 'producer_type': RedisFastProducer}], indirect=True) +def test_publish_command_with_redis_options(mock_app): + with patch('faststream.cli.main.import_from_string', return_value=(None, mock_app)): + mock_app.broker.connect() + + result = runner.invoke(faststream_app, [ + "publish", + "fastream:app", + "hello world", + "--channel", "test channel", + "--reply_to", "tester", + "--list", "0.1", + "--stream", "stream url", + "--correlation_id", "someId", + ]) + + assert result.exit_code == 0 + mock_app.broker._producer.publish.assert_awaited_once_with( + message="hello world", + channel="test channel", + reply_to="tester", + list="0.1", + stream="stream url", + correlation_id="someId", + rpc=False, + ) + + +@pytest.mark.parametrize("mock_app", [{'broker_type': ConfluentBroker, 'producer_type': AsyncConfluentFastProducer}], indirect=True) +def test_publish_command_with_confluent_options(mock_app): + with patch('faststream.cli.main.import_from_string', return_value=(None, mock_app)): + mock_app.broker.connect() + result = runner.invoke(faststream_app, [ + "publish", + "fastream:app", + "hello world", + "--topic", "confluent topic", + "--correlation_id", "someId", + ]) + + assert result.exit_code == 0 + mock_app.broker._producer.publish.assert_awaited_once_with( + message="hello world", + topic="confluent topic", + correlation_id="someId", + rpc=False, + ) + + +@pytest.mark.parametrize("mock_app", [{'broker_type': KafkaBroker, 'producer_type': AioKafkaFastProducer}], indirect=True) +def test_publish_command_with_kafka_options(mock_app): + with patch('faststream.cli.main.import_from_string', return_value=(None, mock_app)): + mock_app.broker.connect() + result = runner.invoke(faststream_app, [ + "publish", + "fastream:app", + "hello world", + "--topic", "kafka topic", + "--correlation_id", "someId", + ]) + + assert result.exit_code == 0 + mock_app.broker._producer.publish.assert_awaited_once_with( + message="hello world", + topic="kafka topic", + correlation_id="someId", + rpc=False, + ) + + +@pytest.mark.parametrize("mock_app", [{'broker_type': NatsBroker, 'producer_type': NatsFastProducer}], indirect=True) +def test_publish_command_with_nats_options(mock_app): + with patch('faststream.cli.main.import_from_string', return_value=(None, mock_app)): + mock_app.broker.connect() + result = runner.invoke(faststream_app, [ + "publish", + "fastream:app", + "hello world", + "--subject", "nats subject", + "--reply_to", "tester", + "--correlation_id", "someId", + ]) + + assert result.exit_code == 0 + mock_app.broker._producer.publish.assert_awaited_once_with( + message="hello world", + subject="nats subject", + reply_to="tester", + correlation_id="someId", + rpc=False, + ) + + +@pytest.mark.parametrize("mock_app", [{'broker_type': RabbitBroker, 'producer_type': AioPikaFastProducer}], indirect=True) +def test_publish_command_with_rabbit_options(mock_app): + with patch('faststream.cli.main.import_from_string', return_value=(None, mock_app)): + mock_app.broker.connect() + result = runner.invoke(faststream_app, [ + "publish", + "fastream:app", + "hello world", + "--correlation_id", "someId", + "--raise_timeout", "True", + ]) + + assert result.exit_code == 0 + mock_app.broker._producer.publish.assert_awaited_once_with( + message="hello world", + correlation_id="someId", + raise_timeout="True", + rpc=False, + ) \ No newline at end of file From 2eea9ea8246256b79a4e2b96c23e071e8a00b9cf Mon Sep 17 00:00:00 2001 From: Mario Rivera Date: Wed, 17 Jan 2024 11:38:16 -0500 Subject: [PATCH 03/10] Formatting changes --- faststream/cli/main.py | 27 +++++++++++++-------------- tests/cli/test_publish.py | 36 +++++++++++++++++++----------------- 2 files changed, 32 insertions(+), 31 deletions(-) diff --git a/faststream/cli/main.py b/faststream/cli/main.py index 8f9dfb1fdf..d48775d9df 100644 --- a/faststream/cli/main.py +++ b/faststream/cli/main.py @@ -218,24 +218,23 @@ def publish( help="Enable RPC mode and system output" ), ): - """ - Publish a message using the specified broker in a FastStream application. + """Publish a message using the specified broker in a FastStream application. - This command publishes a message to a broker configured in a FastStream app instance. - It supports various brokers and can handle extra arguments specific to each broker type. + This command publishes a message to a broker configured in a FastStream app instance. + It supports various brokers and can handle extra arguments specific to each broker type. - Args: - ctx (typer.Context): The Typer context for the command. - app (str): The FastStream application instance path, in the format 'module:instance'. - message (str): The message to be published. - rpc (bool): If True, enables RPC mode and displays system output. + Args: + ctx (typer.Context): The Typer context for the command. + app (str): The FastStream application instance path, in the format 'module:instance'. + message (str): The message to be published. + rpc (bool): If True, enables RPC mode and displays system output. - The command allows extra CLI arguments to be passed, which are broker-specific. - These are parsed and passed to the broker's publish method. - """ + The command allows extra CLI arguments to be passed, which are broker-specific. + These are parsed and passed to the broker's publish method. + """ app, extra = parse_cli_args(app, *ctx.args) - extra['message'] = message - extra['rpc'] = rpc + extra["message"] = message + extra["rpc"] = rpc try: if not app: diff --git a/tests/cli/test_publish.py b/tests/cli/test_publish.py index 76049f3975..f518f45145 100644 --- a/tests/cli/test_publish.py +++ b/tests/cli/test_publish.py @@ -1,11 +1,13 @@ -from unittest.mock import patch, AsyncMock +from unittest.mock import AsyncMock, patch + import pytest from typer.testing import CliRunner -from faststream.cli.main import cli as faststream_app + from faststream import FastStream +from faststream.cli.main import cli as faststream_app +from faststream.confluent import KafkaBroker as ConfluentBroker from faststream.confluent.producer import AsyncConfluentFastProducer from faststream.kafka import KafkaBroker -from faststream.confluent import KafkaBroker as ConfluentBroker from faststream.kafka.producer import AioKafkaFastProducer from faststream.nats import NatsBroker from faststream.nats.producer import NatsFastProducer @@ -18,11 +20,11 @@ runner = CliRunner() -@pytest.fixture +@pytest.fixture() def mock_app(request): app = FastStream() - broker_type = request.param['broker_type'] - producer_type = request.param['producer_type'] + broker_type = request.param["broker_type"] + producer_type = request.param["producer_type"] broker = broker_type() broker.connect = AsyncMock() @@ -35,9 +37,9 @@ def mock_app(request): return app -@pytest.mark.parametrize("mock_app", [{'broker_type': RedisBroker, 'producer_type': RedisFastProducer}], indirect=True) +@pytest.mark.parametrize("mock_app", [{"broker_type": RedisBroker, "producer_type": RedisFastProducer}], indirect=True) def test_publish_command_with_redis_options(mock_app): - with patch('faststream.cli.main.import_from_string', return_value=(None, mock_app)): + with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)): mock_app.broker.connect() result = runner.invoke(faststream_app, [ @@ -63,9 +65,9 @@ def test_publish_command_with_redis_options(mock_app): ) -@pytest.mark.parametrize("mock_app", [{'broker_type': ConfluentBroker, 'producer_type': AsyncConfluentFastProducer}], indirect=True) +@pytest.mark.parametrize("mock_app", [{"broker_type": ConfluentBroker, "producer_type": AsyncConfluentFastProducer}], indirect=True) def test_publish_command_with_confluent_options(mock_app): - with patch('faststream.cli.main.import_from_string', return_value=(None, mock_app)): + with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)): mock_app.broker.connect() result = runner.invoke(faststream_app, [ "publish", @@ -84,9 +86,9 @@ def test_publish_command_with_confluent_options(mock_app): ) -@pytest.mark.parametrize("mock_app", [{'broker_type': KafkaBroker, 'producer_type': AioKafkaFastProducer}], indirect=True) +@pytest.mark.parametrize("mock_app", [{"broker_type": KafkaBroker, "producer_type": AioKafkaFastProducer}], indirect=True) def test_publish_command_with_kafka_options(mock_app): - with patch('faststream.cli.main.import_from_string', return_value=(None, mock_app)): + with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)): mock_app.broker.connect() result = runner.invoke(faststream_app, [ "publish", @@ -105,9 +107,9 @@ def test_publish_command_with_kafka_options(mock_app): ) -@pytest.mark.parametrize("mock_app", [{'broker_type': NatsBroker, 'producer_type': NatsFastProducer}], indirect=True) +@pytest.mark.parametrize("mock_app", [{"broker_type": NatsBroker, "producer_type": NatsFastProducer}], indirect=True) def test_publish_command_with_nats_options(mock_app): - with patch('faststream.cli.main.import_from_string', return_value=(None, mock_app)): + with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)): mock_app.broker.connect() result = runner.invoke(faststream_app, [ "publish", @@ -128,9 +130,9 @@ def test_publish_command_with_nats_options(mock_app): ) -@pytest.mark.parametrize("mock_app", [{'broker_type': RabbitBroker, 'producer_type': AioPikaFastProducer}], indirect=True) +@pytest.mark.parametrize("mock_app", [{"broker_type": RabbitBroker, "producer_type": AioPikaFastProducer}], indirect=True) def test_publish_command_with_rabbit_options(mock_app): - with patch('faststream.cli.main.import_from_string', return_value=(None, mock_app)): + with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)): mock_app.broker.connect() result = runner.invoke(faststream_app, [ "publish", @@ -146,4 +148,4 @@ def test_publish_command_with_rabbit_options(mock_app): correlation_id="someId", raise_timeout="True", rpc=False, - ) \ No newline at end of file + ) From 94e50bbbd09d4587ba6f9d9efa607c89fa96bd5d Mon Sep 17 00:00:00 2001 From: Mario Rivera Date: Wed, 17 Jan 2024 13:41:31 -0500 Subject: [PATCH 04/10] Check and connect the broker before publishing --- faststream/cli/main.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/faststream/cli/main.py b/faststream/cli/main.py index d48775d9df..304dd88669 100644 --- a/faststream/cli/main.py +++ b/faststream/cli/main.py @@ -258,8 +258,11 @@ def publish( async def publish_message(app_obj, extra): try: - result = await app_obj.broker.publish(**extra) - return result + if await app_obj.broker.connect(): + result = await app_obj.broker.publish(**extra) + return result + else: + raise ValueError("Failed to connect to the broker.") except Exception as e: typer.echo(f"Error when broker was publishing: {e}") sys.exit(1) From 79c0ccc32a63b36d7cd8111809a2d68d475fb065 Mon Sep 17 00:00:00 2001 From: Davor Runje Date: Fri, 2 Feb 2024 20:41:47 +0000 Subject: [PATCH 05/10] polishing --- .secrets.baseline | 4 +- docs/docs/SUMMARY.md | 2 + .../en/api/faststream/cli/main/publish.md | 11 ++ .../faststream/cli/main/publish_message.md | 11 ++ faststream/asyncapi/message.py | 2 +- faststream/cli/main.py | 25 ++-- faststream/rabbit/test.py | 4 +- tests/asyncapi/base/arguments.py | 7 +- tests/brokers/rabbit/test_test_client.py | 5 +- tests/cli/test_publish.py | 137 ++++++++++++------ 10 files changed, 141 insertions(+), 67 deletions(-) create mode 100644 docs/docs/en/api/faststream/cli/main/publish.md create mode 100644 docs/docs/en/api/faststream/cli/main/publish_message.md diff --git a/.secrets.baseline b/.secrets.baseline index 36575e6b42..4c75590b33 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -128,7 +128,7 @@ "filename": "docs/docs/en/release.md", "hashed_secret": "35675e68f4b5af7b995d9205ad0fc43842f16450", "is_verified": false, - "line_number": 529, + "line_number": 625, "is_secret": false } ], @@ -163,5 +163,5 @@ } ] }, - "generated_at": "2024-01-30T11:15:38Z" + "generated_at": "2024-02-02T20:33:17Z" } diff --git a/docs/docs/SUMMARY.md b/docs/docs/SUMMARY.md index ab1da47b47..005b0728f9 100644 --- a/docs/docs/SUMMARY.md +++ b/docs/docs/SUMMARY.md @@ -297,6 +297,8 @@ search: - [serve](api/faststream/cli/docs/app/serve.md) - main - [main](api/faststream/cli/main/main.md) + - [publish](api/faststream/cli/main/publish.md) + - [publish_message](api/faststream/cli/main/publish_message.md) - [run](api/faststream/cli/main/run.md) - [version_callback](api/faststream/cli/main/version_callback.md) - supervisors diff --git a/docs/docs/en/api/faststream/cli/main/publish.md b/docs/docs/en/api/faststream/cli/main/publish.md new file mode 100644 index 0000000000..84b490cde8 --- /dev/null +++ b/docs/docs/en/api/faststream/cli/main/publish.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.cli.main.publish diff --git a/docs/docs/en/api/faststream/cli/main/publish_message.md b/docs/docs/en/api/faststream/cli/main/publish_message.md new file mode 100644 index 0000000000..a8bb7b8efa --- /dev/null +++ b/docs/docs/en/api/faststream/cli/main/publish_message.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.cli.main.publish_message diff --git a/faststream/asyncapi/message.py b/faststream/asyncapi/message.py index a3819d59d1..3d43d4d325 100644 --- a/faststream/asyncapi/message.py +++ b/faststream/asyncapi/message.py @@ -23,7 +23,7 @@ def parse_handler_params(call: CallModel[Any, Any], prefix: str = "") -> Dict[st body = get_model_schema( create_model( # type: ignore[call-overload] model.__name__, - **call.flat_params, # type: ignore[arg-type] + **call.flat_params, # type: ignore[arg-type] ), prefix=prefix, exclude=tuple(call.custom_fields.keys()), diff --git a/faststream/cli/main.py b/faststream/cli/main.py index 304dd88669..d1bfcfdbaf 100644 --- a/faststream/cli/main.py +++ b/faststream/cli/main.py @@ -2,7 +2,7 @@ import sys import warnings from contextlib import suppress -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional import anyio import typer @@ -202,22 +202,15 @@ def _run( sys.exit(1) -@cli.command(context_settings={"allow_extra_args": True, "ignore_unknown_options": True}) +@cli.command( + context_settings={"allow_extra_args": True, "ignore_unknown_options": True} +) def publish( ctx: typer.Context, - app: str = typer.Argument( - ..., - help="FastStream app instance, e.g., main:app" - ), - message: str = typer.Argument( - ..., - help="Message to be published" - ), - rpc: bool = typer.Option( - False, - help="Enable RPC mode and system output" - ), -): + app: str = typer.Argument(..., help="FastStream app instance, e.g., main:app"), + message: str = typer.Argument(..., help="Message to be published"), + rpc: bool = typer.Option(False, help="Enable RPC mode and system output"), +) -> None: """Publish a message using the specified broker in a FastStream application. This command publishes a message to a broker configured in a FastStream app instance. @@ -256,7 +249,7 @@ def publish( sys.exit(1) -async def publish_message(app_obj, extra): +async def publish_message(app_obj, extra: Any) -> Any: try: if await app_obj.broker.connect(): result = await app_obj.broker.publish(**extra) diff --git a/faststream/rabbit/test.py b/faststream/rabbit/test.py index 8206386614..5bf2b8b828 100644 --- a/faststream/rabbit/test.py +++ b/faststream/rabbit/test.py @@ -239,7 +239,9 @@ async def publish( call = True elif handler.exchange.type == ExchangeType.TOPIC: - call = apply_pattern(handler.queue.routing, incoming.routing_key or "") + call = apply_pattern( + handler.queue.routing, incoming.routing_key or "" + ) elif handler.exchange.type == ExchangeType.HEADERS: # pramga: no branch queue_headers = (handler.queue.bind_arguments or {}).copy() diff --git a/tests/asyncapi/base/arguments.py b/tests/asyncapi/base/arguments.py index 6a099cd425..a70b460938 100644 --- a/tests/asyncapi/base/arguments.py +++ b/tests/asyncapi/base/arguments.py @@ -404,8 +404,11 @@ def dep(name: str = ""): def dep2(name2: str): return name2 - @broker.subscriber("test", dependencies=(self.dependency_builder(dep2),)) - async def handle(id: int, message=self.dependency_builder(dep)): + dependencies = self.dependency_builder(dep2) + message = self.dependency_builder(dep) + + @broker.subscriber("test", dependencies=dependencies) + async def handle(id: int, message=message): ... schema = get_app_schema(self.build_app(broker)).to_jsonable() diff --git a/tests/brokers/rabbit/test_test_client.py b/tests/brokers/rabbit/test_test_client.py index 4881543cb1..8e3902e1a2 100644 --- a/tests/brokers/rabbit/test_test_client.py +++ b/tests/brokers/rabbit/test_test_client.py @@ -272,6 +272,7 @@ async def h2(): assert len(routes) == 2 + @pytest.mark.parametrize( ("pattern", "current", "result"), [ @@ -284,7 +285,9 @@ async def h2(): pytest.param("#.test.*", "1.2.test.1", True, id="#.test.*"), pytest.param("#.test.*.*", "1.2.test.1.2", True, id="#.test.*."), pytest.param("#.test.*.*.*", "1.2.test.1.2", False, id="#.test.*.*.* - broken"), - pytest.param("#.test.*.test.#", "1.2.test.1.test.1.2", True, id="#.test.*.test.#"), + pytest.param( + "#.test.*.test.#", "1.2.test.1.test.1.2", True, id="#.test.*.test.#" + ), pytest.param("#.*.test", "1.2.2.test", True, id="#.*.test"), pytest.param("#.2.*.test", "1.2.2.test", True, id="#.2.*.test"), pytest.param("#.*.*.test", "1.2.2.test", True, id="#.*.*.test"), diff --git a/tests/cli/test_publish.py b/tests/cli/test_publish.py index f518f45145..6250ea7869 100644 --- a/tests/cli/test_publish.py +++ b/tests/cli/test_publish.py @@ -37,21 +37,33 @@ def mock_app(request): return app -@pytest.mark.parametrize("mock_app", [{"broker_type": RedisBroker, "producer_type": RedisFastProducer}], indirect=True) +@pytest.mark.parametrize( + "mock_app", + [{"broker_type": RedisBroker, "producer_type": RedisFastProducer}], + indirect=True, +) def test_publish_command_with_redis_options(mock_app): with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)): mock_app.broker.connect() - result = runner.invoke(faststream_app, [ - "publish", - "fastream:app", - "hello world", - "--channel", "test channel", - "--reply_to", "tester", - "--list", "0.1", - "--stream", "stream url", - "--correlation_id", "someId", - ]) + result = runner.invoke( + faststream_app, + [ + "publish", + "fastream:app", + "hello world", + "--channel", + "test channel", + "--reply_to", + "tester", + "--list", + "0.1", + "--stream", + "stream url", + "--correlation_id", + "someId", + ], + ) assert result.exit_code == 0 mock_app.broker._producer.publish.assert_awaited_once_with( @@ -65,17 +77,26 @@ def test_publish_command_with_redis_options(mock_app): ) -@pytest.mark.parametrize("mock_app", [{"broker_type": ConfluentBroker, "producer_type": AsyncConfluentFastProducer}], indirect=True) +@pytest.mark.parametrize( + "mock_app", + [{"broker_type": ConfluentBroker, "producer_type": AsyncConfluentFastProducer}], + indirect=True, +) def test_publish_command_with_confluent_options(mock_app): with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)): mock_app.broker.connect() - result = runner.invoke(faststream_app, [ - "publish", - "fastream:app", - "hello world", - "--topic", "confluent topic", - "--correlation_id", "someId", - ]) + result = runner.invoke( + faststream_app, + [ + "publish", + "fastream:app", + "hello world", + "--topic", + "confluent topic", + "--correlation_id", + "someId", + ], + ) assert result.exit_code == 0 mock_app.broker._producer.publish.assert_awaited_once_with( @@ -86,17 +107,26 @@ def test_publish_command_with_confluent_options(mock_app): ) -@pytest.mark.parametrize("mock_app", [{"broker_type": KafkaBroker, "producer_type": AioKafkaFastProducer}], indirect=True) +@pytest.mark.parametrize( + "mock_app", + [{"broker_type": KafkaBroker, "producer_type": AioKafkaFastProducer}], + indirect=True, +) def test_publish_command_with_kafka_options(mock_app): with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)): mock_app.broker.connect() - result = runner.invoke(faststream_app, [ - "publish", - "fastream:app", - "hello world", - "--topic", "kafka topic", - "--correlation_id", "someId", - ]) + result = runner.invoke( + faststream_app, + [ + "publish", + "fastream:app", + "hello world", + "--topic", + "kafka topic", + "--correlation_id", + "someId", + ], + ) assert result.exit_code == 0 mock_app.broker._producer.publish.assert_awaited_once_with( @@ -107,18 +137,28 @@ def test_publish_command_with_kafka_options(mock_app): ) -@pytest.mark.parametrize("mock_app", [{"broker_type": NatsBroker, "producer_type": NatsFastProducer}], indirect=True) +@pytest.mark.parametrize( + "mock_app", + [{"broker_type": NatsBroker, "producer_type": NatsFastProducer}], + indirect=True, +) def test_publish_command_with_nats_options(mock_app): with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)): mock_app.broker.connect() - result = runner.invoke(faststream_app, [ - "publish", - "fastream:app", - "hello world", - "--subject", "nats subject", - "--reply_to", "tester", - "--correlation_id", "someId", - ]) + result = runner.invoke( + faststream_app, + [ + "publish", + "fastream:app", + "hello world", + "--subject", + "nats subject", + "--reply_to", + "tester", + "--correlation_id", + "someId", + ], + ) assert result.exit_code == 0 mock_app.broker._producer.publish.assert_awaited_once_with( @@ -130,17 +170,26 @@ def test_publish_command_with_nats_options(mock_app): ) -@pytest.mark.parametrize("mock_app", [{"broker_type": RabbitBroker, "producer_type": AioPikaFastProducer}], indirect=True) +@pytest.mark.parametrize( + "mock_app", + [{"broker_type": RabbitBroker, "producer_type": AioPikaFastProducer}], + indirect=True, +) def test_publish_command_with_rabbit_options(mock_app): with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)): mock_app.broker.connect() - result = runner.invoke(faststream_app, [ - "publish", - "fastream:app", - "hello world", - "--correlation_id", "someId", - "--raise_timeout", "True", - ]) + result = runner.invoke( + faststream_app, + [ + "publish", + "fastream:app", + "hello world", + "--correlation_id", + "someId", + "--raise_timeout", + "True", + ], + ) assert result.exit_code == 0 mock_app.broker._producer.publish.assert_awaited_once_with( From cbc5ea67a88bbce32a7cbbbd3634841c04392375 Mon Sep 17 00:00:00 2001 From: Davor Runje Date: Fri, 2 Feb 2024 21:09:41 +0000 Subject: [PATCH 06/10] polishing --- tests/asyncapi/base/arguments.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/asyncapi/base/arguments.py b/tests/asyncapi/base/arguments.py index a70b460938..5856a995f8 100644 --- a/tests/asyncapi/base/arguments.py +++ b/tests/asyncapi/base/arguments.py @@ -404,7 +404,7 @@ def dep(name: str = ""): def dep2(name2: str): return name2 - dependencies = self.dependency_builder(dep2) + dependencies = (self.dependency_builder(dep2), ) message = self.dependency_builder(dep) @broker.subscriber("test", dependencies=dependencies) From d85c042e0fe5480676ada64d7c3f891b5286b12f Mon Sep 17 00:00:00 2001 From: Davor Runje Date: Fri, 2 Feb 2024 22:13:33 +0100 Subject: [PATCH 07/10] polishing --- .secrets.baseline | 4 ++-- faststream/asyncapi/message.py | 2 +- faststream/rabbit/test.py | 4 +++- tests/asyncapi/base/arguments.py | 7 +++++-- tests/brokers/rabbit/test_test_client.py | 5 ++++- 5 files changed, 15 insertions(+), 7 deletions(-) diff --git a/.secrets.baseline b/.secrets.baseline index 36575e6b42..e485f1d3ad 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -128,7 +128,7 @@ "filename": "docs/docs/en/release.md", "hashed_secret": "35675e68f4b5af7b995d9205ad0fc43842f16450", "is_verified": false, - "line_number": 529, + "line_number": 625, "is_secret": false } ], @@ -163,5 +163,5 @@ } ] }, - "generated_at": "2024-01-30T11:15:38Z" + "generated_at": "2024-02-02T20:57:18Z" } diff --git a/faststream/asyncapi/message.py b/faststream/asyncapi/message.py index a3819d59d1..3d43d4d325 100644 --- a/faststream/asyncapi/message.py +++ b/faststream/asyncapi/message.py @@ -23,7 +23,7 @@ def parse_handler_params(call: CallModel[Any, Any], prefix: str = "") -> Dict[st body = get_model_schema( create_model( # type: ignore[call-overload] model.__name__, - **call.flat_params, # type: ignore[arg-type] + **call.flat_params, # type: ignore[arg-type] ), prefix=prefix, exclude=tuple(call.custom_fields.keys()), diff --git a/faststream/rabbit/test.py b/faststream/rabbit/test.py index 8206386614..5bf2b8b828 100644 --- a/faststream/rabbit/test.py +++ b/faststream/rabbit/test.py @@ -239,7 +239,9 @@ async def publish( call = True elif handler.exchange.type == ExchangeType.TOPIC: - call = apply_pattern(handler.queue.routing, incoming.routing_key or "") + call = apply_pattern( + handler.queue.routing, incoming.routing_key or "" + ) elif handler.exchange.type == ExchangeType.HEADERS: # pramga: no branch queue_headers = (handler.queue.bind_arguments or {}).copy() diff --git a/tests/asyncapi/base/arguments.py b/tests/asyncapi/base/arguments.py index 6a099cd425..72dbd61608 100644 --- a/tests/asyncapi/base/arguments.py +++ b/tests/asyncapi/base/arguments.py @@ -404,8 +404,11 @@ def dep(name: str = ""): def dep2(name2: str): return name2 - @broker.subscriber("test", dependencies=(self.dependency_builder(dep2),)) - async def handle(id: int, message=self.dependency_builder(dep)): + dependencies = (self.dependency_builder(dep2),) + message = self.dependency_builder(dep) + + @broker.subscriber("test", dependencies=dependencies) + async def handle(id: int, message=message): ... schema = get_app_schema(self.build_app(broker)).to_jsonable() diff --git a/tests/brokers/rabbit/test_test_client.py b/tests/brokers/rabbit/test_test_client.py index 4881543cb1..8e3902e1a2 100644 --- a/tests/brokers/rabbit/test_test_client.py +++ b/tests/brokers/rabbit/test_test_client.py @@ -272,6 +272,7 @@ async def h2(): assert len(routes) == 2 + @pytest.mark.parametrize( ("pattern", "current", "result"), [ @@ -284,7 +285,9 @@ async def h2(): pytest.param("#.test.*", "1.2.test.1", True, id="#.test.*"), pytest.param("#.test.*.*", "1.2.test.1.2", True, id="#.test.*."), pytest.param("#.test.*.*.*", "1.2.test.1.2", False, id="#.test.*.*.* - broken"), - pytest.param("#.test.*.test.#", "1.2.test.1.test.1.2", True, id="#.test.*.test.#"), + pytest.param( + "#.test.*.test.#", "1.2.test.1.test.1.2", True, id="#.test.*.test.#" + ), pytest.param("#.*.test", "1.2.2.test", True, id="#.*.test"), pytest.param("#.2.*.test", "1.2.2.test", True, id="#.2.*.test"), pytest.param("#.*.*.test", "1.2.2.test", True, id="#.*.*.test"), From 121310422b1e9ba34118d820b290f5ee380404a6 Mon Sep 17 00:00:00 2001 From: Davor Runje Date: Fri, 2 Feb 2024 21:28:50 +0000 Subject: [PATCH 08/10] polishing --- faststream/cli/main.py | 7 ++++--- tests/asyncapi/base/arguments.py | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/faststream/cli/main.py b/faststream/cli/main.py index d1bfcfdbaf..6900331a84 100644 --- a/faststream/cli/main.py +++ b/faststream/cli/main.py @@ -11,6 +11,7 @@ from typer.core import TyperOption from faststream.__about__ import INSTALL_WATCHFILES, __version__ +from faststream.app import FastStream from faststream.cli.docs.app import docs_app from faststream.cli.utils.imports import import_from_string from faststream.cli.utils.logs import LogLevels, get_log_level, set_log_level @@ -249,10 +250,10 @@ def publish( sys.exit(1) -async def publish_message(app_obj, extra: Any) -> Any: +async def publish_message(app_obj: FastStream, extra: Any) -> Any: try: - if await app_obj.broker.connect(): - result = await app_obj.broker.publish(**extra) + if await app_obj.broker.connect(): # type: ignore[union-attr] + result = await app_obj.broker.publish(**extra) # type: ignore[union-attr] return result else: raise ValueError("Failed to connect to the broker.") diff --git a/tests/asyncapi/base/arguments.py b/tests/asyncapi/base/arguments.py index 5856a995f8..72dbd61608 100644 --- a/tests/asyncapi/base/arguments.py +++ b/tests/asyncapi/base/arguments.py @@ -404,7 +404,7 @@ def dep(name: str = ""): def dep2(name2: str): return name2 - dependencies = (self.dependency_builder(dep2), ) + dependencies = (self.dependency_builder(dep2),) message = self.dependency_builder(dep) @broker.subscriber("test", dependencies=dependencies) From ca93e9872bf170e731f111f6fa9354976ab38579 Mon Sep 17 00:00:00 2001 From: Davor Runje Date: Fri, 2 Feb 2024 23:56:48 +0100 Subject: [PATCH 09/10] polishing --- docs/docs/SUMMARY.md | 2 + .../en/api/faststream/cli/main/publish.md | 11 ++ .../faststream/cli/main/publish_message.md | 11 ++ faststream/cli/main.py | 30 ++-- tests/cli/test_publish.py | 137 ++++++++++++------ 5 files changed, 129 insertions(+), 62 deletions(-) create mode 100644 docs/docs/en/api/faststream/cli/main/publish.md create mode 100644 docs/docs/en/api/faststream/cli/main/publish_message.md diff --git a/docs/docs/SUMMARY.md b/docs/docs/SUMMARY.md index ab1da47b47..005b0728f9 100644 --- a/docs/docs/SUMMARY.md +++ b/docs/docs/SUMMARY.md @@ -297,6 +297,8 @@ search: - [serve](api/faststream/cli/docs/app/serve.md) - main - [main](api/faststream/cli/main/main.md) + - [publish](api/faststream/cli/main/publish.md) + - [publish_message](api/faststream/cli/main/publish_message.md) - [run](api/faststream/cli/main/run.md) - [version_callback](api/faststream/cli/main/version_callback.md) - supervisors diff --git a/docs/docs/en/api/faststream/cli/main/publish.md b/docs/docs/en/api/faststream/cli/main/publish.md new file mode 100644 index 0000000000..84b490cde8 --- /dev/null +++ b/docs/docs/en/api/faststream/cli/main/publish.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.cli.main.publish diff --git a/docs/docs/en/api/faststream/cli/main/publish_message.md b/docs/docs/en/api/faststream/cli/main/publish_message.md new file mode 100644 index 0000000000..a8bb7b8efa --- /dev/null +++ b/docs/docs/en/api/faststream/cli/main/publish_message.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.cli.main.publish_message diff --git a/faststream/cli/main.py b/faststream/cli/main.py index 304dd88669..946c598993 100644 --- a/faststream/cli/main.py +++ b/faststream/cli/main.py @@ -2,7 +2,7 @@ import sys import warnings from contextlib import suppress -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional import anyio import typer @@ -10,6 +10,7 @@ from pydantic import ValidationError from typer.core import TyperOption +from faststream import FastStream from faststream.__about__ import INSTALL_WATCHFILES, __version__ from faststream.cli.docs.app import docs_app from faststream.cli.utils.imports import import_from_string @@ -202,22 +203,15 @@ def _run( sys.exit(1) -@cli.command(context_settings={"allow_extra_args": True, "ignore_unknown_options": True}) +@cli.command( + context_settings={"allow_extra_args": True, "ignore_unknown_options": True} +) def publish( ctx: typer.Context, - app: str = typer.Argument( - ..., - help="FastStream app instance, e.g., main:app" - ), - message: str = typer.Argument( - ..., - help="Message to be published" - ), - rpc: bool = typer.Option( - False, - help="Enable RPC mode and system output" - ), -): + app: str = typer.Argument(..., help="FastStream app instance, e.g., main:app"), + message: str = typer.Argument(..., help="Message to be published"), + rpc: bool = typer.Option(False, help="Enable RPC mode and system output"), +) -> None: """Publish a message using the specified broker in a FastStream application. This command publishes a message to a broker configured in a FastStream app instance. @@ -256,10 +250,10 @@ def publish( sys.exit(1) -async def publish_message(app_obj, extra): +async def publish_message(app_obj: FastStream, extra: Any) -> Any: try: - if await app_obj.broker.connect(): - result = await app_obj.broker.publish(**extra) + if await app_obj.broker.connect(): # type: ignore[union-attr] + result = await app_obj.broker.publish(**extra) # type: ignore[union-attr] return result else: raise ValueError("Failed to connect to the broker.") diff --git a/tests/cli/test_publish.py b/tests/cli/test_publish.py index f518f45145..6250ea7869 100644 --- a/tests/cli/test_publish.py +++ b/tests/cli/test_publish.py @@ -37,21 +37,33 @@ def mock_app(request): return app -@pytest.mark.parametrize("mock_app", [{"broker_type": RedisBroker, "producer_type": RedisFastProducer}], indirect=True) +@pytest.mark.parametrize( + "mock_app", + [{"broker_type": RedisBroker, "producer_type": RedisFastProducer}], + indirect=True, +) def test_publish_command_with_redis_options(mock_app): with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)): mock_app.broker.connect() - result = runner.invoke(faststream_app, [ - "publish", - "fastream:app", - "hello world", - "--channel", "test channel", - "--reply_to", "tester", - "--list", "0.1", - "--stream", "stream url", - "--correlation_id", "someId", - ]) + result = runner.invoke( + faststream_app, + [ + "publish", + "fastream:app", + "hello world", + "--channel", + "test channel", + "--reply_to", + "tester", + "--list", + "0.1", + "--stream", + "stream url", + "--correlation_id", + "someId", + ], + ) assert result.exit_code == 0 mock_app.broker._producer.publish.assert_awaited_once_with( @@ -65,17 +77,26 @@ def test_publish_command_with_redis_options(mock_app): ) -@pytest.mark.parametrize("mock_app", [{"broker_type": ConfluentBroker, "producer_type": AsyncConfluentFastProducer}], indirect=True) +@pytest.mark.parametrize( + "mock_app", + [{"broker_type": ConfluentBroker, "producer_type": AsyncConfluentFastProducer}], + indirect=True, +) def test_publish_command_with_confluent_options(mock_app): with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)): mock_app.broker.connect() - result = runner.invoke(faststream_app, [ - "publish", - "fastream:app", - "hello world", - "--topic", "confluent topic", - "--correlation_id", "someId", - ]) + result = runner.invoke( + faststream_app, + [ + "publish", + "fastream:app", + "hello world", + "--topic", + "confluent topic", + "--correlation_id", + "someId", + ], + ) assert result.exit_code == 0 mock_app.broker._producer.publish.assert_awaited_once_with( @@ -86,17 +107,26 @@ def test_publish_command_with_confluent_options(mock_app): ) -@pytest.mark.parametrize("mock_app", [{"broker_type": KafkaBroker, "producer_type": AioKafkaFastProducer}], indirect=True) +@pytest.mark.parametrize( + "mock_app", + [{"broker_type": KafkaBroker, "producer_type": AioKafkaFastProducer}], + indirect=True, +) def test_publish_command_with_kafka_options(mock_app): with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)): mock_app.broker.connect() - result = runner.invoke(faststream_app, [ - "publish", - "fastream:app", - "hello world", - "--topic", "kafka topic", - "--correlation_id", "someId", - ]) + result = runner.invoke( + faststream_app, + [ + "publish", + "fastream:app", + "hello world", + "--topic", + "kafka topic", + "--correlation_id", + "someId", + ], + ) assert result.exit_code == 0 mock_app.broker._producer.publish.assert_awaited_once_with( @@ -107,18 +137,28 @@ def test_publish_command_with_kafka_options(mock_app): ) -@pytest.mark.parametrize("mock_app", [{"broker_type": NatsBroker, "producer_type": NatsFastProducer}], indirect=True) +@pytest.mark.parametrize( + "mock_app", + [{"broker_type": NatsBroker, "producer_type": NatsFastProducer}], + indirect=True, +) def test_publish_command_with_nats_options(mock_app): with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)): mock_app.broker.connect() - result = runner.invoke(faststream_app, [ - "publish", - "fastream:app", - "hello world", - "--subject", "nats subject", - "--reply_to", "tester", - "--correlation_id", "someId", - ]) + result = runner.invoke( + faststream_app, + [ + "publish", + "fastream:app", + "hello world", + "--subject", + "nats subject", + "--reply_to", + "tester", + "--correlation_id", + "someId", + ], + ) assert result.exit_code == 0 mock_app.broker._producer.publish.assert_awaited_once_with( @@ -130,17 +170,26 @@ def test_publish_command_with_nats_options(mock_app): ) -@pytest.mark.parametrize("mock_app", [{"broker_type": RabbitBroker, "producer_type": AioPikaFastProducer}], indirect=True) +@pytest.mark.parametrize( + "mock_app", + [{"broker_type": RabbitBroker, "producer_type": AioPikaFastProducer}], + indirect=True, +) def test_publish_command_with_rabbit_options(mock_app): with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)): mock_app.broker.connect() - result = runner.invoke(faststream_app, [ - "publish", - "fastream:app", - "hello world", - "--correlation_id", "someId", - "--raise_timeout", "True", - ]) + result = runner.invoke( + faststream_app, + [ + "publish", + "fastream:app", + "hello world", + "--correlation_id", + "someId", + "--raise_timeout", + "True", + ], + ) assert result.exit_code == 0 mock_app.broker._producer.publish.assert_awaited_once_with( From 47f7e86be286bcf79c987d9044d1677555b5f1ba Mon Sep 17 00:00:00 2001 From: Mario Rivera Date: Thu, 7 Mar 2024 16:01:57 -0500 Subject: [PATCH 10/10] Adding pytest.mark.asyncio to tests since they call the async publish function. --- tests/cli/test_publish.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/tests/cli/test_publish.py b/tests/cli/test_publish.py index f518f45145..34d41df64b 100644 --- a/tests/cli/test_publish.py +++ b/tests/cli/test_publish.py @@ -37,11 +37,10 @@ def mock_app(request): return app +@pytest.mark.asyncio @pytest.mark.parametrize("mock_app", [{"broker_type": RedisBroker, "producer_type": RedisFastProducer}], indirect=True) def test_publish_command_with_redis_options(mock_app): with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)): - mock_app.broker.connect() - result = runner.invoke(faststream_app, [ "publish", "fastream:app", @@ -64,11 +63,10 @@ def test_publish_command_with_redis_options(mock_app): rpc=False, ) - @pytest.mark.parametrize("mock_app", [{"broker_type": ConfluentBroker, "producer_type": AsyncConfluentFastProducer}], indirect=True) +@pytest.mark.asyncio def test_publish_command_with_confluent_options(mock_app): with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)): - mock_app.broker.connect() result = runner.invoke(faststream_app, [ "publish", "fastream:app", @@ -85,11 +83,10 @@ def test_publish_command_with_confluent_options(mock_app): rpc=False, ) - @pytest.mark.parametrize("mock_app", [{"broker_type": KafkaBroker, "producer_type": AioKafkaFastProducer}], indirect=True) +@pytest.mark.asyncio def test_publish_command_with_kafka_options(mock_app): with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)): - mock_app.broker.connect() result = runner.invoke(faststream_app, [ "publish", "fastream:app", @@ -106,11 +103,10 @@ def test_publish_command_with_kafka_options(mock_app): rpc=False, ) - @pytest.mark.parametrize("mock_app", [{"broker_type": NatsBroker, "producer_type": NatsFastProducer}], indirect=True) +@pytest.mark.asyncio def test_publish_command_with_nats_options(mock_app): with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)): - mock_app.broker.connect() result = runner.invoke(faststream_app, [ "publish", "fastream:app", @@ -131,9 +127,9 @@ def test_publish_command_with_nats_options(mock_app): @pytest.mark.parametrize("mock_app", [{"broker_type": RabbitBroker, "producer_type": AioPikaFastProducer}], indirect=True) +@pytest.mark.asyncio def test_publish_command_with_rabbit_options(mock_app): with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)): - mock_app.broker.connect() result = runner.invoke(faststream_app, [ "publish", "fastream:app",