Skip to content

Commit

Permalink
More type annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
ianthomas23 committed Sep 13, 2024
1 parent a050784 commit 9669fcd
Showing 1 changed file with 19 additions and 15 deletions.
34 changes: 19 additions & 15 deletions ipykernel/subshell_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class SubshellManager:
against multiple subshells attempting to send at the same time.
"""

def __init__(self, context: zmq.asyncio.Context, shell_socket):
def __init__(self, context: zmq.asyncio.Context, shell_socket: zmq.asyncio.Socket):
assert current_thread() == main_thread()

self._context: zmq.asyncio.Context = context
Expand All @@ -58,7 +58,7 @@ def __init__(self, context: zmq.asyncio.Context, shell_socket):
# Used by _create_subshell to tell listen_from_subshells to spawn a new task.
self._send_stream, self._receive_stream = create_memory_object_stream[str]()

def close(self):
def close(self) -> None:
"""Stop all subshells and close all resources."""
assert current_thread().name == "Shell channel"

Expand All @@ -82,20 +82,22 @@ def close(self):
break
self._stop_subshell(subshell)

def get_control_other_socket(self):
def get_control_other_socket(self) -> zmq.asyncio.Socket:
return self._control_other_socket

def get_other_socket(self, subshell_id: str | None):
def get_other_socket(self, subshell_id: str | None) -> zmq.asyncio.Socket:
"""Return the other inproc pair socket for a subshell.
This socket is accessed from the subshell thread.
"""
if subshell_id is None:
return self._parent_other_socket
with self._lock_cache:
return self._cache[subshell_id].thread._pair_socket
socket = self._cache[subshell_id].thread._pair_socket
assert socket is not None
return socket

def get_shell_channel_socket(self, subshell_id: str | None):
def get_shell_channel_socket(self, subshell_id: str | None) -> zmq.asyncio.Socket:
"""Return the shell channel inproc pair socket for a subshell.
This socket is accessed from the shell channel thread.
Expand All @@ -113,19 +115,19 @@ def list_subshell(self) -> list[str]:
with self._lock_cache:
return list(self._cache)

async def listen_from_control(self, subshell_task):
async def listen_from_control(self, subshell_task) -> None:
"""Listen for messages on the control inproc socket, handle those messages and
return replies on the same socket. Runs in the shell channel thread.
"""
assert current_thread().name == "Shell channel"

socket = self._control_shell_channel_socket
while True:
request = await socket.recv_json()
request = await socket.recv_json() # type: ignore[misc]
reply = await self._process_control_request(request, subshell_task)
await socket.send_json(reply)
await socket.send_json(reply) # type: ignore[func-returns-value]

async def listen_from_subshells(self):
async def listen_from_subshells(self) -> None:
"""Listen for reply messages on inproc sockets of all subshells and resend
those messages to the client via the shell_socket.
Expand Down Expand Up @@ -154,7 +156,9 @@ def subshell_id_from_thread_id(self, thread_id) -> str | None:
msg = f"Thread id '{thread_id} does not correspond to a subshell of this kernel"
raise RuntimeError(msg)

def _create_inproc_pair_socket(self, name: str | None, shell_channel_end: bool):
def _create_inproc_pair_socket(
self, name: str | None, shell_channel_end: bool
) -> zmq.asyncio.Socket:
"""Create and return a single ZMQ inproc pair socket."""
address = self._get_inproc_socket_address(name)
socket = self._context.socket(zmq.PAIR)
Expand Down Expand Up @@ -199,7 +203,7 @@ def _delete_subshell(self, subshell_id: str) -> None:

self._stop_subshell(subshell)

def _get_inproc_socket_address(self, name: str | None):
def _get_inproc_socket_address(self, name: str | None) -> str:
full_name = f"subshell-{name}" if name else "subshell"
return f"inproc://{full_name}"

Expand All @@ -215,7 +219,7 @@ def _is_subshell(self, subshell_id: str | None) -> bool:
with self._lock_cache:
return subshell_id in self._cache

async def _listen_for_subshell_reply(self, subshell_id: str | None):
async def _listen_for_subshell_reply(self, subshell_id: str | None) -> None:
"""Listen for reply messages on specified subshell inproc socket and
resend to the client via the shell_socket.
Expand All @@ -236,7 +240,7 @@ async def _listen_for_subshell_reply(self, subshell_id: str | None):
return
raise e

async def _process_control_request(self, request, subshell_task):
async def _process_control_request(self, request, subshell_task) -> dict[str, t.Any]:
"""Process a control request message received on the control inproc
socket and return the reply. Runs in the shell channel thread.
"""
Expand All @@ -263,7 +267,7 @@ async def _process_control_request(self, request, subshell_task):
}
return reply

def _stop_subshell(self, subshell: Subshell):
def _stop_subshell(self, subshell: Subshell) -> None:
"""Stop a subshell thread and close all of its resources."""
assert current_thread().name == "Shell channel"

Expand Down

0 comments on commit 9669fcd

Please sign in to comment.