Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: 迁移至结构化并发框架 AnyIO #3053

Merged
merged 5 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,029 changes: 1,110 additions & 919 deletions envs/pydantic-v1/poetry.lock

Large diffs are not rendered by default.

2,132 changes: 1,162 additions & 970 deletions envs/pydantic-v2/poetry.lock

Large diffs are not rendered by default.

1,394 changes: 832 additions & 562 deletions envs/test/poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions envs/test/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ packages = [{ include = "nonebot-test.py" }]

[tool.poetry.dependencies]
python = "^3.9"
nonebug = "^0.3.7"
trio = "^0.27.0"
nonebug = "^0.4.1"
wsproto = "^1.2.0"
pytest-cov = "^5.0.0"
pytest-xdist = "^3.0.2"
pytest-asyncio = "^0.23.2"
werkzeug = ">=2.3.6,<4.0.0"
coverage-conditional-plugin = "^0.9.0"

Expand Down
49 changes: 35 additions & 14 deletions nonebot/dependencies/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,20 @@
"""

import abc
import asyncio
import inspect
from functools import partial
from dataclasses import field, dataclass
from collections.abc import Iterable, Awaitable
from typing import Any, Generic, TypeVar, Callable, Optional, cast

import anyio
from exceptiongroup import BaseExceptionGroup, catch

from nonebot.log import logger
from nonebot.typing import _DependentCallable
from nonebot.exception import SkippedException
from nonebot.utils import run_sync, is_coroutine_callable
from nonebot.compat import FieldInfo, ModelField, PydanticUndefined
from nonebot.utils import run_sync, is_coroutine_callable, flatten_exception_group

from .utils import check_field_type, get_typed_signature

Expand Down Expand Up @@ -82,7 +85,16 @@ def __repr__(self) -> str:
)

async def __call__(self, **kwargs: Any) -> R:
try:
exception: Optional[BaseExceptionGroup[SkippedException]] = None

def _handle_skipped(exc_group: BaseExceptionGroup[SkippedException]):
nonlocal exception
exception = exc_group
# raise one of the exceptions instead
excs = list(flatten_exception_group(exc_group))
logger.trace(f"{self} skipped due to {excs}")

with catch({SkippedException: _handle_skipped}):
# do pre-check
await self.check(**kwargs)

Expand All @@ -94,9 +106,8 @@ async def __call__(self, **kwargs: Any) -> R:
return await cast(Callable[..., Awaitable[R]], self.call)(**values)
else:
return await run_sync(cast(Callable[..., R], self.call))(**values)
except SkippedException as e:
logger.trace(f"{self} skipped due to {e}")
raise

raise exception

@staticmethod
def parse_params(
Expand Down Expand Up @@ -164,10 +175,13 @@ def parse(
return cls(call, params, parameterless_params)

async def check(self, **params: Any) -> None:
await asyncio.gather(*(param._check(**params) for param in self.parameterless))
await asyncio.gather(
*(cast(Param, param.field_info)._check(**params) for param in self.params)
)
async with anyio.create_task_group() as tg:
for param in self.parameterless:
tg.start_soon(partial(param._check, **params))

async with anyio.create_task_group() as tg:
for param in self.params:
tg.start_soon(partial(cast(Param, param.field_info)._check, **params))

async def _solve_field(self, field: ModelField, params: dict[str, Any]) -> Any:
param = cast(Param, field.field_info)
Expand All @@ -183,10 +197,17 @@ async def solve(self, **params: Any) -> dict[str, Any]:
await param._solve(**params)

# solve param values
values = await asyncio.gather(
*(self._solve_field(field, params) for field in self.params)
)
return {field.name: value for field, value in zip(self.params, values)}
result: dict[str, Any] = {}

async def _solve_field(field: ModelField, params: dict[str, Any]) -> None:
value = await self._solve_field(field, params)
result[field.name] = value

async with anyio.create_task_group() as tg:
for field in self.params:
tg.start_soon(_solve_field, field, params)

return result


__autodoc__ = {"CustomConfig": False}
143 changes: 80 additions & 63 deletions nonebot/drivers/none.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@
"""

import signal
import asyncio
import threading
from typing import Optional
from typing_extensions import override

import anyio
from anyio.abc import TaskGroup
from exceptiongroup import BaseExceptionGroup, catch

from nonebot.log import logger
from nonebot.consts import WINDOWS
from nonebot.config import Env, Config
from nonebot.drivers import Driver as BaseDriver
from nonebot.utils import flatten_exception_group

HANDLED_SIGNALS = (
signal.SIGINT, # Unix signal 2. Sent by Ctrl+C.
Expand All @@ -33,8 +37,8 @@
def __init__(self, env: Env, config: Config):
super().__init__(env, config)

self.should_exit: asyncio.Event = asyncio.Event()
self.force_exit: bool = False
self.should_exit: anyio.Event = anyio.Event()
self.force_exit: anyio.Event = anyio.Event()

@property
@override
Expand All @@ -52,84 +56,97 @@
def run(self, *args, **kwargs):
"""启动 none driver"""
super().run(*args, **kwargs)
loop = asyncio.get_event_loop()
loop.run_until_complete(self._serve())
anyio.run(self._serve)

async def _serve(self):
self._install_signal_handlers()
await self._startup()
if self.should_exit.is_set():
return
await self._main_loop()
await self._shutdown()
async with anyio.create_task_group() as driver_tg:
driver_tg.start_soon(self._handle_signals)
driver_tg.start_soon(self._listen_force_exit, driver_tg)
driver_tg.start_soon(self._handle_lifespan, driver_tg)

Check warning on line 65 in nonebot/drivers/none.py

View check run for this annotation

Codecov / codecov/patch

nonebot/drivers/none.py#L64-L65

Added lines #L64 - L65 were not covered by tests

async def _startup(self):
async def _handle_signals(self):

Check warning on line 67 in nonebot/drivers/none.py

View check run for this annotation

Codecov / codecov/patch

nonebot/drivers/none.py#L67

Added line #L67 was not covered by tests
try:
await self._lifespan.startup()
except Exception as e:
logger.opt(colors=True, exception=e).error(
with anyio.open_signal_receiver(*HANDLED_SIGNALS) as signal_receiver:
async for sig in signal_receiver:
self.exit(force=self.should_exit.is_set())
except NotImplementedError:
# Windows
for sig in HANDLED_SIGNALS:

Check warning on line 74 in nonebot/drivers/none.py

View check run for this annotation

Codecov / codecov/patch

nonebot/drivers/none.py#L70-L74

Added lines #L70 - L74 were not covered by tests
signal.signal(sig, self._handle_legacy_signal)

# backport for Windows signal handling

Check warning on line 77 in nonebot/drivers/none.py

View check run for this annotation

Codecov / codecov/patch

nonebot/drivers/none.py#L76-L77

Added lines #L76 - L77 were not covered by tests
def _handle_legacy_signal(self, sig, frame):
self.exit(force=self.should_exit.is_set())

async def _handle_lifespan(self, tg: TaskGroup):

Check warning on line 81 in nonebot/drivers/none.py

View check run for this annotation

Codecov / codecov/patch

nonebot/drivers/none.py#L81

Added line #L81 was not covered by tests
try:
await self._startup()

if self.should_exit.is_set():

Check warning on line 85 in nonebot/drivers/none.py

View check run for this annotation

Codecov / codecov/patch

nonebot/drivers/none.py#L84-L85

Added lines #L84 - L85 were not covered by tests
return

await self._listen_exit()

Check warning on line 88 in nonebot/drivers/none.py

View check run for this annotation

Codecov / codecov/patch

nonebot/drivers/none.py#L87-L88

Added lines #L87 - L88 were not covered by tests

await self._shutdown()

Check warning on line 90 in nonebot/drivers/none.py

View check run for this annotation

Codecov / codecov/patch

nonebot/drivers/none.py#L90

Added line #L90 was not covered by tests
finally:
tg.cancel_scope.cancel()

Check warning on line 92 in nonebot/drivers/none.py

View check run for this annotation

Codecov / codecov/patch

nonebot/drivers/none.py#L92

Added line #L92 was not covered by tests

async def _startup(self):

Check warning on line 94 in nonebot/drivers/none.py

View check run for this annotation

Codecov / codecov/patch

nonebot/drivers/none.py#L94

Added line #L94 was not covered by tests
def handle_exception(exc_group: BaseExceptionGroup[Exception]) -> None:
self.should_exit.set()

for exc in flatten_exception_group(exc_group):

Check warning on line 98 in nonebot/drivers/none.py

View check run for this annotation

Codecov / codecov/patch

nonebot/drivers/none.py#L97-L98

Added lines #L97 - L98 were not covered by tests
logger.opt(colors=True, exception=exc).error(
"<r><bg #f8bbd0>Error occurred while running startup hook."
"</bg #f8bbd0></r>"

Check warning on line 101 in nonebot/drivers/none.py

View check run for this annotation

Codecov / codecov/patch

nonebot/drivers/none.py#L100-L101

Added lines #L100 - L101 were not covered by tests
)
logger.error(
"<r><bg #f8bbd0>Application startup failed. "
"Exiting.</bg #f8bbd0></r>"
)
self.should_exit.set()
return

logger.info("Application startup completed.")
with catch({Exception: handle_exception}):
await self._lifespan.startup()

if not self.should_exit.is_set():

Check warning on line 111 in nonebot/drivers/none.py

View check run for this annotation

Codecov / codecov/patch

nonebot/drivers/none.py#L110-L111

Added lines #L110 - L111 were not covered by tests
logger.info("Application startup completed.")

async def _main_loop(self):
async def _listen_exit(self, tg: Optional[TaskGroup] = None):

Check warning on line 114 in nonebot/drivers/none.py

View check run for this annotation

Codecov / codecov/patch

nonebot/drivers/none.py#L114

Added line #L114 was not covered by tests
await self.should_exit.wait()

if tg is not None:

Check warning on line 117 in nonebot/drivers/none.py

View check run for this annotation

Codecov / codecov/patch

nonebot/drivers/none.py#L117

Added line #L117 was not covered by tests
tg.cancel_scope.cancel()

Check warning on line 119 in nonebot/drivers/none.py

View check run for this annotation

Codecov / codecov/patch

nonebot/drivers/none.py#L119

Added line #L119 was not covered by tests
async def _shutdown(self):
logger.info("Shutting down")
logger.info("Waiting for application shutdown. (CTRL+C to force quit)")

logger.info("Waiting for application shutdown.")

try:
await self._lifespan.shutdown()
except Exception as e:
logger.opt(colors=True, exception=e).error(
"<r><bg #f8bbd0>Error when running shutdown function. "
"Ignored!</bg #f8bbd0></r>"
)

for task in asyncio.all_tasks():
if task is not asyncio.current_task() and not task.done():
task.cancel()
await asyncio.sleep(0.1)

tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
if tasks and not self.force_exit:
logger.info("Waiting for tasks to finish. (CTRL+C to force quit)")
while tasks and not self.force_exit:
await asyncio.sleep(0.1)
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]

for task in tasks:
task.cancel()
error_occurred: bool = False

Check warning on line 124 in nonebot/drivers/none.py

View check run for this annotation

Codecov / codecov/patch

nonebot/drivers/none.py#L124

Added line #L124 was not covered by tests

await asyncio.gather(*tasks, return_exceptions=True)
def handle_exception(exc_group: BaseExceptionGroup[Exception]) -> None:

Check warning on line 126 in nonebot/drivers/none.py

View check run for this annotation

Codecov / codecov/patch

nonebot/drivers/none.py#L126

Added line #L126 was not covered by tests
nonlocal error_occurred

logger.info("Application shutdown complete.")
loop = asyncio.get_event_loop()
loop.stop()
error_occurred = True

def _install_signal_handlers(self) -> None:
if threading.current_thread() is not threading.main_thread():
# Signals can only be listened to from the main thread.
return
for exc in flatten_exception_group(exc_group):

Check warning on line 131 in nonebot/drivers/none.py

View check run for this annotation

Codecov / codecov/patch

nonebot/drivers/none.py#L131

Added line #L131 was not covered by tests
logger.opt(colors=True, exception=exc).error(
"<r><bg #f8bbd0>Error occurred while running shutdown hook."
"</bg #f8bbd0></r>"

Check warning on line 134 in nonebot/drivers/none.py

View check run for this annotation

Codecov / codecov/patch

nonebot/drivers/none.py#L133-L134

Added lines #L133 - L134 were not covered by tests
)
logger.error(
"<r><bg #f8bbd0>Application shutdown failed. "
"Exiting.</bg #f8bbd0></r>"

Check warning on line 138 in nonebot/drivers/none.py

View check run for this annotation

Codecov / codecov/patch

nonebot/drivers/none.py#L138

Added line #L138 was not covered by tests
)

loop = asyncio.get_event_loop()
with catch({Exception: handle_exception}):
await self._lifespan.shutdown()

try:
for sig in HANDLED_SIGNALS:
loop.add_signal_handler(sig, self._handle_exit, sig, None)
except NotImplementedError:
# Windows
for sig in HANDLED_SIGNALS:
signal.signal(sig, self._handle_exit)
if not error_occurred:

Check warning on line 144 in nonebot/drivers/none.py

View check run for this annotation

Codecov / codecov/patch

nonebot/drivers/none.py#L144

Added line #L144 was not covered by tests
logger.info("Application shutdown complete.")

def _handle_exit(self, sig, frame):
self.exit(force=self.should_exit.is_set())
async def _listen_force_exit(self, tg: TaskGroup):

Check warning on line 147 in nonebot/drivers/none.py

View check run for this annotation

Codecov / codecov/patch

nonebot/drivers/none.py#L147

Added line #L147 was not covered by tests
await self.force_exit.wait()
tg.cancel_scope.cancel()

def exit(self, force: bool = False):
"""退出 none driver
Expand All @@ -140,4 +157,4 @@
if not self.should_exit.is_set():
self.should_exit.set()
if force:
self.force_exit = True
self.force_exit.set()

Check warning on line 160 in nonebot/drivers/none.py

View check run for this annotation

Codecov / codecov/patch

nonebot/drivers/none.py#L160

Added line #L160 was not covered by tests
Loading