Skip to content

Commit

Permalink
Finish updating ruff rules
Browse files Browse the repository at this point in the history
- update ruff config
- update files for updated config
- ignores doc rules in CI for now (see #2)
  • Loading branch information
mikeshardmind committed Nov 23, 2024
1 parent e55bfed commit 6450b7d
Show file tree
Hide file tree
Showing 15 changed files with 132 additions and 120 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: chartboost/ruff-action@v1
- uses: astral-sh/ruff-action@v1
with:
args: 'format --check'
- uses: chartboost/ruff-action@v1
- uses: astral-sh/ruff-action@v1
with:
args: 'check'
args: 'check --ignore D --ignore DOC'

check:
runs-on: ubuntu-latest
Expand Down
14 changes: 0 additions & 14 deletions async_utils/_cpython_stuff.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@


class _HashedSeq(list[Any]):
"""This class guarantees that hash() will be called no more than once
per element. This is important because the lru_cache() will hash
the key multiple times on a cache miss."""

__slots__ = ("hashvalue",)

def __init__(
Expand All @@ -45,16 +41,6 @@ def make_key(
type: type[type] = type, # noqa: A002
len: Callable[[Sized], int] = len, # noqa: A002
) -> Hashable:
"""Make a cache key from optionally typed positional and keyword arguments
The key is constructed in a way that is flat as possible rather than
as a nested structure that would take more memory.
If there is only a single argument and its data type is known to cache
its hash value, then that argument is returned without a wrapper. This
saves space and improves lookup speed."""
# All of code below relies on kwds preserving the order input by the user.
# Formerly, we sorted() the kwds before looping. The new way is *much*
# faster; however, it means that f(x=1, y=2) will now be treated as a
# distinct call from f(y=2, x=1) which will be cached separately.
key: tuple[Any, ...] = args
if kwds:
key += kwd_mark
Expand Down
30 changes: 16 additions & 14 deletions async_utils/bg_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,20 @@ def stop(self) -> None:
self._loop.call_soon_threadsafe(self._loop.stop)

def schedule(self, coro: _FutureLike[_T]) -> Future[_T]:
"""Schedule a coroutine to run on the wrapped event loop"""
"""Schedule a coroutine to run on the wrapped event loop."""
return asyncio.run_coroutine_threadsafe(coro, self._loop)

async def run(self, coro: _FutureLike[_T]) -> _T:
"""Schedule a coroutine to run on the background loop,
awaiting it finishing."""

"""Schedule and await a coroutine to run on the background loop."""
future = asyncio.run_coroutine_threadsafe(coro, self._loop)
return await asyncio.wrap_future(future)


def run_forever(
loop: asyncio.AbstractEventLoop, use_eager_task_factory: bool, /
loop: asyncio.AbstractEventLoop,
/,
*,
use_eager_task_factory: bool = True,
) -> None:
asyncio.set_event_loop(loop)
if use_eager_task_factory:
Expand All @@ -69,13 +70,11 @@ def run_forever(
for task in tasks:
try:
if (exc := task.exception()) is not None:
loop.call_exception_handler(
{
"message": "Unhandled exception in task during shutdown.",
"exception": exc,
"task": task,
}
)
loop.call_exception_handler({
"message": "Unhandled exception in task during shutdown.",
"exception": exc,
"task": task,
})
except (asyncio.InvalidStateError, asyncio.CancelledError):
pass

Expand All @@ -87,11 +86,14 @@ def run_forever(
def threaded_loop(
*, use_eager_task_factory: bool = True
) -> Generator[LoopWrapper, None, None]:
"""Starts an event loop on a background thread,
"""Create and use a managed event loop in a backround thread.
Starts an event loop on a background thread,
and yields an object with scheduling methods for interacting with
the loop.
loop is scheduled for shutdown, and thread is joined at contextmanager exit"""
loop is scheduled for shutdown, and thread is joined at contextmanager exit
"""
loop = asyncio.new_event_loop()
thread = None
try:
Expand Down
12 changes: 5 additions & 7 deletions async_utils/bg_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,19 @@


class BGTasks:
"""An intentionally dumber task group"""
"""An intentionally dumber task group."""

def __init__(self, exit_timeout: float | None) -> None:
self._tasks: set[asyncio.Task[Any]] = set()
self._exit_timeout: float | None = exit_timeout
self._etime: float | None = exit_timeout

def create_task(
self,
coro: _CoroutineLike[_T],
*,
name: str | None = None,
context: Context | None = None,
) -> Any:
) -> asyncio.Task[_T]:
t = asyncio.create_task(coro)
self._tasks.add(t)
t.add_done_callback(self._tasks.discard)
Expand All @@ -48,11 +48,9 @@ def create_task(
async def __aenter__(self: Self) -> Self:
return self

async def __aexit__(self, *_dont_care: Any):
async def __aexit__(self, *_dont_care: object):
while tsks := self._tasks.copy():
_done, _pending = await asyncio.wait(
tsks, timeout=self._exit_timeout
)
_done, _pending = await asyncio.wait(tsks, timeout=self._etime)
for task in _pending:
task.cancel()
await asyncio.sleep(0)
20 changes: 12 additions & 8 deletions async_utils/corofunc_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,16 @@ def corocache(
"""Decorator to cache coroutine functions.
This is less powerful than the version in task_cache.py but may work better
for some cases where typing of libraries this interacts with is too restrictive.
for some cases where typing of libraries this interacts with is too
restrictive.
Note: This uses the args and kwargs of the original coroutine function as a
cache key. This includes instances (self) when wrapping methods.
Consider not wrapping instance methods, but what those methods call when feasible
in cases where this may matter.
Consider not wrapping instance methods, but what those methods call when
feasible in cases where this may matter.
The ordering of args and kwargs matters."""
The ordering of args and kwargs matters.
"""

def wrapper(coro: CoroLike[P, R]) -> CoroFunc[P, R]:
internal_cache: dict[Hashable, asyncio.Future[R]] = {}
Expand Down Expand Up @@ -88,16 +90,18 @@ def lrucorocache(
"""Decorator to cache coroutine functions.
This is less powerful than the version in task_cache.py but may work better
for some cases where typing of libraries this interacts with is too restrictive.
for some cases where typing of libraries this interacts with is too
restrictive.
Note: This uses the args and kwargs of the original coroutine function as a
cache key. This includes instances (self) when wrapping methods.
Consider not wrapping instance methods, but what those methods call when feasible
in cases where this may matter.
Consider not wrapping instance methods, but what those methods call when
feasible in cases where this may matter.
The ordering of args and kwargs matters.
cached results are evicted by LRU and ttl."""
Cached results are evicted by LRU and ttl.
"""

def wrapper(coro: CoroLike[P, R]) -> CoroFunc[P, R]:
internal_cache: LRU[Hashable, asyncio.Future[R]] = LRU(maxsize)
Expand Down
10 changes: 5 additions & 5 deletions async_utils/gen_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@


class _PeekableQueue[T](asyncio.Queue[T]):
"""This is for internal use only, tested on both 3.12 and 3.13
This will be tested for 3.14 prior to 3.14's release."""
# This is for internal use only, tested on both 3.12 and 3.13
# This will be tested for 3.14 prior to 3.14's release.

_get_loop: Callable[[], asyncio.AbstractEventLoop] # pyright: ignore[reportUninitializedInstanceVariable]
_getters: deque[asyncio.Future[None]] # pyright: ignore[reportUninitializedInstanceVariable]
Expand Down Expand Up @@ -71,8 +71,7 @@ def sync_to_async_gen(
*args: P.args,
**kwargs: P.kwargs,
) -> AsyncGenerator[YieldType]:
"""Asynchronously iterate over a synchronous generator run in
background thread.
"""Asynchronously iterate over a synchronous generator.
The generator function and it's arguments must be threadsafe and will be
iterated lazily. Generators which perform cpu intensive work while holding
Expand All @@ -84,7 +83,8 @@ def sync_to_async_gen(
If your generator is actually a synchronous coroutine, that's super cool,
but rewrite is as a native coroutine or use it directly then, you don't need
what this function does."""
what this function does.
"""
# Provides backpressure, ensuring the underlying sync generator in a thread
# is lazy If the user doesn't want laziness, then using this method makes
# little sense, they could trivially exhaust the generator in a thread with
Expand Down
6 changes: 4 additions & 2 deletions async_utils/lockout.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@


class Lockout:
"""Lock out an async resource for an amount of time
"""Lock out an async resource for an amount of time.
Resources may be locked out multiple times.
Expand Down Expand Up @@ -79,7 +79,9 @@ async def __aexit__(self, *_dont_care: object) -> None:


class FIFOLockout:
"""A FIFO preserving version of Lockout. This has slightly more
"""A FIFO preserving version of Lockout.
This has slightly more
overhead than the base Lockout class, which is not guaranteed to
preserve FIFO, though happens to in the case of not being locked.
Expand Down
27 changes: 15 additions & 12 deletions async_utils/priority_sem.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import contextvars
import heapq
import threading
from collections.abc import Callable
from collections.abc import Callable, Generator
from contextlib import contextmanager
from typing import Any, NamedTuple

Expand Down Expand Up @@ -55,7 +55,8 @@ def __lt__(self, other: Any) -> bool:


@contextmanager
def priority_context(priority: int):
def priority_context(priority: int) -> Generator[None, None, None]:
"""Set the priority for all PrioritySemaphore use in this context."""
token = _priority.set(priority)
try:
yield None
Expand All @@ -67,7 +68,8 @@ def priority_context(priority: int):


class PrioritySemaphore:
"""
"""A Semaphore with priority-based aquisition ordering.
Provides a semaphore with similar semantics as asyncio.Semaphore,
but using an underlying priority. priority is shared within a context
manager's logical scope, but the context can be nested safely.
Expand All @@ -76,11 +78,11 @@ class PrioritySemaphore:
context manager use:
sem = PrioritySemaphore(1)
>>> sem = PrioritySemaphore(1)
>>> with priority_ctx(10):
async with sem:
...
with priority_ctx(10):
async with sem:
...
"""

_loop: asyncio.AbstractEventLoop | None = None
Expand All @@ -93,12 +95,14 @@ def _get_loop(self) -> asyncio.AbstractEventLoop:
if self._loop is None:
self._loop = loop
if loop is not self._loop:
raise RuntimeError(f"{self!r} is bound to a different event loop")
msg = f"{self!r} is bound to a different event loop"
raise RuntimeError(msg)
return loop

def __init__(self, value: int = 1):
if value < 0:
raise ValueError("Semaphore initial value must be >= 0")
msg = "Semaphore initial value must be >= 0"
raise ValueError(msg)
self._waiters: list[PriorityWaiter] | None = None
self._value: int = value

Expand All @@ -120,9 +124,8 @@ def locked(self) -> bool:
async def __aenter__(self):
prio = _priority.get()
await self.acquire(prio)
return

async def __aexit__(self, *dont_care: Any):
async def __aexit__(self, *dont_care: object):
self.release()

async def acquire(self, priority: int = _default) -> bool:
Expand Down Expand Up @@ -174,6 +177,6 @@ def _maybe_wake(self) -> None:
heapq.heappush(self._waiters, next_waiter)
break

def release(self):
def release(self) -> None:
self._value += 1
self._maybe_wake()
7 changes: 5 additions & 2 deletions async_utils/ratelimiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@


class RateLimiter:
"""This is an asyncio specific ratelimit implementation which does not
"""Asyncio-specific internal application ratelimiter.
This is an asyncio specific ratelimit implementation which does not
account for various networking effects / responses and
should only be used for internal limiting."""
should only be used for internal limiting.
"""

def __init__(self, rate_limit: int, period: float, granularity: float):
self.rate_limit: int = rate_limit
Expand Down
21 changes: 10 additions & 11 deletions async_utils/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async def __aenter__(self):

return self

async def __aexit__(self, *_dont_care: Any):
async def __aexit__(self, *_dont_care: object):
self.__closed = True

def __aiter__(self):
Expand Down Expand Up @@ -108,23 +108,22 @@ async def create_task(
await self.__tqueue.put(t)
return t.cancel_token

async def cancel_task(self, cancel_token: CancelationToken, /) -> bool:
"""Returns if the task with that CancelationToken. Cancelling an
already cancelled task is allowed and has no additional effect."""
async def cancel_task(self, cancel_token: CancelationToken, /) -> None:
"""Cancel a task.
Canceling an already canceled task is not an error
"""
async with self.__l:
try:
task = self.__tasks[cancel_token]
task.canceled = True
except KeyError:
pass
else:
return True
return False

def close(self):
"""Closes the scheduler without waiting"""
def close(self) -> None:
"""Closes the scheduler without waiting."""
self.__closed = True

async def join(self):
"""Waits for the scheduler's internal queue to be empty"""
async def join(self) -> None:
"""Waits for the scheduler's internal queue to be empty."""
await self.__tqueue.join()
Loading

0 comments on commit 6450b7d

Please sign in to comment.