Skip to content

Commit

Permalink
refactor: mv process_msg broker.utils
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Sep 8, 2024
1 parent be854e2 commit fd952c8
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 77 deletions.
3 changes: 1 addition & 2 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -414,14 +414,13 @@ search:
- [SubscriberProto](api/faststream/broker/subscriber/proto/SubscriberProto.md)
- usecase
- [SubscriberUsecase](api/faststream/broker/subscriber/usecase/SubscriberUsecase.md)
- utils
- [process_msg](api/faststream/broker/subscriber/utils/process_msg.md)
- types
- [PublisherMiddleware](api/faststream/broker/types/PublisherMiddleware.md)
- utils
- [MultiLock](api/faststream/broker/utils/MultiLock.md)
- [default_filter](api/faststream/broker/utils/default_filter.md)
- [get_watcher_context](api/faststream/broker/utils/get_watcher_context.md)
- [process_msg](api/faststream/broker/utils/process_msg.md)
- [resolve_custom_func](api/faststream/broker/utils/resolve_custom_func.md)
- wrapper
- call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ search:
boost: 0.5
---

::: faststream.broker.subscriber.utils.process_msg
::: faststream.broker.utils.process_msg
66 changes: 0 additions & 66 deletions faststream/broker/subscriber/utils.py

This file was deleted.

55 changes: 52 additions & 3 deletions faststream/broker/utils.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,85 @@
import asyncio
import inspect
from contextlib import suppress
from contextlib import AsyncExitStack, suppress
from functools import partial
from typing import (
TYPE_CHECKING,
Any,
AsyncContextManager,
Awaitable,
Callable,
Iterable,
Optional,
Type,
Union,
cast,
)

import anyio
from typing_extensions import Self
from typing_extensions import Literal, Self, overload

from faststream.broker.acknowledgement_watcher import WatcherContext, get_watcher
from faststream.utils.functions import fake_context, to_async
from faststream.broker.types import MsgType
from faststream.utils.functions import fake_context, return_input, to_async

if TYPE_CHECKING:
from types import TracebackType

from faststream.broker.message import StreamMessage
from faststream.broker.types import (
AsyncCallable,
BrokerMiddleware,
CustomCallable,
SyncCallable,
)
from faststream.types import LoggerProto


@overload
async def process_msg(
msg: Literal[None],
middlewares: Iterable["BrokerMiddleware[MsgType]"],
parser: Callable[[MsgType], Awaitable["StreamMessage[MsgType]"]],
decoder: Callable[["StreamMessage[MsgType]"], "Any"],
) -> None: ...


@overload
async def process_msg(
msg: MsgType,
middlewares: Iterable["BrokerMiddleware[MsgType]"],
parser: Callable[[MsgType], Awaitable["StreamMessage[MsgType]"]],
decoder: Callable[["StreamMessage[MsgType]"], "Any"],
) -> "StreamMessage[MsgType]": ...


async def process_msg(
msg: Optional[MsgType],
middlewares: Iterable["BrokerMiddleware[MsgType]"],
parser: Callable[[MsgType], Awaitable["StreamMessage[MsgType]"]],
decoder: Callable[["StreamMessage[MsgType]"], "Any"],
) -> Optional["StreamMessage[MsgType]"]:
if msg is None:
return None

async with AsyncExitStack() as stack:
return_msg: Callable[
[StreamMessage[MsgType]],
Awaitable[StreamMessage[MsgType]],
] = return_input

for m in middlewares:
mid = m(msg)
await stack.enter_async_context(mid)
return_msg = partial(mid.consume_scope, return_msg)

parsed_msg = await parser(msg)
parsed_msg._decoded_body = await decoder(parsed_msg)
return await return_msg(parsed_msg)

raise AssertionError("unreachable")


async def default_filter(msg: "StreamMessage[Any]") -> bool:
"""A function to filter stream messages."""
return not msg.processed
Expand Down
2 changes: 1 addition & 1 deletion faststream/confluent/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

from faststream.broker.publisher.fake import FakePublisher
from faststream.broker.subscriber.usecase import SubscriberUsecase
from faststream.broker.subscriber.utils import process_msg
from faststream.broker.types import MsgType
from faststream.broker.utils import process_msg
from faststream.confluent.parser import AsyncConfluentParser
from faststream.confluent.schemas import TopicPartition

Expand Down
2 changes: 1 addition & 1 deletion faststream/kafka/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@

from faststream.broker.publisher.fake import FakePublisher
from faststream.broker.subscriber.usecase import SubscriberUsecase
from faststream.broker.subscriber.utils import process_msg
from faststream.broker.types import (
AsyncCallable,
BrokerMiddleware,
CustomCallable,
MsgType,
)
from faststream.broker.utils import process_msg
from faststream.kafka.message import KafkaAckableMessage, KafkaMessage
from faststream.kafka.parser import AioKafkaBatchParser, AioKafkaParser
from faststream.utils.path import compile_path
Expand Down
2 changes: 1 addition & 1 deletion faststream/nats/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@

from faststream.broker.publisher.fake import FakePublisher
from faststream.broker.subscriber.usecase import SubscriberUsecase
from faststream.broker.subscriber.utils import process_msg
from faststream.broker.types import MsgType
from faststream.broker.utils import process_msg
from faststream.exceptions import NOT_CONNECTED_YET
from faststream.nats.parser import (
BatchParser,
Expand Down
2 changes: 1 addition & 1 deletion faststream/rabbit/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from faststream.broker.publisher.fake import FakePublisher
from faststream.broker.subscriber.usecase import SubscriberUsecase
from faststream.broker.subscriber.utils import process_msg
from faststream.broker.utils import process_msg
from faststream.exceptions import SetupError
from faststream.rabbit.helpers.declarer import RabbitDeclarer
from faststream.rabbit.parser import AioPikaParser
Expand Down
2 changes: 1 addition & 1 deletion faststream/redis/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from faststream.broker.publisher.fake import FakePublisher
from faststream.broker.subscriber.usecase import SubscriberUsecase
from faststream.broker.subscriber.utils import process_msg
from faststream.broker.utils import process_msg
from faststream.redis.message import (
BatchListMessage,
BatchStreamMessage,
Expand Down

0 comments on commit fd952c8

Please sign in to comment.