Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1129 - Create a publish command for the CLI #1151

Merged
merged 22 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
ee6b2e1
Add publish cli command
MRLab12 Jan 17, 2024
341456a
Add test file for testing the publish command with different brokers
MRLab12 Jan 17, 2024
2eea9ea
Formatting changes
MRLab12 Jan 17, 2024
94e50bb
Check and connect the broker before publishing
MRLab12 Jan 17, 2024
30dc51f
Merge branch 'main' into 1129-CLI-publish-command
davorrunje Jan 18, 2024
fd0164e
Merge branch 'main' into 1129-CLI-publish-command
davorrunje Jan 22, 2024
201e590
Merge branch 'main' into 1129-CLI-publish-command
MRLab12 Jan 22, 2024
3b6fdd2
Merge branch 'main' into 1129-CLI-publish-command
MRLab12 Jan 30, 2024
a469222
Merge branch 'main' into 1129-CLI-publish-command
davorrunje Feb 2, 2024
79c0ccc
polishing
davorrunje Feb 2, 2024
cbc5ea6
polishing
davorrunje Feb 2, 2024
d85c042
polishing
davorrunje Feb 2, 2024
1213104
polishing
davorrunje Feb 2, 2024
e93868c
Merge branch 'polishing' into 1129-CLI-publish-command
davorrunje Feb 2, 2024
ca93e98
polishing
davorrunje Feb 2, 2024
e10abc1
polishing
davorrunje Feb 2, 2024
42f82d9
Merge branch 'main' into 1129-CLI-publish-command
davorrunje Feb 3, 2024
0b05039
Merge branch 'main' into 1129-CLI-publish-command
MRLab12 Mar 5, 2024
47f7e86
Adding pytest.mark.asyncio to tests since they call the async publish…
MRLab12 Mar 7, 2024
f3024d3
Merge remote-tracking branch 'origin/1129-CLI-publish-command' into 1…
MRLab12 Mar 7, 2024
6072738
Merge branch 'main' into 1129-CLI-publish-command
MRLab12 Mar 7, 2024
1cc6452
Merge branch 'main' into 1129-CLI-publish-command
davorrunje Mar 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@ search:
- [serve](api/faststream/cli/docs/app/serve.md)
- main
- [main](api/faststream/cli/main/main.md)
- [publish](api/faststream/cli/main/publish.md)
- [publish_message](api/faststream/cli/main/publish_message.md)
- [run](api/faststream/cli/main/run.md)
- [version_callback](api/faststream/cli/main/version_callback.md)
- supervisors
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/cli/main/publish.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.cli.main.publish
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/cli/main/publish_message.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.cli.main.publish_message
62 changes: 61 additions & 1 deletion faststream/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
import sys
import warnings
from contextlib import suppress
from typing import Dict, List, Optional
from typing import Any, Dict, List, Optional

import anyio
import typer
from click.exceptions import MissingParameter
from pydantic import ValidationError
from typer.core import TyperOption

from faststream import FastStream
from faststream.__about__ import INSTALL_WATCHFILES, __version__
from faststream.cli.docs.app import docs_app
from faststream.cli.utils.imports import import_from_string
Expand Down Expand Up @@ -200,3 +201,62 @@ def _run(
ex.show()

sys.exit(1)


@cli.command(
context_settings={"allow_extra_args": True, "ignore_unknown_options": True}
)
def publish(
ctx: typer.Context,
app: str = typer.Argument(..., help="FastStream app instance, e.g., main:app"),
message: str = typer.Argument(..., help="Message to be published"),
rpc: bool = typer.Option(False, help="Enable RPC mode and system output"),
) -> None:
"""Publish a message using the specified broker in a FastStream application.

This command publishes a message to a broker configured in a FastStream app instance.
It supports various brokers and can handle extra arguments specific to each broker type.

Args:
ctx (typer.Context): The Typer context for the command.
app (str): The FastStream application instance path, in the format 'module:instance'.
message (str): The message to be published.
rpc (bool): If True, enables RPC mode and displays system output.

The command allows extra CLI arguments to be passed, which are broker-specific.
These are parsed and passed to the broker's publish method.
"""
app, extra = parse_cli_args(app, *ctx.args)
extra["message"] = message
extra["rpc"] = rpc

try:
if not app:
raise ValueError("App parameter is required.")
if not message:
raise ValueError("Message parameter is required.")

_, app_obj = import_from_string(app)
if not app_obj.broker:
raise ValueError("Broker instance not found in the app.")

result = anyio.run(publish_message, app_obj, extra)

if rpc:
typer.echo(result)

except Exception as e:
typer.echo(f"Publish error: {e}")
sys.exit(1)


async def publish_message(app_obj: FastStream, extra: Any) -> Any:
try:
if await app_obj.broker.connect(): # type: ignore[union-attr]
result = await app_obj.broker.publish(**extra) # type: ignore[union-attr]
return result
else:
raise ValueError("Failed to connect to the broker.")
except Exception as e:
typer.echo(f"Error when broker was publishing: {e}")
sys.exit(1)
147 changes: 147 additions & 0 deletions tests/cli/test_publish.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
from unittest.mock import AsyncMock, patch

import pytest
from typer.testing import CliRunner

from faststream import FastStream
from faststream.cli.main import cli as faststream_app
from faststream.confluent import KafkaBroker as ConfluentBroker
from faststream.confluent.producer import AsyncConfluentFastProducer
from faststream.kafka import KafkaBroker
from faststream.kafka.producer import AioKafkaFastProducer
from faststream.nats import NatsBroker
from faststream.nats.producer import NatsFastProducer
from faststream.rabbit import RabbitBroker
from faststream.rabbit.producer import AioPikaFastProducer
from faststream.redis import RedisBroker
from faststream.redis.producer import RedisFastProducer

# Initialize the CLI runner
runner = CliRunner()


@pytest.fixture()
def mock_app(request):
app = FastStream()
broker_type = request.param["broker_type"]
producer_type = request.param["producer_type"]

broker = broker_type()
broker.connect = AsyncMock()

mock_producer = AsyncMock(spec=producer_type)
mock_producer.publish = AsyncMock()
broker._producer = mock_producer

app.broker = broker
return app


@pytest.mark.asyncio
@pytest.mark.parametrize("mock_app", [{"broker_type": RedisBroker, "producer_type": RedisFastProducer}], indirect=True)
def test_publish_command_with_redis_options(mock_app):
with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)):
result = runner.invoke(faststream_app, [
"publish",
"fastream:app",
"hello world",
"--channel", "test channel",
"--reply_to", "tester",
"--list", "0.1",
"--stream", "stream url",
"--correlation_id", "someId",
])

assert result.exit_code == 0
mock_app.broker._producer.publish.assert_awaited_once_with(
message="hello world",
channel="test channel",
reply_to="tester",
list="0.1",
stream="stream url",
correlation_id="someId",
rpc=False,
)

@pytest.mark.parametrize("mock_app", [{"broker_type": ConfluentBroker, "producer_type": AsyncConfluentFastProducer}], indirect=True)
@pytest.mark.asyncio
def test_publish_command_with_confluent_options(mock_app):
with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)):
result = runner.invoke(faststream_app, [
"publish",
"fastream:app",
"hello world",
"--topic", "confluent topic",
"--correlation_id", "someId",
])

assert result.exit_code == 0
mock_app.broker._producer.publish.assert_awaited_once_with(
message="hello world",
topic="confluent topic",
correlation_id="someId",
rpc=False,
)

@pytest.mark.parametrize("mock_app", [{"broker_type": KafkaBroker, "producer_type": AioKafkaFastProducer}], indirect=True)
@pytest.mark.asyncio
def test_publish_command_with_kafka_options(mock_app):
with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)):
result = runner.invoke(faststream_app, [
"publish",
"fastream:app",
"hello world",
"--topic", "kafka topic",
"--correlation_id", "someId",
])

assert result.exit_code == 0
mock_app.broker._producer.publish.assert_awaited_once_with(
message="hello world",
topic="kafka topic",
correlation_id="someId",
rpc=False,
)

@pytest.mark.parametrize("mock_app", [{"broker_type": NatsBroker, "producer_type": NatsFastProducer}], indirect=True)
@pytest.mark.asyncio
def test_publish_command_with_nats_options(mock_app):
with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)):
result = runner.invoke(faststream_app, [
"publish",
"fastream:app",
"hello world",
"--subject", "nats subject",
"--reply_to", "tester",
"--correlation_id", "someId",
])

assert result.exit_code == 0
mock_app.broker._producer.publish.assert_awaited_once_with(
message="hello world",
subject="nats subject",
reply_to="tester",
correlation_id="someId",
rpc=False,
)


@pytest.mark.parametrize("mock_app", [{"broker_type": RabbitBroker, "producer_type": AioPikaFastProducer}], indirect=True)
@pytest.mark.asyncio
def test_publish_command_with_rabbit_options(mock_app):
with patch("faststream.cli.main.import_from_string", return_value=(None, mock_app)):
result = runner.invoke(faststream_app, [
"publish",
"fastream:app",
"hello world",
"--correlation_id", "someId",
"--raise_timeout", "True",
])

assert result.exit_code == 0
mock_app.broker._producer.publish.assert_awaited_once_with(
message="hello world",
correlation_id="someId",
raise_timeout="True",
rpc=False,
)