Skip to content

Commit

Permalink
feat: add assign TopicPartitions
Browse files Browse the repository at this point in the history
  • Loading branch information
spataphore1337 committed May 5, 2024
1 parent a5d8d4f commit 87e0662
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 9 deletions.
3 changes: 3 additions & 0 deletions faststream/kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from aiokafka import TopicPartition

from faststream.kafka.annotations import KafkaMessage
from faststream.kafka.broker import KafkaBroker
from faststream.kafka.router import KafkaPublisher, KafkaRoute, KafkaRouter
Expand All @@ -12,4 +14,5 @@
"KafkaPublisher",
"TestKafkaBroker",
"TestApp",
"TopicPartition",
)
27 changes: 26 additions & 1 deletion faststream/kafka/broker/registrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from faststream.kafka.subscriber.asyncapi import AsyncAPISubscriber

if TYPE_CHECKING:
from aiokafka import ConsumerRecord
from aiokafka import ConsumerRecord, TopicPartition
from aiokafka.abc import ConsumerRebalanceListener
from aiokafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from fast_depends.dependencies import Depends
Expand Down Expand Up @@ -336,6 +336,12 @@ def subscriber(
Pattern to match available topics. You must provide either topics or pattern, but not both.
"""),
] = None,
partitions: Annotated[
Optional[Iterable["TopicPartition"]],
Doc("""
A topic and partition tuple. You can't use 'topics' and 'partitions' in the same time.
"""),
] = (),
# broker args
dependencies: Annotated[
Iterable["Depends"],
Expand Down Expand Up @@ -660,6 +666,12 @@ def subscriber(
Pattern to match available topics. You must provide either topics or pattern, but not both.
"""),
] = None,
partitions: Annotated[
Optional[Iterable["TopicPartition"]],
Doc("""
A topic and partition tuple. You can't use 'topics' and 'partitions' in the same time.
"""),
] = (),
# broker args
dependencies: Annotated[
Iterable["Depends"],
Expand Down Expand Up @@ -984,6 +996,12 @@ def subscriber(
Pattern to match available topics. You must provide either topics or pattern, but not both.
"""),
] = None,
partitions: Annotated[
Optional[Iterable["TopicPartition"]],
Doc("""
A topic and partition tuple. You can't use 'topics' and 'partitions' in the same time.
"""),
] = (),
# broker args
dependencies: Annotated[
Iterable["Depends"],
Expand Down Expand Up @@ -1311,6 +1329,12 @@ def subscriber(
Pattern to match available topics. You must provide either topics or pattern, but not both.
"""),
] = None,
partitions: Annotated[
Optional[Iterable["TopicPartition"]],
Doc("""
A topic and partition tuple. You can't use 'topics' and 'partitions' in the same time.
"""),
] = (),
# broker args
dependencies: Annotated[
Iterable["Depends"],
Expand Down Expand Up @@ -1402,6 +1426,7 @@ def subscriber(
group_id=group_id,
listener=listener,
pattern=pattern,
partitions=partitions,
builder=builder,
is_manual=not auto_commit,
# subscriber args
Expand Down
6 changes: 5 additions & 1 deletion faststream/kafka/subscriber/asyncapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
)

if TYPE_CHECKING:
from aiokafka import AIOKafkaConsumer, ConsumerRecord
from aiokafka import AIOKafkaConsumer, ConsumerRecord, TopicPartition
from aiokafka.abc import ConsumerRebalanceListener
from fast_depends.dependencies import Depends

Expand Down Expand Up @@ -79,6 +79,7 @@ def create(
group_id: Optional[str],
listener: Optional["ConsumerRebalanceListener"],
pattern: Optional[str],
partitions: Optional[Iterable["TopicPartition"]],
builder: Callable[..., "AIOKafkaConsumer"],
is_manual: bool,
# Subscriber args
Expand All @@ -103,6 +104,7 @@ def create(
group_id: Optional[str],
listener: Optional["ConsumerRebalanceListener"],
pattern: Optional[str],
partitions: Optional[Iterable["TopicPartition"]],
builder: Callable[..., "AIOKafkaConsumer"],
is_manual: bool,
# Subscriber args
Expand All @@ -127,6 +129,7 @@ def create(
group_id: Optional[str],
listener: Optional["ConsumerRebalanceListener"],
pattern: Optional[str],
partitions: Optional[Iterable["TopicPartition"]],
builder: Callable[..., "AIOKafkaConsumer"],
is_manual: bool,
# Subscriber args
Expand Down Expand Up @@ -156,6 +159,7 @@ def create(
group_id: Optional[str],
listener: Optional["ConsumerRebalanceListener"],
pattern: Optional[str],
partitions: Optional[Iterable["TopicPartition"]],
builder: Callable[..., "AIOKafkaConsumer"],
is_manual: bool,
# Subscriber args
Expand Down
21 changes: 14 additions & 7 deletions faststream/kafka/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
CustomCallable,
MsgType,
)
from faststream.exceptions import SetupError
from faststream.kafka.parser import AioKafkaParser

if TYPE_CHECKING:
from aiokafka import AIOKafkaConsumer, ConsumerRecord
from aiokafka import AIOKafkaConsumer, ConsumerRecord, TopicPartition
from aiokafka.abc import ConsumerRebalanceListener
from fast_depends.dependencies import Depends

Expand Down Expand Up @@ -56,6 +57,7 @@ def __init__(
builder: Callable[..., "AIOKafkaConsumer"],
listener: Optional["ConsumerRebalanceListener"],
pattern: Optional[str],
partitions: Optional[Iterable["TopicPartition"]],
is_manual: bool,
# Subscriber args
default_parser: "AsyncCallable",
Expand Down Expand Up @@ -94,6 +96,7 @@ def __init__(
self.client_id = ""
self.__pattern = pattern
self.__listener = listener
self.__partitions = partitions
self.__connection_args: "ConsumerConnectionParams" = {}

@override
Expand Down Expand Up @@ -139,13 +142,17 @@ async def start(self) -> None:
client_id=self.client_id,
**self.__connection_args,
)
consumer.subscribe(
topics=self.topics,
pattern=self.__pattern,
listener=self.__listener,
)
if self.topics and self.__partitions:
raise SetupError("You can't use 'topics' and 'partitions' in the same time")
if self.topics:
consumer.subscribe(
topics=self.topics,
pattern=self.__pattern,
listener=self.__listener,
)
elif self.__partitions:
consumer.assign(partitions=self.__partitions)
await consumer.start()

await super().start()

self.task = asyncio.create_task(self._consume())
Expand Down

0 comments on commit 87e0662

Please sign in to comment.