Skip to content

Commit

Permalink
chore: merge 0.6
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Nov 26, 2024
1 parent 157151b commit 617df43
Show file tree
Hide file tree
Showing 310 changed files with 4,385 additions and 4,669 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.kafka.subscriber.asyncapi.AsyncAPIConcurrentDefaultSubscriber
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.kafka.subscriber.usecase.ConcurrentDefaultSubscriber
24 changes: 24 additions & 0 deletions docs/docs/en/getting-started/acknowlegment.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Acknowledgment

Since unexpected errors may occur during message processing, **FastStream** has an `ack_policy` parameter.

`AckPolicy` have 4 variants:

- `ACK` means that the message will be acked anyway.

- `NACK_ON_ERROR` means that the message will be nacked if an error occurs during processing and consumer will receive this message one more time.

- `REJECT_ON_ERROR` means that the message will be rejected if an error occurs during processing and consumer will not receive this message again.

- `DO_NOTHING` in this case *FastStream* will do nothing with the message. You must ack/nack/reject the message manually.


You must provide this parameter when initializing the subscriber.

```python linenums="1" hl_lines="5" title="main.py"
from faststream import AckPolicy
from faststream.nats import NatsBroker

broker = NatsBroker()
@broker.subscriber(ack_policy=AckPolicy.REJECT_ON_ERROR)
```
8 changes: 4 additions & 4 deletions docs/docs/en/getting-started/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,9 @@ app = FastStream(broker, logger=logger)
And the job is done! Now you have a perfectly structured logs using **Structlog**.

```{.shell .no-copy}
TIMESPAMP [info ] FastStream app starting... extra={}
TIMESPAMP [debug ] `Handler` waiting for messages extra={'topic': 'topic', 'group_id': 'group', 'message_id': ''}
TIMESPAMP [debug ] `Handler` waiting for messages extra={'topic': 'topic', 'group_id': 'group2', 'message_id': ''}
TIMESPAMP [info ] FastStream app started successfully! To exit, press CTRL+C extra={'topic': '', 'group_id': '', 'message_id': ''}
TIMESTAMP [info ] FastStream app starting... extra={}
TIMESTAMP [debug ] `Handler` waiting for messages extra={'topic': 'topic', 'group_id': 'group', 'message_id': ''}
TIMESTAMP [debug ] `Handler` waiting for messages extra={'topic': 'topic', 'group_id': 'group2', 'message_id': ''}
TIMESTAMP [info ] FastStream app started successfully! To exit, press CTRL+C extra={'topic': '', 'group_id': '', 'message_id': ''}
```
{ data-search-exclude }
23 changes: 0 additions & 23 deletions docs/docs/en/nats/jetstream/ack.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,6 @@ In most cases, **FastStream** automatically acknowledges (*acks*) messages on yo

However, there are situations where you might want to use different acknowledgement logic.

## Retries

If you prefer to use a *nack* instead of a *reject* when there's an error in message processing, you can specify the `retry` flag in the `#!python @broker.subscriber(...)` method, which is responsible for error handling logic.

By default, this flag is set to `False`, indicating that if an error occurs during message processing, the message can still be retrieved from the queue:

```python
@broker.subscriber("test", retry=False) # don't handle exceptions
async def base_handler(body: str):
...
```

If this flag is set to `True`, the message will be *nack*ed and placed back in the queue each time an error occurs. In this scenario, the message can be processed by another consumer (if there are several of them) or by the same one:

```python
@broker.subscriber("test", retry=True) # try again indefinitely
async def base_handler(body: str):
...
```

!!! tip
For more complex error handling cases, you can use [tenacity](https://tenacity.readthedocs.io/en/latest/){.external-link target="_blank"}

## Manual Acknowledgement

If you want to acknowledge a message manually, you can get access directly to the message object via the [Context](../../getting-started/context/existed.md){.internal-link} and call the method.
Expand Down
38 changes: 0 additions & 38 deletions docs/docs/en/rabbit/ack.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,6 @@ In most cases, **FastStream** automatically acknowledges (*acks*) messages on yo

However, there are situations where you might want to use a different acknowledgement logic.

## Retries

If you prefer to use a *nack* instead of a *reject* when there's an error in message processing, you can specify the `retry` flag in the `#!python @broker.subscriber(...)` method, which is responsible for error handling logic.

By default, this flag is set to `False`, indicating that if an error occurs during message processing, the message can still be retrieved from the queue:

```python
@broker.subscriber("test", retry=False) # don't handle exceptions
async def base_handler(body: str):
...
```

If this flag is set to `True`, the message will be *nack*ed and placed back in the queue each time an error occurs. In this scenario, the message can be processed by another consumer (if there are several of them) or by the same one:

```python
@broker.subscriber("test", retry=True) # try again indefinitely
async def base_handler(body: str):
...
```

If the `retry` flag is set to an `int`, the message will be placed back in the queue, and the number of retries will be limited to this number:

```python
@broker.subscriber("test", retry=3) # make up to 3 attempts
async def base_handler(body: str):
...
```

!!! tip
**FastStream** identifies the message by its `message_id`. To make this option work, you should manually set this field on the producer side (if your library doesn't set it automatically).

!!! bug
At the moment, attempts are counted only by the current consumer. If the message goes to another consumer, it will have its own counter.
Subsequently, this logic will be reworked.

!!! tip
For more complex error handling cases, you can use [tenacity](https://tenacity.readthedocs.io/en/latest/){.external-link target="_blank"}

## Manual acknowledgement

If you want to acknowledge a message manually, you can get access directly to the message object via the [Context](../getting-started/context/existed.md){.internal-link} and call the method.
Expand Down
4 changes: 2 additions & 2 deletions docs/docs_src/confluent/ack/errors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from faststream import FastStream
from faststream import FastStream, AckPolicy
from faststream.exceptions import AckMessage
from faststream.confluent import KafkaBroker

Expand All @@ -7,7 +7,7 @@


@broker.subscriber(
"test-error-topic", group_id="test-error-group", auto_commit=False, auto_offset_reset="earliest"
"test-error-topic", group_id="test-error-group", ack_policy=AckPolicy.REJECT_ON_ERROR, auto_offset_reset="earliest"
)
async def handle(body):
smth_processing(body)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from faststream import FastStream
from faststream.specification.asyncapi import AsyncAPI
from faststream.specification.schema.license import License
from faststream.specification.schema.contact import Contact
from faststream.specification import License, Contact
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")
Expand Down
4 changes: 2 additions & 2 deletions docs/docs_src/kafka/ack/errors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from faststream import FastStream
from faststream import FastStream, AckPolicy
from faststream.exceptions import AckMessage
from faststream.kafka import KafkaBroker

Expand All @@ -7,7 +7,7 @@


@broker.subscriber(
"test-topic", group_id="test-group", auto_commit=False
"test-topic", group_id="test-group", ack_policy=AckPolicy.REJECT_ON_ERROR,
)
async def handle(body):
smth_processing(body)
Expand Down
5 changes: 2 additions & 3 deletions examples/kafka/ack_after_process.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
from faststream import FastStream, Logger
from faststream import FastStream, Logger, AckPolicy
from faststream.kafka import KafkaBroker

broker = KafkaBroker()
app = FastStream(broker)


@broker.subscriber(
"test",
group_id="group",
auto_commit=False,
ack_policy=AckPolicy.REJECT_ON_ERROR,
)
async def handler(msg: str, logger: Logger):
logger.info(msg)
Expand Down
38 changes: 18 additions & 20 deletions faststream/_internal/_compat.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import sys
import warnings
from collections import UserString
from collections.abc import Iterable, Mapping
from importlib.metadata import version as get_version
from importlib.util import find_spec
Expand All @@ -17,8 +18,6 @@

from faststream._internal.basic_types import AnyDict

PYDANTIC_V2 = PYDANTIC_VERSION.startswith("2.")

IS_WINDOWS = (
sys.platform == "win32" or sys.platform == "cygwin" or sys.platform == "msys"
)
Expand Down Expand Up @@ -76,24 +75,23 @@ def json_dumps(*a: Any, **kw: Any) -> bytes:


JsonSchemaValue = Mapping[str, Any]
major, minor, *_ = PYDANTIC_VERSION.split(".")
_PYDANTCI_MAJOR, _PYDANTIC_MINOR = int(major), int(minor)

PYDANTIC_V2 = _PYDANTCI_MAJOR >= 2

if PYDANTIC_V2:
if PYDANTIC_VERSION >= "2.4.0":
if _PYDANTIC_MINOR >= 4:
from pydantic.annotated_handlers import (
GetJsonSchemaHandler,
)
from pydantic_core.core_schema import (
with_info_plain_validator_function,
)
else:
if PYDANTIC_VERSION >= "2.10":
from pydantic.annotated_handlers import (
GetJsonSchemaHandler,
)
else:
from pydantic._internal._annotated_handlers import ( # type: ignore[no-redef]
GetJsonSchemaHandler,
)
from pydantic._internal._annotated_handlers import ( # type: ignore[no-redef]
GetJsonSchemaHandler,
)
from pydantic_core.core_schema import (
general_plain_validator_function as with_info_plain_validator_function,
)
Expand All @@ -112,7 +110,7 @@ def model_to_jsonable(
def dump_json(data: Any) -> bytes:
return json_dumps(model_to_jsonable(data))

def get_model_fields(model: type[BaseModel]) -> dict[str, Any]:
def get_model_fields(model: type[BaseModel]) -> AnyDict:
return model.model_fields

def model_to_json(model: BaseModel, **kwargs: Any) -> str:
Expand Down Expand Up @@ -142,7 +140,7 @@ def model_schema(model: type[BaseModel], **kwargs: Any) -> AnyDict:
def dump_json(data: Any) -> bytes:
return json_dumps(data, default=pydantic_encoder)

def get_model_fields(model: type[BaseModel]) -> dict[str, Any]:
def get_model_fields(model: type[BaseModel]) -> AnyDict:
return model.__fields__ # type: ignore[return-value]

def model_to_json(model: BaseModel, **kwargs: Any) -> str:
Expand Down Expand Up @@ -175,19 +173,19 @@ def with_info_plain_validator_function( # type: ignore[misc]
return {}


anyio_major = int(get_version("anyio").split(".")[0])
ANYIO_V3 = anyio_major == 3
major, *_ = get_version("anyio").split(".")
_ANYIO_MAJOR = int(major)
ANYIO_V3 = _ANYIO_MAJOR == 3


if ANYIO_V3:
from anyio import ExceptionGroup # type: ignore[attr-defined]
elif sys.version_info < (3, 11):
elif sys.version_info >= (3, 11):
ExceptionGroup = ExceptionGroup # noqa: PLW0127
else:
from exceptiongroup import (
ExceptionGroup,
)
else:
ExceptionGroup = ExceptionGroup # noqa: PLW0127


try:
import email_validator
Expand All @@ -198,7 +196,7 @@ def with_info_plain_validator_function( # type: ignore[misc]
except ImportError: # pragma: no cover
# NOTE: EmailStr mock was copied from the FastAPI
# https://github.com/tiangolo/fastapi/blob/master/fastapi/openapi/models.py#24
class EmailStr(str): # type: ignore[no-redef]
class EmailStr(UserString): # type: ignore[no-redef]
"""EmailStr is a string that should be an email.
Note: EmailStr mock was copied from the FastAPI:
Expand Down
2 changes: 1 addition & 1 deletion faststream/_internal/basic_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
class StandardDataclass(Protocol):
"""Protocol to check type is dataclass."""

__dataclass_fields__: ClassVar[dict[str, Any]]
__dataclass_fields__: ClassVar[AnyDict]


BaseSendableMessage: TypeAlias = Union[
Expand Down
2 changes: 0 additions & 2 deletions faststream/_internal/broker/abc_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,8 @@ def publisher(
is_running: bool = False,
) -> "PublisherProto[MsgType]":
publisher.add_prefix(self.prefix)

if not is_running:
self._publishers.append(publisher)

return publisher

def setup_publisher(
Expand Down
6 changes: 4 additions & 2 deletions faststream/_internal/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
MsgType,
)
from faststream._internal.utils.functions import to_async
from faststream.specification.proto import ServerSpecification

from .abc_broker import ABCBroker
from .pub_base import BrokerPublishMixin
Expand All @@ -51,12 +52,13 @@
PublisherProto,
)
from faststream.security import BaseSecurity
from faststream.specification.schema.tag import Tag, TagDict
from faststream.specification.schema.extra import Tag, TagDict


class BrokerUsecase(
ABCBroker[MsgType],
SetupAble,
ServerSpecification,
BrokerPublishMixin[MsgType],
Generic[MsgType, ConnectionType],
):
Expand Down Expand Up @@ -121,7 +123,7 @@ def __init__(
Doc("AsyncAPI server description."),
],
tags: Annotated[
Optional[Iterable[Union["Tag", "TagDict"]]],
Iterable[Union["Tag", "TagDict"]],
Doc("AsyncAPI server tags."),
],
specification_url: Annotated[
Expand Down
6 changes: 3 additions & 3 deletions faststream/_internal/broker/pub_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async def _basic_publish(
publish = producer.publish
context = self.context # caches property

for m in self.middlewares:
for m in self.middlewares[::-1]:
publish = partial(m(None, context=context).publish_scope, publish)

return await publish(cmd)
Expand All @@ -58,7 +58,7 @@ async def _basic_publish_batch(
publish = producer.publish_batch
context = self.context # caches property

for m in self.middlewares:
for m in self.middlewares[::-1]:
publish = partial(m(None, context=context).publish_scope, publish)

return await publish(cmd)
Expand All @@ -82,7 +82,7 @@ async def _basic_request(
request = producer.request
context = self.context # caches property

for m in self.middlewares:
for m in self.middlewares[::-1]:
request = partial(m(None, context=context).publish_scope, request)

published_msg = await request(cmd)
Expand Down
8 changes: 6 additions & 2 deletions faststream/_internal/cli/docs/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@
from faststream._internal.cli.utils.imports import import_from_string
from faststream.exceptions import INSTALL_WATCHFILES, INSTALL_YAML, SCHEMA_NOT_SUPPORTED
from faststream.specification.asyncapi.site import serve_app
from faststream.specification.asyncapi.v2_6_0.schema import Schema as SchemaV2_6
from faststream.specification.asyncapi.v3_0_0.schema import Schema as SchemaV3
from faststream.specification.asyncapi.v2_6_0.schema import (
ApplicationSchema as SchemaV2_6,
)
from faststream.specification.asyncapi.v3_0_0.schema import (
ApplicationSchema as SchemaV3,
)
from faststream.specification.base.specification import Specification

if TYPE_CHECKING:
Expand Down
Loading

0 comments on commit 617df43

Please sign in to comment.