Skip to content

Commit

Permalink
simplify keep_alive
Browse files Browse the repository at this point in the history
  • Loading branch information
Christian-B committed May 14, 2024
1 parent cdfc5e4 commit 0f8dfcf
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 98 deletions.
109 changes: 34 additions & 75 deletions spinnman/spalloc/spalloc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@
"""
Implementation of the client for the Spalloc web service.
"""

from contextlib import contextmanager
import time
from logging import getLogger
from multiprocessing import Process, Queue

import queue
import struct
import threading
from time import sleep
from typing import (Any, ContextManager, Callable, Dict, FrozenSet, Iterable,
Iterator, List, Mapping, Optional, Tuple, cast)
from typing import (Any, Callable, Dict, FrozenSet, Iterable, List, Mapping,
Optional, Tuple, cast)
from urllib.parse import urlparse, urlunparse, ParseResult

from packaging.version import Version
Expand Down Expand Up @@ -69,6 +68,7 @@
_msg = struct.Struct("<II")
_msg_to = struct.Struct("<IIIII")

KEEP_ALIVE_PERIOND = 30

def fix_url(url: Any) -> str:
"""
Expand Down Expand Up @@ -199,7 +199,7 @@ def _create(self, create: Mapping[str, JsonValue],
def create_job(
self, num_boards: int = 1,
machine_name: Optional[str] = None,
keepalive: int = 45) -> SpallocJob:
keepalive = KEEP_ALIVE_PERIOND) -> SpallocJob:
return self._create({
"num-boards": int(num_boards),
"keepalive-interval": f"PT{int(keepalive)}S"
Expand All @@ -209,7 +209,7 @@ def create_job(
def create_job_rect(
self, width: int, height: int,
machine_name: Optional[str] = None,
keepalive: int = 45) -> SpallocJob:
keepalive = KEEP_ALIVE_PERIOND) -> SpallocJob:
return self._create({
"dimensions": {
"width": int(width),
Expand All @@ -224,7 +224,7 @@ def create_job_board(
physical: Optional[Tuple[int, int, int]] = None,
ip_address: Optional[str] = None,
machine_name: Optional[str] = None,
keepalive: int = 45) -> SpallocJob:
keepalive = KEEP_ALIVE_PERIOND) -> SpallocJob:
board: JsonObject
if triad:
x, y, z = triad
Expand All @@ -248,7 +248,7 @@ def create_job_rect_at_board(
triad: Optional[Tuple[int, int, int]] = None,
physical: Optional[Tuple[int, int, int]] = None,
ip_address: Optional[str] = None,
machine_name: Optional[str] = None, keepalive: int = 45,
machine_name: Optional[str] = None, keepalive = KEEP_ALIVE_PERIOND,
max_dead_boards: int = 0) -> SpallocJob:
board: JsonObject
if triad:
Expand Down Expand Up @@ -285,27 +285,6 @@ class _ProxyServiceError(IOError):
"""


def _spalloc_keepalive(url, interval, term_queue, cookies, headers):
"""
Actual keepalive task implementation. Don't use directly.
"""
headers["Content-Type"] = "text/plain; charset=UTF-8"
while True:
requests.put(url, data="alive", cookies=cookies, headers=headers,
allow_redirects=False, timeout=10)
try:
term_queue.get(True, interval)
break
except queue.Empty:
continue
# On ValueError or OSError, just terminate the keepalive process
# They happen when the term_queue is directly closed
except ValueError:
break
except OSError:
break


class _SpallocMachine(SessionAware, SpallocMachine):
"""
Represents a Spalloc-controlled machine.
Expand Down Expand Up @@ -507,7 +486,7 @@ class _SpallocJob(SessionAware, SpallocJob):
Don't make this yourself. Use :py:class:`SpallocClient` instead.
"""
__slots__ = ("__machine_url", "__chip_url",
"_keepalive_url", "__keepalive_handle", "__proxy_handle",
"_keepalive_url", "__proxy_handle",
"__proxy_thread", "__proxy_ping")

def __init__(self, session: Session, job_handle: str):
Expand All @@ -520,10 +499,11 @@ def __init__(self, session: Session, job_handle: str):
self.__machine_url = self._url + "machine"
self.__chip_url = self._url + "chip"
self._keepalive_url = self._url + "keepalive"
self.__keepalive_handle: Optional[Queue] = None
self.__proxy_handle: Optional[WebSocket] = None
self.__proxy_thread: Optional[_ProxyReceiver] = None
self.__proxy_ping: Optional[_ProxyPing] = None
keep_alive = threading.Thread(target=self.__start_keepalive, daemon=True)
keep_alive.start()

@overrides(SpallocJob.get_session_credentials_for_db)
def get_session_credentials_for_db(self) -> Mapping[Tuple[str, str], str]:
Expand Down Expand Up @@ -651,9 +631,7 @@ def wait_until_ready(self, timeout: Optional[int] = None,

@overrides(SpallocJob.destroy)
def destroy(self, reason: str = "finished"):
if self.__keepalive_handle:
self.__keepalive_handle.close()
self.__keepalive_handle = None
self._keepalive_url = None
if self.__proxy_handle is not None:
if self.__proxy_thread:
self.__proxy_thread.close()
Expand All @@ -663,38 +641,29 @@ def destroy(self, reason: str = "finished"):
self._delete(self._url, reason=str(reason))
logger.info("deleted job at {}", self._url)

@overrides(SpallocJob.keepalive)
def keepalive(self) -> None:
self._put(self._keepalive_url, "alive")

@overrides(SpallocJob.launch_keepalive_task, extend_doc=True)
def launch_keepalive_task(
self, period: float = 30) -> ContextManager[Process]:
def __keepalive(self) -> bool:
"""
.. note::
Tricky! *Cannot* be done with a thread, as the main thread is known
to do significant amounts of CPU-intensive work.
"""
if self.__keepalive_handle is not None:
raise SpallocException("cannot keep job alive from two tasks")
q: Queue = Queue(1)
p = Process(target=_spalloc_keepalive, args=(
self._keepalive_url, 0 + period, q,
*self._session_credentials), daemon=True)
p.start()
self.__keepalive_handle = q
return self.__closer(q, p)

@contextmanager
def __closer(self, q: Queue, p: Process) -> Iterator[Process]:
Signal the that we want it to stay alive for a while longer.
:return: True if the job has not been destroyed
:rtype: bool
"""
if self._keepalive_url is None:
print("False")
return False
cookies, headers = self._session_credentials
headers["Content-Type"] = "text/plain; charset=UTF-8"
logger.debug(self._keepalive_url)
requests.put(self._keepalive_url, data="alive", cookies=cookies,
headers=headers, allow_redirects=False, timeout=10)
return True

def __start_keepalive(self) -> None:
try:
yield p
finally:
q.put("quit")
# Give it a second, and if it still isn't dead, kill it
p.join(1)
if p.is_alive():
p.kill()
while self.__keepalive():
time.sleep(KEEP_ALIVE_PERIOND / 2)
except Exception as ex: # pylint: disable=broad-except
logger.exception(ex)

@overrides(SpallocJob.where_is_machine)
def where_is_machine(self, x: int, y: int) -> Optional[
Expand All @@ -705,16 +674,6 @@ def where_is_machine(self, x: int, y: int) -> Optional[
return cast(Tuple[int, int, int], tuple(
r.json()["physical-board-coordinates"]))

@property
def _keepalive_handle(self) -> Optional[Queue]:
return self.__keepalive_handle

@_keepalive_handle.setter
def _keepalive_handle(self, handle: Queue):
if self.__keepalive_handle is not None:
raise SpallocException("cannot keep job alive from two tasks")
self.__keepalive_handle = handle

@overrides(SpallocJob.create_transceiver)
def create_transceiver(self) -> Transceiver:
if self.get_state() != SpallocState.READY:
Expand Down
23 changes: 0 additions & 23 deletions spinnman/spalloc/spalloc_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,29 +182,6 @@ def destroy(self, reason: str = "finished"):
"""
raise NotImplementedError()

@abstractmethod
def keepalive(self) -> None:
"""
Signal the job that we want it to stay alive for a while longer.
"""
raise NotImplementedError()

@abstractmethod
def launch_keepalive_task(
self, period: int = 30) -> AbstractContextManager:
"""
Starts a periodic task to keep a job alive.
:param SpallocJob job:
The job to keep alive
:param int period:
How often to send a keepalive message (in seconds)
:return:
Some kind of closeable task handle; closing it terminates the task.
Destroying the job will also terminate the task.
"""
raise NotImplementedError()

@abstractmethod
def where_is_machine(self, x: int, y: int) -> Optional[
Tuple[int, int, int]]:
Expand Down

0 comments on commit 0f8dfcf

Please sign in to comment.