Skip to content

Commit

Permalink
Merge branch 'main' into anyio
Browse files Browse the repository at this point in the history
  • Loading branch information
blink1073 authored Feb 25, 2024
2 parents c244595 + de2221c commit 95fb56c
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 21 deletions.
69 changes: 50 additions & 19 deletions ipykernel/eventloops.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,19 @@ def _notify_stream_qt(kernel):
def enum_helper(name):
return operator.attrgetter(name.rpartition(".")[0])(sys.modules[QtCore.__package__])

def exit_loop():
"""fall back to main loop"""
kernel._qt_notifier.setEnabled(False)
kernel.app.qt_event_loop.quit()

def process_stream_events():
"""fall back to main loop when there's a socket event"""
# call flush to ensure that the stream doesn't lose events
# due to our consuming of the edge-triggered FD
# flush returns the number of events consumed.
# if there were any, wake it up
if kernel.shell_stream.flush(limit=1):
kernel._qt_notifier.setEnabled(False)
kernel.app.qt_event_loop.quit()
exit_loop()

if not hasattr(kernel, "_qt_notifier"):
fd = kernel.shell_stream.getsockopt(zmq.FD)
Expand All @@ -101,18 +105,31 @@ def process_stream_events():
else:
kernel._qt_notifier.setEnabled(True)

# allow for scheduling exits from the loop in case a timeout needs to
# be set from the kernel level
def _schedule_exit(delay):
"""schedule fall back to main loop in [delay] seconds"""
# The signatures of QtCore.QTimer.singleShot are inconsistent between PySide and PyQt
# if setting the TimerType, so we create a timer explicitly and store it
# to avoid a memory leak.
# PreciseTimer is needed so we exit after _at least_ the specified delay, not within 5% of it
if not hasattr(kernel, "_qt_timer"):
kernel._qt_timer = QtCore.QTimer(kernel.app)
kernel._qt_timer.setSingleShot(True)
kernel._qt_timer.setTimerType(enum_helper("QtCore.Qt.TimerType").PreciseTimer)
kernel._qt_timer.timeout.connect(exit_loop)
kernel._qt_timer.start(int(1000 * delay))

loop_qt._schedule_exit = _schedule_exit

# there may already be unprocessed events waiting.
# these events will not wake zmq's edge-triggered FD
# since edge-triggered notification only occurs on new i/o activity.
# process all the waiting events immediately
# so we start in a clean state ensuring that any new i/o events will notify.
# schedule first call on the eventloop as soon as it's running,
# so we don't block here processing events
if not hasattr(kernel, "_qt_timer"):
kernel._qt_timer = QtCore.QTimer(kernel.app)
kernel._qt_timer.setSingleShot(True)
kernel._qt_timer.timeout.connect(process_stream_events)
kernel._qt_timer.start(0)
QtCore.QTimer.singleShot(0, process_stream_events)


@register_integration("qt", "qt5", "qt6")
Expand Down Expand Up @@ -229,23 +246,33 @@ def __init__(self, app):
self.app = app
self.app.withdraw()

def process_stream_events(stream, *a, **kw):
def exit_loop():
"""fall back to main loop"""
app.tk.deletefilehandler(kernel.shell_stream.getsockopt(zmq.FD))
app.quit()
app.destroy()
del kernel.app_wrapper

def process_stream_events(*a, **kw):
"""fall back to main loop when there's a socket event"""
if stream.flush(limit=1):
app.tk.deletefilehandler(stream.getsockopt(zmq.FD))
app.quit()
app.destroy()
del kernel.app_wrapper
if kernel.shell_stream.flush(limit=1):
exit_loop()

# allow for scheduling exits from the loop in case a timeout needs to
# be set from the kernel level
def _schedule_exit(delay):
"""schedule fall back to main loop in [delay] seconds"""
app.after(int(1000 * delay), exit_loop)

loop_tk._schedule_exit = _schedule_exit

# For Tkinter, we create a Tk object and call its withdraw method.
kernel.app_wrapper = BasicAppWrapper(app)

notifier = partial(process_stream_events, kernel.shell_stream)
# seems to be needed for tk
notifier.__name__ = "notifier" # type:ignore[attr-defined]
app.tk.createfilehandler(kernel.shell_stream.getsockopt(zmq.FD), READABLE, notifier)
app.tk.createfilehandler(
kernel.shell_stream.getsockopt(zmq.FD), READABLE, process_stream_events
)
# schedule initial call after start
app.after(0, notifier)
app.after(0, process_stream_events)

app.mainloop()

Expand Down Expand Up @@ -559,6 +586,10 @@ def enable_gui(gui, kernel=None):
# User wants to turn off integration; clear any evidence if Qt was the last one.
if hasattr(kernel, "app"):
delattr(kernel, "app")
if hasattr(kernel, "_qt_notifier"):
delattr(kernel, "_qt_notifier")
if hasattr(kernel, "_qt_timer"):
delattr(kernel, "_qt_timer")
else:
if gui.startswith("qt"):
# Prepare the kernel here so any exceptions are displayed in the client.
Expand Down
77 changes: 77 additions & 0 deletions ipykernel/kernelbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,48 @@ async def process_control(self):
if self.control_stop.is_set():
return
raise e
def post_handler_hook(self):
"""Hook to execute after calling message handler"""
signal(SIGINT, self.saved_sigint_handler)

def enter_eventloop(self):
"""enter eventloop"""
self.log.info("Entering eventloop %s", self.eventloop)
# record handle, so we can check when this changes
eventloop = self.eventloop
if eventloop is None:
self.log.info("Exiting as there is no eventloop")
return

async def advance_eventloop():
# check if eventloop changed:
if self.eventloop is not eventloop:
self.log.info("exiting eventloop %s", eventloop)
return
if self.msg_queue.qsize():
self.log.debug("Delaying eventloop due to waiting messages")
# still messages to process, make the eventloop wait
schedule_next()
return
self.log.debug("Advancing eventloop %s", eventloop)
try:
eventloop(self)
except KeyboardInterrupt:
# Ctrl-C shouldn't crash the kernel
self.log.error("KeyboardInterrupt caught in kernel")
if self.eventloop is eventloop:
# schedule advance again
schedule_next()

def schedule_next():
"""Schedule the next advance of the eventloop"""
# call_later allows the io_loop to process other events if needed.
# Going through schedule_dispatch ensures all other dispatches on msg_queue
# are processed before we enter the eventloop, even if the previous dispatch was
# already consumed from the queue by process_one and the queue is
# technically empty.
self.log.debug("Scheduling eventloop advance")
self.io_loop.call_later(0.001, partial(self.schedule_dispatch, advance_eventloop))

async def process_control_message(self, msg=None):
assert self.control_socket is not None
Expand Down Expand Up @@ -1108,6 +1150,41 @@ def _topic(self, topic):
return (f"{base}.{topic}").encode()

_aborting = Bool(False)

def _abort_queues(self):
# while this flag is true,
# execute requests will be aborted
self._aborting = True
self.log.info("Aborting queue")

# flush streams, so all currently waiting messages
# are added to the queue
if self.shell_stream:
self.shell_stream.flush()

# Callback to signal that we are done aborting
# dispatch functions _must_ be async
async def stop_aborting():
self.log.info("Finishing abort")
self._aborting = False

# put the stop-aborting event on the message queue
# so that all messages already waiting in the queue are aborted
# before we reset the flag
schedule_stop_aborting = partial(self.schedule_dispatch, stop_aborting)

if self.stop_on_error_timeout:
# if we have a delay, give messages this long to arrive on the queue
# before we stop aborting requests
self.io_loop.call_later(self.stop_on_error_timeout, schedule_stop_aborting)
# If we have an eventloop, it may interfere with the call_later above.
# If the loop has a _schedule_exit method, we call that so the loop exits
# after stop_on_error_timeout, returning to the main io_loop and letting
# the call_later fire.
if self.eventloop is not None and hasattr(self.eventloop, "_schedule_exit"):
self.eventloop._schedule_exit(self.stop_on_error_timeout + 0.01)
else:
schedule_stop_aborting()

async def _send_abort_reply(self, socket, msg, idents):
"""Send a reply to an aborted request"""
Expand Down
2 changes: 1 addition & 1 deletion ipykernel/zmqshell.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ def init_magics(self):
"""Initialize magics."""
super().init_magics()
self.register_magics(KernelMagics)
self.magics_manager.register_alias("ed", "edit") # type:ignore[union-attr]
self.magics_manager.register_alias("ed", "edit")

def init_virtualenv(self):
"""Initialize virtual environment."""
Expand Down
8 changes: 7 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ classifiers = [
"Programming Language :: Python",
"Programming Language :: Python :: 3",
]
urls = {Homepage = "https://ipython.org"}
requires-python = ">=3.8"
dependencies = [
"debugpy>=1.8.1",
Expand All @@ -37,6 +36,13 @@ dependencies = [
"anyio>=4.0.0",
]

[project.urls]
Homepage = "https://ipython.org"
Documentation = "https://ipykernel.readthedocs.io"
Funding = "https://numfocus.org/donate"
Source = "https://github.com/ipython/ipykernel"
Tracker = "https://github.com/ipython/ipykernel/issues"

[project.optional-dependencies]
docs = [
"sphinx",
Expand Down

0 comments on commit 95fb56c

Please sign in to comment.