From 0f8dfcf6e177ce9c1baa5c9a70a3aeec04f33e90 Mon Sep 17 00:00:00 2001 From: "Christian Y. Brenninkmeijer" Date: Tue, 14 May 2024 10:03:03 +0100 Subject: [PATCH] simplify keep_alive --- spinnman/spalloc/spalloc_client.py | 109 +++++++++-------------------- spinnman/spalloc/spalloc_job.py | 23 ------ 2 files changed, 34 insertions(+), 98 deletions(-) diff --git a/spinnman/spalloc/spalloc_client.py b/spinnman/spalloc/spalloc_client.py index 8ed750490..3503987cb 100644 --- a/spinnman/spalloc/spalloc_client.py +++ b/spinnman/spalloc/spalloc_client.py @@ -14,16 +14,15 @@ """ Implementation of the client for the Spalloc web service. """ - -from contextlib import contextmanager +import time from logging import getLogger -from multiprocessing import Process, Queue + import queue import struct import threading from time import sleep -from typing import (Any, ContextManager, Callable, Dict, FrozenSet, Iterable, - Iterator, List, Mapping, Optional, Tuple, cast) +from typing import (Any, Callable, Dict, FrozenSet, Iterable, List, Mapping, + Optional, Tuple, cast) from urllib.parse import urlparse, urlunparse, ParseResult from packaging.version import Version @@ -69,6 +68,7 @@ _msg = struct.Struct(" str: """ @@ -199,7 +199,7 @@ def _create(self, create: Mapping[str, JsonValue], def create_job( self, num_boards: int = 1, machine_name: Optional[str] = None, - keepalive: int = 45) -> SpallocJob: + keepalive = KEEP_ALIVE_PERIOND) -> SpallocJob: return self._create({ "num-boards": int(num_boards), "keepalive-interval": f"PT{int(keepalive)}S" @@ -209,7 +209,7 @@ def create_job( def create_job_rect( self, width: int, height: int, machine_name: Optional[str] = None, - keepalive: int = 45) -> SpallocJob: + keepalive = KEEP_ALIVE_PERIOND) -> SpallocJob: return self._create({ "dimensions": { "width": int(width), @@ -224,7 +224,7 @@ def create_job_board( physical: Optional[Tuple[int, int, int]] = None, ip_address: Optional[str] = None, machine_name: Optional[str] = None, - keepalive: int = 45) -> SpallocJob: + keepalive = KEEP_ALIVE_PERIOND) -> SpallocJob: board: JsonObject if triad: x, y, z = triad @@ -248,7 +248,7 @@ def create_job_rect_at_board( triad: Optional[Tuple[int, int, int]] = None, physical: Optional[Tuple[int, int, int]] = None, ip_address: Optional[str] = None, - machine_name: Optional[str] = None, keepalive: int = 45, + machine_name: Optional[str] = None, keepalive = KEEP_ALIVE_PERIOND, max_dead_boards: int = 0) -> SpallocJob: board: JsonObject if triad: @@ -285,27 +285,6 @@ class _ProxyServiceError(IOError): """ -def _spalloc_keepalive(url, interval, term_queue, cookies, headers): - """ - Actual keepalive task implementation. Don't use directly. - """ - headers["Content-Type"] = "text/plain; charset=UTF-8" - while True: - requests.put(url, data="alive", cookies=cookies, headers=headers, - allow_redirects=False, timeout=10) - try: - term_queue.get(True, interval) - break - except queue.Empty: - continue - # On ValueError or OSError, just terminate the keepalive process - # They happen when the term_queue is directly closed - except ValueError: - break - except OSError: - break - - class _SpallocMachine(SessionAware, SpallocMachine): """ Represents a Spalloc-controlled machine. @@ -507,7 +486,7 @@ class _SpallocJob(SessionAware, SpallocJob): Don't make this yourself. Use :py:class:`SpallocClient` instead. """ __slots__ = ("__machine_url", "__chip_url", - "_keepalive_url", "__keepalive_handle", "__proxy_handle", + "_keepalive_url", "__proxy_handle", "__proxy_thread", "__proxy_ping") def __init__(self, session: Session, job_handle: str): @@ -520,10 +499,11 @@ def __init__(self, session: Session, job_handle: str): self.__machine_url = self._url + "machine" self.__chip_url = self._url + "chip" self._keepalive_url = self._url + "keepalive" - self.__keepalive_handle: Optional[Queue] = None self.__proxy_handle: Optional[WebSocket] = None self.__proxy_thread: Optional[_ProxyReceiver] = None self.__proxy_ping: Optional[_ProxyPing] = None + keep_alive = threading.Thread(target=self.__start_keepalive, daemon=True) + keep_alive.start() @overrides(SpallocJob.get_session_credentials_for_db) def get_session_credentials_for_db(self) -> Mapping[Tuple[str, str], str]: @@ -651,9 +631,7 @@ def wait_until_ready(self, timeout: Optional[int] = None, @overrides(SpallocJob.destroy) def destroy(self, reason: str = "finished"): - if self.__keepalive_handle: - self.__keepalive_handle.close() - self.__keepalive_handle = None + self._keepalive_url = None if self.__proxy_handle is not None: if self.__proxy_thread: self.__proxy_thread.close() @@ -663,38 +641,29 @@ def destroy(self, reason: str = "finished"): self._delete(self._url, reason=str(reason)) logger.info("deleted job at {}", self._url) - @overrides(SpallocJob.keepalive) - def keepalive(self) -> None: - self._put(self._keepalive_url, "alive") - - @overrides(SpallocJob.launch_keepalive_task, extend_doc=True) - def launch_keepalive_task( - self, period: float = 30) -> ContextManager[Process]: + def __keepalive(self) -> bool: """ - .. note:: - Tricky! *Cannot* be done with a thread, as the main thread is known - to do significant amounts of CPU-intensive work. - """ - if self.__keepalive_handle is not None: - raise SpallocException("cannot keep job alive from two tasks") - q: Queue = Queue(1) - p = Process(target=_spalloc_keepalive, args=( - self._keepalive_url, 0 + period, q, - *self._session_credentials), daemon=True) - p.start() - self.__keepalive_handle = q - return self.__closer(q, p) - - @contextmanager - def __closer(self, q: Queue, p: Process) -> Iterator[Process]: + Signal the that we want it to stay alive for a while longer. + + :return: True if the job has not been destroyed + :rtype: bool + """ + if self._keepalive_url is None: + print("False") + return False + cookies, headers = self._session_credentials + headers["Content-Type"] = "text/plain; charset=UTF-8" + logger.debug(self._keepalive_url) + requests.put(self._keepalive_url, data="alive", cookies=cookies, + headers=headers, allow_redirects=False, timeout=10) + return True + + def __start_keepalive(self) -> None: try: - yield p - finally: - q.put("quit") - # Give it a second, and if it still isn't dead, kill it - p.join(1) - if p.is_alive(): - p.kill() + while self.__keepalive(): + time.sleep(KEEP_ALIVE_PERIOND / 2) + except Exception as ex: # pylint: disable=broad-except + logger.exception(ex) @overrides(SpallocJob.where_is_machine) def where_is_machine(self, x: int, y: int) -> Optional[ @@ -705,16 +674,6 @@ def where_is_machine(self, x: int, y: int) -> Optional[ return cast(Tuple[int, int, int], tuple( r.json()["physical-board-coordinates"])) - @property - def _keepalive_handle(self) -> Optional[Queue]: - return self.__keepalive_handle - - @_keepalive_handle.setter - def _keepalive_handle(self, handle: Queue): - if self.__keepalive_handle is not None: - raise SpallocException("cannot keep job alive from two tasks") - self.__keepalive_handle = handle - @overrides(SpallocJob.create_transceiver) def create_transceiver(self) -> Transceiver: if self.get_state() != SpallocState.READY: diff --git a/spinnman/spalloc/spalloc_job.py b/spinnman/spalloc/spalloc_job.py index 14f5ba429..d7b9a907d 100644 --- a/spinnman/spalloc/spalloc_job.py +++ b/spinnman/spalloc/spalloc_job.py @@ -182,29 +182,6 @@ def destroy(self, reason: str = "finished"): """ raise NotImplementedError() - @abstractmethod - def keepalive(self) -> None: - """ - Signal the job that we want it to stay alive for a while longer. - """ - raise NotImplementedError() - - @abstractmethod - def launch_keepalive_task( - self, period: int = 30) -> AbstractContextManager: - """ - Starts a periodic task to keep a job alive. - - :param SpallocJob job: - The job to keep alive - :param int period: - How often to send a keepalive message (in seconds) - :return: - Some kind of closeable task handle; closing it terminates the task. - Destroying the job will also terminate the task. - """ - raise NotImplementedError() - @abstractmethod def where_is_machine(self, x: int, y: int) -> Optional[ Tuple[int, int, int]]: