Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update package versions #1702

Merged
merged 6 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion faststream/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ async def run(
async with anyio.create_task_group() as tg:
tg.start_soon(self._startup, log_level, run_extra_options)

while not self.should_exit:
# TODO: mv it to event trigger after nats-py fixing
while not self.should_exit: # noqa: ASYNC110
await anyio.sleep(sleep_time)

await self._shutdown(log_level)
Expand Down
18 changes: 13 additions & 5 deletions faststream/confluent/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
Union,
)

from anyio import move_on_after
import anyio
from typing_extensions import Annotated, Doc, override

from faststream.__about__ import SERVICE_NAME
Expand Down Expand Up @@ -527,11 +527,19 @@ async def publish_batch(

@override
async def ping(self, timeout: Optional[float]) -> bool:
with move_on_after(timeout) as cancel_scope:
if cancel_scope.cancel_called:
return False
sleep_time = (timeout or 10) / 10

with anyio.move_on_after(timeout) as cancel_scope:
if self._producer is None:
return False

return await self._producer._producer.ping(timeout=timeout)
while True:
if cancel_scope.cancel_called:
return False

if await self._producer._producer.ping(timeout=timeout):
return True

await anyio.sleep(sleep_time)

return False
18 changes: 13 additions & 5 deletions faststream/kafka/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
)

import aiokafka
import anyio
from aiokafka.partitioner import DefaultPartitioner
from aiokafka.producer.producer import _missing
from anyio import move_on_after
from typing_extensions import Annotated, Doc, override

from faststream.__about__ import SERVICE_NAME
Expand Down Expand Up @@ -807,11 +807,19 @@ async def publish_batch(

@override
async def ping(self, timeout: Optional[float]) -> bool:
with move_on_after(timeout) as cancel_scope:
if cancel_scope.cancel_called:
return False
sleep_time = (timeout or 10) / 10

with anyio.move_on_after(timeout) as cancel_scope:
if self._producer is None:
return False

return not self._producer._producer._closed
while True:
if cancel_scope.cancel_called:
return False

if not self._producer._producer._closed:
return True

await anyio.sleep(sleep_time)

return False
18 changes: 13 additions & 5 deletions faststream/nats/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
Union,
)

import anyio
import nats
from anyio import move_on_after
from nats.aio.client import (
DEFAULT_CONNECT_TIMEOUT,
DEFAULT_DRAIN_TIMEOUT,
Expand Down Expand Up @@ -918,11 +918,19 @@ async def new_inbox(self) -> str:

@override
async def ping(self, timeout: Optional[float]) -> bool:
with move_on_after(timeout) as cancel_scope:
if cancel_scope.cancel_called:
return False
sleep_time = (timeout or 10) / 10

with anyio.move_on_after(timeout) as cancel_scope:
if self._connection is None:
return False

return self._connection.is_connected
while True:
if cancel_scope.cancel_called:
return False

if self._connection.is_connected:
return True

await anyio.sleep(sleep_time)

return False
18 changes: 13 additions & 5 deletions faststream/rabbit/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
)
from urllib.parse import urlparse

import anyio
from aio_pika import connect_robust
from anyio import move_on_after
from typing_extensions import Annotated, Doc, override

from faststream.__about__ import SERVICE_NAME
Expand Down Expand Up @@ -693,11 +693,19 @@ async def declare_exchange(

@override
async def ping(self, timeout: Optional[float]) -> bool:
with move_on_after(timeout) as cancel_scope:
if cancel_scope.cancel_called:
return False
sleep_time = (timeout or 10) / 10

with anyio.move_on_after(timeout) as cancel_scope:
if self._connection is None:
return False

return not self._connection.is_closed
while True:
if cancel_scope.cancel_called:
return False

if not self._connection.is_closed:
return True

await anyio.sleep(sleep_time)

return False
20 changes: 14 additions & 6 deletions faststream/redis/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,15 +481,23 @@ async def publish_batch(

@override
async def ping(self, timeout: Optional[float]) -> bool:
with move_on_after(timeout) as cancel_scope:
if cancel_scope.cancel_called:
return False
sleep_time = (timeout or 10) / 10

with move_on_after(timeout) as cancel_scope:
if self._connection is None:
return False

while True:
if cancel_scope.cancel_called:
return False

try:
return await self._connection.ping()
except ConnectionError: # noqa: PERF203
await anyio.sleep(0.1)
if await self._connection.ping():
return True

except ConnectionError:
pass

await anyio.sleep(sleep_time)

return False
32 changes: 17 additions & 15 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,20 @@ otel = ["opentelemetry-sdk>=1.24.0,<2.0.0"]
optionals = ["faststream[rabbit,kafka,confluent,nats,redis,otel]"]

devdocs = [
"mkdocs-material==9.5.31",
"mkdocs-material==9.5.32",
"mkdocs-static-i18n==1.2.3",
"mdx-include==1.4.2",
"mkdocstrings[python]==0.25.2",
"mkdocs-literate-nav==0.6.1",
"mkdocs-git-revision-date-localized-plugin==1.2.6",
"mike==2.1.2", # versioning
"mike==2.1.3", # versioning
"mkdocs-minify-plugin==0.8.0",
"mkdocs-macros-plugin==1.0.5", # includes with variables
"mkdocs-glightbox==0.4.0", # img zoom
"pillow", # required for mkdocs-glightbo
"cairosvg", # required for mkdocs-glightbo
"requests", # using in CI, do not pin it
"griffe-typingdoc==0.2.5", # Annotated[..., Doc("")] support
"griffe-typingdoc==0.2.6", # Annotated[..., Doc("")] support
]

types = [
Expand All @@ -111,7 +111,7 @@ types = [

lint = [
"faststream[types]",
"ruff==0.5.7",
"ruff==0.6.1",
"bandit==1.7.9",
"semgrep==1.84.1",
"codespell==2.3.0",
Expand All @@ -121,7 +121,7 @@ test-core = [
"coverage[toml]==7.6.1",
"pytest==8.3.2",
"pytest-asyncio==0.23.8",
"dirty-equals==0.7.1.post0",
"dirty-equals==0.8.0",
"typing-extensions>=4.8.0,<4.12.1; python_version < '3.9'", # to fix dirty-equals
]

Expand Down Expand Up @@ -217,23 +217,25 @@ select = [
]

ignore = [
"E501", # line too long, handled by formatter later
"C901", # too complex
"ASYNC109", # own timeout implementation

"E501", # line too long, handled by formatter later
"C901", # too complex

# todo pep8-naming
"N817", # CamelCase `*` imported as acronym `*`
"N815", # Variable `*` in class scope should not be mixedCase
"N803", # Argument name `expandMessageExamples` should be lowercase
"N817", # CamelCase `*` imported as acronym `*`
"N815", # Variable `*` in class scope should not be mixedCase
"N803", # Argument name `expandMessageExamples` should be lowercase

# todo pydocstyle
"D100", # missing docstring in public module
"D100", # missing docstring in public module
"D101",
"D102",
"D103",
"D104", # missing docstring in public package
"D105", # missing docstring in magic methods
"D106", # missing docstring in public nested class
"D107", # missing docstring in __init__
"D104", # missing docstring in public package
"D105", # missing docstring in magic methods
"D106", # missing docstring in public nested class
"D107", # missing docstring in __init__
]

[tool.ruff.lint.per-file-ignores]
Expand Down
6 changes: 3 additions & 3 deletions tests/a_docs/confluent/ack/test_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
from tests.tools import spy_decorator


@pytest.mark.asyncio()
@pytest.mark.confluent()
@pytest.mark.slow()
@pytest.mark.asyncio
@pytest.mark.confluent
@pytest.mark.slow
async def test_ack_exc():
from docs.docs_src.confluent.ack.errors import app, broker, handle

Expand Down
2 changes: 1 addition & 1 deletion tests/a_docs/confluent/additional_config/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from faststream.confluent import TestKafkaBroker


@pytest.mark.asyncio()
@pytest.mark.asyncio
async def test_base_app():
async with TestKafkaBroker(broker):
await broker.publish(HelloWorld(msg="First Hello"), "hello_world")
Expand Down
2 changes: 1 addition & 1 deletion tests/a_docs/confluent/basic/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from faststream.confluent import TestKafkaBroker


@pytest.mark.asyncio()
@pytest.mark.asyncio
async def test_basic():
from docs.docs_src.confluent.basic.basic import broker, on_input_data

Expand Down
4 changes: 2 additions & 2 deletions tests/a_docs/confluent/basic/test_cmd_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
from faststream.cli.main import cli


@pytest.fixture()
@pytest.fixture
def confluent_basic_project():
return "docs.docs_src.confluent.basic.basic:app"


@pytest.mark.confluent()
@pytest.mark.confluent
def test_run_cmd(
runner: CliRunner,
mock: Mock,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from faststream.confluent import TestKafkaBroker


@pytest.mark.asyncio()
@pytest.mark.asyncio
async def test_me():
async with TestKafkaBroker(broker):
await broker.publish_batch(
Expand Down
2 changes: 1 addition & 1 deletion tests/a_docs/confluent/consumes_basics/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from faststream.confluent import TestKafkaBroker


@pytest.mark.asyncio()
@pytest.mark.asyncio
async def test_base_app():
async with TestKafkaBroker(broker):
await broker.publish(HelloWorld(msg="First Hello"), "hello_world")
Expand Down
4 changes: 2 additions & 2 deletions tests/a_docs/confluent/publish_batch/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from faststream.confluent import TestKafkaBroker


@pytest.mark.asyncio()
@pytest.mark.asyncio
async def test_batch_publish_decorator():
async with TestKafkaBroker(broker):
await broker.publish(Data(data=2.0), "input_data_1")
Expand All @@ -21,7 +21,7 @@ async def test_batch_publish_decorator():
)


@pytest.mark.asyncio()
@pytest.mark.asyncio
async def test_batch_publish_call():
async with TestKafkaBroker(broker):
await broker.publish(Data(data=2.0), "input_data_2")
Expand Down
2 changes: 1 addition & 1 deletion tests/a_docs/confluent/publish_batch/test_issues.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async def handle(msg: str) -> List[int]:
app = FastStream(broker)


@pytest.mark.asyncio()
@pytest.mark.asyncio
async def test_base_app():
async with TestKafkaBroker(broker):
await broker.publish("", "test")
2 changes: 1 addition & 1 deletion tests/a_docs/confluent/publish_example/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from faststream.confluent import TestKafkaBroker


@pytest.mark.asyncio()
@pytest.mark.asyncio
async def test_base_app():
async with TestKafkaBroker(broker):
await broker.publish(Data(data=0.2), "input_data")
Expand Down
4 changes: 2 additions & 2 deletions tests/a_docs/confluent/publish_with_partition_key/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from faststream.confluent import TestKafkaBroker


@pytest.mark.asyncio()
@pytest.mark.asyncio
async def test_app():
async with TestKafkaBroker(broker):
await broker.publish(Data(data=0.2), "input_data", key=b"my_key")
Expand All @@ -19,7 +19,7 @@ async def test_app():


@pytest.mark.skip("we are not checking the key")
@pytest.mark.asyncio()
@pytest.mark.asyncio
async def test_keys():
async with TestKafkaBroker(broker):
# we should be able to publish a message with the key
Expand Down
Loading
Loading