Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate thread for confluent kafka consumer #2003

3 changes: 3 additions & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,8 @@ search:
- [KafkaLoggingBroker](api/faststream/kafka/broker/logging/KafkaLoggingBroker.md)
- registrator
- [KafkaRegistrator](api/faststream/kafka/broker/registrator/KafkaRegistrator.md)
- exceptions
- [BatchBufferOverflowException](api/faststream/kafka/exceptions/BatchBufferOverflowException.md)
- fastapi
- [Context](api/faststream/kafka/fastapi/Context.md)
- [KafkaRouter](api/faststream/kafka/fastapi/KafkaRouter.md)
Expand Down Expand Up @@ -960,6 +962,7 @@ search:
- [build_message](api/faststream/rabbit/testing/build_message.md)
- utils
- [build_url](api/faststream/rabbit/utils/build_url.md)
- [build_virtual_host](api/faststream/rabbit/utils/build_virtual_host.md)
- [is_routing_exchange](api/faststream/rabbit/utils/is_routing_exchange.md)
- redis
- [ListSub](api/faststream/redis/ListSub.md)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.kafka.exceptions.BatchBufferOverflowException
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/rabbit/utils/build_virtual_host.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.rabbit.utils.build_virtual_host
21 changes: 0 additions & 21 deletions docs/docs/en/getting-started/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@ search:

# Application and Access Logging

**FastStream** uses two already configured loggers:

* `faststream` - used by `FastStream` app
* `faststream.access` - used by the broker

## Logging Requests

To log requests, it is strongly recommended to use the `access_logger` of your broker, as it is available from the [Context](../getting-started/context/existed.md){.internal-link} of your application.
Expand Down Expand Up @@ -77,22 +72,6 @@ from faststream.rabbit import RabbitBroker
broker = RabbitBroker(log_fmt="%(asctime)s %(levelname)s - %(message)s")
```

## Logger Access

If you want to override default logger's behavior, you can access them directly via `logging`.

```python
import logging
logger = logging.getLogger("faststream")
access_logger = logging.getLogger("faststream.access")
```

Or you can import them from **FastStream**.

```python
from faststream.log import access_logger, logger
```

## Using Your Own Loggers

Since **FastStream** works with the standard `logging.Logger` object, you can initiate an application and a broker
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/en/public_api
2 changes: 1 addition & 1 deletion docs/docs/en/rabbit/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ search:
# Rabbit Routing

!!! note ""
**FastStream** *RabbitMQ* support is implemented on top of [**aio-pika**](https://aio-pika.readthedocs.io/en/latest/){.external-link target="_blank"}. You can always get access to objects of it, if you need to use some low-level methods, not represented in **FastStream**.
**FastStream** *RabbitMQ* support is implemented on top of [**aio-pika**](https://docs.aio-pika.com/){.external-link target="_blank"}. You can always get access to objects of it, if you need to use some low-level methods, not represented in **FastStream**.

## Advantages

Expand Down
69 changes: 48 additions & 21 deletions faststream/confluent/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor
from contextlib import suppress
from time import time
from typing import (
Expand Down Expand Up @@ -314,19 +315,24 @@ def __init__(
self.config = final_config
self.consumer = Consumer(final_config, logger=self.logger) # type: ignore[call-arg]

# We shouldn't read messages and close consumer concurrently
# https://github.com/airtai/faststream/issues/1904#issuecomment-2506990895
self._lock = anyio.Lock()
# A pool with single thread is used in order to execute the commands of the consumer sequentially:
self._thread_pool = ThreadPoolExecutor(max_workers=1)

@property
def topics_to_create(self) -> List[str]:
return list({*self.topics, *(p.topic for p in self.partitions)})

async def start(self) -> None:
"""Starts the Kafka consumer and subscribes to the specified topics."""
loop = asyncio.get_running_loop()

if self.allow_auto_create_topics:
await call_or_await(
create_topics, self.topics_to_create, self.config, self.logger
await loop.run_in_executor(
self._thread_pool,
create_topics,
self.topics_to_create,
self.config,
self.logger,
)

elif self.logger:
Expand All @@ -336,22 +342,39 @@ async def start(self) -> None:
)

if self.topics:
await call_or_await(self.consumer.subscribe, self.topics)
await loop.run_in_executor(
self._thread_pool, self.consumer.subscribe, self.topics
)

elif self.partitions:
await call_or_await(
self.consumer.assign, [p.to_confluent() for p in self.partitions]
await loop.run_in_executor(
self._thread_pool,
self.consumer.assign,
[p.to_confluent() for p in self.partitions],
)

else:
raise SetupError("You must provide either `topics` or `partitions` option.")

async def commit(self, asynchronous: bool = True) -> None:
"""Commits the offsets of all messages returned by the last poll operation."""
await call_or_await(self.consumer.commit, asynchronous=asynchronous)
loop = asyncio.get_running_loop()
if asynchronous:
# Asynchronous commit is non-blocking:
self.consumer.commit(asynchronous=True)
else:
await loop.run_in_executor(
self._thread_pool,
self.consumer.commit,
None,
None,
False,
)

async def stop(self) -> None:
"""Stops the Kafka consumer and releases all resources."""
loop = asyncio.get_running_loop()

# NOTE: If we don't explicitly call commit and then close the consumer, the confluent consumer gets stuck.
# We are doing this to avoid the issue.
enable_auto_commit = self.config["enable.auto.commit"]
Expand All @@ -371,13 +394,14 @@ async def stop(self) -> None:
)

# Wrap calls to async to make method cancelable by timeout
async with self._lock:
await call_or_await(self.consumer.close)
await loop.run_in_executor(self._thread_pool, self.consumer.close)

self._thread_pool.shutdown(wait=False)

async def getone(self, timeout: float = 0.1) -> Optional[Message]:
"""Consumes a single message from Kafka."""
async with self._lock:
msg = await call_or_await(self.consumer.poll, timeout)
loop = asyncio.get_running_loop()
msg = await loop.run_in_executor(self._thread_pool, self.consumer.poll, timeout)
return check_msg_error(msg)

async def getmany(
Expand All @@ -386,21 +410,24 @@ async def getmany(
max_records: Optional[int] = 10,
) -> Tuple[Message, ...]:
"""Consumes a batch of messages from Kafka and groups them by topic and partition."""
async with self._lock:
raw_messages: List[Optional[Message]] = await call_or_await(
self.consumer.consume, # type: ignore[arg-type]
num_messages=max_records or 10,
timeout=timeout,
)

loop = asyncio.get_running_loop()
raw_messages: List[Optional[Message]] = await loop.run_in_executor(
self._thread_pool,
self.consumer.consume, # type: ignore[arg-type]
max_records or 10,
timeout,
)
return tuple(x for x in map(check_msg_error, raw_messages) if x is not None)

async def seek(self, topic: str, partition: int, offset: int) -> None:
"""Seeks to the specified offset in the specified topic and partition."""
loop = asyncio.get_running_loop()
topic_partition = TopicPartition(
topic=topic, partition=partition, offset=offset
)
await call_or_await(self.consumer.seek, topic_partition.to_confluent())
await loop.run_in_executor(
self._thread_pool, self.consumer.seek, topic_partition.to_confluent()
)


def check_msg_error(msg: Optional[Message]) -> Optional[Message]:
Expand Down
16 changes: 6 additions & 10 deletions faststream/confluent/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,18 +378,14 @@ def __init__(

async def get_msg(self) -> Optional[Tuple["Message", ...]]:
assert self.consumer, "You should setup subscriber at first." # nosec B101

messages = await self.consumer.getmany(
timeout=self.polling_interval,
max_records=self.max_records,
return (
await self.consumer.getmany(
timeout=self.polling_interval,
max_records=self.max_records,
)
or None
)

if not messages: # TODO: why we are sleeping here?
await anyio.sleep(self.polling_interval)
return None

return messages

def get_log_context(
self,
message: Optional["StreamMessage[Tuple[Message, ...]]"],
Expand Down
14 changes: 14 additions & 0 deletions faststream/kafka/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from faststream.exceptions import FastStreamException


class BatchBufferOverflowException(FastStreamException):
"""Exception raised when a buffer overflow occurs when adding a new message to the batches."""

def __init__(self, message_position: int) -> None:
self.message_position = message_position

def __str__(self) -> str:
return (
f"The batch buffer is full. The position of the message"
f" in the transferred collection at which the overflow occurred: {self.message_position}"
)
7 changes: 5 additions & 2 deletions faststream/kafka/publisher/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from faststream.broker.publisher.proto import ProducerProto
from faststream.broker.utils import resolve_custom_func
from faststream.exceptions import OperationForbiddenError
from faststream.kafka.exceptions import BatchBufferOverflowException
from faststream.kafka.message import KafkaMessage
from faststream.kafka.parser import AioKafkaParser

Expand Down Expand Up @@ -100,7 +101,7 @@ async def publish_batch(
reply_to,
)

for msg in msgs:
for message_position, msg in enumerate(msgs):
message, content_type = encode_message(msg)

if content_type:
Expand All @@ -111,12 +112,14 @@ async def publish_batch(
else:
final_headers = headers_to_send.copy()

batch.append(
metadata = batch.append(
key=None,
value=message,
timestamp=timestamp_ms,
headers=[(i, j.encode()) for i, j in final_headers.items()],
)
if metadata is None:
raise BatchBufferOverflowException(message_position=message_position)

send_future = await self._producer.send_batch(batch, topic, partition=partition)
if not no_confirm:
Expand Down
5 changes: 5 additions & 0 deletions faststream/opentelemetry/consts.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from faststream.__about__ import __version__


class MessageAction:
CREATE = "create"
PUBLISH = "publish"
Expand All @@ -9,3 +12,5 @@ class MessageAction:
ERROR_TYPE = "error.type"
MESSAGING_DESTINATION_PUBLISH_NAME = "messaging.destination_publish.name"
WITH_BATCH = "with_batch"
INSTRUMENTING_MODULE_NAME = "opentelemetry.instrumentation.faststream"
INSTRUMENTING_LIBRARY_VERSION = __version__
5 changes: 4 additions & 1 deletion faststream/opentelemetry/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from faststream.opentelemetry.baggage import Baggage
from faststream.opentelemetry.consts import (
ERROR_TYPE,
INSTRUMENTING_LIBRARY_VERSION,
INSTRUMENTING_MODULE_NAME,
MESSAGING_DESTINATION_PUBLISH_NAME,
OTEL_SCHEMA,
WITH_BATCH,
Expand Down Expand Up @@ -330,7 +332,8 @@ def _get_meter(

def _get_tracer(tracer_provider: Optional["TracerProvider"] = None) -> "Tracer":
return trace.get_tracer(
__name__,
instrumenting_module_name=INSTRUMENTING_MODULE_NAME,
instrumenting_library_version=INSTRUMENTING_LIBRARY_VERSION,
tracer_provider=tracer_provider,
schema_url=OTEL_SCHEMA,
)
Expand Down
13 changes: 12 additions & 1 deletion faststream/rabbit/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@
from faststream.rabbit.schemas import RabbitExchange


def build_virtual_host(
url: Union[str, "URL", None], virtualhost: Optional[str], path: str
) -> str:
if (not url and not virtualhost) or virtualhost == "/":
return ""
elif virtualhost:
return virtualhost.replace("/", "", 1)
else:
return path.replace("/", "", 1)


def build_url(
url: Union[str, "URL", None] = None,
*,
Expand All @@ -36,7 +47,7 @@ def build_url(
port=port or original_url.port or default_port,
login=login or original_url.user or "guest",
password=password or original_url.password or "guest",
virtualhost=virtualhost or original_url.path.lstrip("/"),
virtualhost=build_virtual_host(url, virtualhost, original_url.path),
ssl=use_ssl,
ssl_options=ssl_options,
client_properties=client_properties,
Expand Down
Loading
Loading