diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index df2e4fa9..031f784e 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -17,8 +17,8 @@ import uuid import warnings from datetime import datetime -from queue import Queue -from signal import SIGINT, SIGTERM, Signals +from functools import partial +from signal import SIGINT, SIGTERM, Signals, signal from .control import CONTROL_THREAD_NAME @@ -333,7 +333,7 @@ async def process_control(self, msg): if self.control_stream: self.control_stream.flush(zmq.POLLOUT) - def should_handle(self, stream, msg, idents): + async def should_handle(self, stream, msg, idents): """Check whether a shell-channel message should be handled Allows subclasses to prevent handling of certain messages (e.g. aborted requests). @@ -350,9 +350,6 @@ def pre_handler_hook(self): """Hook to execute before calling message handler""" # ensure default_int_handler during handler call - def post_handler_hook(self): - """Hook to execute after calling message handler""" - async def enter_eventloop(self): """enter eventloop""" self.log.info("Entering eventloop %s", self.eventloop) @@ -490,102 +487,10 @@ async def control_main(self): await to_thread.run_sync(self.control_stop.wait) tg.cancel_scope.cancel() - async def process_control(self): - try: - while True: - await self.process_control_message() - except BaseException as e: - if self.control_stop.is_set(): - return - raise e - def post_handler_hook(self): """Hook to execute after calling message handler""" signal(SIGINT, self.saved_sigint_handler) - def enter_eventloop(self): - """enter eventloop""" - self.log.info("Entering eventloop %s", self.eventloop) - # record handle, so we can check when this changes - eventloop = self.eventloop - if eventloop is None: - self.log.info("Exiting as there is no eventloop") - return - - async def advance_eventloop(): - # check if eventloop changed: - if self.eventloop is not eventloop: - self.log.info("exiting eventloop %s", eventloop) - return - if self.msg_queue.qsize(): - self.log.debug("Delaying eventloop due to waiting messages") - # still messages to process, make the eventloop wait - schedule_next() - return - self.log.debug("Advancing eventloop %s", eventloop) - try: - eventloop(self) - except KeyboardInterrupt: - # Ctrl-C shouldn't crash the kernel - self.log.error("KeyboardInterrupt caught in kernel") - if self.eventloop is eventloop: - # schedule advance again - schedule_next() - - def schedule_next(): - """Schedule the next advance of the eventloop""" - # call_later allows the io_loop to process other events if needed. - # Going through schedule_dispatch ensures all other dispatches on msg_queue - # are processed before we enter the eventloop, even if the previous dispatch was - # already consumed from the queue by process_one and the queue is - # technically empty. - self.log.debug("Scheduling eventloop advance") - self.io_loop.call_later(0.001, partial(self.schedule_dispatch, advance_eventloop)) - - async def process_control_message(self, msg=None): - assert self.control_socket is not None - assert self.session is not None - - msg = msg or await self.control_socket.recv_multipart() - idents, msg = self.session.feed_identities(msg, copy=True) - try: - msg = self.session.deserialize(msg, content=True, copy=True) - except Exception: - self.log.error("Invalid Control Message", exc_info=True) # noqa: G201 - return - - self.log.debug("Control received: %s", msg) - - # Set the parent message for side effects. - self.set_parent(idents, msg, channel="control") - self._publish_status("busy", "control") - - header = msg["header"] - msg_type = header["msg_type"] - - handler = self.control_handlers.get(msg_type, None) - if handler is None: - self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type) - else: - try: - t, dispatch, args = self.msg_queue.get_nowait() - except (asyncio.QueueEmpty, QueueEmpty): - return - - if self.control_thread is None and self.control_stream is not None: - # If there isn't a separate control thread then this main thread handles both shell - # and control messages. Before processing a shell message we need to flush all control - # messages and allow them all to be processed. - await asyncio.sleep(0) - self.control_stream.flush() - - socket = self.control_stream.socket - while socket.poll(1): - await asyncio.sleep(0) - self.control_stream.flush() - - await dispatch(*args) - async def dispatch_queue(self): """Coroutine to preserve order of message handling