From 4794f3940bc1776b9a9b037b1dac22a747ff0efe Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Tue, 26 Dec 2023 09:56:41 -0600 Subject: [PATCH 01/12] debug event loop --- jupyter_core/utils/__init__.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/jupyter_core/utils/__init__.py b/jupyter_core/utils/__init__.py index a82cac1..cf07775 100644 --- a/jupyter_core/utils/__init__.py +++ b/jupyter_core/utils/__init__.py @@ -158,7 +158,7 @@ def wrapped(*args: Any, **kwargs: Any) -> Any: except RuntimeError: pass - loop = get_event_loop() + loop = get_event_loop(allow_stopped=True) return loop.run_until_complete(inner) wrapped.__doc__ = coro.__doc__ @@ -186,7 +186,7 @@ async def ensure_async(obj: Awaitable[T] | T) -> T: return cast(T, obj) -def get_event_loop() -> asyncio.AbstractEventLoop: +def get_event_loop(allow_stopped=False) -> asyncio.AbstractEventLoop: # Get the loop for this thread. # In Python 3.12, a deprecation warning is raised, which # may later turn into a RuntimeError. We handle both @@ -194,8 +194,10 @@ def get_event_loop() -> asyncio.AbstractEventLoop: with warnings.catch_warnings(): warnings.simplefilter("ignore", DeprecationWarning) try: - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() except RuntimeError: + if not allow_stopped: + raise if sys.platform == "win32": loop = asyncio.WindowsSelectorEventLoopPolicy().new_event_loop() else: From 123c02fb0a5c61d13d3af4e72658d4129116a276 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Tue, 26 Dec 2023 10:42:11 -0600 Subject: [PATCH 02/12] debug --- jupyter_core/utils/__init__.py | 29 +++++++++++------------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/jupyter_core/utils/__init__.py b/jupyter_core/utils/__init__.py index cf07775..3e7b4b5 100644 --- a/jupyter_core/utils/__init__.py +++ b/jupyter_core/utils/__init__.py @@ -158,7 +158,7 @@ def wrapped(*args: Any, **kwargs: Any) -> Any: except RuntimeError: pass - loop = get_event_loop(allow_stopped=True) + loop = get_event_loop() return loop.run_until_complete(inner) wrapped.__doc__ = coro.__doc__ @@ -186,21 +186,14 @@ async def ensure_async(obj: Awaitable[T] | T) -> T: return cast(T, obj) -def get_event_loop(allow_stopped=False) -> asyncio.AbstractEventLoop: - # Get the loop for this thread. - # In Python 3.12, a deprecation warning is raised, which - # may later turn into a RuntimeError. We handle both - # cases. - with warnings.catch_warnings(): - warnings.simplefilter("ignore", DeprecationWarning) - try: - loop = asyncio.get_running_loop() - except RuntimeError: - if not allow_stopped: - raise - if sys.platform == "win32": - loop = asyncio.WindowsSelectorEventLoopPolicy().new_event_loop() - else: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) +def get_event_loop() -> asyncio.AbstractEventLoop: + # Get the loop for this thread, or create a new one. + try: + loop = asyncio.get_running_loop() + except RuntimeError: + if sys.platform == "win32": + loop = asyncio.WindowsSelectorEventLoopPolicy().new_event_loop() + else: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) return loop From 277a668bb8e0d258680f28adddfd4990fe2da139 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Tue, 26 Dec 2023 10:56:33 -0600 Subject: [PATCH 03/12] use contextvar --- jupyter_core/utils/__init__.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/jupyter_core/utils/__init__.py b/jupyter_core/utils/__init__.py index 3e7b4b5..6ad0f09 100644 --- a/jupyter_core/utils/__init__.py +++ b/jupyter_core/utils/__init__.py @@ -9,6 +9,7 @@ import sys import threading import warnings +from contextvars import ContextVar from pathlib import Path from types import FrameType from typing import Any, Awaitable, Callable, TypeVar, cast @@ -125,7 +126,8 @@ def run(self, coro: Any) -> Any: return fut.result(None) -_runner_map: dict[str, _TaskRunner] = {} +_runner: ContextVar[_TaskRunner | None] = ContextVar("_runner", default=None) +_loop: ContextVar[asyncio.AbstractEventLoop | None] = ContextVar("_loop", default=None) def run_sync(coro: Callable[..., Awaitable[T]]) -> Callable[..., T]: @@ -146,15 +148,16 @@ def run_sync(coro: Callable[..., Awaitable[T]]) -> Callable[..., T]: raise AssertionError def wrapped(*args: Any, **kwargs: Any) -> Any: - name = threading.current_thread().name inner = coro(*args, **kwargs) try: # If a loop is currently running in this thread, # use a task runner. asyncio.get_running_loop() - if name not in _runner_map: - _runner_map[name] = _TaskRunner() - return _runner_map[name].run(inner) + runner = _runner.get() + if not runner: + runner = _TaskRunner() + _runner.set(runner) + return runner.run(inner) except RuntimeError: pass @@ -188,6 +191,9 @@ async def ensure_async(obj: Awaitable[T] | T) -> T: def get_event_loop() -> asyncio.AbstractEventLoop: # Get the loop for this thread, or create a new one. + loop = _loop.get() + if loop: + return loop try: loop = asyncio.get_running_loop() except RuntimeError: @@ -196,4 +202,5 @@ def get_event_loop() -> asyncio.AbstractEventLoop: else: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) + _loop.set(loop) return loop From 8690da033dac3511a396a2c86bc56d57e9a4fef9 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Tue, 26 Dec 2023 16:31:55 -0600 Subject: [PATCH 04/12] fix handling of loop --- jupyter_core/utils/__init__.py | 2 +- tests/test_utils.py | 12 +++++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/jupyter_core/utils/__init__.py b/jupyter_core/utils/__init__.py index 6ad0f09..4e7d9ba 100644 --- a/jupyter_core/utils/__init__.py +++ b/jupyter_core/utils/__init__.py @@ -192,7 +192,7 @@ async def ensure_async(obj: Awaitable[T] | T) -> T: def get_event_loop() -> asyncio.AbstractEventLoop: # Get the loop for this thread, or create a new one. loop = _loop.get() - if loop: + if loop and not loop.is_closed(): return loop try: loop = asyncio.get_running_loop() diff --git a/tests/test_utils.py b/tests/test_utils.py index ed5d9f0..c2f4ef4 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -10,7 +10,13 @@ import pytest -from jupyter_core.utils import deprecation, ensure_async, ensure_dir_exists, run_sync +from jupyter_core.utils import ( + deprecation, + ensure_async, + ensure_dir_exists, + get_event_loop, + run_sync, +) def test_ensure_dir_exists(): @@ -42,11 +48,11 @@ async def foo(): foo_sync = run_sync(foo) assert foo_sync() == 1 assert foo_sync() == 1 - asyncio.get_event_loop().close() + get_event_loop().close() asyncio.set_event_loop(None) assert foo_sync() == 1 - asyncio.get_event_loop().close() + get_event_loop().close() asyncio.run(foo()) From b648f834f4f03dbda0864cb40fee10358fe6aeb4 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Tue, 26 Dec 2023 16:39:35 -0600 Subject: [PATCH 05/12] fix task runner --- jupyter_core/utils/__init__.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/jupyter_core/utils/__init__.py b/jupyter_core/utils/__init__.py index 4e7d9ba..e7a67b1 100644 --- a/jupyter_core/utils/__init__.py +++ b/jupyter_core/utils/__init__.py @@ -126,7 +126,7 @@ def run(self, coro: Any) -> Any: return fut.result(None) -_runner: ContextVar[_TaskRunner | None] = ContextVar("_runner", default=None) +_runner_map: dict[str, _TaskRunner] = {} _loop: ContextVar[asyncio.AbstractEventLoop | None] = ContextVar("_loop", default=None) @@ -148,16 +148,15 @@ def run_sync(coro: Callable[..., Awaitable[T]]) -> Callable[..., T]: raise AssertionError def wrapped(*args: Any, **kwargs: Any) -> Any: + name = threading.current_thread().name inner = coro(*args, **kwargs) try: # If a loop is currently running in this thread, # use a task runner. asyncio.get_running_loop() - runner = _runner.get() - if not runner: - runner = _TaskRunner() - _runner.set(runner) - return runner.run(inner) + if name not in _runner_map: + _runner_map[name] = _TaskRunner() + return _runner_map[name].run(inner) except RuntimeError: pass @@ -189,7 +188,7 @@ async def ensure_async(obj: Awaitable[T] | T) -> T: return cast(T, obj) -def get_event_loop() -> asyncio.AbstractEventLoop: +def get_event_loop(prefer_selector_loop=False) -> asyncio.AbstractEventLoop: # Get the loop for this thread, or create a new one. loop = _loop.get() if loop and not loop.is_closed(): @@ -197,7 +196,7 @@ def get_event_loop() -> asyncio.AbstractEventLoop: try: loop = asyncio.get_running_loop() except RuntimeError: - if sys.platform == "win32": + if sys.platform == "win32" and prefer_selector_loop: loop = asyncio.WindowsSelectorEventLoopPolicy().new_event_loop() else: loop = asyncio.new_event_loop() From b582976661507882510a319fd275ad6575e97832 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Tue, 26 Dec 2023 16:50:29 -0600 Subject: [PATCH 06/12] Cleanup --- jupyter_core/utils/__init__.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/jupyter_core/utils/__init__.py b/jupyter_core/utils/__init__.py index e7a67b1..992c896 100644 --- a/jupyter_core/utils/__init__.py +++ b/jupyter_core/utils/__init__.py @@ -126,7 +126,7 @@ def run(self, coro: Any) -> Any: return fut.result(None) -_runner_map: dict[str, _TaskRunner] = {} +_runner: ContextVar[_TaskRunner | None] = ContextVar("_runner", default=None) _loop: ContextVar[asyncio.AbstractEventLoop | None] = ContextVar("_loop", default=None) @@ -148,15 +148,16 @@ def run_sync(coro: Callable[..., Awaitable[T]]) -> Callable[..., T]: raise AssertionError def wrapped(*args: Any, **kwargs: Any) -> Any: - name = threading.current_thread().name inner = coro(*args, **kwargs) try: # If a loop is currently running in this thread, # use a task runner. asyncio.get_running_loop() - if name not in _runner_map: - _runner_map[name] = _TaskRunner() - return _runner_map[name].run(inner) + runner = _runner.get() + if not runner: + runner = _TaskRunner() + _runner.set(runner) + return runner.run(inner) except RuntimeError: pass @@ -188,7 +189,7 @@ async def ensure_async(obj: Awaitable[T] | T) -> T: return cast(T, obj) -def get_event_loop(prefer_selector_loop=False) -> asyncio.AbstractEventLoop: +def get_event_loop(prefer_selector_loop: bool = False) -> asyncio.AbstractEventLoop: # Get the loop for this thread, or create a new one. loop = _loop.get() if loop and not loop.is_closed(): From 1a2992a7875101697037a5addb76bff2bb96b8c9 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Tue, 26 Dec 2023 16:58:26 -0600 Subject: [PATCH 07/12] fix event loop policy handling --- jupyter_core/application.py | 5 ++++- tests/test_application.py | 10 ++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/jupyter_core/application.py b/jupyter_core/application.py index cdc1de0..11bc920 100644 --- a/jupyter_core/application.py +++ b/jupyter_core/application.py @@ -76,6 +76,9 @@ class JupyterApp(Application): aliases = base_aliases flags = base_flags + # Set to True for tornado-based apps. + _prefer_selector_loop = False + def _log_level_default(self) -> int: return logging.INFO @@ -302,7 +305,7 @@ def launch_instance(cls, argv: t.Any = None, **kwargs: t.Any) -> None: If a global instance already exists, this reinitializes and starts it """ - loop = get_event_loop() + loop = get_event_loop(cls._prefer_selector_loop) coro = cls._async_launch_instance(argv, **kwargs) loop.run_until_complete(coro) diff --git a/tests/test_application.py b/tests/test_application.py index 6935c8c..c3d59f6 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -139,3 +139,13 @@ def test_async_app(): AsyncApp.launch_instance([]) app = AsyncApp.instance() assert app.value == 10 + + +class AsyncTornadoApp(AsyncApp): + _prefer_selector_loop = True + + +def test_async_tornado_app(): + AsyncApp.launch_instance([]) + app = AsyncApp.instance() + assert app.value == 10 From 75848d8f9b99de60b58ff7b6a801a0edd52ad6df Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Tue, 26 Dec 2023 17:07:36 -0600 Subject: [PATCH 08/12] use thread local data --- jupyter_core/utils/__init__.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/jupyter_core/utils/__init__.py b/jupyter_core/utils/__init__.py index 992c896..ad1d451 100644 --- a/jupyter_core/utils/__init__.py +++ b/jupyter_core/utils/__init__.py @@ -9,7 +9,6 @@ import sys import threading import warnings -from contextvars import ContextVar from pathlib import Path from types import FrameType from typing import Any, Awaitable, Callable, TypeVar, cast @@ -126,8 +125,9 @@ def run(self, coro: Any) -> Any: return fut.result(None) -_runner: ContextVar[_TaskRunner | None] = ContextVar("_runner", default=None) -_loop: ContextVar[asyncio.AbstractEventLoop | None] = ContextVar("_loop", default=None) +_thread_data = threading.local() +_thread_data.loop = None +_thread_data.runner = None def run_sync(coro: Callable[..., Awaitable[T]]) -> Callable[..., T]: @@ -153,11 +153,9 @@ def wrapped(*args: Any, **kwargs: Any) -> Any: # If a loop is currently running in this thread, # use a task runner. asyncio.get_running_loop() - runner = _runner.get() - if not runner: - runner = _TaskRunner() - _runner.set(runner) - return runner.run(inner) + if not _thread_data.runner: + _thread_data.runner = _TaskRunner() + return _thread_data.runner.run(inner) except RuntimeError: pass @@ -191,7 +189,7 @@ async def ensure_async(obj: Awaitable[T] | T) -> T: def get_event_loop(prefer_selector_loop: bool = False) -> asyncio.AbstractEventLoop: # Get the loop for this thread, or create a new one. - loop = _loop.get() + loop = _thread_data.loop if loop and not loop.is_closed(): return loop try: @@ -202,5 +200,5 @@ def get_event_loop(prefer_selector_loop: bool = False) -> asyncio.AbstractEventL else: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - _loop.set(loop) + _thread_data.loop = loop return loop From 9237a5b126657b0c8b93a91025e9e6820691f572 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Tue, 26 Dec 2023 17:09:25 -0600 Subject: [PATCH 09/12] fix typing --- jupyter_core/utils/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jupyter_core/utils/__init__.py b/jupyter_core/utils/__init__.py index ad1d451..68ecf27 100644 --- a/jupyter_core/utils/__init__.py +++ b/jupyter_core/utils/__init__.py @@ -191,7 +191,7 @@ def get_event_loop(prefer_selector_loop: bool = False) -> asyncio.AbstractEventL # Get the loop for this thread, or create a new one. loop = _thread_data.loop if loop and not loop.is_closed(): - return loop + return loop # type:ignore[no-any-return] try: loop = asyncio.get_running_loop() except RuntimeError: @@ -201,4 +201,4 @@ def get_event_loop(prefer_selector_loop: bool = False) -> asyncio.AbstractEventL loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) _thread_data.loop = loop - return loop + return loop # type:ignore[no-any-return] From 55e0a5fd76b560bcea722973e4b5582620e10ef3 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Tue, 26 Dec 2023 19:29:23 -0600 Subject: [PATCH 10/12] fix task runner --- jupyter_core/utils/__init__.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/jupyter_core/utils/__init__.py b/jupyter_core/utils/__init__.py index 68ecf27..0c3985c 100644 --- a/jupyter_core/utils/__init__.py +++ b/jupyter_core/utils/__init__.py @@ -125,9 +125,9 @@ def run(self, coro: Any) -> Any: return fut.result(None) +_runner_map: dict[str, _TaskRunner] = {} _thread_data = threading.local() _thread_data.loop = None -_thread_data.runner = None def run_sync(coro: Callable[..., Awaitable[T]]) -> Callable[..., T]: @@ -148,14 +148,15 @@ def run_sync(coro: Callable[..., Awaitable[T]]) -> Callable[..., T]: raise AssertionError def wrapped(*args: Any, **kwargs: Any) -> Any: + name = threading.current_thread().name inner = coro(*args, **kwargs) try: # If a loop is currently running in this thread, # use a task runner. asyncio.get_running_loop() - if not _thread_data.runner: - _thread_data.runner = _TaskRunner() - return _thread_data.runner.run(inner) + if name not in _runner_map: + _runner_map[name] = _TaskRunner() + return _runner_map[name].run(inner) except RuntimeError: pass From 60cb295cff265935b902b0eb1fba60e02076dba5 Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Tue, 26 Dec 2023 19:51:12 -0600 Subject: [PATCH 11/12] use contextvar --- jupyter_core/utils/__init__.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/jupyter_core/utils/__init__.py b/jupyter_core/utils/__init__.py index 0c3985c..450dc69 100644 --- a/jupyter_core/utils/__init__.py +++ b/jupyter_core/utils/__init__.py @@ -9,6 +9,7 @@ import sys import threading import warnings +from contextvars import ContextVar from pathlib import Path from types import FrameType from typing import Any, Awaitable, Callable, TypeVar, cast @@ -126,8 +127,7 @@ def run(self, coro: Any) -> Any: _runner_map: dict[str, _TaskRunner] = {} -_thread_data = threading.local() -_thread_data.loop = None +_loop: ContextVar[asyncio.AbstractEventLoop | None] = ContextVar("_loop", default=None) def run_sync(coro: Callable[..., Awaitable[T]]) -> Callable[..., T]: @@ -190,8 +190,8 @@ async def ensure_async(obj: Awaitable[T] | T) -> T: def get_event_loop(prefer_selector_loop: bool = False) -> asyncio.AbstractEventLoop: # Get the loop for this thread, or create a new one. - loop = _thread_data.loop - if loop and not loop.is_closed(): + loop = _loop.get() + if loop is not None and not loop.is_closed(): return loop # type:ignore[no-any-return] try: loop = asyncio.get_running_loop() @@ -201,5 +201,5 @@ def get_event_loop(prefer_selector_loop: bool = False) -> asyncio.AbstractEventL else: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - _thread_data.loop = loop + _loop.set(loop) return loop # type:ignore[no-any-return] From 2d7bf47f0a053e4339f7d120ad2c219aa4f918fd Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Tue, 26 Dec 2023 19:59:49 -0600 Subject: [PATCH 12/12] typing --- jupyter_core/utils/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jupyter_core/utils/__init__.py b/jupyter_core/utils/__init__.py index 450dc69..7782628 100644 --- a/jupyter_core/utils/__init__.py +++ b/jupyter_core/utils/__init__.py @@ -192,7 +192,7 @@ def get_event_loop(prefer_selector_loop: bool = False) -> asyncio.AbstractEventL # Get the loop for this thread, or create a new one. loop = _loop.get() if loop is not None and not loop.is_closed(): - return loop # type:ignore[no-any-return] + return loop try: loop = asyncio.get_running_loop() except RuntimeError: @@ -202,4 +202,4 @@ def get_event_loop(prefer_selector_loop: bool = False) -> asyncio.AbstractEventL loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) _loop.set(loop) - return loop # type:ignore[no-any-return] + return loop