Skip to content

Commit

Permalink
Merge branch 'main' into opentelemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik authored May 9, 2024
2 parents b131a0c + 48c35f4 commit dd4e0bb
Show file tree
Hide file tree
Showing 14 changed files with 283 additions and 39 deletions.
2 changes: 2 additions & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ search:
- [KafkaRouter](public_api/faststream/kafka/KafkaRouter.md)
- [TestApp](public_api/faststream/kafka/TestApp.md)
- [TestKafkaBroker](public_api/faststream/kafka/TestKafkaBroker.md)
- [TopicPartition](public_api/faststream/kafka/TopicPartition.md)
- nats
- [AckPolicy](public_api/faststream/nats/AckPolicy.md)
- [ConsumerConfig](public_api/faststream/nats/ConsumerConfig.md)
Expand Down Expand Up @@ -504,6 +505,7 @@ search:
- [KafkaRouter](api/faststream/kafka/KafkaRouter.md)
- [TestApp](api/faststream/kafka/TestApp.md)
- [TestKafkaBroker](api/faststream/kafka/TestKafkaBroker.md)
- [TopicPartition](api/faststream/kafka/TopicPartition.md)
- broker
- [KafkaBroker](api/faststream/kafka/broker/KafkaBroker.md)
- broker
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/kafka/TopicPartition.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: aiokafka.structs.TopicPartition
2 changes: 1 addition & 1 deletion docs/docs/en/release.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ hide:
* fix (#1415): raise SetupError if rpc and reply_to are using in TestCL… by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1419](https://github.com/airtai/faststream/pull/1419){.external-link target="_blank"}
* Chore/update deps2 by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1418](https://github.com/airtai/faststream/pull/1418){.external-link target="_blank"}
* refactor: correct security with kwarg params merging by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1417](https://github.com/airtai/faststream/pull/1417){.external-link target="_blank"}
* fix (#1414): correct Messag.ack error processing by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1420](https://github.com/airtai/faststream/pull/1420){.external-link target="_blank"}
* fix (#1414): correct Message.ack error processing by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1420](https://github.com/airtai/faststream/pull/1420){.external-link target="_blank"}

**Full Changelog**: [#0.5.3...0.5.4](https://github.com/airtai/faststream/compare/0.5.3...0.5.4){.external-link target="_blank"}

Expand Down
18 changes: 3 additions & 15 deletions faststream/broker/acknowledgement_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,7 @@ async def __ack(self) -> None:
await self.message.ack(**self.extra_options)
except Exception as er:
if self.logger is not None:
self.logger.log(
logging.ERROR,
er,
exc_info=er
)
self.logger.log(logging.ERROR, er, exc_info=er)
else:
self.watcher.remove(self.message.message_id)

Expand All @@ -191,22 +187,14 @@ async def __nack(self) -> None:
await self.message.nack(**self.extra_options)
except Exception as er:
if self.logger is not None:
self.logger.log(
logging.ERROR,
er,
exc_info=er
)
self.logger.log(logging.ERROR, er, exc_info=er)

async def __reject(self) -> None:
try:
await self.message.reject(**self.extra_options)
except Exception as er:
if self.logger is not None:
self.logger.log(
logging.ERROR,
er,
exc_info=er
)
self.logger.log(logging.ERROR, er, exc_info=er)
else:
self.watcher.remove(self.message.message_id)

Expand Down
3 changes: 3 additions & 0 deletions faststream/kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from aiokafka import TopicPartition

from faststream.kafka.annotations import KafkaMessage
from faststream.kafka.broker import KafkaBroker
from faststream.kafka.router import KafkaPublisher, KafkaRoute, KafkaRouter
Expand All @@ -12,4 +14,5 @@
"KafkaPublisher",
"TestKafkaBroker",
"TestApp",
"TopicPartition",
)
31 changes: 30 additions & 1 deletion faststream/kafka/broker/registrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from faststream.kafka.subscriber.asyncapi import AsyncAPISubscriber

if TYPE_CHECKING:
from aiokafka import ConsumerRecord
from aiokafka import ConsumerRecord, TopicPartition
from aiokafka.abc import ConsumerRebalanceListener
from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from fast_depends.dependencies import Depends
Expand Down Expand Up @@ -336,6 +336,13 @@ def subscriber(
Pattern to match available topics. You must provide either topics or pattern, but not both.
"""),
] = None,
partitions: Annotated[
Iterable["TopicPartition"],
Doc("""
An explicit partitions list to assign.
You can't use 'topics' and 'partitions' in the same time.
"""),
] = (),
# broker args
dependencies: Annotated[
Iterable["Depends"],
Expand Down Expand Up @@ -660,6 +667,13 @@ def subscriber(
Pattern to match available topics. You must provide either topics or pattern, but not both.
"""),
] = None,
partitions: Annotated[
Iterable["TopicPartition"],
Doc("""
An explicit partitions list to assign.
You can't use 'topics' and 'partitions' in the same time.
"""),
] = (),
# broker args
dependencies: Annotated[
Iterable["Depends"],
Expand Down Expand Up @@ -984,6 +998,13 @@ def subscriber(
Pattern to match available topics. You must provide either topics or pattern, but not both.
"""),
] = None,
partitions: Annotated[
Iterable["TopicPartition"],
Doc("""
An explicit partitions list to assign.
You can't use 'topics' and 'partitions' in the same time.
"""),
] = (),
# broker args
dependencies: Annotated[
Iterable["Depends"],
Expand Down Expand Up @@ -1311,6 +1332,13 @@ def subscriber(
Pattern to match available topics. You must provide either topics or pattern, but not both.
"""),
] = None,
partitions: Annotated[
Iterable["TopicPartition"],
Doc("""
An explicit partitions list to assign.
You can't use 'topics' and 'partitions' in the same time.
"""),
] = (),
# broker args
dependencies: Annotated[
Iterable["Depends"],
Expand Down Expand Up @@ -1402,6 +1430,7 @@ def subscriber(
group_id=group_id,
listener=listener,
pattern=pattern,
partitions=partitions,
builder=builder,
is_manual=not auto_commit,
# subscriber args
Expand Down
30 changes: 30 additions & 0 deletions faststream/kafka/fastapi/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from asyncio import AbstractEventLoop
from enum import Enum

from aiokafka import TopicPartition
from aiokafka.abc import AbstractTokenProvider, ConsumerRebalanceListener
from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from fastapi import params
Expand Down Expand Up @@ -919,6 +920,13 @@ def subscriber(
Pattern to match available topics. You must provide either topics or pattern, but not both.
"""),
] = None,
partitions: Annotated[
Iterable["TopicPartition"],
Doc("""
An explicit partitions list to assign.
You can't use 'topics' and 'partitions' in the same time.
"""),
] = (),
# broker args
dependencies: Annotated[
Iterable["params.Depends"],
Expand Down Expand Up @@ -1401,6 +1409,13 @@ def subscriber(
Pattern to match available topics. You must provide either topics or pattern, but not both.
"""),
] = None,
partitions: Annotated[
Iterable["TopicPartition"],
Doc("""
An explicit partitions list to assign.
You can't use 'topics' and 'partitions' in the same time.
"""),
] = (),
# broker args
dependencies: Annotated[
Iterable["params.Depends"],
Expand Down Expand Up @@ -1883,6 +1898,13 @@ def subscriber(
Pattern to match available topics. You must provide either topics or pattern, but not both.
"""),
] = None,
partitions: Annotated[
Iterable["TopicPartition"],
Doc("""
An explicit partitions list to assign.
You can't use 'topics' and 'partitions' in the same time.
"""),
] = (),
# broker args
dependencies: Annotated[
Iterable["params.Depends"],
Expand Down Expand Up @@ -2368,6 +2390,13 @@ def subscriber(
Pattern to match available topics. You must provide either topics or pattern, but not both.
"""),
] = None,
partitions: Annotated[
Iterable["TopicPartition"],
Doc("""
An explicit partitions list to assign.
You can't use 'topics' and 'partitions' in the same time.
"""),
] = (),
# broker args
dependencies: Annotated[
Iterable["params.Depends"],
Expand Down Expand Up @@ -2575,6 +2604,7 @@ def subscriber(
batch_timeout_ms=batch_timeout_ms,
listener=listener,
pattern=pattern,
partitions=partitions,
# broker args
dependencies=dependencies,
parser=parser,
Expand Down
44 changes: 43 additions & 1 deletion faststream/kafka/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
from faststream.kafka.broker.registrator import KafkaRegistrator

if TYPE_CHECKING:
from aiokafka import ConsumerRecord
from aiokafka import ConsumerRecord, TopicPartition
from aiokafka.abc import ConsumerRebalanceListener
from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from fast_depends.dependencies import Depends

Expand Down Expand Up @@ -376,6 +377,44 @@ def __init__(
Optional[int],
Doc("Number of messages to consume as one batch."),
] = None,
listener: Annotated[
Optional["ConsumerRebalanceListener"],
Doc("""
Optionally include listener
callback, which will be called before and after each rebalance
operation.
As part of group management, the consumer will keep track of
the list of consumers that belong to a particular group and
will trigger a rebalance operation if one of the following
events trigger:
* Number of partitions change for any of the subscribed topics
* Topic is created or deleted
* An existing member of the consumer group dies
* A new member is added to the consumer group
When any of these events are triggered, the provided listener
will be invoked first to indicate that the consumer's
assignment has been revoked, and then again when the new
assignment has been received. Note that this listener will
immediately override any listener set in a previous call
to subscribe. It is guaranteed, however, that the partitions
revoked/assigned
through this interface are from topics subscribed in this call.
"""),
] = None,
pattern: Annotated[
Optional[str],
Doc("""
Pattern to match available topics. You must provide either topics or pattern, but not both.
"""),
] = None,
partitions: Annotated[
Optional[Iterable["TopicPartition"]],
Doc("""
A topic and partition tuple. You can't use 'topics' and 'partitions' in the same time.
"""),
] = (),
# broker args
dependencies: Annotated[
Iterable["Depends"],
Expand Down Expand Up @@ -456,6 +495,9 @@ def __init__(
max_records=max_records,
batch_timeout_ms=batch_timeout_ms,
batch=batch,
listener=listener,
pattern=pattern,
partitions=partitions,
# basic args
dependencies=dependencies,
parser=parser,
Expand Down
20 changes: 19 additions & 1 deletion faststream/kafka/subscriber/asyncapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
from faststream.asyncapi.schema.bindings import kafka
from faststream.asyncapi.utils import resolve_payloads
from faststream.broker.types import MsgType
from faststream.exceptions import SetupError
from faststream.kafka.subscriber.usecase import (
BatchSubscriber,
DefaultSubscriber,
LogicSubscriber,
)

if TYPE_CHECKING:
from aiokafka import AIOKafkaConsumer, ConsumerRecord
from aiokafka import AIOKafkaConsumer, ConsumerRecord, TopicPartition
from aiokafka.abc import ConsumerRebalanceListener
from fast_depends.dependencies import Depends

Expand Down Expand Up @@ -79,6 +80,7 @@ def create(
group_id: Optional[str],
listener: Optional["ConsumerRebalanceListener"],
pattern: Optional[str],
partitions: Iterable["TopicPartition"],
builder: Callable[..., "AIOKafkaConsumer"],
is_manual: bool,
# Subscriber args
Expand All @@ -103,6 +105,7 @@ def create(
group_id: Optional[str],
listener: Optional["ConsumerRebalanceListener"],
pattern: Optional[str],
partitions: Iterable["TopicPartition"],
builder: Callable[..., "AIOKafkaConsumer"],
is_manual: bool,
# Subscriber args
Expand All @@ -127,6 +130,7 @@ def create(
group_id: Optional[str],
listener: Optional["ConsumerRebalanceListener"],
pattern: Optional[str],
partitions: Iterable["TopicPartition"],
builder: Callable[..., "AIOKafkaConsumer"],
is_manual: bool,
# Subscriber args
Expand Down Expand Up @@ -156,6 +160,7 @@ def create(
group_id: Optional[str],
listener: Optional["ConsumerRebalanceListener"],
pattern: Optional[str],
partitions: Iterable["TopicPartition"],
builder: Callable[..., "AIOKafkaConsumer"],
is_manual: bool,
# Subscriber args
Expand All @@ -173,6 +178,17 @@ def create(
"AsyncAPIDefaultSubscriber",
"AsyncAPIBatchSubscriber",
]:
if not topics and not partitions and not pattern:
raise SetupError(
"You should provide either `topics` or `partitions` or `pattern`."
)
elif topics and partitions:
raise SetupError("You can't provide both `topics` and `partitions`.")
elif topics and pattern:
raise SetupError("You can't provide both `topics` and `pattern`.")
elif pattern and partitions:
raise SetupError("You can't provide both `pattern` and `partitions`.")

if batch:
return AsyncAPIBatchSubscriber(
*topics,
Expand All @@ -181,6 +197,7 @@ def create(
group_id=group_id,
listener=listener,
pattern=pattern,
partitions=partitions,
builder=builder,
is_manual=is_manual,
no_ack=no_ack,
Expand All @@ -197,6 +214,7 @@ def create(
group_id=group_id,
listener=listener,
pattern=pattern,
partitions=partitions,
builder=builder,
is_manual=is_manual,
no_ack=no_ack,
Expand Down
Loading

0 comments on commit dd4e0bb

Please sign in to comment.