Skip to content

Commit

Permalink
Merge branch 'main' into pass-config-to-confluent
Browse files Browse the repository at this point in the history
  • Loading branch information
kumaranvpl authored Jun 10, 2024
2 parents 607bb2e + 4405abe commit 08e386f
Show file tree
Hide file tree
Showing 24 changed files with 157 additions and 57 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/docs_update-references.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ on:
paths:
- faststream/**

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

permissions:
contents: write

Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/pr_codeql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ on:
schedule:
- cron: '39 20 * * 0'

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
analyze:
if: github.event.pull_request.draft == false
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/pr_dependency-review.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ on:
paths:
- pyproject.toml

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

permissions:
contents: read

Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/pr_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ on:
types:
- checks_requested

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
static_analysis:
if: github.event.pull_request.draft == false
Expand Down
2 changes: 2 additions & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ search:
- message
- [ConsumerProtocol](api/faststream/kafka/message/ConsumerProtocol.md)
- [FakeConsumer](api/faststream/kafka/message/FakeConsumer.md)
- [KafkaAckableMessage](api/faststream/kafka/message/KafkaAckableMessage.md)
- [KafkaMessage](api/faststream/kafka/message/KafkaMessage.md)
- opentelemetry
- [KafkaTelemetryMiddleware](api/faststream/kafka/opentelemetry/KafkaTelemetryMiddleware.md)
Expand All @@ -551,6 +552,7 @@ search:
- [KafkaTelemetrySettingsProvider](api/faststream/kafka/opentelemetry/provider/KafkaTelemetrySettingsProvider.md)
- [telemetry_attributes_provider_factory](api/faststream/kafka/opentelemetry/provider/telemetry_attributes_provider_factory.md)
- parser
- [AioKafkaBatchParser](api/faststream/kafka/parser/AioKafkaBatchParser.md)
- [AioKafkaParser](api/faststream/kafka/parser/AioKafkaParser.md)
- publisher
- asyncapi
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/kafka/message/KafkaAckableMessage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.kafka.message.KafkaAckableMessage
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/kafka/parser/AioKafkaBatchParser.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.kafka.parser.AioKafkaBatchParser
17 changes: 17 additions & 0 deletions docs/docs/en/release.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,23 @@ hide:
---

# Release Notes
## 0.5.11

### What's Changed
* Update Release Notes for 0.5.10 by @faststream-release-notes-updater in [#1482](https://github.com/airtai/faststream/pull/1482){.external-link target="_blank"}
* feat: provide with an ability to create default RMQ Exchange by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1485](https://github.com/airtai/faststream/pull/1485){.external-link target="_blank"}
* docs: fix typos by [@crazymidnight](https://github.com/crazymidnight){.external-link target="_blank"} in [#1489](https://github.com/airtai/faststream/pull/1489){.external-link target="_blank"}
* chore: update CI triggers to minify useless runs by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1483](https://github.com/airtai/faststream/pull/1483){.external-link target="_blank"}
* Update link to badges by [@kumaranvpl](https://github.com/kumaranvpl){.external-link target="_blank"} in [#1496](https://github.com/airtai/faststream/pull/1496){.external-link target="_blank"}
* Run tests every day at 12:00 AM by [@kumaranvpl](https://github.com/kumaranvpl){.external-link target="_blank"} in [#1497](https://github.com/airtai/faststream/pull/1497){.external-link target="_blank"}
* Chore: update deps by [@kumaranvpl](https://github.com/kumaranvpl){.external-link target="_blank"} in [#1503](https://github.com/airtai/faststream/pull/1503){.external-link target="_blank"}
* fix: include NatsRouter streams to original broker by [@Lancetnik](https://github.com/Lancetnik){.external-link target="_blank"} in [#1509](https://github.com/airtai/faststream/pull/1509){.external-link target="_blank"}

### New Contributors
* [@crazymidnight](https://github.com/crazymidnight){.external-link target="_blank"} made their first contribution in [#1489](https://github.com/airtai/faststream/pull/1489){.external-link target="_blank"}

**Full Changelog**: [#0.5.10...0.5.11](https://github.com/airtai/faststream/compare/0.5.10...0.5.11){.external-link target="_blank"}

## 0.5.10

### What's Changed
Expand Down
2 changes: 1 addition & 1 deletion faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Simple and fast framework to create message brokers based microservices."""

__version__ = "0.5.10"
__version__ = "0.5.11"

SERVICE_NAME = f"faststream-{__version__}"

Expand Down
6 changes: 3 additions & 3 deletions faststream/confluent/opentelemetry/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def get_publish_attrs_from_kwargs(

return attrs

@staticmethod
def get_publish_destination_name(
self,
kwargs: "AnyDict",
) -> str:
return cast(str, kwargs["topic"])
Expand Down Expand Up @@ -66,8 +66,8 @@ def get_consume_attrs_from_message(

return attrs

@staticmethod
def get_consume_destination_name(
self,
msg: "StreamMessage[Message]",
) -> str:
return cast(str, msg.raw_message.topic())
Expand Down Expand Up @@ -95,8 +95,8 @@ def get_consume_attrs_from_message(

return attrs

@staticmethod
def get_consume_destination_name(
self,
msg: "StreamMessage[Tuple[Message, ...]]",
) -> str:
return cast(str, msg.raw_message[0].topic())
Expand Down
2 changes: 2 additions & 0 deletions faststream/kafka/annotations.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from aiokafka import AIOKafkaConsumer
from typing_extensions import Annotated

from faststream.annotations import ContextRepo, Logger, NoCast
Expand All @@ -15,6 +16,7 @@
"KafkaProducer",
)

Consumer = Annotated[AIOKafkaConsumer, Context("handler_.consumer")]
KafkaMessage = Annotated[KM, Context("message")]
KafkaBroker = Annotated[KB, Context("broker")]
KafkaProducer = Annotated[AioKafkaFastProducer, Context("broker._producer")]
6 changes: 3 additions & 3 deletions faststream/kafka/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ def __init__(
self,
*args: Any,
consumer: ConsumerProtocol,
is_manual: bool = False,
**kwargs: Any,
) -> None:
super().__init__(*args, **kwargs)

self.is_manual = is_manual
self.consumer = consumer


class KafkaAckableMessage(KafkaMessage):
async def ack(self) -> None:
"""Acknowledge the Kafka message."""
if self.is_manual and not self.committed:
if not self.committed:
await self.consumer.commit()
await super().ack()
6 changes: 3 additions & 3 deletions faststream/kafka/opentelemetry/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def get_publish_attrs_from_kwargs(

return attrs

@staticmethod
def get_publish_destination_name(
self,
kwargs: "AnyDict",
) -> str:
return cast(str, kwargs["topic"])
Expand Down Expand Up @@ -66,8 +66,8 @@ def get_consume_attrs_from_message(

return attrs

@staticmethod
def get_consume_destination_name(
self,
msg: "StreamMessage[ConsumerRecord]",
) -> str:
return cast(str, msg.raw_message.topic)
Expand Down Expand Up @@ -96,8 +96,8 @@ def get_consume_attrs_from_message(

return attrs

@staticmethod
def get_consume_destination_name(
self,
msg: "StreamMessage[Tuple[ConsumerRecord, ...]]",
) -> str:
return cast(str, msg.raw_message[0].topic)
Expand Down
43 changes: 25 additions & 18 deletions faststream/kafka/parser.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type

from faststream.broker.message import decode_message, gen_cor_id
from faststream.kafka.message import FAKE_CONSUMER, KafkaMessage
Expand All @@ -15,14 +15,17 @@
class AioKafkaParser:
"""A class to parse Kafka messages."""

@staticmethod
def __init__(self, msg_class: Type[KafkaMessage]) -> None:
self.msg_class = msg_class

async def parse_message(
self,
message: "ConsumerRecord",
) -> "StreamMessage[ConsumerRecord]":
"""Parses a Kafka message."""
headers = {i: j.decode() for i, j in message.headers}
handler: Optional[LogicSubscriber[Any]] = context.get_local("handler_")
return KafkaMessage(
handler: Optional["LogicSubscriber[Any]"] = context.get_local("handler_")
return self.msg_class(
body=message.value,
headers=headers,
reply_to=headers.get("reply_to", ""),
Expand All @@ -31,11 +34,19 @@ async def parse_message(
correlation_id=headers.get("correlation_id", gen_cor_id()),
raw_message=message,
consumer=getattr(handler, "consumer", None) or FAKE_CONSUMER,
is_manual=getattr(handler, "is_manual", True),
)

@staticmethod
async def parse_message_batch(
async def decode_message(
self,
msg: "StreamMessage[ConsumerRecord]",
) -> "DecodedMessage":
"""Decodes a message."""
return decode_message(msg)


class AioKafkaBatchParser(AioKafkaParser):
async def parse_message(
self,
message: Tuple["ConsumerRecord", ...],
) -> "StreamMessage[Tuple[ConsumerRecord, ...]]":
"""Parses a batch of messages from a Kafka consumer."""
Expand All @@ -53,7 +64,7 @@ async def parse_message_batch(

handler: Optional[LogicSubscriber[Any]] = context.get_local("handler_")

return KafkaMessage(
return self.msg_class(
body=body,
headers=headers,
batch_headers=batch_headers,
Expand All @@ -63,18 +74,14 @@ async def parse_message_batch(
correlation_id=headers.get("correlation_id", gen_cor_id()),
raw_message=message,
consumer=getattr(handler, "consumer", None) or FAKE_CONSUMER,
is_manual=getattr(handler, "is_manual", True),
)

@staticmethod
async def decode_message(msg: "StreamMessage[ConsumerRecord]") -> "DecodedMessage":
"""Decodes a message."""
return decode_message(msg)

@classmethod
async def decode_message_batch(
cls,
async def decode_message(
self,
msg: "StreamMessage[Tuple[ConsumerRecord, ...]]",
) -> "DecodedMessage":
"""Decode a batch of messages."""
return [decode_message(await cls.parse_message(m)) for m in msg.raw_message]
return [
decode_message(await super(AioKafkaBatchParser, self).parse_message(m))
for m in msg.raw_message
]
23 changes: 14 additions & 9 deletions faststream/kafka/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
CustomCallable,
MsgType,
)
from faststream.kafka.parser import AioKafkaParser
from faststream.kafka.message import KafkaAckableMessage, KafkaMessage
from faststream.kafka.parser import AioKafkaBatchParser, AioKafkaParser

if TYPE_CHECKING:
from aiokafka import AIOKafkaConsumer, ConsumerRecord
Expand Down Expand Up @@ -60,7 +61,6 @@ def __init__(
listener: Optional["ConsumerRebalanceListener"],
pattern: Optional[str],
partitions: Iterable["TopicPartition"],
is_manual: bool,
# Subscriber args
default_parser: "AsyncCallable",
default_decoder: "AsyncCallable",
Expand Down Expand Up @@ -93,7 +93,6 @@ def __init__(
self.partitions = partitions
self.group_id = group_id

self.is_manual = is_manual
self.builder = None
self.consumer = None
self.task = None
Expand Down Expand Up @@ -306,17 +305,20 @@ def __init__(
description_: Optional[str],
include_in_schema: bool,
) -> None:
parser = AioKafkaParser(
msg_class=KafkaAckableMessage if is_manual else KafkaMessage
)

super().__init__(
*topics,
group_id=group_id,
listener=listener,
pattern=pattern,
connection_args=connection_args,
partitions=partitions,
is_manual=is_manual,
# subscriber args
default_parser=AioKafkaParser.parse_message,
default_decoder=AioKafkaParser.decode_message,
default_parser=parser.parse_message,
default_decoder=parser.decode_message,
# Propagated args
no_ack=no_ack,
no_reply=no_reply,
Expand Down Expand Up @@ -363,17 +365,20 @@ def __init__(
self.batch_timeout_ms = batch_timeout_ms
self.max_records = max_records

parser = AioKafkaBatchParser(
msg_class=KafkaAckableMessage if is_manual else KafkaMessage
)

super().__init__(
*topics,
group_id=group_id,
listener=listener,
pattern=pattern,
connection_args=connection_args,
partitions=partitions,
is_manual=is_manual,
# subscriber args
default_parser=AioKafkaParser.parse_message_batch,
default_decoder=AioKafkaParser.decode_message_batch,
default_parser=parser.parse_message,
default_decoder=parser.decode_message,
# Propagated args
no_ack=no_ack,
no_reply=no_reply,
Expand Down
Loading

0 comments on commit 08e386f

Please sign in to comment.