Skip to content

Commit

Permalink
1129 - Create a publish command for the CLI (#1151)
Browse files Browse the repository at this point in the history
* Add publish cli command

* Add test file for testing the publish command with different brokers

* Formatting changes

* Check and connect the broker before publishing

* polishing

* polishing

* polishing

* polishing

* polishing

* Adding pytest.mark.asyncio to tests since they call the async publish function.

---------

Co-authored-by: Davor Runje <[email protected]>
  • Loading branch information
MRLab12 and davorrunje authored Mar 11, 2024
1 parent 396bcb0 commit d7fef0c
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 1 deletion.
2 changes: 2 additions & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@ search:
- [serve](api/faststream/cli/docs/app/serve.md)
- main
- [main](api/faststream/cli/main/main.md)
- [publish](api/faststream/cli/main/publish.md)
- [publish_message](api/faststream/cli/main/publish_message.md)
- [run](api/faststream/cli/main/run.md)
- [version_callback](api/faststream/cli/main/version_callback.md)
- supervisors
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/cli/main/publish.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.cli.main.publish
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/cli/main/publish_message.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.cli.main.publish_message
62 changes: 61 additions & 1 deletion faststream/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
import sys
import warnings
from contextlib import suppress
from typing import Dict, List, Optional
from typing import Any, Dict, List, Optional

import anyio
import typer
from click.exceptions import MissingParameter
from pydantic import ValidationError
from typer.core import TyperOption

from faststream import FastStream
from faststream.__about__ import INSTALL_WATCHFILES, __version__
from faststream.cli.docs.app import docs_app
from faststream.cli.utils.imports import import_from_string
Expand Down Expand Up @@ -200,3 +201,62 @@ def _run(
ex.show()

sys.exit(1)


@cli.command(
context_settings={"allow_extra_args": True, "ignore_unknown_options": True}
)
def publish(
ctx: typer.Context,
app: str = typer.Argument(..., help="FastStream app instance, e.g., main:app"),
message: str = typer.Argument(..., help="Message to be published"),
rpc: bool = typer.Option(False, help="Enable RPC mode and system output"),
) -> None:
"""Publish a message using the specified broker in a FastStream application.
This command publishes a message to a broker configured in a FastStream app instance.
It supports various brokers and can handle extra arguments specific to each broker type.
Args:
ctx (typer.Context): The Typer context for the command.
app (str): The FastStream application instance path, in the format 'module:instance'.
message (str): The message to be published.
rpc (bool): If True, enables RPC mode and displays system output.
The command allows extra CLI arguments to be passed, which are broker-specific.
These are parsed and passed to the broker's publish method.
"""
app, extra = parse_cli_args(app, *ctx.args)
extra["message"] = message
extra["rpc"] = rpc

try:
if not app:
raise ValueError("App parameter is required.")
if not message:
raise ValueError("Message parameter is required.")

_, app_obj = import_from_string(app)
if not app_obj.broker:
raise ValueError("Broker instance not found in the app.")

result = anyio.run(publish_message, app_obj, extra)

if rpc:
typer.echo(result)

except Exception as e:
typer.echo(f"Publish error: {e}")
sys.exit(1)


async def publish_message(app_obj: FastStream, extra: Any) -> Any:
try:
if await app_obj.broker.connect(): # type: ignore[union-attr]
result = await app_obj.broker.publish(**extra) # type: ignore[union-attr]
return result
else:
raise ValueError("Failed to connect to the broker.")
except Exception as e:
typer.echo(f"Error when broker was publishing: {e}")
sys.exit(1)
147 changes: 147 additions & 0 deletions tests/cli/test_publish.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
from unittest.mock import AsyncMock, patch

import pytest
from typer.testing import CliRunner

from faststream import FastStream
from faststream.cli.main import cli as faststream_app
from faststream.confluent import KafkaBroker as ConfluentBroker
from faststream.confluent.producer import AsyncConfluentFastProducer
from faststream.kafka import KafkaBroker
from faststream.kafka.producer import AioKafkaFastProducer
from faststream.nats import NatsBroker
from faststream.nats.producer import NatsFastProducer
from faststream.rabbit import RabbitBroker
from faststream.rabbit.producer import AioPikaFastProducer
from faststream.redis import RedisBroker
from faststream.redis.producer import RedisFastProducer

# Initialize the CLI runner
runner = CliRunner()


@pytest.fixture()
def mock_app(request):
app = FastStream()
broker_type = request.param["broker_type"]
producer_type = request.param["producer_type"]

broker = broker_type()
broker.connect = AsyncMock()

mock_producer = AsyncMock(spec=producer_type)
mock_producer.publish = AsyncMock()
broker._producer = mock_producer

app.broker = broker
return app


@pytest.mark.asyncio
@pytest.mark.parametrize("mock_app", [{"broker_type": RedisBroker, "producer_type": RedisFastProducer}], indirect=True)
def test_publish_command_with_redis_options(mock_app):
with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)):
result = runner.invoke(faststream_app, [
"publish",
"fastream:app",
"hello world",
"--channel", "test channel",
"--reply_to", "tester",
"--list", "0.1",
"--stream", "stream url",
"--correlation_id", "someId",
])

assert result.exit_code == 0
mock_app.broker._producer.publish.assert_awaited_once_with(
message="hello world",
channel="test channel",
reply_to="tester",
list="0.1",
stream="stream url",
correlation_id="someId",
rpc=False,
)

@pytest.mark.parametrize("mock_app", [{"broker_type": ConfluentBroker, "producer_type": AsyncConfluentFastProducer}], indirect=True)
@pytest.mark.asyncio
def test_publish_command_with_confluent_options(mock_app):
with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)):
result = runner.invoke(faststream_app, [
"publish",
"fastream:app",
"hello world",
"--topic", "confluent topic",
"--correlation_id", "someId",
])

assert result.exit_code == 0
mock_app.broker._producer.publish.assert_awaited_once_with(
message="hello world",
topic="confluent topic",
correlation_id="someId",
rpc=False,
)

@pytest.mark.parametrize("mock_app", [{"broker_type": KafkaBroker, "producer_type": AioKafkaFastProducer}], indirect=True)
@pytest.mark.asyncio
def test_publish_command_with_kafka_options(mock_app):
with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)):
result = runner.invoke(faststream_app, [
"publish",
"fastream:app",
"hello world",
"--topic", "kafka topic",
"--correlation_id", "someId",
])

assert result.exit_code == 0
mock_app.broker._producer.publish.assert_awaited_once_with(
message="hello world",
topic="kafka topic",
correlation_id="someId",
rpc=False,
)

@pytest.mark.parametrize("mock_app", [{"broker_type": NatsBroker, "producer_type": NatsFastProducer}], indirect=True)
@pytest.mark.asyncio
def test_publish_command_with_nats_options(mock_app):
with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)):
result = runner.invoke(faststream_app, [
"publish",
"fastream:app",
"hello world",
"--subject", "nats subject",
"--reply_to", "tester",
"--correlation_id", "someId",
])

assert result.exit_code == 0
mock_app.broker._producer.publish.assert_awaited_once_with(
message="hello world",
subject="nats subject",
reply_to="tester",
correlation_id="someId",
rpc=False,
)


@pytest.mark.parametrize("mock_app", [{"broker_type": RabbitBroker, "producer_type": AioPikaFastProducer}], indirect=True)
@pytest.mark.asyncio
def test_publish_command_with_rabbit_options(mock_app):
with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)):
result = runner.invoke(faststream_app, [
"publish",
"fastream:app",
"hello world",
"--correlation_id", "someId",
"--raise_timeout", "True",
])

assert result.exit_code == 0
mock_app.broker._producer.publish.assert_awaited_once_with(
message="hello world",
correlation_id="someId",
raise_timeout="True",
rpc=False,
)

0 comments on commit d7fef0c

Please sign in to comment.