Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix signature inspection on debounced/throttled, update typing and wrapped #228

Merged
merged 5 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ exclude = [
"src/psygnal/containers",
"src/psygnal/qt.py",
"src/psygnal/_pyinstaller_util",
"src/psygnal/_throttler.py",
]

[tool.cibuildwheel]
Expand Down
74 changes: 49 additions & 25 deletions src/psygnal/_throttler.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,53 @@
from __future__ import annotations

from threading import Timer
from typing import Any, Callable
from typing import TYPE_CHECKING, Any, Callable, Generic, TypeVar

from typing_extensions import Literal
if TYPE_CHECKING:
import inspect

Kind = Literal["throttler", "debouncer"]
EmissionPolicy = Literal["trailing", "leading"]
from typing_extensions import Literal, ParamSpec

Kind = Literal["throttler", "debouncer"]
EmissionPolicy = Literal["trailing", "leading"]

class _ThrottlerBase:
P = ParamSpec("P")
else:
# just so that we don't have to depend on a new version of typing_extensions
# at runtime
P = TypeVar("P")


class _ThrottlerBase(Generic[P]):
_timer: Timer

def __init__(
self,
func: Callable[..., Any],
func: Callable[P, Any],
interval: int = 100,
policy: EmissionPolicy = "leading",
) -> None:
self._func = func
self._interval: int = interval
self.__wrapped__: Callable[P, Any] = func
self._interval: float = interval / 1000
self._policy: EmissionPolicy = policy
self._has_pending: bool = False
self._timer: Timer = Timer(0, lambda: None)
self._timer.start()
self._args: tuple[Any, ...] = ()
self._kwargs: dict[str, Any] = {}

# this mimics what functools.wraps does, but avoids __dict__ usage and other
# things that won't work with mypyc... HOWEVER, most of these dynamic
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By won't work, I guess you mean that mypyc will just ignore these? But won't error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct. __doc__ will be None on mypyc... (i.e. it won't update the doc on the instance itself)

# assignments won't work in mypyc anyway (they just do nothing.)
self.__module__: str = getattr(func, "__module__", "")
self.__name__: str = getattr(func, "__name__", "")
self.__qualname__: str = getattr(func, "__qualname__", "")
self.__doc__: str | None = getattr(func, "__doc__", None)
self.__annotations__: dict[str, Any] = getattr(func, "__annotations__", {})

def _actually_call(self) -> None:
self._has_pending = False
self._func(*self._args, **self._kwargs)
self.__wrapped__(*self._args, **self._kwargs)
self._start_timer()

def _call_if_has_pending(self) -> None:
Expand All @@ -38,7 +56,7 @@ def _call_if_has_pending(self) -> None:

def _start_timer(self) -> None:
self._timer.cancel()
self._timer = Timer(self._interval / 1000, self._call_if_has_pending)
self._timer = Timer(self._interval, self._call_if_has_pending)
self._timer.start()

def cancel(self) -> None:
Expand All @@ -50,16 +68,22 @@ def flush(self) -> None:
"""Force a call if there is one pending."""
self._call_if_has_pending()

def __call__(self, *args: Any, **kwargs: Any) -> None:
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> None:
raise NotImplementedError("Subclasses must implement this method.")

@property
def __signature__(self) -> inspect.Signature:
import inspect

return inspect.signature(self.__wrapped__)


class Throttler(_ThrottlerBase):
class Throttler(_ThrottlerBase, Generic[P]):
"""Class that prevents calling `func` more than once per `interval`.

Parameters
----------
func : Callable[..., Any]
func : Callable[P, Any]
a function to wrap
interval : int, optional
the minimum interval in ms that must pass before the function is called again,
Expand All @@ -73,13 +97,13 @@ class Throttler(_ThrottlerBase):

def __init__(
self,
func: Callable[..., Any],
func: Callable[P, Any],
interval: int = 100,
policy: EmissionPolicy = "leading",
) -> None:
super().__init__(func, interval, policy)

def __call__(self, *args: Any, **kwargs: Any) -> None:
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> None:
"""Call underlying function."""
self._has_pending = True
self._args = args
Expand All @@ -92,12 +116,12 @@ def __call__(self, *args: Any, **kwargs: Any) -> None:
self._start_timer()


class Debouncer(_ThrottlerBase):
class Debouncer(_ThrottlerBase, Generic[P]):
"""Class that waits at least `interval` before calling `func`.

Parameters
----------
func : Callable[..., Any]
func : Callable[P, Any]
a function to wrap
interval : int, optional
the minimum interval in ms that must pass before the function is called again,
Expand All @@ -111,13 +135,13 @@ class Debouncer(_ThrottlerBase):

def __init__(
self,
func: Callable[..., Any],
func: Callable[P, Any],
interval: int = 100,
policy: EmissionPolicy = "trailing",
) -> None:
super().__init__(func, interval, policy)

def __call__(self, *args: Any, **kwargs: Any) -> None:
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> None:
"""Call underlying function."""
self._has_pending = True
self._args = args
Expand All @@ -129,10 +153,10 @@ def __call__(self, *args: Any, **kwargs: Any) -> None:


def throttled(
func: Callable[..., Any] | None = None,
func: Callable[P, Any] | None = None,
timeout: int = 100,
leading: bool = True,
) -> Callable[..., None] | Callable[[Callable[..., Any]], Callable[..., None]]:
) -> Throttler[P] | Callable[[Callable[P, Any]], Throttler[P]]:
"""Create a throttled function that invokes func at most once per timeout.

The throttled function comes with a `cancel` method to cancel delayed func
Expand Down Expand Up @@ -174,18 +198,18 @@ def on_change(val: int)
```
"""

def deco(func: Callable[..., Any]) -> Callable[..., None]:
def deco(func: Callable[P, Any]) -> Throttler[P]:
policy: EmissionPolicy = "leading" if leading else "trailing"
return Throttler(func, timeout, policy)

return deco(func) if func is not None else deco


def debounced(
func: Callable[..., Any] | None = None,
func: Callable[P, Any] | None = None,
timeout: int = 100,
leading: bool = False,
) -> Callable[..., None] | Callable[[Callable[..., Any]], Callable[..., None]]:
) -> Debouncer[P] | Callable[[Callable[P, Any]], Debouncer[P]]:
"""Create a debounced function that delays invoking `func`.

`func` will not be invoked until `timeout` ms have elapsed since the last time
Expand Down Expand Up @@ -230,7 +254,7 @@ def on_change(val: int)
```
"""

def deco(func: Callable[..., Any]) -> Callable[..., None]:
def deco(func: Callable[P, Any]) -> Debouncer[P]:
policy: EmissionPolicy = "leading" if leading else "trailing"
return Debouncer(func, timeout, policy)

Expand Down
32 changes: 31 additions & 1 deletion tests/test_throttler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import time
from inspect import Parameter, signature
from typing import Callable
from unittest.mock import Mock

from psygnal import debounced, throttled
import pytest

from psygnal import SignalInstance, _compiled, debounced, throttled


def test_debounced() -> None:
Expand Down Expand Up @@ -79,3 +83,29 @@ def test_flush() -> None:
f1.flush()
time.sleep(0.2)
mock1.assert_called_once()


@pytest.mark.parametrize("deco", [debounced, throttled])
def test_throttled_debounced_signature(deco: Callable) -> None:
mock = Mock()

@deco(timeout=0, leading=True)
def f1(x: int) -> None:
"""Doc."""
mock(x)

# make sure we can still inspect the signature
assert signature(f1).parameters["x"] == Parameter(
"x", Parameter.POSITIONAL_OR_KEYWORD, annotation=int
)

# make sure these are connectable
sig = SignalInstance((int, int, int))
sig.connect(f1)
sig.emit(1, 2, 3)
mock.assert_called_once_with(1)

if not _compiled:
# unfortunately, dynamic assignment of __doc__ and stuff isn't possible in mypyc
assert f1.__doc__ == "Doc."
assert f1.__name__ == "f1"
Loading