Skip to content

Commit

Permalink
chore: merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed May 21, 2024
1 parent fd3c333 commit 536026b
Show file tree
Hide file tree
Showing 70 changed files with 1,352 additions and 512 deletions.
2 changes: 1 addition & 1 deletion docs/docs/en/release.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ broker = NatsBroker(
app = FastStream(broker)
```

To find detailt information just visit our documentation aboout [telemetry](https://faststream.airt.ai/latest/getting-started/opentelemetry/)
To find detailt information just visit our documentation about [telemetry](https://faststream.airt.ai/latest/getting-started/opentelemetry/)

P.S. The release includes basic OpenTelemetry support - messages tracing & basic metrics. Baggage support and correct spans linking in batch processing case will be added soon.

Expand Down
4 changes: 1 addition & 3 deletions faststream/confluent/subscriber/asyncapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@

if TYPE_CHECKING:
from confluent_kafka import Message as ConfluentMsg
from fast_depends.dependencies import Depends
from faststream.broker.types import BrokerMiddleware
from faststream.types import AnyDict


class AsyncAPISubscriber(LogicSubscriber[MsgType]):
Expand Down Expand Up @@ -59,6 +56,7 @@ def get_schema(self) -> Dict[str, Channel]:

return channels


class AsyncAPIDefaultSubscriber(
DefaultSubscriber,
AsyncAPISubscriber["ConfluentMsg"],
Expand Down
15 changes: 7 additions & 8 deletions faststream/confluent/subscriber/factory.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from typing import (
TYPE_CHECKING,
Callable,
Iterable,
Literal,
Optional,
Expand All @@ -19,7 +18,7 @@
from fast_depends.dependencies import Depends

from faststream.broker.types import BrokerMiddleware
from faststream.confluent.client import AsyncConfluentConsumer
from faststream.types import AnyDict


@overload
Expand All @@ -30,7 +29,7 @@ def create_subscriber(
max_records: Optional[int],
# Kafka information
group_id: Optional[str],
builder: Callable[..., "AsyncConfluentConsumer"],
connection_data: "AnyDict",
is_manual: bool,
# Subscriber args
no_ack: bool,
Expand All @@ -52,7 +51,7 @@ def create_subscriber(
max_records: Optional[int],
# Kafka information
group_id: Optional[str],
builder: Callable[..., "AsyncConfluentConsumer"],
connection_data: "AnyDict",
is_manual: bool,
# Subscriber args
no_ack: bool,
Expand All @@ -74,7 +73,7 @@ def create_subscriber(
max_records: Optional[int],
# Kafka information
group_id: Optional[str],
builder: Callable[..., "AsyncConfluentConsumer"],
connection_data: "AnyDict",
is_manual: bool,
# Subscriber args
no_ack: bool,
Expand All @@ -100,7 +99,7 @@ def create_subscriber(
max_records: Optional[int],
# Kafka information
group_id: Optional[str],
builder: Callable[..., "AsyncConfluentConsumer"],
connection_data: "AnyDict",
is_manual: bool,
# Subscriber args
no_ack: bool,
Expand All @@ -123,7 +122,7 @@ def create_subscriber(
batch_timeout_ms=batch_timeout_ms,
max_records=max_records,
group_id=group_id,
builder=builder,
connection_data=connection_data,
is_manual=is_manual,
no_ack=no_ack,
retry=retry,
Expand All @@ -137,7 +136,7 @@ def create_subscriber(
return AsyncAPIDefaultSubscriber(
*topics,
group_id=group_id,
builder=builder,
connection_data=connection_data,
is_manual=is_manual,
no_ack=no_ack,
retry=retry,
Expand Down
8 changes: 1 addition & 7 deletions faststream/kafka/subscriber/asyncapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from faststream.asyncapi.schema.bindings import kafka
from faststream.asyncapi.utils import resolve_payloads
from faststream.broker.types import MsgType
from faststream.exceptions import SetupError
from faststream.kafka.subscriber.usecase import (
BatchSubscriber,
DefaultSubscriber,
Expand All @@ -23,12 +22,6 @@

if TYPE_CHECKING:
from aiokafka import ConsumerRecord
from aiokafka import ConsumerRecord, TopicPartition
from aiokafka.abc import ConsumerRebalanceListener
from fast_depends.dependencies import Depends

from faststream.broker.types import BrokerMiddleware
from faststream.types import AnyDict


class AsyncAPISubscriber(LogicSubscriber[MsgType]):
Expand Down Expand Up @@ -63,6 +56,7 @@ def get_schema(self) -> Dict[str, Channel]:

return channels


class AsyncAPIDefaultSubscriber(
DefaultSubscriber,
AsyncAPISubscriber["ConsumerRecord"],
Expand Down
38 changes: 30 additions & 8 deletions faststream/kafka/subscriber/factory.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from typing import (
TYPE_CHECKING,
Callable,
Iterable,
Literal,
Optional,
Expand All @@ -9,17 +8,19 @@
overload,
)

from faststream.exceptions import SetupError
from faststream.kafka.subscriber.asyncapi import (
AsyncAPIBatchSubscriber,
AsyncAPIDefaultSubscriber,
)

if TYPE_CHECKING:
from aiokafka import AIOKafkaConsumer, ConsumerRecord
from aiokafka import ConsumerRecord, TopicPartition
from aiokafka.abc import ConsumerRebalanceListener
from fast_depends.dependencies import Depends

from faststream.broker.types import BrokerMiddleware
from faststream.types import AnyDict


@overload
Expand All @@ -32,7 +33,8 @@ def create_subscriber(
group_id: Optional[str],
listener: Optional["ConsumerRebalanceListener"],
pattern: Optional[str],
builder: Callable[..., "AIOKafkaConsumer"],
connection_args: "AnyDict",
partitions: Iterable["TopicPartition"],
is_manual: bool,
# Subscriber args
no_ack: bool,
Expand All @@ -56,7 +58,8 @@ def create_subscriber(
group_id: Optional[str],
listener: Optional["ConsumerRebalanceListener"],
pattern: Optional[str],
builder: Callable[..., "AIOKafkaConsumer"],
connection_args: "AnyDict",
partitions: Iterable["TopicPartition"],
is_manual: bool,
# Subscriber args
no_ack: bool,
Expand All @@ -80,7 +83,8 @@ def create_subscriber(
group_id: Optional[str],
listener: Optional["ConsumerRebalanceListener"],
pattern: Optional[str],
builder: Callable[..., "AIOKafkaConsumer"],
connection_args: "AnyDict",
partitions: Iterable["TopicPartition"],
is_manual: bool,
# Subscriber args
no_ack: bool,
Expand Down Expand Up @@ -108,7 +112,8 @@ def create_subscriber(
group_id: Optional[str],
listener: Optional["ConsumerRebalanceListener"],
pattern: Optional[str],
builder: Callable[..., "AIOKafkaConsumer"],
connection_args: "AnyDict",
partitions: Iterable["TopicPartition"],
is_manual: bool,
# Subscriber args
no_ack: bool,
Expand All @@ -125,6 +130,20 @@ def create_subscriber(
"AsyncAPIDefaultSubscriber",
"AsyncAPIBatchSubscriber",
]:
if is_manual and not group_id:
raise SetupError("You must use `group_id` with manual commit mode.")

if not topics and not partitions and not pattern:
raise SetupError(
"You should provide either `topics` or `partitions` or `pattern`."
)
elif topics and partitions:
raise SetupError("You can't provide both `topics` and `partitions`.")
elif topics and pattern:
raise SetupError("You can't provide both `topics` and `pattern`.")
elif partitions and pattern:
raise SetupError("You can't provide both `partitions` and `pattern`.")

if batch:
return AsyncAPIBatchSubscriber(
*topics,
Expand All @@ -133,7 +152,8 @@ def create_subscriber(
group_id=group_id,
listener=listener,
pattern=pattern,
builder=builder,
connection_args=connection_args,
partitions=partitions,
is_manual=is_manual,
no_ack=no_ack,
retry=retry,
Expand All @@ -143,13 +163,15 @@ def create_subscriber(
description_=description_,
include_in_schema=include_in_schema,
)

else:
return AsyncAPIDefaultSubscriber(
*topics,
group_id=group_id,
listener=listener,
pattern=pattern,
builder=builder,
connection_args=connection_args,
partitions=partitions,
is_manual=is_manual,
no_ack=no_ack,
retry=retry,
Expand Down
28 changes: 5 additions & 23 deletions faststream/nats/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,8 +631,6 @@ async def start(self) -> None:
)

except BadRequestError as e: # noqa: PERF203
old_config = (await self.stream.stream_info(stream.name)).config

log_context = AsyncAPISubscriber.build_log_context(
message=None,
subject="",
Expand All @@ -644,6 +642,8 @@ async def start(self) -> None:
e.description
== "stream name already in use with a different configuration"
):
old_config = (await self.stream.stream_info(stream.name)).config

self._log(str(e), logging.WARNING, log_context)
await self.stream.update_stream(
config=stream.config,
Expand All @@ -655,27 +655,9 @@ async def start(self) -> None:
else: # pragma: no cover
self._log(str(e), logging.ERROR, log_context, exc_info=e)

except BadRequestError as e:
if (
e.description
== "stream name already in use with a different configuration"
):
old_config = (await self.stream.stream_info(stream.name)).config

self._log(str(e), logging.WARNING, log_context)
await self.stream.update_stream(
config=stream.config,
subjects=tuple(
set(old_config.subjects or ()).union(stream.subjects)
),
)

else: # pragma: no cover
self._log(str(e), logging.ERROR, log_context, exc_info=e)

finally:
# prevent from double declaration
stream.declare = False
finally:
# prevent from double declaration
stream.declare = False

# TODO: filter by already running handlers after TestClient refactor
for handler in self._subscribers.values():
Expand Down
2 changes: 1 addition & 1 deletion faststream/nats/fastapi/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
SubscriberMiddleware,
)
from faststream.nats.message import NatsBatchMessage, NatsMessage
from faststream.nats.schemas import JStream, PullSub, KvWatch, ObjWatch
from faststream.nats.schemas import JStream, KvWatch, ObjWatch, PullSub
from faststream.security import BaseSecurity
from faststream.types import AnyDict, LoggerProto

Expand Down
9 changes: 6 additions & 3 deletions faststream/nats/opentelemetry/provider.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import TYPE_CHECKING, List, Optional, Sequence, Union, overload

from nats.aio.msg import Msg
from opentelemetry.semconv.trace import SpanAttributes

from faststream.__about__ import SERVICE_NAME
Expand All @@ -8,8 +9,6 @@
from faststream.opentelemetry.consts import MESSAGING_DESTINATION_PUBLISH_NAME

if TYPE_CHECKING:
from nats.aio.msg import Msg

from faststream.broker.message import StreamMessage
from faststream.types import AnyDict

Expand Down Expand Up @@ -107,8 +106,12 @@ def telemetry_attributes_provider_factory(
) -> Union[
NatsTelemetrySettingsProvider,
NatsBatchTelemetrySettingsProvider,
None,
]:
if isinstance(msg, Sequence):
return NatsBatchTelemetrySettingsProvider()
else:
elif isinstance(msg, Msg):
return NatsTelemetrySettingsProvider()
else:
# KeyValue and Object Storage watch cases
return None
3 changes: 1 addition & 2 deletions faststream/nats/parser.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from typing import TYPE_CHECKING, Any, List, Optional
from typing import TYPE_CHECKING, Dict, List, Optional
from typing import TYPE_CHECKING, Any, Dict, List, Optional

from faststream.broker.message import StreamMessage, decode_message, gen_cor_id
from faststream.nats.message import (
Expand Down
2 changes: 1 addition & 1 deletion faststream/nats/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
SubscriberMiddleware,
)
from faststream.nats.message import NatsBatchMessage, NatsMessage
from faststream.nats.schemas import JStream, PullSub, KvWatch, ObjWatch
from faststream.nats.schemas import JStream, KvWatch, ObjWatch, PullSub
from faststream.types import SendableMessage


Expand Down
8 changes: 1 addition & 7 deletions faststream/nats/subscriber/asyncapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,10 @@ def get_schema(self) -> Dict[str, Channel]:
class AsyncAPIObjStoreWatchSubscriber(AsyncAPISubscriber, ObjStoreWatchSubscriber):
"""ObjStoreWatch consumer with AsyncAPI methods."""

class AsyncAPIDefaultSubscriber(DefaultHandler, AsyncAPISubscriber):
"""One-message consumer with AsyncAPI methods."""

@override
def get_name(self) -> str:
return ""

@override
def get_schema(self) -> Dict[str, Channel]:
return {}

class AsyncAPIBatchSubscriber(BatchHandler, AsyncAPISubscriber):
"""Batch-message consumer with AsyncAPI methods."""

14 changes: 8 additions & 6 deletions faststream/nats/subscriber/subscription.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
from typing import Generic, Protocol, TypeVar
from typing import Any, Generic, Optional, Protocol, TypeVar


class Unsubscriptable(Protocol):
async def unsubscribe(self) -> None: ...


class Stopable(Protocol):
class Watchable(Protocol):
async def stop(self) -> None: ...

async def updates(self, timeout: float) -> Optional[Any]: ...

StopableT = TypeVar("StopableT", bound=Stopable)

WatchableT = TypeVar("WatchableT", bound=Watchable)

class UnsubscribeAdapter(Unsubscriptable, Generic[StopableT]):

class UnsubscribeAdapter(Unsubscriptable, Generic[WatchableT]):
__slots__ = ("obj",)

obj: StopableT
obj: WatchableT

def __init__(self, subscription: StopableT):
def __init__(self, subscription: WatchableT) -> None:
self.obj = subscription

async def unsubscribe(self) -> None:
Expand Down
Loading

0 comments on commit 536026b

Please sign in to comment.