Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use typing_extensions.TypedDict import #1575

Merged
merged 16 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
File renamed without changes.
1 change: 1 addition & 0 deletions .github/workflows/docs_update-references.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on:
types:
- opened
- synchronize
- ready_for_review
paths:
- faststream/**

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ on:
types:
- opened
- synchronize
- ready_for_review
branches:
- main
paths:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_dependency-review.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ on:
types:
- opened
- synchronize
- ready_for_review
branches:
- main
paths:
Expand Down
5 changes: 3 additions & 2 deletions .github/workflows/pr_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ on:
types:
- opened
- synchronize
- ready_for_review
# https://docs.github.com/en/repositories/configuring-branches-and-merges-in-your-repository/configuring-pull-request-merges/managing-a-merge-queue#triggering-merge-group-checks-with-github-actions
merge_group:
types:
Expand All @@ -29,8 +30,8 @@ jobs:
shell: bash
run: |
set -ux
python -m pip install --upgrade pip
pip install -e ".[lint]"
python -m pip install uv
uv pip install --system -e ".[lint]"

- name: Run ruff
shell: bash
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
runs-on: ubuntu-latest

steps:
- uses: actions/setup-python@v4
- uses: actions/setup-python@v5
with:
python-version: "3.9"

Expand Down
File renamed without changes.
14 changes: 0 additions & 14 deletions docs/docs/en/confluent/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,6 @@ This chapter discusses the security options available in **FastStream** and how
{! docs_src/confluent/security/plaintext.py !}
```

**Using any SASL authentication without SSL:**

The following example will log a **RuntimeWarning**:

```python linenums="1"
{! docs_src/confluent/security/ssl_warning.py [ln:8.16] !}
```

If the user does not want to use SSL encryption without the warning getting logged, they must explicitly set the `use_ssl` parameter to `False` when creating a SASL object.

```python linenums="1"
{! docs_src/confluent/security/ssl_warning.py [ln:12.5-12.72] !}
```

### 3. SASLScram256/512 Object with SSL/TLS

**Purpose:** The `SASLScram256` and `SASLScram512` objects are used for authentication using the Salted Challenge Response Authentication Mechanism (SCRAM).
Expand Down
14 changes: 0 additions & 14 deletions docs/docs/en/kafka/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,6 @@ This chapter discusses the security options available in **FastStream** and how
{! docs_src/kafka/security/plaintext.py [ln:1-10.25,11-] !}
```

**Using any SASL authentication without SSL:**

The following example will log a **RuntimeWarning**:

```python linenums="1"
{! docs_src/kafka/security/ssl_warning.py [ln:8.16] !}
```

If the user does not want to use SSL encryption without the warning getting logged, they must explicitly set the `use_ssl` parameter to `False` when creating a SASL object.

```python linenums="1"
{! docs_src/kafka/security/ssl_warning.py [ln:12.5-12.72] !}
```

### 3. SASLScram256/512 Object with SSL/TLS

**Purpose:** The `SASLScram256` and `SASLScram512` objects are used for authentication using the Salted Challenge Response Authentication Mechanism (SCRAM).
Expand Down
12 changes: 0 additions & 12 deletions docs/docs_src/confluent/security/ssl_warning.py

This file was deleted.

12 changes: 0 additions & 12 deletions docs/docs_src/kafka/security/ssl_warning.py

This file was deleted.

5 changes: 3 additions & 2 deletions faststream/broker/acknowledgement_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from typing import TYPE_CHECKING, Any, Optional, Type, Union
from typing import Counter as CounterType

from faststream._compat import is_test_env
from faststream.exceptions import (
AckMessage,
HandlerException,
Expand Down Expand Up @@ -163,6 +162,7 @@ async def __aexit__(
elif isinstance(exc_val, RejectMessage): # pragma: no branch
await self.__reject()

# Exception was processed and suppressed
return True

elif self.watcher.is_max(self.message.message_id):
Expand All @@ -171,7 +171,8 @@ async def __aexit__(
else:
await self.__nack()

return not is_test_env()
# Exception was not processed
return False

async def __ack(self) -> None:
try:
Expand Down
1 change: 1 addition & 0 deletions faststream/broker/middlewares/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,5 @@ async def after_processed(

await super().after_processed(exc_type, exc_val, exc_tb)

# Exception was not processed
return False
3 changes: 3 additions & 0 deletions faststream/broker/subscriber/proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ async def close(self) -> None: ...
@abstractmethod
async def consume(self, msg: MsgType) -> Any: ...

@abstractmethod
async def process_message(self, msg: MsgType) -> Any: ...

@abstractmethod
def add_call(
self,
Expand Down
36 changes: 22 additions & 14 deletions faststream/broker/subscriber/usecase.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from abc import abstractmethod
from contextlib import AsyncExitStack, asynccontextmanager
from contextlib import AsyncExitStack
from itertools import chain
from typing import (
TYPE_CHECKING,
Any,
AsyncIterator,
Callable,
ContextManager,
Dict,
Expand Down Expand Up @@ -296,6 +295,26 @@ async def consume(self, msg: MsgType) -> Any:
if not self.running:
return None

try:
return await self.process_message(msg)

except StopConsume:
# Stop handler at StopConsume exception
await self.close()

except SystemExit:
# Stop handler at `exit()` call
await self.close()

if app := context.get("app"):
app.exit()

except Exception: # nosec B110
# All other exceptions were logged by CriticalLogMiddleware
pass

async def process_message(self, msg: MsgType) -> Any:
"""Execute all message processing stages."""
async with AsyncExitStack() as stack:
stack.enter_context(self.lock)

Expand All @@ -304,8 +323,6 @@ async def consume(self, msg: MsgType) -> Any:
stack.enter_context(context.scope(k, v))

stack.enter_context(context.scope("handler_", self))
# Stop handler at StopConsume exception
await stack.enter_async_context(self._stop_scope())

# enter all middlewares
middlewares: List[BaseMiddleware] = []
Expand Down Expand Up @@ -385,21 +402,12 @@ def get_log_context(
"message_id": getattr(message, "message_id", ""),
}

@asynccontextmanager
async def _stop_scope(self) -> AsyncIterator[None]:
try:
yield
except StopConsume:
await self.close()
except SystemExit:
await self.close()
context.get("app").exit()

# AsyncAPI methods

@property
def call_name(self) -> str:
"""Returns the name of the handler call."""
# TODO: default call_name
return to_camelcase(self.calls[0].call_name)

def get_description(self) -> Optional[str]:
Expand Down
4 changes: 3 additions & 1 deletion faststream/confluent/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from enum import Enum
from typing import Any, Callable, TypedDict
from typing import Any, Callable

from typing_extensions import TypedDict


class BuiltinFeatures(Enum):
Expand Down
9 changes: 0 additions & 9 deletions faststream/confluent/security.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import ssl
import warnings
from typing import TYPE_CHECKING, Optional

from faststream.exceptions import SetupError
Expand All @@ -8,7 +7,6 @@
SASLPlaintext,
SASLScram256,
SASLScram512,
ssl_not_set_error_msg,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -42,13 +40,6 @@ def _parse_base_security(security: BaseSecurity) -> "AnyDict":


def _parse_sasl_plaintext(security: SASLPlaintext) -> "AnyDict":
if not security.use_ssl:
warnings.warn(
message=ssl_not_set_error_msg,
category=RuntimeWarning,
stacklevel=1,
)

return {
"security_protocol": "SASL_SSL" if security.use_ssl else "SASL_PLAINTEXT",
"sasl_mechanism": "PLAIN",
Expand Down
3 changes: 0 additions & 3 deletions faststream/kafka/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ class KafkaInitKwargs(TypedDict, total=False):
"""
),
]
send_backoff_ms: int
enable_idempotence: Annotated[
bool,
Doc(
Expand Down Expand Up @@ -405,7 +404,6 @@ def __init__(
"""
),
] = 0,
send_backoff_ms: int = 100,
enable_idempotence: Annotated[
bool,
Doc(
Expand Down Expand Up @@ -549,7 +547,6 @@ def __init__(
partitioner=partitioner,
max_request_size=max_request_size,
linger_ms=linger_ms,
send_backoff_ms=send_backoff_ms,
enable_idempotence=enable_idempotence,
transactional_id=transactional_id,
transaction_timeout_ms=transaction_timeout_ms,
Expand Down
2 changes: 0 additions & 2 deletions faststream/kafka/fastapi/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ def __init__(
"""
),
] = 0,
send_backoff_ms: int = 100,
enable_idempotence: Annotated[
bool,
Doc(
Expand Down Expand Up @@ -573,7 +572,6 @@ def __init__(
partitioner=partitioner,
max_request_size=max_request_size,
linger_ms=linger_ms,
send_backoff_ms=send_backoff_ms,
enable_idempotence=enable_idempotence,
transactional_id=transactional_id,
transaction_timeout_ms=transaction_timeout_ms,
Expand Down
9 changes: 0 additions & 9 deletions faststream/kafka/security.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import warnings
from typing import TYPE_CHECKING, Optional

from faststream.security import (
BaseSecurity,
SASLPlaintext,
SASLScram256,
SASLScram512,
ssl_not_set_error_msg,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -36,13 +34,6 @@ def _parse_base_security(security: BaseSecurity) -> "AnyDict":


def _parse_sasl_plaintext(security: SASLPlaintext) -> "AnyDict":
if security.ssl_context is None:
warnings.warn(
message=ssl_not_set_error_msg,
category=RuntimeWarning,
stacklevel=1,
)

return {
"security_protocol": "SASL_SSL" if security.use_ssl else "SASL_PLAINTEXT",
"ssl_context": security.ssl_context,
Expand Down
31 changes: 16 additions & 15 deletions faststream/redis/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,27 +156,28 @@ async def start( # type: ignore[override]
start_signal = anyio.Event()
self.task = asyncio.create_task(self._consume(*args, start_signal=start_signal))

await start_signal.wait()
with anyio.fail_after(3.0):
await start_signal.wait()

async def _consume(self, *args: Any, start_signal: anyio.Event) -> None:
connected = True

while self.running:
with suppress(Exception):
try:
await self._get_msgs(*args)
try:
await self._get_msgs(*args)

except Exception:
if connected:
connected = False
await anyio.sleep(5)
except Exception: # noqa: PERF203
if connected:
connected = False
await anyio.sleep(5)

else:
if not connected:
connected = True
else:
if not connected:
connected = True

finally:
if not start_signal.is_set():
finally:
if not start_signal.is_set():
with suppress(Exception):
start_signal.set()

@abstractmethod
Expand Down Expand Up @@ -397,13 +398,13 @@ async def _get_msgs(self, client: "Redis[bytes]") -> None:
raw_msg = await client.lpop(name=self.list_sub.name)

if raw_msg:
message = DefaultListMessage(
msg = DefaultListMessage(
type="list",
data=raw_msg,
channel=self.list_sub.name,
)

await self.consume(message) # type: ignore[arg-type]
await self.consume(msg) # type: ignore[arg-type]

else:
await anyio.sleep(self.list_sub.polling_interval)
Expand Down
Loading
Loading