Skip to content

Commit

Permalink
lint
Browse files Browse the repository at this point in the history
  • Loading branch information
blink1073 committed Feb 27, 2024
1 parent e375798 commit f1bc9bb
Showing 1 changed file with 3 additions and 98 deletions.
101 changes: 3 additions & 98 deletions ipykernel/kernelbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import uuid
import warnings
from datetime import datetime
from queue import Queue
from signal import SIGINT, SIGTERM, Signals
from functools import partial
from signal import SIGINT, SIGTERM, Signals, signal

from .control import CONTROL_THREAD_NAME

Expand Down Expand Up @@ -333,7 +333,7 @@ async def process_control(self, msg):
if self.control_stream:
self.control_stream.flush(zmq.POLLOUT)

def should_handle(self, stream, msg, idents):
async def should_handle(self, stream, msg, idents):
"""Check whether a shell-channel message should be handled
Allows subclasses to prevent handling of certain messages (e.g. aborted requests).
Expand All @@ -350,9 +350,6 @@ def pre_handler_hook(self):
"""Hook to execute before calling message handler"""
# ensure default_int_handler during handler call

def post_handler_hook(self):
"""Hook to execute after calling message handler"""

async def enter_eventloop(self):
"""enter eventloop"""
self.log.info("Entering eventloop %s", self.eventloop)
Expand Down Expand Up @@ -490,102 +487,10 @@ async def control_main(self):
await to_thread.run_sync(self.control_stop.wait)
tg.cancel_scope.cancel()

async def process_control(self):
try:
while True:
await self.process_control_message()
except BaseException as e:
if self.control_stop.is_set():
return
raise e

def post_handler_hook(self):
"""Hook to execute after calling message handler"""
signal(SIGINT, self.saved_sigint_handler)

def enter_eventloop(self):
"""enter eventloop"""
self.log.info("Entering eventloop %s", self.eventloop)
# record handle, so we can check when this changes
eventloop = self.eventloop
if eventloop is None:
self.log.info("Exiting as there is no eventloop")
return

async def advance_eventloop():
# check if eventloop changed:
if self.eventloop is not eventloop:
self.log.info("exiting eventloop %s", eventloop)
return
if self.msg_queue.qsize():
self.log.debug("Delaying eventloop due to waiting messages")
# still messages to process, make the eventloop wait
schedule_next()
return
self.log.debug("Advancing eventloop %s", eventloop)
try:
eventloop(self)
except KeyboardInterrupt:
# Ctrl-C shouldn't crash the kernel
self.log.error("KeyboardInterrupt caught in kernel")
if self.eventloop is eventloop:
# schedule advance again
schedule_next()

def schedule_next():
"""Schedule the next advance of the eventloop"""
# call_later allows the io_loop to process other events if needed.
# Going through schedule_dispatch ensures all other dispatches on msg_queue
# are processed before we enter the eventloop, even if the previous dispatch was
# already consumed from the queue by process_one and the queue is
# technically empty.
self.log.debug("Scheduling eventloop advance")
self.io_loop.call_later(0.001, partial(self.schedule_dispatch, advance_eventloop))

async def process_control_message(self, msg=None):
assert self.control_socket is not None
assert self.session is not None

msg = msg or await self.control_socket.recv_multipart()
idents, msg = self.session.feed_identities(msg, copy=True)
try:
msg = self.session.deserialize(msg, content=True, copy=True)
except Exception:
self.log.error("Invalid Control Message", exc_info=True) # noqa: G201
return

self.log.debug("Control received: %s", msg)

# Set the parent message for side effects.
self.set_parent(idents, msg, channel="control")
self._publish_status("busy", "control")

header = msg["header"]
msg_type = header["msg_type"]

handler = self.control_handlers.get(msg_type, None)
if handler is None:
self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
else:
try:
t, dispatch, args = self.msg_queue.get_nowait()
except (asyncio.QueueEmpty, QueueEmpty):
return

if self.control_thread is None and self.control_stream is not None:
# If there isn't a separate control thread then this main thread handles both shell
# and control messages. Before processing a shell message we need to flush all control
# messages and allow them all to be processed.
await asyncio.sleep(0)
self.control_stream.flush()

socket = self.control_stream.socket
while socket.poll(1):
await asyncio.sleep(0)
self.control_stream.flush()

await dispatch(*args)

async def dispatch_queue(self):
"""Coroutine to preserve order of message handling
Expand Down

0 comments on commit f1bc9bb

Please sign in to comment.