Skip to content

Commit

Permalink
Merge branch '0.6.0' of github.com:airtai/faststream into 0.6.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Dec 20, 2024
2 parents bc87327 + 0b49d76 commit f2bfb7c
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 8 deletions.
32 changes: 26 additions & 6 deletions faststream/_internal/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
fake_context,
to_async,
)
from faststream.exceptions import SetupError

if TYPE_CHECKING:
from fast_depends.library.serializer import SerializerProto
Expand Down Expand Up @@ -78,7 +79,7 @@ async def catch_startup_validation_error() -> AsyncIterator[None]:
class StartAbleApplication:
def __init__(
self,
broker: "BrokerUsecase[Any, Any]",
broker: Optional["BrokerUsecase[Any, Any]"] = None,
/,
provider: Optional["Provider"] = None,
serializer: Optional["SerializerProto"] = EMPTY,
Expand All @@ -91,7 +92,7 @@ def __init__(

def _init_setupable_( # noqa: PLW3201
self,
broker: "BrokerUsecase[Any, Any]",
broker: Optional["BrokerUsecase[Any, Any]"] = None,
/,
provider: Optional["Provider"] = None,
serializer: Optional["SerializerProto"] = EMPTY,
Expand All @@ -115,21 +116,39 @@ def _init_setupable_( # noqa: PLW3201
)
)

self.broker = broker
self.brokers = [broker] if broker else []

self._setup()

def _setup(self) -> None:
self.broker._setup(OuterBrokerState(di_state=self._state.di_state))
for broker in self.brokers:
broker._setup(OuterBrokerState(di_state=self._state.di_state))

async def _start_broker(self) -> None:
assert self.broker, "You should setup a broker"
await self.broker.start()

@property
def broker(self) -> Optional["BrokerUsecase[Any, Any]"]:
return self.brokers[0] if self.brokers else None

def set_broker(self, broker: "BrokerUsecase[Any, Any]") -> None:
"""Set already existed App object broker.
Useful then you create/init broker in `on_startup` hook.
"""
if self.brokers:
msg = f"`{self}` already has a broker. You can't use multiple brokers until 1.0.0 release."
raise SetupError(msg)

self.brokers.append(broker)
self._setup()


class Application(StartAbleApplication):
def __init__(
self,
broker: "BrokerUsecase[Any, Any]",
broker: Optional["BrokerUsecase[Any, Any]"] = None,
/,
logger: Optional["LoggerProto"] = logger,
provider: Optional["Provider"] = None,
Expand Down Expand Up @@ -253,7 +272,8 @@ async def _shutdown(self, log_level: int = logging.INFO) -> None:
async def stop(self) -> None:
"""Executes shutdown hooks and stop broker."""
async with self._shutdown_hooks_context():
await self.broker.close()
for broker in self.brokers:
await broker.close()

@asynccontextmanager
async def _shutdown_hooks_context(self) -> AsyncIterator[None]:
Expand Down
3 changes: 2 additions & 1 deletion faststream/_internal/cli/utils/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,5 @@ def set_log_level(level: int, app: "Application") -> None:
if app.logger and getattr(app.logger, "setLevel", None):
app.logger.setLevel(level) # type: ignore[attr-defined]

app.broker._state.get().logger_state.set_level(level)
for broker in app.brokers:
broker._state.get().logger_state.set_level(level)
2 changes: 1 addition & 1 deletion faststream/asgi/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class AsgiFastStream(Application):

def __init__(
self,
broker: "BrokerUsecase[Any, Any]",
broker: Optional["BrokerUsecase[Any, Any]"] = None,
/,
asgi_routes: Sequence[tuple[str, "ASGIApp"]] = (),
# regular broker args
Expand Down
Empty file added tests/application/__init__.py
Empty file.
38 changes: 38 additions & 0 deletions tests/application/test_delayed_broker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import pytest

from faststream._internal.application import StartAbleApplication
from faststream.exceptions import SetupError
from faststream.rabbit import RabbitBroker


def test_set_broker() -> None:
app = StartAbleApplication()

assert app.broker is None

broker = RabbitBroker()
app.set_broker(broker)

assert app.broker is broker


def test_set_more_than_once_broker() -> None:
app = StartAbleApplication()
broker_1 = RabbitBroker()
broker_2 = RabbitBroker()

app.set_broker(broker_1)

with pytest.raises(
SetupError,
match=f"`{app}` already has a broker. You can't use multiple brokers until 1.0.0 release.",
):
app.set_broker(broker_2)


@pytest.mark.asyncio()
async def test_start_not_setup_broker() -> None:
app = StartAbleApplication()

with pytest.raises(AssertionError, match="You should setup a broker"):
await app._start_broker()

0 comments on commit f2bfb7c

Please sign in to comment.