Skip to content

Commit

Permalink
Feat: add concurrent subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniil Dumchenko committed Dec 28, 2024
1 parent 3624405 commit a5e9b02
Show file tree
Hide file tree
Showing 45 changed files with 449 additions and 173 deletions.
4 changes: 2 additions & 2 deletions faststream/_internal/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,14 @@ def validate(cls, v: Any) -> str:
return str(v)

@classmethod
def _validate(cls, __input_value: Any, _: Any) -> str:
def _validate(cls, input_value: Any, _: Any) -> str:
warnings.warn(
"email-validator bot installed, email fields will be treated as str.\n"
"To install, run: pip install email-validator",
category=RuntimeWarning,
stacklevel=1,
)
return str(__input_value)
return str(input_value)

@classmethod
def __get_pydantic_json_schema__(
Expand Down
8 changes: 4 additions & 4 deletions faststream/_internal/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ def __init__(
Doc("Whether to use FastDepends or not."),
],
serializer: Optional["SerializerProto"] = EMPTY,
_get_dependant: Annotated[
get_dependant: Annotated[
Optional[Callable[..., Any]],
Doc("Custom library dependant generator callback."),
],
_call_decorators: Annotated[
call_decorators: Annotated[
Sequence["Decorator"],
Doc("Any custom decorator to apply to wrapped functions."),
],
Expand Down Expand Up @@ -142,8 +142,8 @@ def __init__(
state = InitialBrokerState(
di_state=DIState(
use_fastdepends=apply_types,
get_dependent=_get_dependant,
call_decorators=_call_decorators,
get_dependent=get_dependant,
call_decorators=call_decorators,
serializer=serializer,
provider=Provider(),
context=ContextRepo(),
Expand Down
2 changes: 1 addition & 1 deletion faststream/_internal/fastapi/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ async def solve_faststream_dependency(
**kwargs,
)
values, errors, background = (
solved_result.values, # noqa: PD011
solved_result.values,
solved_result.errors,
solved_result.background_tasks,
)
Expand Down
6 changes: 3 additions & 3 deletions faststream/_internal/fastapi/route.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ def wrap_callable_to_fastapi_compatible(
response_model_exclude_none: bool,
state: "DIState",
) -> Callable[["NativeMessage[Any]"], Awaitable[Any]]:
__magic_attr = "__faststream_consumer__"
magic_attr = "__faststream_consumer__"

if getattr(user_callable, __magic_attr, False):
if getattr(user_callable, magic_attr, False):
return user_callable # type: ignore[return-value]

if response_model:
Expand All @@ -105,7 +105,7 @@ def wrap_callable_to_fastapi_compatible(
state=state,
)

setattr(parsed_callable, __magic_attr, True)
setattr(parsed_callable, magic_attr, True)
return wraps(user_callable)(parsed_callable)


Expand Down
4 changes: 2 additions & 2 deletions faststream/_internal/publisher/fake.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ async def _publish(
self,
cmd: "PublishCommand",
*,
_extra_middlewares: Iterable["PublisherMiddleware"],
extra_middlewares: Iterable["PublisherMiddleware"],
) -> Any:
"""This method should be called in subscriber flow only."""
cmd = self.patch_command(cmd)

call: AsyncFunc = self._producer.publish
for m in _extra_middlewares:
for m in extra_middlewares:
call = partial(m, call)

return await call(cmd)
Expand Down
8 changes: 4 additions & 4 deletions faststream/_internal/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ async def _basic_publish(
self,
cmd: "PublishCommand",
*,
_extra_middlewares: Iterable["PublisherMiddleware"],
extra_middlewares: Iterable["PublisherMiddleware"],
) -> Any:
pub: Callable[..., Awaitable[Any]] = self._producer.publish

Expand All @@ -113,7 +113,7 @@ async def _basic_publish(
for pub_m in chain(
self.middlewares[::-1],
(
_extra_middlewares
extra_middlewares
or (
m(None, context=context).publish_scope
for m in self._broker_middlewares[::-1]
Expand Down Expand Up @@ -159,7 +159,7 @@ async def _basic_publish_batch(
self,
cmd: "PublishCommand",
*,
_extra_middlewares: Iterable["PublisherMiddleware"],
extra_middlewares: Iterable["PublisherMiddleware"],
) -> Any:
pub = self._producer.publish_batch

Expand All @@ -168,7 +168,7 @@ async def _basic_publish_batch(
for pub_m in chain(
self.middlewares[::-1],
(
_extra_middlewares
extra_middlewares
or (
m(None, context=context).publish_scope
for m in self._broker_middlewares[::-1]
Expand Down
8 changes: 4 additions & 4 deletions faststream/_internal/subscriber/call_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def _setup( # type: ignore[override]
decoder: "AsyncCallable",
state: "Pointer[BrokerState]",
broker_dependencies: Iterable["Dependant"],
_call_decorators: Iterable["Decorator"],
call_decorators: Iterable["Decorator"],
) -> None:
if self.dependant is None:
di_state = state.get().di_state
Expand All @@ -89,7 +89,7 @@ def _setup( # type: ignore[override]

dependant = self.handler.set_wrapped(
dependencies=dependencies,
_call_decorators=(*_call_decorators, *di_state.call_decorators),
_call_decorators=(*call_decorators, *di_state.call_decorators),
state=di_state,
)

Expand Down Expand Up @@ -148,12 +148,12 @@ async def call(
self,
/,
message: "StreamMessage[MsgType]",
_extra_middlewares: Iterable["SubscriberMiddleware[Any]"],
extra_middlewares: Iterable["SubscriberMiddleware[Any]"],
) -> Any:
"""Execute wrapped handler with consume middlewares."""
call: AsyncFuncAny = self.handler.call_wrapped

for middleware in chain(self.item_middlewares[::-1], _extra_middlewares):
for middleware in chain(self.item_middlewares[::-1], extra_middlewares):
call = partial(middleware, call)

try:
Expand Down
4 changes: 2 additions & 2 deletions faststream/_internal/subscriber/call_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ def set_wrapped(
self,
*,
dependencies: Sequence["Dependant"],
_call_decorators: Iterable["Decorator"],
call_decorators: Iterable["Decorator"],
state: "DIState",
) -> Optional["CallModel"]:
call = self._original_call
for decor in _call_decorators:
for decor in call_decorators:
call = decor(call)
self._original_call = call

Expand Down
8 changes: 4 additions & 4 deletions faststream/confluent/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,11 +331,11 @@ def __init__(
Doc("Whether to use FastDepends or not."),
] = True,
serializer: Optional["SerializerProto"] = EMPTY,
_get_dependant: Annotated[
get_dependant: Annotated[
Optional[Callable[..., Any]],
Doc("Custom library dependant generator callback."),
] = None,
_call_decorators: Annotated[
call_decorators: Annotated[
Iterable["Decorator"],
Doc("Any custom decorator to apply to wrapped functions."),
] = (),
Expand Down Expand Up @@ -399,8 +399,8 @@ def __init__(
log_fmt=log_fmt,
),
# FastDepends args
_get_dependant=_get_dependant,
_call_decorators=_call_decorators,
_get_dependant=get_dependant,
_call_decorators=call_decorators,
apply_types=apply_types,
serializer=serializer,
)
Expand Down
8 changes: 4 additions & 4 deletions faststream/confluent/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ async def _publish(
self,
cmd: Union["PublishCommand", "KafkaPublishCommand"],
*,
_extra_middlewares: Iterable["PublisherMiddleware"],
extra_middlewares: Iterable["PublisherMiddleware"],
) -> None:
"""This method should be called in subscriber flow only."""
cmd = KafkaPublishCommand.from_cmd(cmd)
Expand All @@ -148,7 +148,7 @@ async def _publish(
cmd.partition = cmd.partition or self.partition
cmd.key = cmd.key or self.key

await self._basic_publish(cmd, _extra_middlewares=_extra_middlewares)
await self._basic_publish(cmd, _extra_middlewares=extra_middlewares)

@override
async def request(
Expand Down Expand Up @@ -208,7 +208,7 @@ async def _publish(
self,
cmd: Union["PublishCommand", "KafkaPublishCommand"],
*,
_extra_middlewares: Iterable["PublisherMiddleware"],
extra_middlewares: Iterable["PublisherMiddleware"],
) -> None:
"""This method should be called in subscriber flow only."""
cmd = KafkaPublishCommand.from_cmd(cmd, batch=True)
Expand All @@ -219,4 +219,4 @@ async def _publish(

cmd.partition = cmd.partition or self.partition

await self._basic_publish_batch(cmd, _extra_middlewares=_extra_middlewares)
await self._basic_publish_batch(cmd, _extra_middlewares=extra_middlewares)
4 changes: 2 additions & 2 deletions faststream/confluent/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def __init__(
/,
*messages: "SendableMessage",
topic: str,
_publish_type: PublishType,
publish_type: PublishType,
key: Union[bytes, str, None] = None,
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
Expand All @@ -75,7 +75,7 @@ def __init__(
reply_to=reply_to,
correlation_id=correlation_id,
headers=headers,
_publish_type=_publish_type,
_publish_type=publish_type,
)
self.extra_bodies = messages

Expand Down
8 changes: 4 additions & 4 deletions faststream/kafka/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,11 +506,11 @@ def __init__(
Doc("Whether to use FastDepends or not."),
] = True,
serializer: Optional["SerializerProto"] = EMPTY,
_get_dependant: Annotated[
get_dependant: Annotated[
Optional[Callable[..., Any]],
Doc("Custom library dependant generator callback."),
] = None,
_call_decorators: Annotated[
call_decorators: Annotated[
Iterable["Decorator"],
Doc("Any custom decorator to apply to wrapped functions."),
] = (),
Expand Down Expand Up @@ -581,8 +581,8 @@ def __init__(
log_fmt=log_fmt,
),
# FastDepends args
_get_dependant=_get_dependant,
_call_decorators=_call_decorators,
_get_dependant=get_dependant,
_call_decorators=call_decorators,
apply_types=apply_types,
serializer=serializer,
)
Expand Down
8 changes: 4 additions & 4 deletions faststream/kafka/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ async def _publish(
self,
cmd: Union["PublishCommand", "KafkaPublishCommand"],
*,
_extra_middlewares: Iterable["PublisherMiddleware"],
extra_middlewares: Iterable["PublisherMiddleware"],
) -> None:
"""This method should be called in subscriber flow only."""
cmd = KafkaPublishCommand.from_cmd(cmd)
Expand All @@ -263,7 +263,7 @@ async def _publish(
cmd.partition = cmd.partition or self.partition
cmd.key = cmd.key or self.key

await self._basic_publish(cmd, _extra_middlewares=_extra_middlewares)
await self._basic_publish(cmd, _extra_middlewares=extra_middlewares)

@override
async def request(
Expand Down Expand Up @@ -423,7 +423,7 @@ async def _publish(
self,
cmd: Union["PublishCommand", "KafkaPublishCommand"],
*,
_extra_middlewares: Iterable["PublisherMiddleware"],
extra_middlewares: Iterable["PublisherMiddleware"],
) -> None:
"""This method should be called in subscriber flow only."""
cmd = KafkaPublishCommand.from_cmd(cmd, batch=True)
Expand All @@ -434,4 +434,4 @@ async def _publish(

cmd.partition = cmd.partition or self.partition

await self._basic_publish_batch(cmd, _extra_middlewares=_extra_middlewares)
await self._basic_publish_batch(cmd, _extra_middlewares=extra_middlewares)
4 changes: 2 additions & 2 deletions faststream/kafka/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __init__(
/,
*messages: "SendableMessage",
topic: str,
_publish_type: PublishType,
publish_type: PublishType,
key: Union[bytes, Any, None] = None,
partition: Optional[int] = None,
timestamp_ms: Optional[int] = None,
Expand All @@ -67,7 +67,7 @@ def __init__(
reply_to=reply_to,
correlation_id=correlation_id,
headers=headers,
_publish_type=_publish_type,
_publish_type=publish_type,
)
self.extra_bodies = messages

Expand Down
4 changes: 2 additions & 2 deletions faststream/middlewares/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
Callable[..., None],
Callable[..., Awaitable[None]],
]
PublishingExceptionHandler: TypeAlias = Callable[..., "Any"]
PublishingExceptionHandler: TypeAlias = Callable[..., Any]

CastedGeneralExceptionHandler: TypeAlias = Callable[..., Awaitable[None]]
CastedPublishingExceptionHandler: TypeAlias = Callable[..., Awaitable["Any"]]
CastedPublishingExceptionHandler: TypeAlias = Callable[..., Awaitable[Any]]
CastedHandlers: TypeAlias = list[
tuple[
type[Exception],
Expand Down
8 changes: 4 additions & 4 deletions faststream/nats/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,11 +427,11 @@ def __init__(
Doc("Whether to use FastDepends or not."),
] = True,
serializer: Optional["SerializerProto"] = EMPTY,
_get_dependant: Annotated[
get_dependant: Annotated[
Optional[Callable[..., Any]],
Doc("Custom library dependant generator callback."),
] = None,
_call_decorators: Annotated[
call_decorators: Annotated[
Iterable["Decorator"],
Doc("Any custom decorator to apply to wrapped functions."),
] = (),
Expand Down Expand Up @@ -507,8 +507,8 @@ def __init__(
# FastDepends args
apply_types=apply_types,
serializer=serializer,
_get_dependant=_get_dependant,
_call_decorators=_call_decorators,
_get_dependant=get_dependant,
_call_decorators=call_decorators,
)

self._state.patch_value(
Expand Down
4 changes: 2 additions & 2 deletions faststream/nats/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ async def _publish(
self,
cmd: Union["PublishCommand", "NatsPublishCommand"],
*,
_extra_middlewares: Iterable["PublisherMiddleware"],
extra_middlewares: Iterable["PublisherMiddleware"],
) -> None:
"""This method should be called in subscriber flow only."""
cmd = NatsPublishCommand.from_cmd(cmd)
Expand All @@ -138,7 +138,7 @@ async def _publish(
cmd.stream = self.stream.name
cmd.timeout = self.timeout

return await self._basic_publish(cmd, _extra_middlewares=_extra_middlewares)
return await self._basic_publish(cmd, _extra_middlewares=extra_middlewares)

@override
async def request(
Expand Down
4 changes: 2 additions & 2 deletions faststream/nats/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ def __init__(
reply_to: str = "",
stream: Optional[str] = None,
timeout: Optional[float] = None,
_publish_type: PublishType,
publish_type: PublishType,
) -> None:
super().__init__(
body=message,
destination=subject,
correlation_id=correlation_id,
headers=headers,
reply_to=reply_to,
_publish_type=_publish_type,
_publish_type=publish_type,
)

self.stream = stream
Expand Down
Loading

0 comments on commit a5e9b02

Please sign in to comment.